Netty 入门笔记

Netty 入门笔记

一、 Netty简介

1. 是什么?

官方对于Netty的介绍:

Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

本质来说,它是一个把java nio包封装得很好的网络应用框架,具有以下特点:

  • 高并发,低延迟
  • 高扩展,API使用简单,开发门槛低
  • 非阻塞

基于事件驱动的编程,我们只需专注于业务本身,实现每一个必要的handler,省去了许多繁杂的底层操作,Netty就是这样,它的存在大大降低了NIO程序的编程复杂度。

Netty的优点归功于其封装性好之外,还跟NIO(Nonblocking I/O,非阻塞IO)有很大的关系。

2. BIO与NIO

Netty是一款基于NIO(Nonblocking I/O,非阻塞IO)开发的网络通信框架,对比于BIO(Blocking I/O,阻塞IO),他的并发性能得到了很大提高。

那么BIO与NIO的差距在哪里呢?

  1. 客户端监听(Listen)时,Accept是阻塞的,只有新连接来了,Accept才会返回,主线程才能继续。
  2. 读写socket时,Read是阻塞的,只有请求消息来了,Read才能返回,子线程才能继续处理。
  3. 读写socket时,Write是阻塞的,只有客户端把消息收了,Write才能返回,子线程才能继续读取下一个请求。

传统的BIO模式下,从头到尾的所有线程都是阻塞的,这些线程就干等着,占用系统的资源,什么事也不干。也就是说,一个线程只能处理一个连接

非阻塞IO会进行持续的轮询(polling),以查看某个操作是否就绪。

“非阻塞将大的整片时间的阻塞分成N多的小的阻塞, 所以进程不断地有机会 ‘被’ CPU光顾”。

Java NIO的非阻塞模式,是通过多路复用IO技术实现的,多路复用的核心就是通过Selector来轮询注册在其上的Channel,当发现某个或者多个Channel处于就绪状态后,从阻塞状态返回就绪的Channel的选择键集合,进行IO操作。如果什么事都没得做,它也不会死循环,它会将线程休眠起来,直到下一个事件来了再继续干活。也就是说,一个线程就能处理多个连接

这个优点在连接增多时尤其明显,在高并发场景中,动辄数万级到百万级的连接,使用传统的BIO模型,即使有线程池的加持,消耗的资源还是太多,服务器承受不了压力很容易gg。

NIO让我们使用较少的线程处理多个连接的同时,也大大减少了上下文切换的开销,当某个线程处于空闲状态时,还能安排它做其他任务。

3. Why Netty?

NIO的不足

Java 1.4版本推出的nio包提供了NIO的模型,然而现在我们很少能看到使用原生nio类库开发出的应用,主要有以下几个问题:

1) 类库和API繁杂,使用麻烦。

2) NIO编程涉及到Reactor模式,你必须对多线程和网路编程非常熟悉,才能编写出高质量的NIO程序。

3) 需要花大量的时间健壮自己的程序。例如处理断线重连、网络闪断、半包读写、失败缓存、网络拥塞和异常码流等等。

4) NIO的epoll bug,它会导致Selector空轮询,最终导致CPU 100%。直到JDK1.7版本该问题仍旧存在,只不过该bug发生概率降低了一些而已,并没有被解决,1.8才修复。

Netty的强势

1) API使用简单,开发门槛低;

2) 功能强大,预置了多种编解码功能,支持多种主流协议;

3) 定制能力强,可以通过ChannelHandler对通信框架进行灵活的扩展;

4) 性能高,通过与其它业界主流的NIO框架对比,Netty的综合性能最优;

5) 成熟、稳定,Netty修复了已经发现的所有JDK NIO BUG,业务开发人员不需要再为NIO的BUG而烦恼;

6) 社区活跃,版本迭代周期短,发现的BUG可以被及时修复,同时,更多的新功能会被加入;

7) 经历了大规模的商业应用考验,质量已经得到验证。在互联网、大数据、网络游戏、企业应用、电信软件等众多行业得到成功商用,证明了它可以完全满足不同行业的商业应用。

正是因为这些优点,Netty成为了Java NIO编程的首选框架。

二、 Netty的应用

1. Netty能做什么?

Netty本身是一个网络通讯框架,很多网络应用,RPC,分布式框架都有使用Netty进行网络通讯。通俗的说,Netty就是一个好使的处理Socket的框架

