Netty入门与实战-仿写微信IM即时通讯系统-笔记

本文为《Netty 入门与实战:仿写微信 IM 即时通讯系统》 的读书笔记

是什么

  • 本质:JBoss做的一个Jar包
  • 目的:快速开发高性能、高可靠性的网络服务器和客户端程序
  • 优点:提供异步的、事件驱动的网络应用程序框架和工具
  • 通俗的说:一个可以让你优雅地处理Socket的Jar包

Netty的特性

设计

统一的API,适用于不同的协议(阻塞和非阻塞)

基于灵活、可扩展的事件驱动模型

高度可定制的线程模型

可靠的无连接数据Socket支持(UDP)

性能

更好的吞吐量,低延迟

更省资源

尽量减少不必要的内存拷贝

安全

完整的SSL/TLS和STARTTLS的支持

能在Applet与Android的限制环境运行良好

健壮性

不再因过快、过慢或超负载连接导致OutOfMemoryError

不再有在高速网络环境下NIO读写频率不一致的问题

易用

完善的JavaDoc,用户指南和样例

简洁简单

仅信赖于JDK1.5

能干什么

有了Netty,你可以实现自己的HTTP服务器,FTP服务器,UDP服务器,RPC服务器,WebSocket服务器,Redis的Proxy服务器,MySQL的Proxy服务器等等。

传统的HTTP服务器的原理

1
2
3
4
5
6
7
8
9
1. 创建一个ServerSocket,监听并绑定一个端口
2. 一系列客户端来请求这个端口
3. 服务器使用Accept,获得一个来自客户端的Socket连接对象
4. 启动一个新线程处理连接
4.1 读Socket,得到字节流
4.2 解码协议,得到Http请求对象
4.3 处理Http请求,得到一个结果,封装成一个HttpResponse对象
4.4 编码协议,将结果序列化字节流 写Socket,将字节流发给客户端
5. 继续循环步骤3

HTTP服务器之所以称为HTTP服务器,是因为编码解码协议是HTTP协议,如果协议是Redis协议,那它就成了Redis服务器,如果协议是WebSocket,那它就成了WebSocket服务器,等等。使用Netty你就可以定制编解码协议,实现自己的特定协议的服务器。

典型的应用

从组网情况看,垂直的架构拆分之后,系统采用分布式部署,各个节点之间需要远程服务调用,高性能的 RPC 框架必不可少,Netty 作为异步高性能的通信框架,往往作为基础通信组件被这些 RPC 框架使用。

如:阿里分布式服务框架 Dubbo 的 RPC 框架使用 Dubbo 协议进行节点间通信,Dubbo 协议默认使用 Netty 作为基础通信组件,用于实现各进程节点之间的内部通信。

怎么玩

添加依赖

1
2
3
4
5
6
7
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.6.Final</version>
</dependency>
</dependencies>

Netty服务端demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.AttributeKey;

/**
* 要启动一个Netty服务端,必须要指定三类属性,分别是线程模型、IO 模型、连接读写处理逻辑、绑定端口
*
* @author zhang guo dong
*/
public class NettyServer {

private static final int BEGIN_PORT = 10101;

public static void main(String[] args) {
NioEventLoopGroup boosGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();

final ServerBootstrap serverBootstrap = new ServerBootstrap();
final AttributeKey<Object> clientKey = AttributeKey.newInstance("clientKey");
serverBootstrap
// 1.绑定线程模型,一个负责接受请求,一个负责干活
.group(boosGroup, workerGroup)
// 2.指定IO模型
.channel(NioServerSocketChannel.class)
// 可选:用于指定在服务端启动过程中的一些逻辑
.handler(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}
})
// 可选:可以给服务端的 channel,也就是NioServerSocketChannel指定一些自定义属性, 然后可以通过channel.attr()取出这个属性
.attr(AttributeKey.newInstance("serverName"), "nettyServer")
// 可选:可以给每一条连接指定自定义属性
.childAttr(clientKey, "clientValue")
// 可选:给服务端channel设置一些属性。表示系统用于临时存放已完成三次握手的请求的队列的最大长度
// 可选:如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数
.option(ChannelOption.SO_BACKLOG, 1024)
// 可选:每条连接设置一些TCP底层相关的属性。表示是否开启TCP底层心跳机制
.childOption(ChannelOption.SO_KEEPALIVE, true)
// 可选:表示是否开启Nagle算法,如果要求高实时性,有数据发送时就马上发送,就关闭,如果需要减少发送次数减少网络交互,就开启
.childOption(ChannelOption.TCP_NODELAY, true)
// 3.定义后续每条连接的数据读写,业务处理逻辑
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
System.out.println(ch.attr(clientKey).get());
}
});

