本文为《Netty 入门与实战:仿写微信 IM 即时通讯系统》 的读书笔记
是什么
- 本质:JBoss做的一个Jar包
- 目的:快速开发高性能、高可靠性的网络服务器和客户端程序
- 优点:提供异步的、事件驱动的网络应用程序框架和工具
- 通俗的说:一个可以让你优雅地处理Socket的Jar包
Netty的特性
设计
统一的API,适用于不同的协议(阻塞和非阻塞)
基于灵活、可扩展的事件驱动模型
高度可定制的线程模型
可靠的无连接数据Socket支持(UDP)
性能
更好的吞吐量,低延迟
更省资源
尽量减少不必要的内存拷贝
安全
完整的SSL/TLS和STARTTLS的支持
能在Applet与Android的限制环境运行良好
健壮性
不再因过快、过慢或超负载连接导致OutOfMemoryError
不再有在高速网络环境下NIO读写频率不一致的问题
易用
完善的JavaDoc,用户指南和样例
简洁简单
仅信赖于JDK1.5
能干什么
有了Netty,你可以实现自己的HTTP服务器,FTP服务器,UDP服务器,RPC服务器,WebSocket服务器,Redis的Proxy服务器,MySQL的Proxy服务器等等。
传统的HTTP服务器的原理
1 2 3 4 5 6 7 8 9
| 1. 创建一个ServerSocket,监听并绑定一个端口 2. 一系列客户端来请求这个端口 3. 服务器使用Accept,获得一个来自客户端的Socket连接对象 4. 启动一个新线程处理连接 4.1 读Socket,得到字节流 4.2 解码协议,得到Http请求对象 4.3 处理Http请求,得到一个结果,封装成一个HttpResponse对象 4.4 编码协议,将结果序列化字节流 写Socket,将字节流发给客户端 5. 继续循环步骤3
|
HTTP服务器之所以称为HTTP服务器,是因为编码解码协议是HTTP协议,如果协议是Redis协议,那它就成了Redis服务器,如果协议是WebSocket,那它就成了WebSocket服务器,等等。使用Netty你就可以定制编解码协议,实现自己的特定协议的服务器。
典型的应用
从组网情况看,垂直的架构拆分之后,系统采用分布式部署,各个节点之间需要远程服务调用,高性能的 RPC 框架必不可少,Netty 作为异步高性能的通信框架,往往作为基础通信组件被这些 RPC 框架使用。
如:阿里分布式服务框架 Dubbo 的 RPC 框架使用 Dubbo 协议进行节点间通信,Dubbo 协议默认使用 Netty 作为基础通信组件,用于实现各进程节点之间的内部通信。
怎么玩
添加依赖
1 2 3 4 5 6 7
| <dependencies> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.6.Final</version> </dependency> </dependencies>
|
Netty服务端demo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
| import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.AttributeKey; /** * 要启动一个Netty服务端,必须要指定三类属性,分别是线程模型、IO 模型、连接读写处理逻辑、绑定端口 * * @author zhang guo dong */ public class NettyServer { private static final int BEGIN_PORT = 10101; public static void main(String[] args) { NioEventLoopGroup boosGroup = new NioEventLoopGroup(); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); final ServerBootstrap serverBootstrap = new ServerBootstrap(); final AttributeKey<Object> clientKey = AttributeKey.newInstance("clientKey"); serverBootstrap // 1.绑定线程模型,一个负责接受请求,一个负责干活 .group(boosGroup, workerGroup) // 2.指定IO模型 .channel(NioServerSocketChannel.class) // 可选:用于指定在服务端启动过程中的一些逻辑 .handler(new ChannelInboundHandlerAdapter() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); } }) // 可选:可以给服务端的 channel,也就是NioServerSocketChannel指定一些自定义属性, 然后可以通过channel.attr()取出这个属性 .attr(AttributeKey.newInstance("serverName"), "nettyServer") // 可选:可以给每一条连接指定自定义属性 .childAttr(clientKey, "clientValue") // 可选:给服务端channel设置一些属性。表示系统用于临时存放已完成三次握手的请求的队列的最大长度 // 可选:如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数 .option(ChannelOption.SO_BACKLOG, 1024) // 可选:每条连接设置一些TCP底层相关的属性。表示是否开启TCP底层心跳机制 .childOption(ChannelOption.SO_KEEPALIVE, true) // 可选:表示是否开启Nagle算法,如果要求高实时性,有数据发送时就马上发送,就关闭,如果需要减少发送次数减少网络交互,就开启 .childOption(ChannelOption.TCP_NODELAY, true) // 3.定义后续每条连接的数据读写,业务处理逻辑 .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) { System.out.println(ch.attr(clientKey).get()); } }); // 4.绑定端口 bind(serverBootstrap, BEGIN_PORT); } /** * 自动绑定递增端口 * * @param serverBootstrap 启动器 * @param port 待绑定的端口 */ private static void bind(final ServerBootstrap serverBootstrap, final int port) { // bind(port) 是一个异步的方法,调用之后是立即返回的,他的返回值是一个ChannelFuture serverBootstrap.bind(port) // 给这个ChannelFuture添加一个监听器GenericFutureListener,在operationComplete方法里,可以监听端口是否绑定成功 .addListener(future -> { if (future.isSuccess()) { System.out.println("端口[" + port + "]绑定成功!"); } else { System.err.println("端口[" + port + "]绑定失败!"); bind(serverBootstrap, port + 1); } }); } }
|
Netty客户端demo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.AttributeKey; import java.util.Date; import java.util.concurrent.TimeUnit; /** * 要启动一个Netty客户端,必须要指定三类属性,分别是线程模型、IO 模型、连接读写处理逻辑、绑定端口 * * @author zhang guo dong */ public class NettyClient { private static final int MAX_RETRY = 5; public static void main(String[] args) { NioEventLoopGroup workerGroup = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap // 1.指定线程模型 .group(workerGroup) // 2.指定 IO 类型为 NIO .channel(NioSocketChannel.class) // 可选:可以给客户端 Channel,也就是NioSocketChannel绑定自定义属性,可以通过channel.attr()取出这个属性 .attr(AttributeKey.newInstance("clientName"), "nettyClient") // 可选:设置TCP底层属性,表示连接的超时时间 .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) // 3.IO 处理逻辑 .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { } }); // 4.建立连接 connect(bootstrap, "127.0.0.1", 10101, MAX_RETRY); } private static void connect(Bootstrap bootstrap, String host, int port, int retry) { bootstrap.connect(host, port).addListener(future -> { if (future.isSuccess()) { System.out.println("连接成功!"); } else if (retry == 0) { System.err.println("重试次数已用完,放弃连接!"); } else { // 第几次重连 int order = (MAX_RETRY - retry) + 1; // 本次重连的间隔 int delay = 1 << order; System.err.println(new Date() + ": 连接失败,第" + order + "次重连……"); bootstrap.config().group().schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit .SECONDS); } }); } }
|
仔细看
传输载体ByteBuf
- ByteBuf是一个字节容器,包含三个部分,第一部分是已经丢弃的字节,这部分数据是无效的;第二部分是可读字节,这部分是ByteBuf的主体数据;第三部分是可写字节,所有写到ButeBuf的数据都会写到这个区域。后边虚线部分是该ByteBuf最多还能扩容多少容量。
- 上述三个部门是被两个指针划分出来,从左到右,依次是读指针(readerIndex)、写指针(writerIndex)、总容量capacity。
- 从ByteBuf中每读取一个字节,rederIndex自增1,ByteBuf里面总共有writerIndex - readerIndex个可读字节,当readerIndex 与 writerIndex相等的时候,ByteBuf不可读。
- 写数据是从writeIndex指向的部分开始写,每写一个字节,writerIndex自增1,直到和capacity相等,ByteBuf不可写了。
- ByteBuf 里面其实还有一个参数 maxCapacity,当向 ByteBuf 写数据的时候,如果容量不足,那么这个时候可以进行扩容,每次扩容64字节的倍数,直到 capacity 扩容到 maxCapacity,超过 maxCapacity 就会报错。
相关API
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| // 前者表示返回当前的读指针 readerIndex, 后者表示设置读指针 readerIndex() 与 readerIndex(int) // 前者表示返回当前的写指针 writerIndex, 后者表示设置写指针 writeIndex() 与 writeIndex(int) // 前者表示把当前的读指针保存起来,后者表示把当前的读指针恢复到之前保存的值,下面两段代码是等价的 markReaderIndex() 与 resetReaderIndex() // markWriterIndex() 与 resetWriterIndex() // 代码片段1 int readerIndex = buffer.readerIndex(); // .. 其他操作 buffer.readerIndex(readerIndex); // 代码片段二 buffer.markReaderIndex(); // .. 其他操作 buffer.resetReaderIndex(); // 关于 ByteBuf 的读写都可以看作从指针开始的地方开始读写数据 // 表示把字节数组 src 里面的数据全部写到 ByteBuf,src 字节数组大小的长度通常小于等于 writableBytes() writeBytes(byte[] src) // 把 ByteBuf 里面的数据全部读取到 dst,这里 dst 字节数组的大小通常等于 readableBytes() buffer.readBytes(byte[] dst)
|
writeByte() 表示往 ByteBuf 中写一个字节,而 buffer.readByte() 表示从 ByteBuf 中读取一个字节,类似的 API 还有 writeBoolean()、writeChar()、writeShort()、writeInt()、writeLong()、writeFloat()、writeDouble() 与 readBoolean()、readChar()、readShort()、readInt()、readLong()、readFloat()、readDouble()
与读写 API 类似的 API 还有 getBytes、getByte() 与 setBytes()、setByte() 系列,唯一的区别就是 get/set 不会改变读写指针,而 read/write 会改变读写指针,这点在解析数据的时候千万要注意
由于 Netty 使用了堆外内存,而堆外内存是不被 jvm 直接管理的,也就是说申请到的内存无法被垃圾回收器直接回收,所以需要我们手动回收。有点类似于c语言里面,申请到的内存必须手工释放,否则会造成内存泄漏。
Netty 的 ByteBuf 是通过引用计数的方式管理的,如果一个 ByteBuf 没有地方被引用到,需要回收底层内存。默认情况下,当创建完一个 ByteBuf,它的引用为1,然后每次调用 retain() 方法, 它的引用就加一, release() 方法原理是将引用计数减一,减完之后如果发现引用计数为0,则直接回收 ByteBuf 底层的内存。
通信协议编码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| release() retain() // 方法从原始 ByteBuf 中截取一段,这段数据是从 readerIndex 到 writeIndex, // 同时,返回的新的 ByteBuf 的最大容量 maxCapacity 为原始 ByteBuf 的 readableBytes() slice() // 方法把整个 ByteBuf 都截取出来,包括所有的数据,指针信息 duplicate() // slice() 方法与 duplicate() 方法的相同点是:底层内存以及引用计数与原始的 ByteBuf 共享,也就是说经过 slice() 或者 duplicate() 返回的 ByteBuf 调用 write 系列方法都会影响到 原始的 ByteBuf,但是它们都维持着与原始 ByteBuf 相同的内存引用计数和不同的读写指针 // slice() 方法与 duplicate() 不同点就是:slice() 只截取从 readerIndex 到 writerIndex 之间的数据,它返回的 ByteBuf 的最大容量被限制到 原始 ByteBuf 的 readableBytes(), 而 duplicate() 是把整个 ByteBuf 都与原始的 ByteBuf 共享 // slice() 方法与 duplicate() 方法不会拷贝数据,它们只是通过改变读写指针来改变读写的行为,而最后一个方法 copy() 会直接从原始的 ByteBuf 中拷贝所有的信息,包括读写指针以及底层对应的数据,因此,往 copy() 返回的 ByteBuf 中写数据不会影响到原始的 ByteBuf // slice() 和 duplicate() 不会改变 ByteBuf 的引用计数,所以原始的 ByteBuf 调用 release() 之后发现引用计数为零,就开始释放内存,调用这两个方法返回的 ByteBuf 也会被释放,这个时候如果再对它们进行读写,就会报错。因此,我们可以通过调用一次 retain() 方法 来增加引用,表示它们对应的底层的内存多了一次引用,引用计数为2,在释放内存的时候,需要调用两次 release() 方法,将引用计数降到零,才会释放内存 // 这三个方法均维护着自己的读写指针,与原始的 ByteBuf 的读写指针无关,相互之间不受影响
|
使用到 slice 和 duplicate 方法的时候,千万要弄清楚内存共享,引用计数共享,读写指针不共享几个概念
1 2 3 4 5 6 7 8 9
| retainedSlice() 与 retainedDuplicate() // 它们的作用是在截取内存片段的同时,增加内存的引用计数,分别与下面两段代码等价 // retainedSlice 等价于 slice().retain(); // retainedDuplicate() 等价于 duplicate().retain()
|
pipeline 与 channelHandler
通过责任链设计模式来组织代码逻辑,并且能够支持逻辑的动态添加和删除 ,Netty 能够支持各类协议的扩展,比如 HTTP,Websocket,Redis,靠的就是 pipeline 和 channelHandler。
无论是从服务端来看,还是客户端来看,在 Netty 整个框架里面,一条连接对应着一个 Channel,这条 Channel 所有的处理逻辑都在一个叫做 ChannelPipeline 的对象里面,ChannelPipeline 是一个双向链表结构,他和 Channel 之间是一对一的关系。
ChannelPipeline 里面每个节点都是一个 ChannelHandlerContext 对象,这个对象能够拿到和 Channel 相关的所有的上下文信息,然后这个对象包着一个重要的对象,那就是逻辑处理器 ChannelHandler。
ChannelHandler 有哪些分类
- ChannelInboundHandler: 从字面意思也可以猜到,它是处理读数据的逻辑。在一端读到一段数据,首先要解析这段数据,然后对这些数据做一系列逻辑处理,最终把响应写到对端, 在开始组装响应之前的所有的逻辑,都可以放置在 ChannelInboundHandler 里处理,其中一个最重要的方法就是 channelRead()。
- ChannelOutBoundHandler 是处理写数据的逻辑,它是定义我们一端在组装完响应之后,把数据写到对端的逻辑。我们封装好一个 response 对象,接下来我们有可能对这个 response 做一些其他的特殊逻辑,然后,再编码成 ByteBuf,最终写到对端,它里面最核心的一个方法就是 write()。
inBoundHandler和outBoundHandler执行顺序
这两个子接口分别有对应的默认实现,ChannelInboundHandlerAdapter,和 ChanneloutBoundHandlerAdapter,它们分别实现了两大接口的所有功能,默认情况下会把读写事件传播到下一个 handler。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| // 在服务端的 pipeline 添加三个 ChannelInboundHandler NettyServer.java serverBootstrap .childHandler(new ChannelInitializer<NioSocketChannel>() { protected void initChannel(NioSocketChannel ch) { ch.pipeline().addLast(new InBoundHandlerA()); ch.pipeline().addLast(new InBoundHandlerB()); ch.pipeline().addLast(new InBoundHandlerC()); } }); // 每个 inBoundHandler 都继承自 ChannelInboundHandlerAdapter,然后实现了 channelRead() 方法 public class InBoundHandlerA extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("InBoundHandlerA: " + msg); super.channelRead(ctx, msg); } } public class InBoundHandlerB extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("InBoundHandlerB: " + msg); super.channelRead(ctx, msg); } } public class InBoundHandlerC extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("InBoundHandlerC: " + msg); super.channelRead(ctx, msg); } }
|
在 channelRead() 方法里面,我们打印当前 handler 的信息,然后调用父类的 channelRead() 方法,而这里父类的 channelRead() 方法会自动调用到下一个 inBoundHandler 的 channelRead() 方法,并且会把当前 inBoundHandler 里处理完毕的对象传递到下一个 inBoundHandler。
inBoundHandler的添加顺序和传递顺序一致,顺序为 A -> B -> C。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| // 我们继续在服务端的 pipeline 添加三个 ChanneloutBoundHandler serverBootstrap .childHandler(new ChannelInitializer<NioSocketChannel>() { protected void initChannel(NioSocketChannel ch) { // inBound,处理读数据的逻辑链 ch.pipeline().addLast(new InBoundHandlerA()); ch.pipeline().addLast(new InBoundHandlerB()); ch.pipeline().addLast(new InBoundHandlerC()); // outBound,处理写数据的逻辑链 ch.pipeline().addLast(new OutBoundHandlerA()); ch.pipeline().addLast(new OutBoundHandlerB()); ch.pipeline().addLast(new OutBoundHandlerC()); } }); // 每个 outBoundHandler 都继承自 ChanneloutBoundHandlerAdapter,然后实现了 write() 方法 public class OutBoundHandlerA extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("OutBoundHandlerA: " + msg); super.write(ctx, msg, promise); } } public class OutBoundHandlerB extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("OutBoundHandlerB: " + msg); super.write(ctx, msg, promise); } } public class OutBoundHandlerC extends ChannelOutboundHandlerAdapter { public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("OutBoundHandlerC: " + msg); super.write(ctx, msg, promise); } }
|
outBoundHandler 的执行顺序与我们添加的顺序相反,顺序为 A -> B -> C。
pipeline 的结构
pipline 的执行顺序
虽然两种类型的 handler 在一个双向链表里,但是这两类 handler 的分工是不一样的,inBoundHandler 的事件通常只会传播到下一个 inBoundHandler,outBoundHandler 的事件通常只会传播到下一个 outBoundHandler,两者相互不受干扰。
拆包粘包
尽管我们在应用层面使用了 Netty,但是对于操作系统来说,只认 TCP 协议,尽管我们的应用层是按照ByteBuf为单位发送数据,但是到了底层操作系统仍然是按照字节流发送数据,因此,数据到了服务端,也是按照字节流的方式读入,然后到了Netty应用层,重新拼装成ByteBuf。问题就在传输和接收过程,这两个ByteBuf可能是不对等的。
因此,我们需要在客户端根据自定义协议来组装我们应用层的数据包,然后在服务端根据我们的应用层的协议来组装数据包,这个过程通常在服务端称为拆包,而在客户端称为粘包。
拆包原理
- 如果当前读取的数据不足以拼接成一个完整的业务数据包,那就保留该数据,继续从 TCP 缓冲区中读取,直到得到一个完整的数据包。
- 如果当前读到的数据加上已经读取的数据足够拼接成一个数据包,那就将已经读取的数据拼接上本次读取的数据,构成一个完整的业务数据包传递到业务逻辑,多余的数据仍然保留,以便和下次读到的数据尝试拼接。
Netty自带拆包器
固定长度的拆包器 FixedLengthFrameDecoder。如果你的应用层协议非常简单,每个数据包的长度都是固定的,比如 100,那么只需要把这个拆包器加到 pipeline 中,Netty 会把一个个长度为 100 的数据包 (ByteBuf) 传递到下一个 channelHandler。
行拆包器 LineBasedFrameDecoder。从字面意思来看,发送端发送数据包的时候,每个数据包之间以换行符作为分隔,接收端通过 LineBasedFrameDecoder 将粘过的 ByteBuf 拆分成一个个完整的应用层数据包。
分隔符拆包器 DelimiterBasedFrameDecoder。DelimiterBasedFrameDecoder 是行拆包器的通用版本,只不过我们可以自定义分隔符。
基于长度域拆包器 LengthFieldBasedFrameDecoder。最后一种拆包器是最通用的一种拆包器,只要你的自定义协议中包含长度域字段,均可以使用这个拆包器来实现应用层拆包。由于上面三种拆包器比较简单,读者可以自行写出 demo,接下来,我们就结合我们小册的自定义协议,来学习一下如何使用基于长度域的拆包器来拆解我们的数据包。
channelHandler的生命周期
ChannelHandler 有很多回调方法,这些回调方法的执行是有顺序的,而这个执行顺序可以称为 ChannelHandler 的生命周期。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| LifeCyCleTestHandler.java public class LifeCyCleTestHandler extends ChannelInboundHandlerAdapter { @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.out.println("逻辑处理器被添加:handlerAdded()"); super.handlerAdded(ctx); } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { System.out.println("channel 绑定到线程(NioEventLoop):channelRegistered()"); super.channelRegistered(ctx); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("channel 准备就绪:channelActive()"); super.channelActive(ctx); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("channel 有数据可读:channelRead()"); super.channelRead(ctx, msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("channel 某次数据读完:channelReadComplete()"); super.channelReadComplete(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("channel 被关闭:channelInactive()"); super.channelInactive(ctx); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { System.out.println("channel 取消线程(NioEventLoop) 的绑定: channelUnregistered()"); super.channelUnregistered(ctx); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { System.out.println("逻辑处理器被移除:handlerRemoved()"); super.handlerRemoved(ctx); } } // 我们把这个 handler 添加到 构建的 pipeline 中 // 前面代码略 .childHandler(new ChannelInitializer<NioSocketChannel>() { protected void initChannel(NioSocketChannel ch) { // 添加到第一个 ch.pipeline().addLast(new LifeCyCleTestHandler()); ch.pipeline().addLast(new PacketDecoder()); ch.pipeline().addLast(new LoginRequestHandler()); ch.pipeline().addLast(new MessageRequestHandler()); ch.pipeline().addLast(new PacketEncoder()); } });
|
先运行 NettyServer.java,然后再运行 NettyClient.java
可以看到,ChannelHandler 回调方法的执行顺序为
handlerAdded() -> channelRegistered() -> channelActive() -> channelRead() -> channelReadComplete() -> channelInactive() -> channelUnregistered() -> handlerRemoved()
- handlerAdded() :指的是当检测到新连接之后,调用 ch.pipeline().addLast(new LifeCyCleTestHandler()); 之后的回调,表示在当前的 channel 中,已经成功添加了一个 handler 处理器。
- channelRegistered():这个回调方法,表示当前的 channel 的所有的逻辑处理已经和某个 NIO 线程建立了绑定关系,accept 到新的连接,然后创建一个线程来处理这条连接的读写,只不过 Netty 里面是使用了线程池的方式,只需要从线程池里面去抓一个线程绑定在这个 channel 上即可,这里的 NIO 线程通常指的是 NioEventLoop,不理解没关系,后面我们还会讲到。
- channelActive():当 channel 的所有的业务逻辑链准备完毕(也就是说 channel 的 pipeline 中已经添加完所有的 handler)以及绑定好一个 NIO 线程之后,这条连接算是真正激活了,接下来就会回调到此方法。
- channelRead():客户端向服务端发来数据,每次都会回调此方法,表示有数据可读。
- channelReadComplete():服务端每次读完一次完整的数据之后,回调该方法,表示数据读取完毕。
- channelInactive(): 表面这条连接已经被关闭了,这条连接在 TCP 层面已经不再是 ESTABLISH 状态了
- channelUnregistered(): 既然连接已经被关闭,那么与这条连接绑定的线程就不需要对这条连接负责了,这个回调就表明与这条连接对应的 NIO 线程移除掉对这条连接的处理
- handlerRemoved():最后,我们给这条连接上添加的所有的业务逻辑处理器都给移除掉。
客户端互聊原理
- 如下图,A 要和 B 聊天,首先 A 和 B 需要与服务器建立连接,然后进行一次登录流程,服务端保存用户标识和 TCP 连接的映射关系。
- A 发消息给 B,首先需要将带有 B 标识的消息数据包发送到服务器,然后服务器从消息数据包中拿到 B 的标识,找到对应的 B 的连接,将消息发送给 B。
群聊发起和通知
如下图,要实现群聊,其实和单聊类似
- A,B,C 依然会经历登录流程,服务端保存用户标识对应的 TCP 连接
- A 发起群聊的时候,将 A,B,C 的标识发送至服务端,服务端拿到之后建立一个群聊 ID,然后把这个 ID 与 A,B,C 的标识绑定
- 群聊里面任意一方在群里聊天的时候,将群聊 ID 发送至服务端,服务端拿到群聊 ID 之后,取出对应的用户标识,遍历用户标识对应的 TCP 连接,就可以将消息发送至每一个群聊成员
心跳与空闲检测
连接假死的现象是:在某一端(服务端或者客户端)看来,底层的 TCP 连接已经断开了,但是应用程序并没有捕获到,因此会认为这条连接仍然是存在的,从 TCP 层面来说,只有收到四次握手数据包或者一个 RST 数据包,连接的状态才表示已断开。
连接假死会带来以下两大问题
- 对于服务端来说,因为每条连接都会耗费 cpu 和内存资源,大量假死的连接会逐渐耗光服务器的资源,最终导致性能逐渐下降,程序奔溃。
- 对于客户端来说,连接假死会造成发送数据超时,影响用户体验。
通常,连接假死由以下几个原因造成的
- 应用程序出现线程堵塞,无法进行数据的读写。
- 客户端或者服务端网络相关的设备出现故障,比如网卡,机房故障。
- 公网丢包。公网环境相对内网而言,非常容易出现丢包,网络抖动等现象,如果在一段时间内用户接入的网络连续出现丢包现象,那么对客户端来说数据一直发送不出去,而服务端也是一直收不到客户端来的数据,连接就一直耗着。
解决方案
- 要处理假死问题首先我们要实现客户端与服务端定期发送心跳,在这里,其实服务端只需要对客户端的定时心跳包进行回复。
- 客户端与服务端如果都需要检测假死,那么直接在 pipeline 的最前方插入一个自定义 IdleStateHandler,在 channelIdle() 方法里面自定义连接假死之后的逻辑。
- 通常空闲检测时间要比发送心跳的时间的两倍要长一些,这也是为了排除偶发的公网抖动,防止误判