有了Netty,你可以实现自己的HTTP服务器,FTP服务器,UDP服务器,RPC服务器,WebSocket服务器,Redis的Proxy服务器,MySQL的Proxy服务器等等。

2. 使用Netty的产品

Dubbo

阿里分布式服务框架 Dubbo 的 RPC 框架使用 Dubbo 协议进行节点间通信,Dubbo 协议默认使用 Netty 作为基础通信组件,用于实现各进程节点之间的内部通信。

RocketMQ

淘宝的消息中间件, 在消息生产者和消息消费者之间,也采用 Netty 进行高性能、异步通信。

Hadoop

经典的 Hadoop 的高性能通信和序列化组件 Avro 的 RPC 框架,默认采用 Netty 进行跨节点通信,它的 Netty Service 基于 Netty 框架二次封装实现。

各种游戏服务器

使用Java语言的手游服务端和大型网游,大多使用Netty 作为通信组件,它本身提供了 TCP/UDP 和 HTTP 协议栈,非常方便定制和开发私有协议栈。账号登陆服务器、地图服务器之间可以方便的通过 Netty 进行高性能的通信。

三、Netty demo

一次普通的Socket连接

建立一个正常的serverSocket,各种实现的代码量对比:

  • 阻塞I/O
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class PlainOioServer {

public void serve(int port) throws IOException {
final ServerSocket socket = new ServerSocket(port);
try {
for (;;) {
final Socket clientSocket = socket.accept();
System.out.println("Accepted connection from " + clientSocket);
new Thread(new Runnable() {
@Override
public void run() {
OutputStream out;
try {
out = clientSocket.getOutputStream();
out.write("Hi!\r\n".getBytes(Charset.forName("UTF-8")));
out.flush();
clientSocket.close();
} catch (IOException e) {
e.printStackTrace();
try {
clientSocket.close();
} catch (IOException ex) {
// ignore on close
}
}
}
}).start();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
  • 非阻塞I/O
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class PlainNioServer {
public void serve(int port) throws IOException {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
ServerSocket ss = serverChannel.socket();
InetSocketAddress address = new InetSocketAddress(port);
ss.bind(address);
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());
for (;;) {
try {
selector.select();
} catch (IOException ex) {
ex.printStackTrace();
// handle exception
break;
}
Set<SelectionKey> readyKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = readyKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
try {
if (key.isAcceptable()) {
ServerSocketChannel server =
(ServerSocketChannel)key.channel();
SocketChannel client = server.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_WRITE |
SelectionKey.OP_READ, msg.duplicate());
System.out.println(
"Accepted connection from " + client);
}
if (key.isWritable()) {
SocketChannel client =
(SocketChannel)key.channel();
ByteBuffer buffer =
(ByteBuffer)key.attachment();
while (buffer.hasRemaining()) {
if (client.write(buffer) == 0) {
break;
}
}
client.close();
}
} catch (IOException ex) {
key.cancel();
try {
key.channel().close();
} catch (IOException cex) {
// 在关闭时忽略
}
}
}
}
}
}
  • Netty
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class NettyOioServer {

public void server(int port) throws Exception {
final ByteBuf buf = Unpooled.unreleasableBuffer(
Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
EventLoopGroup group = new OioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();

b.group(group)
.channel(OioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(
//这里写逻辑
new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);
}
});
}
});
ChannelFuture f = b.bind().sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
}

同样是非阻塞IO实现,原生nio使用了太多的api,导致程序复杂度远高于Netty。当然,初看Netty的同学会觉得代码有些臃肿,没关系,后面我们一行一行搞明白,你会发觉真正需要花时间关注的地方只有上面代码的ChannelInboundHandlerAdapter处。