// 4.绑定端口
bind(serverBootstrap, BEGIN_PORT);
}

/**
* 自动绑定递增端口
*
* @param serverBootstrap 启动器
* @param port 待绑定的端口
*/
private static void bind(final ServerBootstrap serverBootstrap, final int port) {
// bind(port) 是一个异步的方法,调用之后是立即返回的,他的返回值是一个ChannelFuture
serverBootstrap.bind(port)
// 给这个ChannelFuture添加一个监听器GenericFutureListener,在operationComplete方法里,可以监听端口是否绑定成功
.addListener(future -> {
if (future.isSuccess()) {
System.out.println("端口[" + port + "]绑定成功!");
} else {
System.err.println("端口[" + port + "]绑定失败!");
bind(serverBootstrap, port + 1);
}
});
}
}

Netty客户端demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.AttributeKey;

import java.util.Date;
import java.util.concurrent.TimeUnit;

/**
* 要启动一个Netty客户端,必须要指定三类属性,分别是线程模型、IO 模型、连接读写处理逻辑、绑定端口
*
* @author zhang guo dong
*/
public class NettyClient {
private static final int MAX_RETRY = 5;


public static void main(String[] args) {
NioEventLoopGroup workerGroup = new NioEventLoopGroup();

Bootstrap bootstrap = new Bootstrap();
bootstrap
// 1.指定线程模型
.group(workerGroup)
// 2.指定 IO 类型为 NIO
.channel(NioSocketChannel.class)
// 可选:可以给客户端 Channel,也就是NioSocketChannel绑定自定义属性,可以通过channel.attr()取出这个属性
.attr(AttributeKey.newInstance("clientName"), "nettyClient")
// 可选:设置TCP底层属性,表示连接的超时时间
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
// 3.IO 处理逻辑
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
}
});

// 4.建立连接
connect(bootstrap, "127.0.0.1", 10101, MAX_RETRY);
}

private static void connect(Bootstrap bootstrap, String host, int port, int retry) {
bootstrap.connect(host, port).addListener(future -> {
if (future.isSuccess()) {
System.out.println("连接成功!");
} else if (retry == 0) {
System.err.println("重试次数已用完,放弃连接!");
} else {
// 第几次重连
int order = (MAX_RETRY - retry) + 1;
// 本次重连的间隔
int delay = 1 << order;
System.err.println(new Date() + ": 连接失败,第" + order + "次重连……");
bootstrap.config().group().schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit
.SECONDS);
}
});
}
}

仔细看

传输载体ByteBuf

d4nbHs.png

  1. ByteBuf是一个字节容器,包含三个部分,第一部分是已经丢弃的字节,这部分数据是无效的;第二部分是可读字节,这部分是ByteBuf的主体数据;第三部分是可写字节,所有写到ButeBuf的数据都会写到这个区域。后边虚线部分是该ByteBuf最多还能扩容多少容量。
  2. 上述三个部门是被两个指针划分出来,从左到右,依次是读指针(readerIndex)、写指针(writerIndex)、总容量capacity。
  3. 从ByteBuf中每读取一个字节,rederIndex自增1,ByteBuf里面总共有writerIndex - readerIndex个可读字节,当readerIndex 与 writerIndex相等的时候,ByteBuf不可读。
  4. 写数据是从writeIndex指向的部分开始写,每写一个字节,writerIndex自增1,直到和capacity相等,ByteBuf不可写了。
  5. ByteBuf 里面其实还有一个参数 maxCapacity,当向 ByteBuf 写数据的时候,如果容量不足,那么这个时候可以进行扩容,每次扩容64字节的倍数,直到 capacity 扩容到 maxCapacity,超过 maxCapacity 就会报错。

相关API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 前者表示返回当前的读指针 readerIndex, 后者表示设置读指针
readerIndex() 与 readerIndex(int)

// 前者表示返回当前的写指针 writerIndex, 后者表示设置写指针
writeIndex() 与 writeIndex(int)

// 前者表示把当前的读指针保存起来,后者表示把当前的读指针恢复到之前保存的值,下面两段代码是等价的
markReaderIndex() 与 resetReaderIndex()