搞一个EchoServer

  • EchoServer

    把c端发来的消息原封不动吐回去,这次我们把handler逻辑分离出去,首先搞定EchoServerHandler。

    上代码上代码~:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    /**
    * 负责连接发生读操作时的逻辑
    * @param ctx
    * @param msg
    */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ByteBuf in = (ByteBuf) msg;
    //解析收到的字节流
    String content = in.toString(CharsetUtil.UTF_8);
    //消息打印到控制台
    System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8));
    //将收到的消息吐回去
    ctx.write(in);
    }

    /**
    * 负责读操作的结尾逻辑
    * @param ctx
    */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
    //刷新缓冲区完成写操作
    ctx.writeAndFlush(Unpooled.EMPTY_BUFFER);
    }

    /**
    * 异常捕获
    * @param ctx
    * @param cause
    */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    // Close the connection when an exception is raised.
    cause.printStackTrace();
    ctx.close();
    }
    }

    handler完毕,我们在主类中注册它:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    public final class EchoServer {

    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
    // 处理IO操作的事件循环
    EventLoopGroup bossGroup = new NioEventLoopGroup(1); //boss负责接受连接,相当于Reactor 模型中处理客户端连接的线程池。
    EventLoopGroup workerGroup = new NioEventLoopGroup(); //worker处理事件的线程池
    final EchoServerHandler serverHandler = new EchoServerHandler();
    try {
    ServerBootstrap b = new ServerBootstrap(); //设置服务器的辅助类
    b.group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class) //使用基于NIO实现的channel来接受连接
    .option(ChannelOption.SO_BACKLOG, 100) //option设置特定的连接选项,这里SO_BACKLOG代表当服务器请求处理线程全满时,
    // 用于临时存放已完成三次握手的请求的队列的最大长度
    .handler(new LoggingHandler(LogLevel.INFO)) //父handler,这里设置了日志处理
    .childHandler( //子handler
    new ChannelInitializer<SocketChannel>() {
    //在这里注册自定义的handler
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline p = ch.pipeline();
    p.addLast(serverHandler);
    }
    });

    // 启动服务端
    ChannelFuture f = b.bind(PORT).sync();

    } finally {
    // 关闭时间循环
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
    }
    }
    }

    跑起来后,我们可以用telnet连一波了:

相关概念

java nio的Channel、Buffer、Selector在Netty中做了更高层的抽象。

  • Channel
    数据传输流,与channel相关的概念有以下四个:

Channel一览

Channel,表示一个连接,可以理解为每一个请求,就是一个Channel。
ChannelHandler,核心处理业务就在这里,用于处理业务请求。
ChannelHandlerContext,用于传输业务数据。
ChannelPipeline,用于保存处理过程需要用到的ChannelHandler和ChannelHandlerContext。

  • ByteBuf
    ByteBuf是一个存储字节的容器,最大特点就是使用方便,它既有自己的读索引和写索引,方便你对整段字节缓存进行读写,也支持get/set,方便你对其中每一个字节进行读写,他的数据结构如下图所示:

ByteBuf数据结构

他有三种使用模式:

  1. Heap Buffer 堆缓冲区
    堆缓冲区是ByteBuf最常用的模式,他将数据存储在堆空间。

  2. Direct Buffer 直接缓冲区

    直接缓冲区是ByteBuf的另外一种常用模式,他的内存分配都不发生在堆,jdk1.4引入的nio的ByteBuffer类允许jvm通过本地方法调用分配内存,这样做有两个好处

    • 通过免去中间交换的内存拷贝, 提升IO处理速度; 直接缓冲区的内容可以驻留在垃圾回收扫描的堆区以外。
    • DirectBuffer 在 -XX:MaxDirectMemorySize=xxM大小限制下, 使用 Heap 之外的内存, GC对此”无能为力”,也就意味着规避了在高负载下频繁的GC过程对应用线程的中断影响.
  3. Composite Buffer 复合缓冲区
    复合缓冲区相当于多个不同ByteBuf的视图,这是netty提供的,jdk不提供这样的功能。

  • Codec
    Netty中的编码/解码器,通过他你能完成字节与pojo、pojo与pojo的相互转换,从而达到自定义协议的目的。

四、Netty为何快?

串行化

  • 串行无锁化设计,在IO线程内部进行串行操作,避免多线程竞争导致的性能下降。
  • 通过调整NIO线程池的线程参数,可以同时启动多个串行化的线程并行运行,这种局部无锁化的串行线程设计相比一个队列-多个工作线程模型性能更优
  • 减少上下文切换,以及状态数据的同步

零拷贝

NIO中的另一大特性——零拷贝,当他需要接收数据的时候,他会在堆内存之外开辟一块内存,数据就直接从IO读到了那块内存中去,在netty里面通过ByteBuf可以直接对这些数据进行直接操作,从而加快了传输速度。

五、Netty学习资料

Netty官网

Netty权威指南

netty-example

并发编程网

# Java, Netty

Komentar

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×