//
markWriterIndex() 与 resetWriterIndex()
// 代码片段1
int readerIndex = buffer.readerIndex();
// .. 其他操作
buffer.readerIndex(readerIndex);


// 代码片段二
buffer.markReaderIndex();
// .. 其他操作
buffer.resetReaderIndex();
// 关于 ByteBuf 的读写都可以看作从指针开始的地方开始读写数据

// 表示把字节数组 src 里面的数据全部写到 ByteBuf,src 字节数组大小的长度通常小于等于 writableBytes()
writeBytes(byte[] src)

// 把 ByteBuf 里面的数据全部读取到 dst,这里 dst 字节数组的大小通常等于 readableBytes()
buffer.readBytes(byte[] dst)

writeByte() 表示往 ByteBuf 中写一个字节,而 buffer.readByte() 表示从 ByteBuf 中读取一个字节,类似的 API 还有 writeBoolean()、writeChar()、writeShort()、writeInt()、writeLong()、writeFloat()、writeDouble() 与 readBoolean()、readChar()、readShort()、readInt()、readLong()、readFloat()、readDouble()

与读写 API 类似的 API 还有 getBytes、getByte() 与 setBytes()、setByte() 系列,唯一的区别就是 get/set 不会改变读写指针,而 read/write 会改变读写指针,这点在解析数据的时候千万要注意

由于 Netty 使用了堆外内存,而堆外内存是不被 jvm 直接管理的,也就是说申请到的内存无法被垃圾回收器直接回收,所以需要我们手动回收。有点类似于c语言里面,申请到的内存必须手工释放,否则会造成内存泄漏。

Netty 的 ByteBuf 是通过引用计数的方式管理的,如果一个 ByteBuf 没有地方被引用到,需要回收底层内存。默认情况下,当创建完一个 ByteBuf,它的引用为1,然后每次调用 retain() 方法, 它的引用就加一, release() 方法原理是将引用计数减一,减完之后如果发现引用计数为0,则直接回收 ByteBuf 底层的内存。

通信协议编码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
release()

retain()
// 方法从原始 ByteBuf 中截取一段,这段数据是从 readerIndex 到 writeIndex,
// 同时,返回的新的 ByteBuf 的最大容量 maxCapacity 为原始 ByteBuf 的 readableBytes()
slice()

// 方法把整个 ByteBuf 都截取出来,包括所有的数据,指针信息
duplicate()

// slice() 方法与 duplicate() 方法的相同点是:底层内存以及引用计数与原始的 ByteBuf 共享,也就是说经过 slice() 或者 duplicate() 返回的 ByteBuf 调用 write 系列方法都会影响到 原始的 ByteBuf,但是它们都维持着与原始 ByteBuf 相同的内存引用计数和不同的读写指针

// slice() 方法与 duplicate() 不同点就是:slice() 只截取从 readerIndex 到 writerIndex 之间的数据,它返回的 ByteBuf 的最大容量被限制到 原始 ByteBuf 的 readableBytes(), 而 duplicate() 是把整个 ByteBuf 都与原始的 ByteBuf 共享

// slice() 方法与 duplicate() 方法不会拷贝数据,它们只是通过改变读写指针来改变读写的行为,而最后一个方法 copy() 会直接从原始的 ByteBuf 中拷贝所有的信息,包括读写指针以及底层对应的数据,因此,往 copy() 返回的 ByteBuf 中写数据不会影响到原始的 ByteBuf

// slice() 和 duplicate() 不会改变 ByteBuf 的引用计数,所以原始的 ByteBuf 调用 release() 之后发现引用计数为零,就开始释放内存,调用这两个方法返回的 ByteBuf 也会被释放,这个时候如果再对它们进行读写,就会报错。因此,我们可以通过调用一次 retain() 方法 来增加引用,表示它们对应的底层的内存多了一次引用,引用计数为2,在释放内存的时候,需要调用两次 release() 方法,将引用计数降到零,才会释放内存

// 这三个方法均维护着自己的读写指针,与原始的 ByteBuf 的读写指针无关,相互之间不受影响

使用到 slice 和 duplicate 方法的时候,千万要弄清楚内存共享,引用计数共享,读写指针不共享几个概念

1
2
3
4
5
6
7
8
9
retainedSlice() 与 retainedDuplicate()

// 它们的作用是在截取内存片段的同时,增加内存的引用计数,分别与下面两段代码等价

// retainedSlice 等价于
slice().retain();

// retainedDuplicate() 等价于
duplicate().retain()

pipeline 与 channelHandler

通过责任链设计模式来组织代码逻辑,并且能够支持逻辑的动态添加和删除 ,Netty 能够支持各类协议的扩展,比如 HTTP,Websocket,Redis,靠的就是 pipeline 和 channelHandler。

d4n7uQ.png

无论是从服务端来看,还是客户端来看,在 Netty 整个框架里面,一条连接对应着一个 Channel,这条 Channel 所有的处理逻辑都在一个叫做 ChannelPipeline 的对象里面,ChannelPipeline 是一个双向链表结构,他和 Channel 之间是一对一的关系。

ChannelPipeline 里面每个节点都是一个 ChannelHandlerContext 对象,这个对象能够拿到和 Channel 相关的所有的上下文信息,然后这个对象包着一个重要的对象,那就是逻辑处理器 ChannelHandler。

ChannelHandler 有哪些分类

d4nLEn.png

  1. ChannelInboundHandler: 从字面意思也可以猜到,它是处理读数据的逻辑。在一端读到一段数据,首先要解析这段数据,然后对这些数据做一系列逻辑处理,最终把响应写到对端, 在开始组装响应之前的所有的逻辑,都可以放置在 ChannelInboundHandler 里处理,其中一个最重要的方法就是 channelRead()。
  2. ChannelOutBoundHandler 是处理写数据的逻辑,它是定义我们一端在组装完响应之后,把数据写到对端的逻辑。我们封装好一个 response 对象,接下来我们有可能对这个 response 做一些其他的特殊逻辑,然后,再编码成 ByteBuf,最终写到对端,它里面最核心的一个方法就是 write()。

inBoundHandler和outBoundHandler执行顺序

这两个子接口分别有对应的默认实现,ChannelInboundHandlerAdapter,和 ChanneloutBoundHandlerAdapter,它们分别实现了两大接口的所有功能,默认情况下会把读写事件传播到下一个 handler。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 在服务端的 pipeline 添加三个 ChannelInboundHandler
NettyServer.java

serverBootstrap
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new InBoundHandlerA());
ch.pipeline().addLast(new InBoundHandlerB());
ch.pipeline().addLast(new InBoundHandlerC());
}
});

// 每个 inBoundHandler 都继承自 ChannelInboundHandlerAdapter,然后实现了 channelRead() 方法
public class InBoundHandlerA extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InBoundHandlerA: " + msg);
super.channelRead(ctx, msg);
}
}

public class InBoundHandlerB extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InBoundHandlerB: " + msg);
super.channelRead(ctx, msg);
}
}

public class InBoundHandlerC extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InBoundHandlerC: " + msg);
super.channelRead(ctx, msg);
}
}

在 channelRead() 方法里面,我们打印当前 handler 的信息,然后调用父类的 channelRead() 方法,而这里父类的 channelRead() 方法会自动调用到下一个 inBoundHandler 的 channelRead() 方法,并且会把当前 inBoundHandler 里处理完毕的对象传递到下一个 inBoundHandler。

inBoundHandler的添加顺序和传递顺序一致,顺序为 A -> B -> C。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 我们继续在服务端的 pipeline 添加三个 ChanneloutBoundHandler
serverBootstrap
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
// inBound,处理读数据的逻辑链
ch.pipeline().addLast(new InBoundHandlerA());
ch.pipeline().addLast(new InBoundHandlerB());
ch.pipeline().addLast(new InBoundHandlerC());

// outBound,处理写数据的逻辑链
ch.pipeline().addLast(new OutBoundHandlerA());
ch.pipeline().addLast(new OutBoundHandlerB());
ch.pipeline().addLast(new OutBoundHandlerC());
}
});

// 每个 outBoundHandler 都继承自 ChanneloutBoundHandlerAdapter,然后实现了 write() 方法
public class OutBoundHandlerA extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("OutBoundHandlerA: " + msg);
super.write(ctx, msg, promise);
}
}

public class OutBoundHandlerB extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("OutBoundHandlerB: " + msg);
super.write(ctx, msg, promise);
}
}

public class OutBoundHandlerC extends ChannelOutboundHandlerAdapter {
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("OutBoundHandlerC: " + msg);
super.write(ctx, msg, promise);
}
}

outBoundHandler 的执行顺序与我们添加的顺序相反,顺序为 A -> B -> C。

pipeline 的结构

d4KtF1.png

pipline 的执行顺序

d4nHBj.png

虽然两种类型的 handler 在一个双向链表里,但是这两类 handler 的分工是不一样的,inBoundHandler 的事件通常只会传播到下一个 inBoundHandler,outBoundHandler 的事件通常只会传播到下一个 outBoundHandler,两者相互不受干扰。

拆包粘包

尽管我们在应用层面使用了 Netty,但是对于操作系统来说,只认 TCP 协议,尽管我们的应用层是按照ByteBuf为单位发送数据,但是到了底层操作系统仍然是按照字节流发送数据,因此,数据到了服务端,也是按照字节流的方式读入,然后到了Netty应用层,重新拼装成ByteBuf。问题就在传输和接收过程,这两个ByteBuf可能是不对等的。

因此,我们需要在客户端根据自定义协议来组装我们应用层的数据包,然后在服务端根据我们的应用层的协议来组装数据包,这个过程通常在服务端称为拆包,而在客户端称为粘包。

拆包原理

  • 如果当前读取的数据不足以拼接成一个完整的业务数据包,那就保留该数据,继续从 TCP 缓冲区中读取,直到得到一个完整的数据包。
  • 如果当前读到的数据加上已经读取的数据足够拼接成一个数据包,那就将已经读取的数据拼接上本次读取的数据,构成一个完整的业务数据包传递到业务逻辑,多余的数据仍然保留,以便和下次读到的数据尝试拼接。

Netty自带拆包器

  1. 固定长度的拆包器 FixedLengthFrameDecoder。如果你的应用层协议非常简单,每个数据包的长度都是固定的,比如 100,那么只需要把这个拆包器加到 pipeline 中,Netty 会把一个个长度为 100 的数据包 (ByteBuf) 传递到下一个 channelHandler。

  2. 行拆包器 LineBasedFrameDecoder。从字面意思来看,发送端发送数据包的时候,每个数据包之间以换行符作为分隔,接收端通过 LineBasedFrameDecoder 将粘过的 ByteBuf 拆分成一个个完整的应用层数据包。

  3. 分隔符拆包器 DelimiterBasedFrameDecoder。DelimiterBasedFrameDecoder 是行拆包器的通用版本,只不过我们可以自定义分隔符。

  4. 基于长度域拆包器 LengthFieldBasedFrameDecoder。最后一种拆包器是最通用的一种拆包器,只要你的自定义协议中包含长度域字段,均可以使用这个拆包器来实现应用层拆包。由于上面三种拆包器比较简单,读者可以自行写出 demo,接下来,我们就结合我们小册的自定义协议,来学习一下如何使用基于长度域的拆包器来拆解我们的数据包。

d4nojg.png

channelHandler的生命周期

ChannelHandler 有很多回调方法,这些回调方法的执行是有顺序的,而这个执行顺序可以称为 ChannelHandler 的生命周期。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
LifeCyCleTestHandler.java

public class LifeCyCleTestHandler extends ChannelInboundHandlerAdapter {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("逻辑处理器被添加:handlerAdded()");
super.handlerAdded(ctx);
}

@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel 绑定到线程(NioEventLoop):channelRegistered()");
super.channelRegistered(ctx);
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel 准备就绪:channelActive()");
super.channelActive(ctx);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("channel 有数据可读:channelRead()");
super.channelRead(ctx, msg);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel 某次数据读完:channelReadComplete()");
super.channelReadComplete(ctx);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel 被关闭:channelInactive()");
super.channelInactive(ctx);
}

@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel 取消线程(NioEventLoop) 的绑定: channelUnregistered()");
super.channelUnregistered(ctx);
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("逻辑处理器被移除:handlerRemoved()");
super.handlerRemoved(ctx);
}
}
// 我们把这个 handler 添加到 构建的 pipeline 中
// 前面代码略
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
// 添加到第一个
ch.pipeline().addLast(new LifeCyCleTestHandler());
ch.pipeline().addLast(new PacketDecoder());
ch.pipeline().addLast(new LoginRequestHandler());
ch.pipeline().addLast(new MessageRequestHandler());
ch.pipeline().addLast(new PacketEncoder());
}
});

先运行 NettyServer.java,然后再运行 NettyClient.java

d4nzgU.png

可以看到,ChannelHandler 回调方法的执行顺序为

handlerAdded() -> channelRegistered() -> channelActive() -> channelRead() -> channelReadComplete() -> channelInactive() -> channelUnregistered() -> handlerRemoved()

  1. handlerAdded() :指的是当检测到新连接之后,调用 ch.pipeline().addLast(new LifeCyCleTestHandler()); 之后的回调,表示在当前的 channel 中,已经成功添加了一个 handler 处理器。
  2. channelRegistered():这个回调方法,表示当前的 channel 的所有的逻辑处理已经和某个 NIO 线程建立了绑定关系,accept 到新的连接,然后创建一个线程来处理这条连接的读写,只不过 Netty 里面是使用了线程池的方式,只需要从线程池里面去抓一个线程绑定在这个 channel 上即可,这里的 NIO 线程通常指的是 NioEventLoop,不理解没关系,后面我们还会讲到。
  3. channelActive():当 channel 的所有的业务逻辑链准备完毕(也就是说 channel 的 pipeline 中已经添加完所有的 handler)以及绑定好一个 NIO 线程之后,这条连接算是真正激活了,接下来就会回调到此方法。
  4. channelRead():客户端向服务端发来数据,每次都会回调此方法,表示有数据可读。
  5. channelReadComplete():服务端每次读完一次完整的数据之后,回调该方法,表示数据读取完毕。
  6. channelInactive(): 表面这条连接已经被关闭了,这条连接在 TCP 层面已经不再是 ESTABLISH 状态了
  7. channelUnregistered(): 既然连接已经被关闭,那么与这条连接绑定的线程就不需要对这条连接负责了,这个回调就表明与这条连接对应的 NIO 线程移除掉对这条连接的处理
  8. handlerRemoved():最后,我们给这条连接上添加的所有的业务逻辑处理器都给移除掉。

d4nONq.png

客户端互聊原理

  • 如下图,A 要和 B 聊天,首先 A 和 B 需要与服务器建立连接,然后进行一次登录流程,服务端保存用户标识和 TCP 连接的映射关系。
  • A 发消息给 B,首先需要将带有 B 标识的消息数据包发送到服务器,然后服务器从消息数据包中拿到 B 的标识,找到对应的 B 的连接,将消息发送给 B。

d4nX40.png

群聊发起和通知

如下图,要实现群聊,其实和单聊类似

  • A,B,C 依然会经历登录流程,服务端保存用户标识对应的 TCP 连接
  • A 发起群聊的时候,将 A,B,C 的标识发送至服务端,服务端拿到之后建立一个群聊 ID,然后把这个 ID 与 A,B,C 的标识绑定
  • 群聊里面任意一方在群里聊天的时候,将群聊 ID 发送至服务端,服务端拿到群聊 ID 之后,取出对应的用户标识,遍历用户标识对应的 TCP 连接,就可以将消息发送至每一个群聊成员

d4nvCV.png

心跳与空闲检测

连接假死的现象是:在某一端(服务端或者客户端)看来,底层的 TCP 连接已经断开了,但是应用程序并没有捕获到,因此会认为这条连接仍然是存在的,从 TCP 层面来说,只有收到四次握手数据包或者一个 RST 数据包,连接的状态才表示已断开。

连接假死会带来以下两大问题

  • 对于服务端来说,因为每条连接都会耗费 cpu 和内存资源,大量假死的连接会逐渐耗光服务器的资源,最终导致性能逐渐下降,程序奔溃。
  • 对于客户端来说,连接假死会造成发送数据超时,影响用户体验。

通常,连接假死由以下几个原因造成的

  • 应用程序出现线程堵塞,无法进行数据的读写。
  • 客户端或者服务端网络相关的设备出现故障,比如网卡,机房故障。
  • 公网丢包。公网环境相对内网而言,非常容易出现丢包,网络抖动等现象,如果在一段时间内用户接入的网络连续出现丢包现象,那么对客户端来说数据一直发送不出去,而服务端也是一直收不到客户端来的数据,连接就一直耗着。

解决方案

  • 要处理假死问题首先我们要实现客户端与服务端定期发送心跳,在这里,其实服务端只需要对客户端的定时心跳包进行回复。
  • 客户端与服务端如果都需要检测假死,那么直接在 pipeline 的最前方插入一个自定义 IdleStateHandler,在 channelIdle() 方法里面自定义连接假死之后的逻辑。
  • 通常空闲检测时间要比发送心跳的时间的两倍要长一些,这也是为了排除偶发的公网抖动,防止误判