netty
IO模型
什么是IO?
I/O(Input/Outpu) 即输入/输出
BIO
什么是BIO?
Blocking I/O,属于同步阻塞 IO 模型
同步并阻塞(传统阻塞型),服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销
同步阻塞 IO 模型中,应用程序发起 read 调用后,会一直阻塞,直到内核把数据拷贝到用户空间
缺点:
- 如果客户端很多就必须创建更多的工作线程服务于客户端,线程的创建会消耗服务器资源,影响效率
- Client和Server建立连接后,不一定会一直进行读写操作,有可能什么都不干,但是还是占用一个工作线程,浪费资源
- BIO一个服务端的工作线程对应一个客户端请求无法满足高并发场景
适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高, 并发局限于应用中,JDK1.4以前的唯一选择,但程序简单易理解
NIO
背景:
在客户端连接数量不高的情况下,使用BIO是没问题的。但是,当面对十万甚至百万级连接的时候,传统的 BIO 模型是无能为力的。因此,我们需要一种更高效的 I/O 处理模型来应对更高的并发量
NIO:
Java 中的 NIO 于 Java 1.4 中引入,对应
java.nio
包,提供了Channel
,Selector
,Buffer
等抽象。NIO 中的 N 可以理解为 Non-blocking,不单纯是 New。它是支持面向缓冲的,基于通道的 I/O 操作方法。 对于高负载、高并发的(网络)应用,应使用 NIO
- Non-blocking I/O (同步非阻塞IO)
- 同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求就进行处理
同步非阻塞
同步非阻塞 IO 模型中,应用程序会一直发起 read 调用,等待数据从内核空间拷贝到用户空间的这段时间里,线程依然是阻塞的,直到在内核把数据拷贝到用户空间。
相比于同步阻塞 IO 模型,同步非阻塞 IO 模型确实有了很大改进。通过轮询操作,避免了一直阻塞。
但是,这种 IO 模型同样存在问题:应用程序不断进行 I/O 系统调用轮询数据是否已经准备好的过程是十分消耗 CPU 资源的。
这个时候,I/O 多路复用模型 就上场了
IO多路复用
IO 多路复用模型,通过减少无效的系统调用,减少了对 CPU 资源的消耗
Java 中的 NIO ,有一个非常重要的选择器 ( Selector ) 的概念,也可以被称为 多路复用器。通过它,只需要一个线程便可以管理多个客户端连接。当客户端数据到了之后,才会为其服务
IO 多路复用模型中,线程首先发起 select 调用,询问内核数据是否准备就绪,等内核把数据准备好了,用户线程再发起 read 调用。read 调用的过程(数据从内核空间 -> 用户空间)还是阻塞的
目前支持 IO 多路复用的系统调用,有 select,epoll 等等。select 系统调用,目前几乎在所有的操作系统上都有支持
select 调用 :内核提供的系统调用,它支持一次查询多个系统调用的可用状态。几乎所有的操作系统都支持
epoll 调用 :linux 2.6 内核,属于 select 调用的增强版本,优化了 IO 的执行效率
- 户空间的程序不能直接去磁盘空间(网卡)中读取数据,必须由经由内核空间通过DMA来获取
- 一般用户空间的内存分页与磁盘空间不会对齐,因此需要由内核空间在中间做一层处理
适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,弹幕系统,服务器间通讯等。编程比较复杂,JDK1.4开始支持
AIO
Asynchronous I/O 异步非阻塞IO
AIO 也就是 NIO 2。Java 7 中引入了 NIO 的改进版 NIO 2,它是异步 IO 模型
异步 IO 是基于事件和回调机制实现的,也就是应用操作之后会直接返回,不会堵塞在那里,当后台处理完成,操作系统会通知相应的线程进行后续的操作
目前来说 AIO 的应用还不是很广泛。Netty 之前也尝试使用过 AIO,不过又放弃了。这是因为,Netty 使用了 AIO 之后,在 Linux 系统上的性能并没有多少提升
使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,编程比较复杂,JDK7开始支持
NIO
NIO的基本介绍
- Java NIO 全称 java non-blocking IO,是指 JDK 提供的新 API。从 JDK1.4 开始,Java 提供了一系列改进的输入/输出 的新特性,被统称为 NIO(即 New IO),是同步非阻塞的
- NIO 相关类都被放在
java.nio
包及子包下,并且对原java.io
包中的很多类进行改写- NIO 有三大核心部分:Channel(通道),Buffer(缓冲区), Selector(选择器)
- NIO是面向缓冲区 ,或者面向块编程的。数据读取到一个它稍后处理的缓冲区(Buffer),需要时可在缓冲区中前后移动,这就 增加了处理过程中的灵活性,使用它可以提供非阻塞式的高伸缩性网络
- Java NIO的非阻塞模式,使一个线程从某通道发送请求或者读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此,一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情
- 通俗理解:NIO是可以做到用一个线程来处理多个操作的。假设有10000个请求过来, 根据实际情况,可以分配50或者100个线程来处理。不像之前的阻塞IO那样,非得分配10000个
- HTTP2.0使用了多路复用的技术,做到同一个连接并发处理多个请求,而且并发请求 的数量比HTTP1.1大了好几个数量级
NIO三大核心
Buffer
java.nio 定义了以下几个 Buffer 的实现
一个 Buffer本质上是内存中的一块,我们可以将数据写入这块内存,之后从这块内存获取数据
其实核心是最后的 ByteBuffer,前面的一大串类只是包装了一下它而已,我们使用最多的通常也是 ByteBuffer
我们应该将 Buffer 理解为一个数组,IntBuffer、CharBuffer、DoubleBuffer 等分别对应 int[]、char[]、double[] 等
MappedByteBuffer 用于实现内存映射文件
操作 Buffer 和操作数组、类集差不多,只不过大部分时候我们都把它放到了 NIO 的场景里面来使用而已。下面介绍 Buffer 中的几个重要属性和几个重要方法
position、limit、capacity
就像数组有数组容量,每次访问元素要指定下标,Buffer 中也有几个重要属性:position、limit、capacity
capacity:它代表这个缓冲区的容量,一旦设定就不可以更改。比如 capacity 为 1024 的 IntBuffer,代表其一次可以存放 1024 个 int 类型的值。一旦 Buffer 的容量达到 capacity,需要清空 Buffer,才能重新写入值
position:它代表这个缓冲区的数据定位,从写操作模式到读操作模式切换的时候(flip),position 都会归零,这样就可以从头开始读写了
- 写模式:每往 Buffer 中写入一个值,position 就自动加 1,代表下一次的写入位置
- 读模式:每读一个值,position 就自动加 1,指向下一次要读取的位置
limit:它代表这个缓冲区的数据限制
- 写模式:limit 代表的是最大能写入的数据,这个时候 limit 等于 capacity
- 读模式:此时的 limit 等于 Buffer 中实际的数据大小,因为 Buffer 不一定被写满
初始化Buffer
每个 Buffer 实现类都提供了一个静态方法
allocate(int capacity)
帮助我们快速实例化一个 Buffer
1
2
3
ByteBuffer byteBuf = ByteBuffer.allocate(1024);
IntBuffer intBuf = IntBuffer.allocate(1024);
LongBuffer longBuf = LongBuffer.allocate(1024);另外,我们经常使用 wrap 方法来初始化一个 Buffer
1
2
3
ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[1024]);
IntBuffer intBuffer = IntBuffer.wrap(new int[1024]);
LongBuffer longBuffer = LongBuffer.wrap(new long[1024]);
填充Buffer (写操作)
1
2
3
4
5
6
7
8
9
10
11
// 填充一个 byte 值
public abstract ByteBuffer put(byte b);
// 在指定位置填充一个 byte 值
public abstract ByteBuffer put(int index, byte b);
// 将一个数组中的值填充进去
public final ByteBuffer put(byte[] src);
// 截取数组中的一部分填充进去
public ByteBuffer put(byte[] src, int offset, int length)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
public class BufferTest {
@Test
public void bufferExample() {
// 分配一个长度为5的IntBuffer
//IntBuffer buffer = IntBuffer.allocate(5);
IntBuffer buffer = IntBuffer.wrap(new int[5]);
for (int i = 0; i < buffer.capacity(); i++) {
buffer.put(i);
}
// 转换,如果当前是写模式则转换为读模式
buffer.flip();
while (buffer.hasRemaining()) {
// get 有指针会向前移动
System.out.println(buffer.get());
}
}
@Test
public void put() {
IntBuffer intBuffer = IntBuffer.allocate(5);
// HeapIntBuffer 底层pos自动 + 1, flip 后 lim=pos, pos=1
intBuffer.put(1);
intBuffer.flip();
// 输出 1
while (intBuffer.hasRemaining()) {
System.out.println(intBuffer.get());
}
}
@Test
public void put2() {
IntBuffer intBuffer = IntBuffer.allocate(5);
// 这里put底层,pos不会自动加1,导致flip后 lim = pos = 0
intBuffer.put(1, 2);
intBuffer.flip();
// 重新设置限制,默认lim为0
intBuffer.limit(2);
// lim和pos都为0
while (intBuffer.hasRemaining()) {
System.out.println(intBuffer.get());
}
// [0, 2, 0, 0, 0]
System.out.println(Arrays.toString(intBuffer.array()));
// 不设置intBuffer.limit(2) 则数组下标越界(lim为0)
System.out.println(intBuffer.get(1));
}
@Test
public void put3() {
IntBuffer intBuffer = IntBuffer.allocate(5);
int[] bytes = {1,2,3,4,5};
intBuffer.put(bytes);
intBuffer.flip();
// 输出 1,2,3,4,5
while (intBuffer.hasRemaining()) {
System.out.println(intBuffer.get());
}
}
@Test
public void put4() {
IntBuffer intBuffer = IntBuffer.allocate(10);
int[] array = {1,2,3};
// 第二个参数offse指的是数组的偏移量。第三个参数length 指的是取偏移量后几位
intBuffer.put(array, 1, 2);
intBuffer.flip();
// 输出 2,3
while (intBuffer.hasRemaining()) {
System.out.println(intBuffer.get());
}
}
}上述这些方法需要自己控制 Buffer 大小,不能超过 capacity,超过会抛 java.nio.BufferOverflowException异常
读操作:对于 Buffer 来说,另一个常见的操作中就是,我们要将来自 Channel 的数据填充到 Buffer 中,在系统层面上,这个操作我们称为读操作,因为数据是从外部(文件或网络等)读到内存中
1
int num = channel.read(buf);
上述方法会返回从 Channel 中读入到 Buffer 的数据大小
提取 Buffer 中的值 (读操作)
前面介绍了写操作,每写入一个值,position 的值都需要加 1,所以 position 最后会指向最后一次写入的位置的后面一个,如果 Buffer 写满了,那么 position 等于 capacity(position 从 0 开始)
如果要读 Buffer 中的值,需要切换模式,从写入模式切换到读出模式。注意,通常在说 NIO 的读操作的时候,我们说的是从 Channel 中读数据到 Buffer 中,对应的是对 Buffer 的写入操作,初学者需要理清楚这个
调用 Buffer 的 flip() 方法,可以从写入模式切换到读取模式。其实这个方法也就是设置了一下
position
和limit
值罢了
1
2
3
4
5
6
public final Buffer flip() {
limit = position; // 将 limit 设置为实际写入的数据数量
position = 0; // 重置 position 为 0
mark = -1; // mark 之后再说
return this;
}对应写入操作的一系列 put 方法,读操作提供了一系列的 get 方法
1
2
3
4
5
6
// 根据 position 来获取数据
public abstract byte get();
// 获取指定位置的数据
public abstract byte get(int index);
// 将 Buffer 中的数据写入到数组中
public ByteBuffer get(byte[] dst)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Test
public void get() {
IntBuffer intBuffer = IntBuffer.allocate(5);
int[] bytes = {1,2,3,4,5};
intBuffer.put(bytes);
intBuffer.flip();
// 1,2,3,4,5
while (intBuffer.hasRemaining()) {
// 根据 position 来获取数据, 每读取一次 position + 1
System.out.println(intBuffer.get());
}
}
@Test
public void get2() {
IntBuffer intBuffer = IntBuffer.allocate(5);
int[] bytes = {1,2,3,4,5};
intBuffer.put(bytes);
intBuffer.flip();
// 3
System.out.println(intBuffer.get(2));
}
@Test
public void get3() {
IntBuffer intBuffer = IntBuffer.allocate(5);
int[] bytes = {1,2,3,4,5};
intBuffer.put(bytes);
intBuffer.flip();
int[] arr = new int[3];
// 将Buffer中的数据写入到数组中
intBuffer.get(arr);
// 1,2,3
System.out.println(Arrays.toString(arr));
}一个
ByteBuffer
经常使用的方法
1
new String(buffer.array()).trim();
当然了,除了将数据从 Buffer 取出来使用,更常见的操作是将我们写入的数据传输到 Channel 中,如通过 FileChannel 将数据写入到文件中,通过 SocketChannel 将数据写入网络发送到远程机器等。对应的,这种操作,我们称之为写操作
1
int num = channel.write(buf);
mark() & reset()
mark: 用于临时保存 position 的值,每次调用 mark() 方法都会将 mark 设值为当前的 position,便于后续需要的时候使用
reset: 把position设置成mark的值,相当于之前做过一个标记,现在要退回到之前标记(mark)的地方
1
2
3
4
public final Buffer mark() {
mark = position;
return this;
}那到底什么时候用呢?考虑以下场景,我们在 position 为 5 的时候,先 mark() 一下,然后继续往下读,读到第 10 的时候,我想重新回到 position 为 5 的地方重新来一遍,那只要调一下 reset() 方法,position 就回到 5 了
1
2
3
4
5
6
7
public final Buffer reset() {
int m = mark;
if (m < 0)
throw new InvalidMarkException();
position = m;
return this;
}
rewind() & clear() & compact()
rewind:会重置 position 为 0,通常用于重新从头读写 Buffer,类似磁带倒带
场景:在写模式下重写数据到缓冲区
1 |
|
clear:有点重置 Buffer 的意思,相当于重新实例化了一样
场景:
写 -> 读 -> clear -> 继续写
。通常,我们会先填充 Buffer,然后从 Buffer 读取数据,之后我们再重新往里填充新的数据,我们一般在重新填充之前先调用 clear()
1 |
|
compact: 和 clear() 一样的是,它们都是在准备往 Buffer 填充新的数据之前调用,相当于压缩
场景:Buffer数据还没完全读完就要继续写,那么就将未读取的数据重新压缩到Buffer头部方便下次去读
前面说的 clear() 方法会重置几个属性,但是我们要看到,clear() 方法并不会将 Buffer 中的数据清空,只不过后续的写入会覆盖掉原来的数据,也就相当于清空了数据了。
而 compact() 方法有点不一样,调用这个方法以后,会先处理还没有读取的数据,也就是 position 到 limit 之间的数据(还没有读过的数据),先将这些数据移到左边,然后在这个基础上再开始写入。很明显,此时 limit 还是等于 capacity,position 指向原来数据的右边
类型化的Buffer
ByteBuffer 支持类型化的put 和 get, put 放入的是什么数据类型, get就应该使用相应的数据类型来取出, 否则可能有 BufferUnderflowException 异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Test
public void type() {
ByteBuffer buffer = ByteBuffer.allocate(64);
//类型化方式放入数据
buffer.putInt(100);
buffer.putLong(9);
buffer.putChar('A');
buffer.putShort((short) 4);
buffer.flip();
// 如果取出的数据类型不匹配,则可能会抛出异常, 比如最后一行调用 buffer.getShort() 缓冲会溢出
// 如果取出的顺序不一致,则读取的数据会错乱
System.out.println(buffer.getInt());
System.out.println(buffer.getLong());
System.out.println(buffer.getChar());
System.out.println(buffer.getShort());
}
只读Buffer
可以将一个普通Buffer 转成只读Buffer,如果对 readOnlyBuffer进行 put 操作就会抛出异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Test
public void readTest() {
//创建一个buffer
ByteBuffer buffer = ByteBuffer.allocate(3);
for(int i = 0; i < 3; i++) {
buffer.put((byte)i);
}
//准备读取
buffer.flip();
//得到一个只读的Buffer HeapByteBufferR
ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
System.out.println(readOnlyBuffer.getClass());
while (readOnlyBuffer.hasRemaining()) {
System.out.println(readOnlyBuffer.get());
}
//ReadOnlyBufferException
readOnlyBuffer.put((byte)100);
}
MappedByteBuffer
NIO 还提供了 MappedByteBuffer, 可以让文件直接在内存(堆外的内存) 中进行修改, 而如何同步到文件由NIO 来完成
这里底层其实就是使用到了零拷贝的 MMAP 技术
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Test
public void mappedByteBufferTest() throws IOException {
RandomAccessFile randomAccessFile = new RandomAccessFile("/Volumes/app/test.txt", "rw");
//获取对应的通道
FileChannel channel = randomAccessFile.getChannel();
/**
* 这里使用内存映射文件,可以提高内存的读写效率.实际上就是使用零拷贝的 MMAP
* 参数1: FileChannel.MapMode.READ_WRITE 使用的读写模式
* 参数2: 0 : 可以直接修改的起始位置
* 参数3: 5: 是映射到内存的大小(不是索引位置) ,即将 test.txt 的多少个字节映射到内存
* 可以直接修改的范围就是 0-5
* 实际类型 DirectByteBuffer
*/
MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5);
// MMAP 映射到内存的数据是可以直接修改的, 可以直接修改的范围是 0-5 数据在内核空间映射到用户空间
mappedByteBuffer.put(0, (byte) 'H');
mappedByteBuffer.put(3, (byte) '9');
mappedByteBuffer.put(4, (byte) 'Y');
// mappedByteBuffer.put(5, (byte) 'Z'); //IndexOutOfBoundsException
randomAccessFile.close();
System.out.println("修改成功~~");
}这里的5是映射到内存的大小,不是索引位置,如果在操作第5个位置就会抛出异常
Buffer的分散和聚集
前面我们讲的读写操作, 都是通过一个Buffer 完成的, NIO 还支持 通过多个Buffer (即 Buffer 数组) 完成读写操作, 即 Scattering 和 Gathering
Buffer 的分散和聚集
Scattering:将数据写入到buffer时,可以采用buffer数组,依次写入 [分散]
将Channel中的数据读取到多个Buffer
分离读取的时候,Channel写入buffer的数据是按顺序的,Scatter操作并不适合动态长度的数据传输,也就意味着传输数据的每一部分都是固定长度时,Scatter才能发挥它的作用
Gathering: 从buffer读取数据时,可以采用buffer数组,依次读 [聚集]
Gather操作将多个buffer的数据写入到同一个Channel
channel的write()方法可以接受buffer数据作为参数,write()方法会按照顺序将多个buffer中的数据依次写入channel。需要注意的是,write()操作只会写入buffer中已写入的数据,即position到limit之间的数据;例如一个buffer的容量为128字节,但buffer中只写入了28字节的数据,只有这28个字节会写入channel中,因此Gather操作非常适合动态长度数据写入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// 客户端使用名称 telnet localhost 8080
@Test
public void test() throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 监听 8080 端口
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
//创建buffer数组
ByteBuffer[] byteBuffers = new ByteBuffer[2];
byteBuffers[0] = ByteBuffer.allocate(5);
byteBuffers[1] = ByteBuffer.allocate(3);
//等待客户端连接(telnet)
SocketChannel socketChannel = serverSocketChannel.accept();
//假定从客户端接受8个字节
int messageLength = 8;
//循环的读取
while (true) {
int byteRead = 0;
while (byteRead < messageLength) {
// 使用scattering方式读取 将数据读取到buffer数组中
long r = socketChannel.read(byteBuffers);
System.out.println("r = " + r);
//累计读取的字节数
byteRead += 1;
System.out.println("byteRead = " + byteRead);
//使用流打印, 看看当前的这个buffer的position 和 limit
Arrays.asList(byteBuffers)
.stream()
.map(byteBuffer -> "position=" + byteBuffer.position() + ", limit=" + byteBuffer.limit())
.forEach(System.out::println);
}
//将所有的buffer进行flip
Arrays.asList(byteBuffers).forEach(byteBuffer -> byteBuffer.flip());
//将数据读出显示到客户端
int byteWrite = 0;
while (byteWrite < messageLength) {
// 使用gathering方式读取 将buffer数组中的数据写出到客户端
long w = socketChannel.write(byteBuffers);
byteWrite += 1;
System.out.println("byteWrite = " + byteWrite);
}
//将所有的buffer 进行clear
Arrays.asList(byteBuffers).forEach(buffer -> {
buffer.clear();
});
System.out.println("byteRead:=" + byteRead + " byteWrite=" + byteWrite + ", messagelength=" + messageLength);
}
}
总结
重要属性
属性 | 说明 |
---|---|
Capacity | 容量,即可以容纳的最大数据量;在缓冲区创建时被设定并且不能改变 |
Limit | 表示缓冲区的当前终点,不能对缓冲区超过极限的位置进行读写操作。且极限是可以修改的 |
Position | 位置,下一个要被读或写的元素的索引,每次读写缓冲区数据时都会改变改值,为下次读写作准备 |
Mark | 标记,调用mark()来设置mark=position,再调用reset()可以让position恢复到标记的位置 |
实例化
方法 | 说明 |
---|---|
allocate(int capacity) | 从堆空间中分配一个容量大小为capacity的byte数组作为缓冲区的byte数据存储器 |
allocateDirect(int capacity) | 是不使用JVM堆栈而是通过操作系统来创建内存块用作缓冲区,它与当前操作系统能够更好的耦合,因此能进一步提高I/O操作速度。但是分配直接缓冲区的系统开销很大,因此只有在缓冲区较大并长期存在,或者需要经常重用时,才使用这种缓冲区 |
wrap(byte[] array) | 这个缓冲区的数据会存放在byte数组中,bytes数组或buff缓冲区任何一方中数据的改动都会影响另一方。其实ByteBuffer底层本来就有一个bytes数组负责来保存buffer缓冲区中的数据,通过allocate方法系统会帮你构造一个byte数组 |
wrap(byte[] array, int offset, int length) | 在上一个方法的基础上可以指定偏移量和长度,这个offset也就是包装后byteBuffer的position,而length呢就是limit-position的大小,从而我们可以得到limit的位置为length+position(offset) |
重要方法
方法 | 说明 |
---|---|
limit(), limit(10)等 | 其中读取和设置这4个属性的方法的命名和jQuery中的val(),val(10)类似,一个负责get,一个负责set |
reset() | 把position设置成mark的值,相当于之前做过一个标记,现在要退回到之前标记的地方 |
clear() | position = 0;limit = capacity;mark = -1; 有点初始化的味道,但是并不影响底层byte数组的内容 |
flip() | limit = position;position = 0;mark = -1; 翻转,也就是让flip之后的position到limit这块区域变成之前的0到position这块,翻转就是将一个处于存数据状态的缓冲区变为一个处于准备取数据的状态 |
rewind() | 把position设为0,mark设为-1,不改变limit的值 |
remaining() | return limit - position;返回limit和position之间相对位置差 |
hasRemaining() | return position < limit返回是否还有未读内容 |
compact() | 把从position到limit中的内容移到0到limit-position的区域内,position和limit的取值也分别变成limit-position、capacity。如果先将positon设置到limit,再compact,那么相当于clear() |
get() | 相对读,从position位置读取一个byte,并将position+1,为下次读写作准备 |
get(int index) | 绝对读,读取byteBuffer底层的bytes中下标为index的byte,不改变position |
get(byte[]) | 将 Buffer 中的数据写入到数组中 |
get(byte[] dst, int offset, int length) | 从position位置开始相对读,读length个byte,并写入dst下标从offset到offset+length的区域 |
put(byte b) | 相对写,向position的位置写入一个byte,并将postion+1,为下次读写作准备 |
put(int index, byte b) | 绝对写,向byteBuffer底层的bytes中下标为index的位置插入byte b,不改变position |
put(ByteBuffer src) | 用相对写,把src中可读的部分(也就是position到limit)写入此byteBuffer |
put(byte[] src, int offset, int length) | 从src数组中的offset到offset+length区域读取数据并使用相对写写入此byteBuffer |
相对读:position+1
绝对读:不改变position
相对写:postion+1
绝对写:不改变position
非直接缓冲区(直接在堆内存中开辟空间,也就是数组):
竖左边的是操作系统(OS),右边是Java虚拟机(JVM),应用程序无论是读操作还是写操作都必须在OS和JVM之间进行复制
1
2
通过allocate()方法分配缓冲区,将缓冲区建立在JVM的内存中
ByteBuffer byteBuffer = ByteBuffer.allocate(1020);直接缓冲区
解释:在NIO中,直接开辟物理内存映射文件,应用程序直接操作物理内存映射文件,这样就少了中间的copy过程,可以极大得提高读写效率。虽然直接缓冲可以进行高效的I/O操作,但它使用的内存是操作系统分配的,绕过了JVM堆栈,建立和销毁比堆栈上的缓冲区要更大的开销
1
2
分配直接缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1020);
类型 优点 缺点 直接缓冲区 在虚拟机内存外,开辟的内存,IO操作直接进行,没有再次复制 创建和销毁开销大 非直接缓冲区 在虚拟机内存中创建,易回收 但占用虚拟机内存开销,处理中有复制过程。 多个进程可以允许并发地内存映射同一文件,以便允许数据共享。任何一个进程的写入会修改虚拟内存的数据,并且其他映射同一文件部分的进程都可看到
内存映射的共享是实现:每个共享进程的虚拟内存映射指向物理内存的同一页面,而该页面有磁盘块的复制
总而言之,内存映射就是将每个磁盘块映射到内存中,一个进程或多个进程把对磁盘块的访问转化为对内存页的访问
内存映射文件的目的是实现内存共享,MMAP是零拷贝的一种实现
很多时候,共享内存实际上是通过内存映射来实现的。在这种情况下,进程可以通过共享内存来通信,而共享内存是通过映射同样文件到通信进程的虚拟地址空间来实现的。内存映射文件充当通信进程之间的共享内存区域
虚拟内存
虚拟内存是计算机系统内存管理的一种技术。它使得应用程序认为它拥有连续可用的内存(一个连续完整的地址空间),而实际上物理内存通常被分隔成多个内存碎片,还有部分暂时存储在外部磁盘存储器上,在需要时进行数据交换。与没有使用虚拟内存技术的系统相比,使用这种技术使得大型程序的编写变得更容易,对真正的物理内存(例如RAM)的使用也更有效率。此外,虚拟内存技术可以使多个进程共享同一个运行库,并通过分割不同进程的内存空间来提高系统的安全性
相关知识理论
Channel
所有的 NIO 操作始于通道,通道是数据来源或数据写入的目的地,主要地,我们将关心 java.nio 包中实现的以下几个 Channel
- FileChannel:文件通道,用于文件的读和写
- DatagramChannel:用于 UDP 连接的接收和发送
- SocketChannel:把它理解为 TCP 连接通道,简单理解就是 TCP 客户端
- ServerSocketChannel:TCP 对应的服务端,用于监听某个端口进来的请求
Channel 经常翻译为通道,类似 IO 中的流,用于读取和写入。它与前面介绍的 Buffer 打交道,读操作的时候将 Channel 中的数据填充到 Buffer 中,而写操作时将 Buffer 中的数据写入到 Channel 中
NIO的通道类似于流,但有些区别如下:
- 通道可以同时进行读写,而流只能读或者只能写
- 通道可以实现异步读写数据
- 通道可以从缓冲读数据,也可以写数据到缓冲
FileChannel
文件操作对于大家来说应该是最熟悉的,不过我们在说 NIO 的时候,其实 FileChannel 并不是关注的重点。而且后面我们说非阻塞的时候会看到,FileChannel 是不支持非阻塞的
FileChannel主要用来对本地文件进行 IO 操作,常见的方法有
1
2
3
4
public int read(ByteBuffer dst) ,从通道读取数据并放到缓冲区中
public int write(ByteBuffer src) ,把缓冲区的数据写到通道中
public long transferFrom(ReadableByteChannel src, long position, long count),从目标通道 中复制数据到当前通道
public long transferTo(long position, long count, WritableByteChannel target),把数据从当 前通道复制给目标通道
流只能读或者写
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 流只能读或者只能写
* @throws IOException
*/
@Test
public void test() throws IOException {
// 写入数据 输出流
FileOutputStream fileOutputStream = new FileOutputStream("/Volumes/app/data.txt");
FileChannel fileChannel = fileOutputStream.getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocate(18);
byteBuffer.put("写入测试数据".getBytes());
// 切换读模式
byteBuffer.flip();
fileChannel.write(byteBuffer);
// 读取数据
byteBuffer.clear();
// 读取数据 输入流
FileInputStream fileInputStream = new FileInputStream("/Volumes/app/data.txt");
fileChannel = fileInputStream.getChannel();
fileChannel.read(byteBuffer);
System.out.println(new String(byteBuffer.array()));
}
通道可以从缓冲读数据,也可以写数据到缓冲
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Test
public void test2() throws IOException {
RandomAccessFile randomAccessFile = new RandomAccessFile("/Volumes/app/data.txt", "rw");
FileChannel fileChannel = randomAccessFile.getChannel();
// 写入数据, utf-8 一个汉字三个字节
ByteBuffer byteBuffer = ByteBuffer.allocate(18);
byteBuffer.put("写入测试数据".getBytes());
// 切换读模式
byteBuffer.flip();
// 读取Buffer的数据到channel
while (byteBuffer.hasRemaining()) {
fileChannel.write(byteBuffer);
}
byteBuffer.clear();
fileChannel.read(byteBuffer);
System.out.println(new String(byteBuffer.array()));
}
实现文件复制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Test
public void copy() throws IOException {
FileInputStream inputStream = new FileInputStream("/Volumes/app/data.txt");
FileChannel inChannel = inputStream.getChannel();
FileOutputStream outputStream = new FileOutputStream("/Volumes/app/data2.txt");
FileChannel outChannel = outputStream.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(10);
for (;;) {
// 这里一定要清理buffer.否则limit 和 position相等会读取不到数据 num一直等于0(buffer刚被读完状态)
buffer.clear();
int num = inChannel.read(buffer);
if (num == -1) {
break;
}
buffer.flip();
outChannel.write(buffer);
}
outChannel.close();
inChannel.close();
outputStream.close();
inputStream.close();
}
sendFile 零拷贝
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 文件拷贝使用sendFile实现零拷贝
@Test
public void transferFrom() throws IOException {
//创建相关流
FileInputStream fileInputStream = new FileInputStream("/Volumes/app/data.txt");
FileOutputStream fileOutputStream = new FileOutputStream("/Volumes/app/data2.txt");
//获取各个流对应的filechannel
FileChannel sourceCh = fileInputStream.getChannel();
FileChannel destCh = fileOutputStream.getChannel();
//使用transferForm完成拷贝
destCh.transferFrom(sourceCh, 0,sourceCh.size());
//关闭资源
sourceCh.close();
sourceCh.close();
fileInputStream.close();
fileOutputStream.close();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 网络传输文件实现零拷贝
@Test
public void transferTo() throws IOException {
File file = new File("/Volumes/app/data.txt");
Long size = file.length();
try {
// 1.将test.txt文件内容读取到arr中
RandomAccessFile raFile = new RandomAccessFile(file, "rwd");
FileChannel channel = raFile.getChannel();
// 2.提供对外服务
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
serverSocketChannel.configureBlocking(false);
while(true){
SocketChannel socketChannel =
serverSocketChannel.accept();
if(socketChannel != null){
// 3.使用transferTo方法将文件数据传输到客户端
channel.transferTo(0, size, socketChannel);
}
}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
ServerSocketChannel
SocketChannel可以看作是 TCP 客户端,那么 ServerSocketChannel 就是对应的服务端
1
2
3
4
5
6
7
8
9
10
11
12
@Test
public void test() throws IOException {
// 实例化
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 监听 8080 端口
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
while (true) {
// 一旦有一个 TCP 连接进来,就对应创建一个 SocketChannel 进行处理
SocketChannel socketChannel = serverSocketChannel.accept();
}
}这里我们可以看到 SocketChannel 的另一个实例化方式
SocketChannel 它不仅仅是 TCP 客户端,它代表的是一个网络通道,可读可写
ServerSocketChannel 不和 Buffer 打交道了,因为它并不实际处理数据,它一旦接收到请求后,实例化 SocketChannel,之后在这个连接通道上的数据传递它就不管了,因为它需要继续监听端口,等待下一个连接
SocketChannel
可以将 SocketChannel 理解成一个 TCP 客户端或者一个可读可写的网络通道
NIO中的 ServerSocketChannel功能类似ServerSocket,SocketChannel功能类 似Socket
1
2
3
4
5
6
7
8
public void test() throws IOException {
// 打开一个通道
// SocketChannel socketChannel = SocketChannel.open();
// 发起连接
// socketChannel.connect(new InetSocketAddress("http://localhost", 8080));
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 8080));
}SocketChannel 的读写和 FileChannel 没什么区别,就是操作缓冲区
1
2
3
4
5
6
7
// 读取数据
socketChannel.read(buffer);
// 写入数据到网络连接中
while(buffer.hasRemaining()) {
socketChannel.write(buffer);
}
DatagramChannel
UDP 和 TCP 不一样,DatagramChannel 一个类处理了服务端和客户端
UDP 是面向无连接的,不需要和对方握手,不需要通知对方,就可以直接将数据包投出去,至于能不能送达,它是不知道的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Test
public void test() throws IOException {
// 服务端
DatagramChannel server = DatagramChannel.open();
server.socket().bind(new InetSocketAddress(8080));
ByteBuffer buf = ByteBuffer.allocate(35);
// 客户端发送数据
String newData = "datagramChannel test "
+ System.currentTimeMillis();
buf.put(newData.getBytes());
buf.flip();
int bytesSent = server.send(buf, new InetSocketAddress("localhost", 8080));
// 服务端监听
server.receive(buf);
System.out.println(new String(buf.array()));
}
总结
- 通道类似于流,但是通道可以进行读和写
- 读和写
- 通道的
read
操作是将通道中的数据写到缓冲的过程 - 通道的
writer
操作是将缓冲的数据写到通道的过程
- 通道的
- 一个通道可以对应多个缓冲,能够对多个缓冲进行读写
Selector
介绍
- Java 的 NIO,用非阻塞的 IO 方式。可以用一个线程,处理多个的客户端连接,就会使用到Selector(选择器或叫多路复用器)
- Selector 能够检测多个注册的通道上是否有事件发生(注意:多个Channel以事件的方式可以注册到同一个Selector),如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求
- 只有在 通道(连接) 真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程
- 避免了多线程之间的上下文切换导致的开销
说明
- Netty 的 IO 线程 NioEventLoop 聚合了 Selector(选择器, 也叫多路复用器),可以同时并发处理成百上千个客户端连接
- 当线程从某客户端 Socket 通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务
- 线程通常将非阻塞 IO 的空闲时间用于在其他通道上 执行 IO 操作,所以单独的线程可以管理多个输入和 输出通道
- 由于读写操作都是非阻塞的,这就可以充分提升 IO 线程的运行效率,避免由于频繁 I/O 阻塞导致的线程挂起(BIO数据没就绪就会阻塞线程)
- 一个 I/O 线程可以并发处理 N 个客户端连接和读写操作,这从根本上解决了传统同步阻塞 I/O 一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到 了极大的提升
Selector常用方法
Selector是一个抽象类
源码
1
2
3
4
5
6
7
8
9
10
11
public abstract class Selector implements Closeable {
public static Selector open();//得到一个选择器对象
public int select(long timeout);//监控所有注册的通道,当其中有 IO 操作可以进行时,将对应的 SelectionKey 加入到内部集合中并返回,参数用来设置超时时间
public abstract Set<SelectionKey> keys();//获取当前channel注册在Selector上所有的key
public abstract Set<SelectionKey> selectedKeys();//从内部集合中得到所有的 SelectionKey
public abstract int selectNow() throws IOException;//不阻塞,立马返还
public abstract int select(long timeout)throws IOException;//阻塞1000毫秒,在1000毫秒后返回
public abstract int select() throws IOException;//阻塞,监控所有注册的通,至少有一个已注册的Channel发生了事件才返回
public abstract Selector wakeup();//唤醒
public abstract void close() throws IOException;//关闭
}
SelectionKey
SelectionKey,表示 Selector 和Channel的注册关系
当我们调用**
channel.register(selector, SelectionKey.OP_READ, buffer);
方法时,会将通道channel注册到选择器selector上并监听指定事件,同时会返回一个SelectionKey选择键对象,这个键对象标识了通道和选择器之间的注册关系。选择键会记住您关心的通道。它们也会追踪对应的通道是否已经就绪**。当您调用一个选择器对象的select( )方法时,相关的键建会被更新,用来检查所有被注册到该选择器的通道。您可以获取一个就绪键的集合,从而找到当时已经就绪的通道。通过遍历这些键,您可以选择出每个从上次您调用select( )开始直到现在,已经就绪的通道
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public abstract class SelectionKey {
//以下四个常量是定义的通道的四种操作
// 代表读操作
public static final int OP_READ = 1;
// 代表写操作
public static final int OP_WRITE = 4;
// 代表连接已经建立
public static final int OP_CONNECT = 8;
//有新的网络连接可以 accept
public static final int OP_ACCEPT = 16;
//可以针对指定的通道附加一个对象
private volatile Object attachment = null;
// 获得通道对象
public abstract SelectableChannel channel();
// 获得通道所注册的选择器
public abstract Selector selector();
// 验证通道和选择器之间的注册关系是否还生效
public abstract boolean isValid();
//将通道放入选择器的已注销集合中
public abstract void cancel();
//获得该通道所感兴趣的操作
public abstract int interestOps();
//设置通道感兴趣的操作
public abstract SelectionKey interestOps(int paramInt);
//获得通道已准备好的操作
public abstract int readyOps();
//是否可以读
public final boolean isReadable() {
return ((readyOps() & 0x1) != 0);
}
//是否可以写
public final boolean isWritable() {
return ((readyOps() & 0x4) != 0);
}
// 是否可以 accept
public final boolean isConnectable() {
return ((readyOps() & 0x8) != 0);
}
//是否可以 accept
public final boolean isAcceptable() {
return ((readyOps() & 0x10) != 0);
}
//为SelectionKey绑定附加对象
public final Object attach(Object paramObject) {
Object localObject = this.attachment;
this.attachment = paramObject;
return localObject;
}
//绑定之后,可通过对应的SelectionKey取出该对象
public final Object attachment() {
return this.attachment;
}
}
- 当客户端连接时,会通过ServletSocketChannel得到对应的StoketChannel
- Selector进行事件监听(select方法),返回有事件发生的通道个数
- 将SocketChannel注册到Selector上,一个Selector可以注册多个Channel(比如:SelectableChannel:register(Selector sel, int ops) 其中ops为事件类型)
- 注册后返回一个SelectionKey,会和该Selector关联(Selector维护一个集合)
- 进一步得到SelectionKey(有事件发生)
- 通过SelectionKey反向获得SocketChannel(比如:SelectionKey:channel())
- 通过得到的Channel配合Buffer完成读写操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
public class SelectorTest {
@Test
public void server() throws IOException {
// 1.打开一个服务端通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 2.绑定端口号
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
// 3.通道默认是阻塞的,设置为非阻塞
serverSocketChannel.configureBlocking(false);
// 4.创建一个选择器
// 真实类型
// Windows平台: WindowsSelectorImpl
// Linux平台: EpollSelectorImpl
// Mac平台: KQueueSelectorImpl
Selector selector = Selector.open();
// 5.将通道注册到选择器上, 并且指定监听事件 OP_ACCEPT
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 6.轮询式的获取选择器上已经准备就绪的事件
while (true) {
// 7.轮询获取已经就绪的事件
if (selector.select(1000) == 0) {
// 8.如果3秒钟内没有获取到已经就绪的事件, 则打印一个警告信息
System.out.println("没有获取到已经就绪的事件");
continue;
}
// 8.获取已经就绪的事件(可能有多个就绪的事件,所以是集合)
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
// 9.判断就绪时间类型, 如果是OP_ACCEPT, 则表示有新的客户端连接到来
if (selectionKey.isAcceptable()) {
// 10.获取到客户端通道
SocketChannel socketChannel = serverSocketChannel.accept();
// 11.设置为非阻塞
socketChannel.configureBlocking(false);
// 12.将客户端通道注册到选择器上, 并且指定监听事件 OP_READ, 再添加一个绑定对象
socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
}
// 13.如果是OP_READ, 则表示有客户端数据发送过来
if (selectionKey.isReadable()) {
// 14.通过selectionKey获取到客户端通道
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
// 15. 获取绑定对象
ByteBuffer byteBuffer = (ByteBuffer) selectionKey.attachment();
// 16.读取数据
int read = socketChannel.read(byteBuffer);
System.out.println(new String(byteBuffer.array(), 0, read, StandardCharsets.UTF_8));
// 17.向客户端发送数据
socketChannel.write(ByteBuffer.wrap("hello client".getBytes(StandardCharsets.UTF_8)));
// 通道一旦关闭就不能被再次打开
socketChannel.close();
}
// 18.清空已处理的事件
iterator.remove();
}
}
}
@Test
public void client() throws IOException, InterruptedException {
// 1.打开一个客户端通道
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 8080));
// 2.设置为非阻塞
socketChannel.configureBlocking(false);
// 3.发送数据
socketChannel.write(ByteBuffer.wrap("hello server".getBytes(StandardCharsets.UTF_8)));
// 4.客户端读取服务端发送过来的数据
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int read = socketChannel.read(byteBuffer);
System.out.println(new String(byteBuffer.array(), 0, read, StandardCharsets.UTF_8));
}
}
总结
- Channel注册到Selector中必须设置需要关注的事件
- Channel注册到Selector会返回一个SelectionKey,可以通过这个Key反向获取到和客户端连接的Channel
- 一个Selector可以注册多个Channel
- 当触发了注册的事件后,数据准备就绪就会将SelectionKey放到
Selector的selectedKeys
中,可以遍历这个Set去处理就绪事件 - NIO的非阻塞特性需要手动设置
SelectableChannel.configureBlocking(false)
NIO 网络编程
要求
内核角度看NIO
Reactor线程模型
Reactor模式 是一种「事件驱动」模式
「Reactor线程模型」就是通过单个线程使用Java NIO包中的Selector的select()方法,进行监听。当获取到事件(如accept、read等)后,就会分配(dispatch)事件进行相应的事件处理(handle)
如果要给 Reactor线程模型 下一个更明确的定义,应该是
Reactor线程模式 = Reactor(I/O多路复用)+ 线程池
其中Reactor负责监听和分配事件,线程池负责处理事件
单Reactor单线程
说明:
- Select 是前面 I/O 复用模型介绍的标准网络编程API,可以实现应用程序通过一个阻塞对象监听多路连接请求(IO多路复用)
Reactor
对象通过Select
监控客户端请求事件,收到事件后通过Dispatch
进行分发- 如果是建立连接请求事件,则由
Acceptor
通过 accept 处理连接请求,然后创建一个Handler
对象处理连接完成后的后续业务处理- 如果不是建立连接事件,则
Reactor
会分发调用连接对应的Handler
来响应- Handler 会完成
Read→业务处理→Send
的完整业务流程.服务器端用一个线程通过多路复用搞定所有的 IO 操作(包括连接,读、写 等),编码简单,清晰明了,但是如果客户端连接数量较多,将无法支撑
优点:
- 模型简单,没有多线程、进程通信、竞争的问题,全部都在一个线程中完成
缺点:
- 性能问题,只有一个线程,无法完全发挥多核 CPU 的性能。Handler 在处理某 个连接上的业务时,整个进程无法处理其他连接事件,很容易导致性能瓶颈
- 可靠性问题,线程意外终止,或者进入死循环,会导致整个系统通信模块不 可用,不能接收和处理外部消息,造成节点故障
使用场景:
- 客户端的数量有限,业务处理非常快速,比如 Redis在业务处理的时间复 杂度 O(1) 的情况
单Reactor多线程
说明:
- Reactor 对象通过 select 监控客户端请求事件, 收到事件后,通过 dispatch 进行分发
- 如果建立连接请求, 则由 Acceptor 通过 accept 处理连接请求, 然后创建一个Handler对象处理完成连接后的各种事件
- 如果不是连接请求,则由 Reactor 分发调用连接对 应的Handler 来处理
- Handler 只负责响应事件,不做具体的业务处理, 通过 read 读取数据后,会分发给后面的 Worker 线程池的某个线程处理业务
- Worker 线程池会分配独立线程完成真正的业务, 并将结果返回给 Handler
- Handler 收到响应后,通过 send 将结果返回给 Client
优点:
- 可以充分的利用多核cpu 的处理能力
缺点:
- 多线程数据共享和访问比较复杂,单 Reactor 处理所有的事件的监听和响应,在单线程运行, 在高并发场景容易出现性能瓶颈
主从Reactor多线程
说明:
- Reactor 主线程 MainReactor 对象通过 select 监听连接事件, 收到事件后,通过 Acceptor 处理连接事件(主 Reactor 只处理连接事件)
- 当 Acceptor 处理连接事件后,MainReactor 将连接分配给 SubReactor
- SubReactor 将连接加入到连接队列进行监听,并创建 Handler 进行各种事件处理
- 当有新事件发生时, SubReactor 就会调用对应的 Handler处理,Handler 通过 read 读取数据,分发给后面的 (Worker 线程池)处理
- (Worker 线程池)分配独立的 (Worker 线程)进行业务处理,并返 回结果
- Handler 收到响应的结果后,再通过 send 将结果返回给 Client
- 一个 MainReactor 可以关联多个 SubReactor
优点:
- 父线程与子线程的数据交互简单职责明确,父线程只需要接收新连接,子线程完成后续的业务处理
- 父线程与子线程的数据交互简单,Reactor 主线程只需要把新连接传给子线程,子线程无需返回数据
缺点:
- 编程复杂度较高
结合实例:
- 这种模型在许多项目中广泛使用,包括
Nginx
主从 Reactor 多进程模型,Memcached
主从多线程,Netty
主从多线程模型的支持
小结
- 单 Reactor 单线程,前台接待员和服务员是同一个人,全程为顾客服
- 单 Reactor 多线程,1 个前台接待员,多个服务员,接待员只负责接待
- 主从 Reactor 多线程,多个前台接待员,多个服务员
Reactor 模式具有如下的优点:
- 响应快,不必为单个同步事件所阻塞,虽然 Reactor 本身依然是同步的
- 可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程 的切换开销
- 扩展性好,可以方便的通过增加 Reactor 实例个数来充分利用 CPU 资源
- 复用性好,Reactor 模型本身与具体事件处理逻辑无关,具有很高的复用性
Netty模型
简单版
- BossGroup线程维护Selector, 只关注Accecpt(只处理连接事件)
- 当接收到Accept事件,获取到对应的 SocketChannel, 封装成 NIOScoketChannel并注册到 Worker 线程(事件循环), 并进行维护
- 当Worker线程监听到 Selector 中通道发生自己感兴趣的事件后(监听事件),就进行处理(就由 Handler 处理), 注意 Handler 已经加入到通道
进阶版
- Netty主要基于主从Reactor多线程模型做了一定的改进,其中主从Reactor多线程模型有多个Reactor
- 每个Reactor都可以看作是一个NIOEventLoop
最终版
- Netty 抽象出两组线程池 ,BossGroup 专门负责接收客户端的连接,WorkerGroup 专门负责网络的读写
- BossGroup 和 WorkerGroup 类型都是 NioEventLoopGroup (NIO事件循环组)
- NioEventLoopGroup 相当于一个事件循环组,这个组中含有多个事件循环,每一个事件循环是 NioEventLoop
- NioEventLoop 表示一个不断循环的执行处理任务的线程(selector监听绑定事件是否发生,因为是非阻塞的所以需要不断循环),每个 NioEventLoop 都有一个 Selector,用于监听绑定在其上的 socket 的网络通讯,比如NioServerSocketChannel绑定在服务器boosgroup的NioEventLoop的selector上,NioSocketChannel绑定在客户端的NioEventLoop的selector上,然后各自的selector就不断循环监听相关事件
- NioEventLoopGroup 可以有多个线程,即可以含有多个 NioEventLoop
- 每个 BossGroup下面的NioEventLoop 循环执行的步骤有 3 步
- 轮询 accept 事件
- 处理 accept 事件,与 client 建立连接,生成 NioScocketChannel,并将其注册到某个 workerGroup NIOEventLoop 上的 Selector
- 继续处理任务队列的任务,即 runAllTasks(同一时间可能会有多个连接,就绪的任务就放在TaskQueue中)
- 每个 WorkerGroup NIOEventLoop 循环执行的步骤
- 轮询 read,write 事件
- 处理 I/O 事件,即 read,write 事件,在对应 NioScocketChannel 处理
- 处理任务队列的任务,即 runAllTasks
- 每个 Worker NIOEventLoop 处理业务时,会使用
pipeline(管道)
,pipeline 中包含了 channel(通道),即通过 pipeline 可以获取到对应通道,管道中维护了很多的Handler- NioEventLoop 内部采用串行化设计,从消息的 读取->解码->处理->编码->发送,始终由 IO 线程 NioEventLoop 负责
- NioEventLoopGroup 下包含多个 NioEventLoop
- 每个 NioEventLoop 中包含有一个 Selector,一个 taskQueue
- 每个 NioEventLoop 的 Selector 上可以注册监听多个 NioChannel
- 每个 NioChannel 只会绑定在唯一的 NioEventLoop 上
- 每个 NioChannel 都绑定有一个自己的 ChannelPipeline
- NioChannel可以获取对应的ChannelPipeline,ChannelPipeline也可以获取对应的NioChannel
ChannelPipeline 管道:比如水管,水管是由多个组件组成的,比如,弯头,开关,变径等。这些组件就好比是Handler, 当水(NioChannel)流过管道(ChannelPipeline)就会经过经过管道的组件,水就会被管道组件(Handler)所处理
Netty TCP服务例子
使用Netty开发,当客户端连接上服务端后发送消息,服务端接收消息后相应客户端
1 |
|
1 |
|
1 |
|
1 |
|
自定义任务
- 当需要执行比较耗时的任务时,为了避免长时间阻塞
EventLoop
,可以将任务提交给taskQueue
,由任务队列异步执行用户自定义普通任务,修改
NettyServerHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 数据读取完毕
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 添加耗时长的自定义任务, 异步执行10秒, 如果添加多个则串行执行
ctx.channel().eventLoop().execute(() -> {
try {
// 假设任务需要执行10
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
ctx.channel().writeAndFlush(Unpooled.copiedBuffer("自定义任务响应", CharsetUtil.UTF_8));
});
//writeAndFlush 是 write + flush
//将数据写入到缓存,并刷新
//一般讲,我们对这个发送的数据进行编码
System.out.println(Thread.currentThread().getName());
ctx.writeAndFlush(Unpooled.copiedBuffer("hello client !!!", CharsetUtil.UTF_8));
}
启动服务端和客户端后
服务端输出:
客户端发送消息是:hello server !!
客户端地址:/127.0.0.1:57026
nioEventLoopGroup-3-1
nioEventLoopGroup-3-1
客户端输出:
服务端回复的消息是:hello client !!!
服务器地址是:localhost/127.0.0.1:6666
服务端回复的消息是:自定义任务响应
服务器地址是:localhost/127.0.0.1:6666由此可见,用户自定义的任务不会阻塞正常的流程,并且
taskQueue
执行的线程是EventLoop
这个线程
定时任务
定时任务,按照指定时间间隔触发,异步执行。任务是提交到
scheduledTaskQueue
用户定义定时任务,修改
NettyServerHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// 数据读取完毕
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 添加耗时长的自定义任务, 异步执行
ctx.channel().eventLoop().execute(() -> {
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info(Thread.currentThread().getName());
ctx.channel().writeAndFlush(Unpooled.copiedBuffer("自定义任务响应", CharsetUtil.UTF_8));
});
// 添加定时任务, 10秒执行一次
ctx.channel().eventLoop().schedule(() -> {
log.info(Thread.currentThread().getName());
ctx.channel().writeAndFlush(Unpooled.copiedBuffer("定时任务响应", CharsetUtil.UTF_8));
}, 10, TimeUnit.SECONDS);
//writeAndFlush 是 write + flush
//将数据写入到缓存,并刷新
//一般讲,我们对这个发送的数据进行编码
log.info(Thread.currentThread().getName());
ctx.writeAndFlush(Unpooled.copiedBuffer("hello client !!!", CharsetUtil.UTF_8));
}
启动服务端和客户端后
服务端输出:
客户端发送消息是:hello server !!
客户端地址:/127.0.0.1:58964
19:17:19.889 [nioEventLoopGroup-3-1] INFO com.wgf.tcp.NettyServerHandler - nioEventLoopGroup-3-1
19:17:29.912 [nioEventLoopGroup-3-1] INFO com.wgf.tcp.NettyServerHandler - nioEventLoopGroup-3-1
19:17:29.912 [nioEventLoopGroup-3-1] INFO com.wgf.tcp.NettyServerHandler - nioEventLoopGroup-3-1
客户端输出:
19:17:19.905 [nioEventLoopGroup-2-1] INFO com.wgf.tcp.NettyClientHandler - 服务端回复的消息是:hello client !!!
19:17:19.905 [nioEventLoopGroup-2-1] INFO com.wgf.tcp.NettyClientHandler - 服务器地址是:localhost/127.0.0.1:6666
19:17:29.912 [nioEventLoopGroup-2-1] INFO com.wgf.tcp.NettyClientHandler - 服务端回复的消息是:自定义任务响应
19:17:29.912 [nioEventLoopGroup-2-1] INFO com.wgf.tcp.NettyClientHandler - 服务器地址是:localhost/127.0.0.1:6666
19:17:29.912 [nioEventLoopGroup-2-1] INFO com.wgf.tcp.NettyClientHandler - 服务端回复的消息是:定时任务响应
19:17:29.912 [nioEventLoopGroup-2-1] INFO com.wgf.tcp.NettyClientHandler - 服务器地址是:localhost/127.0.0.1:6666
scheduledTaskQueue
的执行线程是EventLoop
这个线程
非Reachor线程调用Channel
例如在推送系统的业务线程里面,根据用户的标识,找到对应用户的Channel引用,然后调用Channel的writer方法向用户推送消息
用户定义定时任务,修改
NettyServerHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
// channel 用户列表
public static final Set<Channel> channelSet = new HashSet<>();
// 构造器执行定时任务
public NettyServerHandler() {
// 模拟推送业务
pushMsg();
}
//读取数据实际(这里我们可以读取客户端发送的消息)
/*
1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址
2. Object msg: 就是客户端发送的数据 默认Object
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("服务器读取线程 " + Thread.currentThread().getName() + " channle =" + ctx.channel());
System.out.println("server ctx =" + ctx);
System.out.println("看看channel 和 pipeline的关系");
Channel channel = ctx.channel();
ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链接, 出站入站
//将 msg 转成一个 ByteBuf
//ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer.
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("客户端发送消息是:" + byteBuf.toString(CharsetUtil.UTF_8));
System.out.println("客户端地址:" + channel.remoteAddress());
}
// 数据读取完毕
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
channelSet.add(ctx.channel());
//writeAndFlush 是 write + flush
//将数据写入到缓存,并刷新
//一般讲,我们对这个发送的数据进行编码
log.info(Thread.currentThread().getName());
ctx.writeAndFlush(Unpooled.copiedBuffer("hello client !!!", CharsetUtil.UTF_8));
}
// 定时向客户端推送在线人数
private void pushMsg() {
// 非Reactor线程
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.scheduleAtFixedRate(() -> {
int count = channelSet.size();
if (count > 0) {
Iterator<Channel> iterator = channelSet.iterator();
while (iterator.hasNext()) {
Channel next = iterator.next();
if (!next.isActive()) {
iterator.remove();
} else {
next.pipeline().writeAndFlush(Unpooled.copiedBuffer("当前人数:" + count, CharsetUtil.UTF_8));
}
}
}
}, 5, 5, TimeUnit.SECONDS);
}
//处理异常, 一般是需要关闭通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
客户端输出:
19:49:46.322 [nioEventLoopGroup-2-1] INFO com.wgf.tcp.NettyClientHandler - 服务端回复的消息是:hello client !!!
19:49:46.322 [nioEventLoopGroup-2-1] INFO com.wgf.tcp.NettyClientHandler - 服务器地址是:localhost/127.0.0.1:6666
19:49:51.287 [nioEventLoopGroup-2-1] INFO com.wgf.tcp.NettyClientHandler - 服务端回复的消息是:当前人数:1
19:49:51.287 [nioEventLoopGroup-2-1] INFO com.wgf.tcp.NettyClientHandler - 服务器地址是:localhost/127.0.0.1:6666
19:49:56.275 [nioEventLoopGroup-2-1] INFO com.wgf.tcp.NettyClientHandler - 服务端回复的消息是:当前人数:1
19:49:56.275 [nioEventLoopGroup-2-1] INFO com.wgf.tcp.NettyClientHandler - 服务器地址是:localhost/127.0.0.1:6666
Netty源码剖析
客户端Bootstrap启动过程
Netty 中 Bootstrap 类是客户端程序的启动引导类
使用官方Example EchoClient, 下面的源码剖析都根据这个Example进行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public final class EchoClient {
static final boolean SSL = System.getProperty("ssl") != null;
static final String HOST = System.getProperty("host", "127.0.0.1");
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
public static void main(String[] args) throws Exception {
// Configure SSL.git
final SslContext sslCtx;
if (SSL) {
sslCtx = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE).build();
} else {
sslCtx = null;
}
// Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(HOST, PORT).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
private final ByteBuf firstMessage;
/**
* Creates a client-side handler.
*/
public EchoClientHandler() {
firstMessage = Unpooled.buffer(EchoClient.SIZE);
for (int i = 0; i < firstMessage.capacity(); i ++) {
firstMessage.writeByte((byte) i);
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(firstMessage);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
Bootstrap启动流程
EventLoopGroup的初始化
客户端源码在一开始就实例化了 一个 NioEventLoopGroup 对象因此就从它的构造器中追踪 EventLoop 的初始化过程。首先来看 NioEventLoopGroup 的类继承层次结构
NioEventLoopGroup 有几个重载的构造器,不过内容都没有太大的区别,最终都调用父类 MultithreadEventLoopGroup 的构造器
1 |
|
参数
nThreads
就是 NioEventLoopGroup 线程数量,其实就是 EventLoop 的数量,如果传入的参数为0则额外特殊处理, 在静态代码块中获取当前机器的CPU核心数量 * 2
1 |
|
Netty首先从系统属性中获取“io.netty.eventLoopThreads”的值,如果我们没有设置,就返回默认值,即
CPU核数 × 2
。回到 MultithreadEventLoopGroup 构造器中会继续调用父类MultithreadEventExecutorGroup 的构造器
1 |
|
EventLoop的创建过程
newChild(executor, args)
这是一个抽象方法,在 MultithreadEventLoopGroup 的构造函数中被调用,通过循环来创建EventLoop ,它的任务是实例化 EventLoop 对象。我们跟踪一下它的代码,可以发现,这个方法在 NioEventLoopGroup 类中有实现
1 |
|
1 |
|
EventExecutorChooser的创建过程
事件执行器选择器,用于在Channel注册时提供选择 EventLoop 功能
chooserFactory.newChooser(children);
在 MultithreadEventLoopGroup 的构造函数中被调用
DefaultEventExecutorChooserFactory
1 |
|
如果nThreads是2的平方,则使用 PowerOfTwoEventExecutorChooser , 否则使用GenericEventExecutorChooser。这两个Chooser都重写next()方法。next() 方法的主要功能就是将数组索引循环位移,如下图所示
当索引移动到最后一个位置时,再调用next()方法就会将索引位 置重新指向0,如下图所示
选择器之所有两个内部类,是因为计算机底层的 & 运算比 % 运算效率高,Netty会根据nThread是否为2次幂动态选择,属于一个优化手段,最终实现的都是一个 轮询算法
总结
- NioEventLoopGroup 底层其实是 MultithreadEventExecutorGroup 内部维护好了一个 EventExecutor 的children数组 ,其大小是nThreads,这样就构成了一 个线程池
- 实例化 NioEventLoopGroup 时,如果指定线程池大小,则nThreads就是指定的值,反之是CPU核数×2
- 在 MultithreadEventExecutorGroup 中调用 newChild() 象方法来初始化children数组
- newChild() 方法是在 NioEventLoopGroup 中实现的,它返回一个 NioEventLoop 实例
- 初始化NioEventLoop对象并给属性赋值,具体赋值的属性如下
- provider : 就是在 NioEventLoopGroup 构造器中 , 调用 **SelectorProvider.provider()**方法获取的 SelectorProvider (java spi) 对象
- selector : 就是在 NioEventLoop 构造器中 , 调用 provider.openSelector() (java nio) 方法获取的
Selector
对象
sequenceDiagram EventLoopGroup ->> MultithreadEventLoopGroup : 构造函数调用,nThreads取值:参数指定,构造函数指定,或cpu核心数 * 2 MultithreadEventLoopGroup ->> MultithreadEventExecutorGroup : 构造函数调用,确认EventLoop数量 MultithreadEventExecutorGroup ->> NioEventLoopGroup : 循环调用 newChild() 创建EventLoo NioEventLoopGroup ->> NioEventLoop : 构造函数 -> JCTolls 创建任务队列,openSelector(),默认创建优化的Selector MultithreadEventExecutorGroup ->> DefaultEventExecutorChooserFactory : newChooser(), 创建EventLoop选择器 MultithreadEventExecutorGroup ->> MultithreadEventExecutorGroup : 添加EventLoop终止监听 MultithreadEventExecutorGroup ->> MultithreadEventExecutorGroup : 将EventLoop数组转为只读Set
Channel
在Netty中,Channel相当于一个Socket的抽象,它为用户提供了关于Socket状态(是连接还是断开)及对Socket的读、写等操作。每当Netty建立了一个连接,都创建一个与其对应的Channel实例
除了TCP,Netty还支持很多其他的连接协议,并且每种协议还有NIO(非阻塞I/O)和OIO(Old-I/O,即传统的阻塞I/O)版本的区别,不同协议不同阻塞类型的连接都有不同的Channel类型与之对应
通道类型 说明 NioSocketChannel 异步的客户端 TCP Socket 连接 NioServerSocketChannel 异步的服务器端 TCP Socket 连接 NioDatagramChannel 异步的 UDP 连接 NioSctpChannel 异步的客户端 Sctp 连接 NioSctpServerChannel 异步的 Sctp 服务器端连接,这些通道涵盖了 UDP 和 TCP 网络 IO 以及文件 IO OioSocketChannel 同步的客户端 TCP Socket 连接 OioServerSocketChannel 同步的服务器端 TCP Socket 连接 OioDatagramChannel 同步的 UDP 连接 OioSctpChannel 同步的 Sctp 服务器端连接 OioSctpServerChannel 同步的客户端 TCP Socket 连接
Channel的创建过程
客户端连接代码的初始化Bootstrap中,该方法调用了一个channel()方法,传入的参数是NioSocketChannel.class,在这个方法中其实就是初始化了一ReflectiveChannelFactory的对象,代码实现如下 :
1
2
3
4
5
6
7
8
9
10
11
/**
* 设置服务端Channel类型,返回ReflectiveChannelFactory 它是一个反射工厂, 通过反射创建Channel对象
* The {@link Class} which is used to create {@link Channel} instances from.
* You either use this or {@link #channelFactory(io.netty.channel.ChannelFactory)} if your
* {@link Channel} implementation has no no-args constructor.
*/
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}而 ReflectiveChannelFactory 实现了 ChannelFactory 接口,它提供了唯一的方法,即newChannel()方法。顾名思义,ChannelFactory 就是创建 Channel 的工厂类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* 反射Channel工厂
* A {@link ChannelFactory} that instantiates a new {@link Channel} by invoking its default constructor reflectively.
*/
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
// Channel 的构造函数
private final Constructor<? extends T> constructor;
/**
* 构造函数,根据传递进来的Channel类型获取其无参构造函数
*/
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
" does not have a public non-arg constructor", e);
}
}
/**
* 通过无参构造函数调用Java反射Api生成一个新的Channel,在有客户端建立新的连接时使用
* @return
*/
@Override
public T newChannel() {
try {
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
...
}结论:
- Bootstrap 中的 ChannelFactory 实现类是 ReflectiveChannelFactory
- 通过 channel() 方法创建的 Channel 具体类型是 NioSocketChannel
- Channel 的实例化过程其实就是调用 ChannelFactory 的newChannel() 方 法 , 而实例化的 Channel 具体类型又和初始化Bootstrap时传入的channel()方法的参数相关。因此对于客户端的 Bootstrap 而言,创建的 Channel 实例就是 NioSocketChannel
sequenceDiagram AbstractBootstrap ->> AbstractBootstrap : channel() -> channelFactory() 创建ChannelFactory AbstractBootstrap ->> ReflectiveChannelFactory : 创建一个通过反射创建Channel的ChannelFactory
NioSocketChannel的创建
1
2
3
4
5
6
7
...
// 指定Channel类型
.channel(NioSocketChannel.class)
...
// Start the client.
ChannelFuture f = b.connect(HOST, PORT).sync();
...
- 分析 Bootstrap 类的 connect() 方法调用链路图可知,当客户端发起连接操作的时候在 AbstractBootstrap 的 initAndRegister() 方 法 中 , 调用
ChannelFactory 的newChannel()
方法来创建一个 NioSocketChannel 的实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
...
// 初始化和注册Channel
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 通过工厂生成Channel, 工厂在配置引导类调用 .channel(channelClass) 方法时被初始化
channel = channelFactory.newChannel();
// 本类的抽象方法,客户端由 Bootstrap 类实现,服务端由 ServerBootStrap 类实现
init(channel);
} catch (Throwable t) {
if (channel != null) {
// 异常强制关闭 channel, Netty里,channel就是对Socket的抽象封装
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 异步地向EventLoopGroup 注册当前的channel
ChannelFuture regFuture = config().group().register(channel);
// 处理重复注册和异常
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
...
客户端Channel的初始化
NioSocketChannel体系
- 在 ReflectiveChannelFactory. newChannel() 方法中,利用反射机制调用类对象的
newInstance()
方法来创建一个新的Channel实例,相当于调用NioSocketChannel的默认构造器 (参考 ReflectiveChannelFactory 工厂的实现)- 利用反射机制调用类对象的
newInstance()
方法来创建一个新的Channel实例,相当于调用 NioSocketChannel 的默认构造器。NioSocketChannel的默认构造器代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioSocketChannel.class);
// JDK的 Selector提供者,JDK NIO 模块的 SPI机制, 主要兼容不同的平台 mac, win, linux
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
// 通过java SPI 机制获取不同平台上打开Socket的实现
private static final Method OPEN_SOCKET_CHANNEL_WITH_FAMILY =
SelectorProviderUtil.findOpenMethod("openSocketChannel");
// java nio Channel的相关配置,NioSocketChannel 持有 java nio Channel的相关配置这点可以体现Netty 对 java nio Channel 的高度封装
private final SocketChannelConfig config;
// 2.创建一个新的Channel, 构造函数会调用这个方法
private static SocketChannel newChannel(SelectorProvider provider, InternetProtocolFamily family) {
try {
// 通过反射调用 不同平台实现的 openSocketChannel 方法获取一个 SocketChannel
SocketChannel channel = SelectorProviderUtil.newChannel(OPEN_SOCKET_CHANNEL_WITH_FAMILY, provider, family);
// 根据 SPI 打开一个新的 java Nio Channel
return channel == null ? provider.openSocketChannel() : channel;
} catch (IOException e) {
throw new ChannelException("Failed to open a socket.", e);
}
}
/**
* 无参构造函数
* ReflectiveChannelFactory 工厂使用这个反射生成Channel
* Create a new instance
*/
public NioSocketChannel() {
this(DEFAULT_SELECTOR_PROVIDER);
}
/**
* 支持自定义 Selector提供者
* Create a new instance using the given {@link SelectorProvider}.
*/
public NioSocketChannel(SelectorProvider provider) {
this(provider, null);
}
/**
* 构造器
* Create a new instance using the given {@link SelectorProvider} and protocol family (supported only since JDK 15).
*/
public NioSocketChannel(SelectorProvider provider, InternetProtocolFamily family) {
this(newChannel(provider, family));
}
- 在这个构造器中首先会调用 newSocket() 方法来打开一个新的Java NIO的 SocketChannel 对象 (步骤 2)
1
2
3
4
5
6
7
8
9
10
11
/**
* 构造函数,第一个channel 暂时为空,第二个ch 为 java nio SocketChannel
* Create a new instance
*
* @param parent the parent {@link Channel} by which this instance was created. May be {@code null}
* @param ch the underlying {@link SelectableChannel} on which it operates
*/
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
// 调用父类 AbstractNioChannel 的构造器,并传入一个读操作标识
super(parent, ch, SelectionKey.OP_READ);
}
最后会调用其父类的构造器,即AbstractNioByteChannel的构造器 (步骤 4),传入参数,parent的值默认为null,ch为之前调用
newSocket()方法创建的Java NIO 的 SocketChannel 对象,因此新创建的 NioSocketChannel 对象中的parent暂时是null
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* Create a new instance
*
* @param parent the parent {@link Channel} by which this instance was created. May be {@code null}
* @param ch the underlying {@link SelectableChannel} on which it operates
* @param readInterestOp the ops to set to receive data from the {@link SelectableChannel}
*/
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
// AbstractChannel 父类的构造函数
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
// 设置 java nio channel 为非阻塞的
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
- 接着会调用父类AbstractNioChannel的构造器,并传入实际参数readInterestOp=SelectionKey.OP_READ
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
// AbstractChannel 父类的构造函数
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
// 设置 java nio channel 为非阻塞的
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
- 最后会调用父类 AbstractChannel 的构造器
1
2
3
4
5
6
7
8
9
protected AbstractChannel(Channel parent) {
this.parent = parent;
// 创建一个 DefaultChannelId,为Channel 绑定一个 ShortId 不唯一的值, 只有LongId才不会重复,通过mac 地址等基础数据计算
id = newId();
// Unsafe 是 Channel 的内部类,只允许IO线程调用,真正用于数据传输操作
unsafe = newUnsafe();
// 为当前的 Channel 创建一个 DefaultChannelPipeline 管道,所以 Channel 和 Pipeline 是一对一关系
pipeline = newChannelPipeline();
}至此,NioSocketChannel就完成了初始化, 注意:创建了一个Nio 的 Channel, 此时上面的parent 还是 null
总结:
调用 NioSocketChannel.newSocket(DEFAULT_SELECTOR_PROVIDER) ->
SelectorProvider.openSocketChannel()
打开一个新的Java Nio SocketChannel
向上不断调用其父类的构造函数直到初始化 AbstractChannel(Channel parent) 对象并给属性赋值,具体赋值的属性如下
- id:每个Channel都会被分配一个id
- parent:属性值默认为null
- unsafe:通过调用newUnsafe()方法实例化一个Unsafe对象,它的类型是 AbstractNioByteChannel.NioByteUnsafe内部类,负责真正的I/O读写
- pipeline:是通过调用new DefaultChannelPipeline(this)新创建的实例
- AbstractNioByteChannel 中被赋值的属性如下
- ch:被赋值为Java原生SocketChannel,即NioSocketChannel的newSocket()方法返回的Java NIO SocketChannel
- readInterestOp:被赋值为SelectionKey.OP_READ
- ch:被配置为非阻塞,即调用 ch.configureBlocking(false) 方法
- NioSocketChannel 中被赋值的属性 : config = newNioSocketChannelConfig(this,socket.socket())
sequenceDiagram Bootstrap ->> Bootstrap : connect() -> doResolveAndConnect() 客户端连接 Bootstrap ->> AbstractBootstrap : initAndRegister() 初始化和注册Channel AbstractBootstrap ->> ReflectiveChannelFactory : .newChannel() 通过工厂创建Channel ReflectiveChannelFactory ->> NioSocketChannel : 反射创建 NioSocketChannel NioSocketChannel ->> SelectorProvider : SPI机制调用 openSocketChannel SelectorProvider ->> SocketChannel : 获得nio原生Channel NioSocketChannel ->> AbstractNioChannel : Channel设置非阻塞 AbstractNioChannel ->> AbstractChannel : newId() newUnsafe() newChannelPipeline()
Unsafe内部类
注意:这里的
Unsafe
并不是我们常说的Java自带的sun.misc.Unsafe
,而是 io.netty.channel.Channel#Unsafe。按源码注释上说法是 “Unsafe函数不允许被用户代码使用,这些函数是真正用于数据传输操作,必须被IO线程调用” ,也就是说真正依赖于底层协议/方案的实现是通过Unsafe包装出去的 在实例化 NioSocketChannel 的 过 程 中 , Unsafe就特别关键 。 Unsafe 其实是对Java底层Socket操作的封装,因此,它实际上是沟通 Netty上层和Java底层的重要桥梁。下面我们看一下Unsafe接口所提供的方法
Unsafe
的初始化时机在 AbstractChannel 的构造函数中调用了newUnsafe()
1 |
|
从上述源码中可以看出,这些方法其实都是与Java底层的相关Socket的操作相对应的
AbstractChannel$AbstractUnsafe 基本抽象实现
AbstractChannel 的构造方法中 , 在这里调用了 newUnsafe()方法获取一个新的Unsafe对象,而newUnsafe()方法在 NioSocketChannel中被重写了
1 |
|
1 |
|
NioSocketChannel 的 newUnsafe() 方 法 会 返 回 一 个 NioSocketChannelUnsafe实例。从这里我们就可以确定,在实例化的 NioSocketChannel中的 Unsafe 属性其实是一 个 NioSocketChannelUnsafe的实例
ChannelPipeline的初始化
1 |
|
在创建完 AbstractChannel 构造函数中可以看到,在创建完 Unsafe 后紧接着又调用 newChannelPipeline() 创建 ChannelPipeline
在 ChannelPipeline 的注释说明中写道 “Each channel has its own pipeline and it is created automatically when a new channel is created” 。我们知道,在实例化一个Channel时,必然都要实例化一个 ChannelPipeline 。而我们确实在 AbstractChannel 的构造器中看到了ChannelPipeline 属性被初始化为DefaultChannelPipeline 的实例
1 |
|
AbstractChannel.newChannelPipeline() 方法实现
1 |
|
DefaultChannelPipeline
1 |
|
DefaultChannelPipeline 的构造器需要传入一个 Channel,而这个 Channel 其实就是我们实例化的NioSocketChannel 对象,DefaultChannelPipeline 会将这个NioSocketChannel 对象保存在 Channel 属性中。DefaultChannelPipeline 中还有两个特殊的属性,即Head和Tail,这两个属性是双向链表的头和尾。其实在DefaultChannelPipeline 中维护了一个以AbstractChannelHandlerContext为节点元素的双向链表,这个链表是Netty实现Pipeline机制的关键
HandlerContext
HeadContext 体系
TailContext 体系
1 |
|
1 |
|
获取出站和入站的HandleConetxt方法
1 |
|
sequenceDiagram AbstractChannel ->> AbstractChannel : 构造函数 -> newChannelPipeline() AbstractChannel ->> DefaultChannelPipeline : 创建默认的 ChannelPipeline, 传递Channel DefaultChannelPipeline ->> HeadContext : 创建Pipeline 链表头 DefaultChannelPipeline ->> TailContext : 创建Pipeline 链表尾
将Channel注册到Selector
Bootstrap 最后会调用
connect(String inetHost, int inetPort)
方法连接服务端,最终会调用 Bootstrap 的doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress)
方法,Channel (java nio Channel) 会在 Bootstrap 的initAndRegister()
中进行初始化,但是这个方法还会将初始化好的 Channe (java nio Channel) 注册到 NioEventLoop 的 Selector 中
1 |
|
1 |
|
当Channel初始化后,紧接着会调用
config().group().register(channel)
方法来向 Selector 注册 Channel通过跟踪调用链 , 我们最终发现在 AbstractBootstrap 的
initAndRegister()
方法中调用的是Unsafe
的register()
方法,接下来看一下AbstractChannel$AbstractUnsafe.register()
方法的具体实现代码
sequenceDiagram Bootstrap ->> AbstractBootstrap : initAndRegister() AbstractBootstrap ->> MultithreadEventLoopGroup : register() MultithreadEventLoopGroup ->> SingleThreadEventLoop : register() SingleThreadEventLoop ->> AbstractChannel$AbstractUnsafe : register()
通过跟踪调用链,我们最终发现在 AbstractBootstrap 的
initAndRegister()
方法中调用的是Unsafe的register()方法,接下来看一下 AbstractChannel$AbstractUnsafe.register()方法的具体实现代码
AbstractChannel$AbstractUnsafe
1 |
|
首先,将 EventLoop 赋值给 AbstractChannel 的
eventLoop
属性,我们知道 EventLoop 对象其实是通过 MultithreadEventLoopGroup的next() 方法
获取的,根据前面的分析,可以确定next()方法返回的EventLoop对象是 NioEventLoop 实例。register()方法接着调用了register0()方法,代码如下
AbstractChannel$AbstractUnsafe
1 |
|
register0()
方法又调用了 AbstractNioChannel的doRegister()方法,代码如下
1 |
|
看到
javaChannel()
这个方法,我们在前面已经知道了,它返回的是一个Java NIO的SocketChannel对象,这里我们将SocketChannel注册到与eventLoop关联的Selector
上
总结
- 在客户端引导类 Bootstrap 的连接操作方法
connect
中,最终户i调用到 AbstractBootstrap 的initAndRegister()方法- 在 AbstractBootstrap 的
initAndRegister()
方法中,通过group().register(channel) 调用 MultithreadEventLoopGroup 的register()
方法- 在 MultithreadEventLoopGroup 的
register()
方法中,调用next()
(通过前面的 EventExecutorChooser 从EventloopGroup选择一个EventGroup) 方法获取一个可用的 SingleThreadEventLoop,然后调用它的register()
方法- 在 SingleThreadEventLoop 的 register() 方法中 , 调用
channel.unsafe().register(this , promise)
方法获取 AbstractChannel 的 unsafe() 底层操作对象,然后调用 AbstractUnsafe 的register()方法- 在 AbstractUnsafe 的register()方法中,调用register0() 方法注册Channel对象
- 在 AbstractUnsafe 的 register0() 方法中 , 调用 AbstractNioChannel 的doRegister()方法
- AbstractNioChannel 的 doRegister() 方法通过 javaChannel().register(eventLoop().selector , 0 , this) 将Channel 对应的 Java NIO 的 SocketChannel 注册到一个 EventLoop 的Selector中,并且将当前Channel作为Attachment与SocketChannel关联
总的来说,Channel的注册过程所做的工作就是将Channel与对应的EventLoop进行关联。因此,在Netty中,每个Channel都会关联一个特定的EventLoop,并且这个Channel中的所有I/O操作都是在这个EventLoop中执行的;当关联好Channel和EventLoop后,会继续调用底层Java NIO的SocketChannel对象的register()方法,将底层Java NIO的SocketChannel注册到指定的Selector中。通过这两步,就完成了Netty对Channel的注册过程
sequenceDiagram Bootstrap ->> Channel : 1.connetc() 通过Factory创建一个Channel Bootstrap ->> EventLoopGroup : 2.group() 通过配置得到 EventLoopGroup EventLoopGroup ->> EventLoop : 3.next() 得到EventLoop EventLoop ->> SingleThreadEventLoop : 4 register(Channel) SingleThreadEventLoop ->> AbstractNioUnsafe : 5. doRegister() 通过Channel获得Unsafe AbstractNioUnsafe --> AbstractNioUnsafe : 6. 获取EventLoop 对应的Selector AbstractNioUnsafe --> AbstractNioUnsafe : 7 javaChannel().register(selector)
Handler的添加过程
Netty有一个强大和灵活之处就是基于Pipeline的自定义Handler机制。基于此,我们可以像添加插件一样自由组合各种各样的Handler来完成业务逻辑。 例如我们需要处理 HTTP 数据 , 那么就可以在Pipeline前添加一个针对HTTP编解码的Handler,然后添加我们自己的业务逻辑的Handler,这样网络上的数据流就像通过一个管道一样,从不同的Handler中流过并进行编解码,最终到达我们自定义的Handler中。
在Bootstrap引导类中就有关于Handler的配置,
handler(ChannelHandler handler)
方法,可供用户自定义将 Handler 添加到 ChannelPipeline 中, 调用时机在 Channel 被注册到 EventLoop 后添加 ChannelHandler
调用时机
1
2
3
4
5
6
7
8
9
10
private void register0(ChannelPromise promise) {
try {
....
// 注册Channel到EventLoop
doRegister();
....
// 如果在注册前有处理器添加,还没进行HandlerAdded回调,注册成功后要回调,这里会添加Bootstrap配置的ChannelHandler
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
初始化
1 |
|
这个代码片段就实现了Handler的添加功能。我们看到,Bootstrap的handler()方法接收一个 ChannelHandler,而我们传入的参数是一个派生于抽象类 ChannelInitializer 的匿名类,它也实现了 ChannelHandler 接口。我们来看 ChannelInitializer 类中到底有什么玄机,代码如下
1 |
|
ChannelInitializer 是一个抽象类 , 它有一个抽象的
initChannel()
方法,我们看到的匿名类正是实现了这个方法,并在这个方法中添加了自定义的Handler。那么initChannel()
方法是在哪里被调用的呢?其实是在 ChannelInitializer 的channelRegistered()
方法中 接下来关注一下
channelRegistered()
方法。我们从上面的代码中可以看到,在channelRegistered()
方法中,会调用initChannel()
方法 , 将自定 义的Handler
添加到 ChannelPipeline 中 , 然后调用ctx.pipeline().remove(this)
方法将自己从 ChannelPipeline 中删除 一开始, ChannelPipeline 中只有三个Handler,分别是Head、Tail和我们添加的 ChannelInitializer
接着调用initChannel()方法,添加自定义的Handler,如下图所示
最后将 ChannelInitializer 删掉,它的作用只是在 Channel 进行注册的时候对自定义的 ChannelHandler 添加到 ChannelPipeline
sequenceDiagram AbstractUnsafe ->> AbstractUnsafe : register0() 注册Channel AbstractUnsafe ->> ChannelInitializer : pipeline.invokeHandlerAddedIfNeeded() 添加Handler ..-> handlerAdded() ChannelInitializer ->> 子类实现 : 调用 initChannel方法实现 子类实现 ->> ChannelPipeline : 添加 Handler ChannelInitializer ->> ChannelInitializer : removeState(ctx) 方法,将自身在ChannelPipeline中删除
客户端发起连接请求
客户端通过调用 Bootstrap 的
connect()
方法进行连接 。 在connect()
方法中进行一些参数检查,并调用doConnect()
方法,其代码实现如下
1 |
|
在
doConnect()
方法中 , eventLoop 线程会调用Channel
的connect()
法 , 而这个 Channel 的具体类型实际就是NioSocketChannel,前面已经分析过。继续跟踪channel.connect()
方法,我们发现它调用的是 DefaultChannelPipeline 的connect()
方法,Pipeline的connect()方法的代码如下
1 |
|
我们已经分析过,Tail 是一个 TailContext 的实例,而 TailContext 又是 AbstractChannelHandlerContext 的子类,并且没有实现
connect()
方法,因此这里调用的其实是AbstractChannelHandlerContext的connect()
方法,我们看一下这个方法的实现代码
1 |
|
上面代码片段中有一句非常关键的代码,即 final AbstractChannelHandlerContextnext=findContextOutbound(),这里调用
findContextOutbound()
方法,从 DefaultChannelPipeline 内的双向链表的Tail开始,不断向前找到第一个处理Outbound的 AbstractChannelHandlerContext,然后调用它的invokeConnect()方法,代码如下
1 |
|
1 |
|
在 DefaultChannelPipeline 的构造器中,实例化了两个对象:Head 和 Tail,并形成了双向链表的头和尾。Head 是 HeadContext 的实例,它实现了ChannelOutboundHandler 接口。因此在
findContextOutbound()
方法中,找到的 AbstractChannelHandlerContext 对象其实就是Head , 进而在invokeConnect()
方法中,我们向上转换为ChannelOutboundHandler
就没问题了。而又因为 HeadContext 重写了connect()方法
,所以实际上调用的是HeadContext 的 connect() 方法
。 接着跟踪HeadContext 的connect()方法
1 |
|
这个
connect()
方法很简单,只是调用了Unsafe的connect()方法。回顾一下 HeadContext 的构造器,我们发现这个Unsafe其实就是pipeline.channel().unsafe() 返回的Channel
的Unsafe
属性。到这里为止,我们应该已经知道,其实是 AbstractNioByteChannel.NioByteUnsafe 内部类转了一大圈。最后,我们找到创建 Socket 连接的关键代码继续跟踪,其实调用的就是AbstractNioUnsafe的connect()方法
1 |
|
在这个connect()方法中,又调用了doConnect()方法。注意:这个方法并不是AbstractNioUnsafe的方法,而是AbstractNioChannel的抽象方法。doConnect()方法是在NioSocketChannel中实现的,因此进入NioSocketChannel的doConnect()方法,代码如下
1 |
|
上面代码的功能是,首先获取Java NIO的SocketChannel,然后获取NioSocketChannel的newSocket()方法返回的SocketChannel对象;再调SocketChannel的 connect() 方法完成Java NIO底层的Socket连接
服务端
BossGroup和WorkerGroup
- Netty中的
BossGroup
和WorkerGroup
的实际类型是NioEventLoopGroup
,通过类图能够发现是通过JUC线程池接口扩展而来的- 线程池线程的个数(NioEventLoop个数)如果在构造函数不指定的话,默认是CPU核心数的2倍
- NioEventLoopGroup 就是 NioEventLoop 组,负责管理 NioEventLoop,当有 Channel 需要注册的时候,NioEventLoopGroup 会轮询找到下一个 NioEventLoop 注册上去。在NioEventLoopGroup 上作出的配置最终都会作用到 NioEventLoop 上
源码解析
1 |
|
先看启动类:main 方法中,首先创建了关于 SSL 的配置类
重点分析下 创建了两个 EventLoopGroup 对象:
1
2
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
- 这两个对象是整个 Netty 的核心对象,可以说,整个 Netty 的运作都依赖于他们。bossGroup 用于接受Tcp 请求,他会将请求交给 workerGroup ,workerGroup 会获取到真正的连接,然后和连接进行通信,比如读写解码编码等操作
- EventLoopGroup 是事件循环组(线程组) 含有多个 EventLoop,可以注册 channel ,用于在事件循环中去进行选择(和选择器相关)
在
NioEventLoopGroup
的父类MultithreadEventExecutorGroup
类中包含属性private final EventExecutor[] children
,NioEventLoop
类是接口EventExecutor
的实现类之一
- new NioEventLoopGroup(1); 这个 1 表示 bossGroup 事件组有 1 个线程你可以指定,如果 new NioEventLoopGroup() 会含有默认个线程 cpu 核数 * 2, 即可以充分的利用多核的优势
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);
private static final int DEFAULT_EVENT_LOOP_THREADS;
/**
* NettyRuntime.availableProcessors() 会拿到当前计算机的Cpu的核心数
* 最终结果是 Cpu核心数 * 2
*/
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
/**
* 跟踪 new NioEventLoopGroup() 源码
* 在类 MultithreadEventLoopGroup 源码的构造方法中,如果传入的 nThreads 为0 (new EventLoopGroup() 构造函数传入)
* 则使用默认值 DEFAULT_EVENT_LOOP_THREADS
* @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
*/
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
...
}
- NioEventLoopGroup初始化过程
1 |
|
ServerBootstrap
Bootstrap 意思是引导,一个 Netty 应用通常由一个 Bootstrap 开始 ,主要作用是配置整个 Netty 程序,串联各个组件, ServerBootstrap是服务端启动引导类
常见的方法有:
方法 说明 public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) 该方法用于服务器端,用来设置两个 EventLoopGroup public B channel(Class<? extends C> channelClass) 该方法用来设置一个服务器端的通道实现 public ChannelFuture bind(int inetPort) 该方法用于服务器端,用来设置占用的端口号 public B option(ChannelOption option, T value) 用来给 ServerChannel 添加配置 public B handler(ChannelHandler handler) 用于给bossGroup设置业务处理类 public B group(EventLoopGroup group) 该方法用于客户端,用来设置一个 EventLoopGroup public ChannelFuture connect(String inetHost, int inetPort) 该方法用于客户端,用来连接服务器端 public ServerBootstrap childOption(ChannelOption childOption, T value) 用来给接收到的通道添加配置 public ServerBootstrap childHandler(ChannelHandler childHandler) 该方法用来设置业务处理类 (workerGroup 自定义的 handler)
- 服务端的启动类,扩展于NIO的
Channel
接口- 用于Netty的启动配置,如通道类型,Handler等配置
BossGroup
的配置在ServerBootstrap
实例中,WorkerGroup
的配置信息在AbstractBootstrap
中
源码解析
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(5);
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
// 指定channel类型,通过反射创建
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
- 变量 b 调用了 group 方法将两个 group 放入了自己的字段中,用于后期引导使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class);
// The order in which child ChannelOptions are applied is important they may depend on each other for validation
// purposes.
// Channel 的配置属性项
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
// AttributeMap 的key, AttributeMap 是 Channel 接口的顶级接口,每个Channel 必定是个 AttributeMap
private final Map<AttributeKey<?>, Object> childAttrs = new ConcurrentHashMap<AttributeKey<?>, Object>();
// ServerBootstrap 配置类,主要提供获取 BossGroup 和 WorkerGroup配置能力
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
// WorkerGroup
private volatile EventLoopGroup childGroup;
// WorkerGroup 对应的handler, 用于处理 initChannel 事件。当Channel在注册WorkerGroup时触发
private volatile ChannelHandler childHandler;
...
/**
* 设置BossGroup 和 WorkerGroup
* Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These
* {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and
* {@link Channel}'s.
*/
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
//调用父类方法 BossGroup 的相关属性都放在父类 AbstractBootstrap里
super.group(parentGroup);
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
// 将WorkerGroup赋值到当前对象 WorkerGroup相关配置都在 ServerBootstrap
this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
return this;
}
...
}.channel(NioServerSocketChannel.class) 用于指定所使用的Channel类型,并会创建一个Channel工厂用于反射生成Channel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 设置服务端Channel类型,返回ReflectiveChannelFactory 它是一个反射工厂, 通过反射创建Channel对象
* The {@link Class} which is used to create {@link Channel} instances from.
* You either use this or {@link #channelFactory(io.netty.channel.ChannelFactory)} if your
* {@link Channel} implementation has no no-args constructor.
*/
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
/**
* 设置Channel工厂
* @deprecated Use {@link #channelFactory(io.netty.channel.ChannelFactory)} instead.
*/
@Deprecated
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
ObjectUtil.checkNotNull(channelFactory, "channelFactory");
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return self();
}ReflectiveChannelFactory 是如何通过反射创建Channel的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* 反射Channel工厂
* A {@link ChannelFactory} that instantiates a new {@link Channel} by invoking its default constructor reflectively.
*/
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
// Channel 的构造函数
private final Constructor<? extends T> constructor;
/**
* 构造函数,根据传递进来的Channel类型获取其无参构造函数
*/
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
" does not have a public non-arg constructor", e);
}
}
/**
* 通过无参构造函数调用Java反射Api生成一个新的Channel,在有客户端建立新的连接时使用
* @return
*/
@Override
public T newChannel() {
try {
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
...
}
- .option(ChannelOption.SO_BACKLOG, 100) 设置BossGroup配置属性项
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 设置BossGroup 的配置属性项
* Allow to specify a {@link ChannelOption} which is used for the {@link Channel} instances once they got
* created. Use a value of {@code null} to remove a previous set {@link ChannelOption}.
*/
public <T> B option(ChannelOption<T> option, T value) {
ObjectUtil.checkNotNull(option, "option");
synchronized (options) {
// options 是个map
if (value == null) {
options.remove(option);
} else {
options.put(option, value);
}
}
return self();
}
- .handler(…) .childHandler(…)
分别为AbstractBootstrap#handler 和 ServerBootstrap#childHandler 赋值
- ServerBootstrap的bind是最终启动服务的方法
- 它首先会通过反射创建一个指定类型的
ServerSocketChannel
- 然后根据ServerBootStrap设置的配置去初始化
ServerSocketChannel
- 通过
ServerSocketChannel
获得DefaultChannelPipeline
,在图中能看到Channel
和Pipeline
是相互包含关系,在创建Channel的同时就会创建Pipeline- 获取ServerBootStrap设置相关的child配置(WorkerGroup的配置)
- 创建一个异步任务,为
Pipeline
添加一个handlerServerBootstrapAcceptor
ServerBootstrapAcceptor
处理客户端连接的Accect
事件,获得SocketChannel
后就会获取child配置初始化到Channel,然后注册到WorkerGroup
SocketChannel
注册到WorkerGroup
的EventLoop
中,默认采用的是轮询算法进行注册,注册完成之后就监听读事件,等待客户端请求
NioEventLoop
- NioEventLoop 就是一个事件循环类,几乎所有事件处理都会经过这个类
- 和NioEventLoopGroup一样都是扩展于JUC包下的线程池接口,不同的是事件循环是一个单线程的线程池
- 根据类图可知每个子线程都有自己的Selector (NioEventLoop#selector) 和TaskQueue (SingleThreadEventExecutor#taskQueue)
源码剖析
Channel
- 每个NioChannel只会绑定一个EventLoop
- Netty 网络通信的组件,能够用于执行网络 I/O 操作
- 通过Channel 可获得当前网络连接的通道的状态
- 通过Channel 可获得网络连接的配置参数 (例如接收缓冲区大小 )
- Channel 提供异步的网络 I/O 操作(如建立连接,读写,绑定端口),异步调用意味着任何 I/O 调用都将立即返回,并且不保证在调用结束时所请求的 I/O 操作已完成
- 调用立即返回一个 ChannelFuture 实例,通过注册监听器到 ChannelFuture 上,可以 I/O 操作成功、失败或取消时回调通知调用方 支持关联 I/O 操作与对应的处理程序
- 不同协议、不同的阻塞类型的连接都有不同的 Channel 类型与之对应
常用的 Channel 类型
通道类型 说明 NioSocketChannel 异步的客户端 TCP Socket 连接 NioServerSocketChannel 异步的服务器端 TCP Socket 连接 NioDatagramChannel 异步的 UDP 连接 NioSctpChannel 异步的客户端 Sctp 连接 NioSctpServerChannel 异步的 Sctp 服务器端连接,这些通道涵盖了 UDP 和 TCP 网络 IO 以及文件 IO OioSocketChannel 同步的客户端 TCP Socket 连接 OioServerSocketChannel 同步的服务器端 TCP Socket 连接 OioDatagramChannel 同步的 UDP 连接 OioSctpChannel 同步的 Sctp 服务器端连接 OioSctpServerChannel 同步的客户端 TCP Socket 连接
ChannelOption
异步模型
- 异步的概念和同步是相对的。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态,通知和回调来通知调用者
- Netty中的 I/O 操作是异步的,包括 Bind, Write, Connect等操作会简单地返回一个ChannelFuture
- 调用者并不能立即获得结果,而是通过 Future-Listener 机制,用户可以方便主动获取或通过通知机制获得 I/O 操作结果
- Netty的异步模型是建立在 Future和Callback之上的(JUC的异步任务)。Callback就是回调。重点说 Future,它的核心思想是:假设一个方法 fun, 计算过程可能非常耗时,等待fun返回显然不太合适。那么可以在调用fun的时候,立马返回一个 Future, 后续可以通过Future去监控方法fun的处理过程(Future-Listener机制)
Future-Listener 机制
- Future 表示异步的执行结果,可以通过它提供的方法来检测执行是否完成,比如检索计算等等
- 在使用Netty进行编程时,拦截操作和转换出入站数据只需要提供 callback 或利用 future 即可,这使得链式操作简单,高效,并有利编写可重用的,通用的代码
- Netty框架的目标就是让你的业务代码逻辑从网络编程中分离出来,解脱出来
ChannelFuture
- ChannelFuture是一个接口,我们可以添加监听器,当监听的事件发生时,就会通知到监听器
常用方法:
- Channel channel(),返回当前正在进行 IO 操作的通道
- ChannelFuture sync(),等待异步操作执行完毕
- ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> var1) 注册监听器
- 顶层的
Future
是JUC
的接口,第二个是Netty包下的接口
- Future-Listener 机制当 Future 对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture 来获取操作执行的状态,注册监听函数来执行完成后的操作
- 常见有如下操作
- 通过 isDone 方法来判断当前操作是否完成
- 通过 isSuccess 方法来判断已完成的当前操作是否成功;
- 通过 getCause 方法来获取已完成的当前操作失败的原因;
- 通过 isCancelled 方法来判断已完成的当前操作是否被取消;
- 通过 addListener 方法来注册监听器,当操作已完成(isDone 方法返回完成),将会通知指定的监听器;如果 Future 对象已完成,则通知指定的监听器
给一个 ChannelFuture 注册监听器,来监控我们关心的事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
//启动服务器(并绑定端口),bind是一个异步操作
ChannelFuture channelFuture = bootstrap.bind(PORT).sync();
//给channelFuture 注册监听器,监控我们关心的事件
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if(future.isSuccess()) {
System.out.println("监听端口 6666 成功");
} else {
System.out.println("监听端口 6666 失败");
}
}
});
Selector
- 参考 NIO Selector
- Netty 基于 Selector 对象实现 I/O 多路复用,通过 Selector 一个线程可以监听多个连接的 Channel 事件
- 当向一个 Selector 中注册 Channel 后,Selector 内部的机制就可以自动不断地查询 (Select) 这些注册的 Channel 是否有已就绪的 I/O 事件(例如可读,可写,网络连接 完成等),这样程序就可以很简单地使用一个线程高效地管理多个 Channel
ChannelHandler
- ChannelHandler是一个接口,处理 I/O 事件或拦截 I/O 操作,并将其转发到ChannelPipeline(业务处理链)中的下一个处理程序, Handler是Netty业务处理的重要体系
- ChannelHandler本身并没有提供很多方法,因为这个接口有许多方法需要实现,方便使用期间,可以继承它的子类
- ChannelInboundHandler 用于处理入站 I/O 事件
- ChannelOutboundHandler用于处理出站 I/O 事件
- 适配器
- ChannelInboundHandlerAdapter 用于处理入站 I/O 事件
- ChannelOutboundHandlerAdapter 用于处理出站 I/O 事件
- ChannelDuplexHandler 用于处理出站和入站事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class HttpServerInitialize extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 向管道加入处理器
// 得到管道
final ChannelPipeline pipeline = socketChannel.pipeline();
// 加入一个Netty提供的 HttpServerCodec (http协议编解码器) codec => decoder
pipeline.addLast("MyHttpServerCodec", new HttpServerCodec());
// 增加一个针对http协议的handler
pipeline.addLast("MyHttpServerHandler", new HttpServerHandler());
}
}Handler常用的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public interface ChannelInboundHandler extends ChannelHandler {
/**
* Channel注册到EventLoop的时候,调用
*/
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
/**
* Channel从EventLoop注销的时候,调用
*/
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
/**
* Channel活跃的时候,调用
*/
void channelActive(ChannelHandlerContext ctx) throws Exception;
/**
* Channel不活跃的时候,调用,此时生命周期马上结束
*/
void channelInactive(ChannelHandlerContext ctx) throws Exception;
/**
* Channel读取到消息的时候调用
*/
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
/**
* 抛出异常时调用
*/
@Override
@SuppressWarnings("deprecation")
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
* 发生IO出站事件的时候,方法会得到通知调用
*/
public interface ChannelOutboundHandler extends ChannelHandler {
/**
* Called once a bind operation is made.
* 绑定操作被执行的时候调用
*/
void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
/**
* Called once a connect operation is made.
* 连接操作执行的时候调用
*/
void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception;
/**
* Called once a disconnect operation is made.
* 断开连接的时候调用
*/
void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
/**
* Called once a close operation is made.
* 关闭的时候调用
*/
void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
/**
* Called once a deregister operation is made from the current registered {@link EventLoop}.
* 注销的时候调用
*/
void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
/**
* Intercepts {@link ChannelHandlerContext#read()}.
* 拦截ChannelHandlerContext#read()方法
*/
void read(ChannelHandlerContext ctx) throws Exception;
/**
* Called once a write operation is made. The write operation will write the messages through the
* {@link ChannelPipeline}. Those are then ready to be flushed to the actual {@link Channel} once
* {@link Channel#flush()} is called
* 写操作时候调用,写的消息会经过ChannelPipeline,调用Channel#flush()的时候,消息会被flush到Channel
*/
void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
/**
* Called once a flush operation is made. The flush operation will try to flush out all previous written messages that are pending.
* flush会将前面前部pending的消息flush到Channel
*/
void flush(ChannelHandlerContext ctx) throws Exception;
}适配器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public abstract class ChannelHandlerAdapter implements ChannelHandler {
boolean added;
public ChannelHandlerAdapter() {
}
protected void ensureNotSharable() {
if (this.isSharable()) {
throw new IllegalStateException("ChannelHandler " + this.getClass().getName() + " is not allowed to be shared");
}
}
// 判断当前hanlder是否是可共享(在多个pipeline中)
public boolean isSharable() {
Class<?> clazz = this.getClass();
Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
Boolean sharable = (Boolean)cache.get(clazz);
if (sharable == null) {
sharable = clazz.isAnnotationPresent(Sharable.class);
cache.put(clazz, sharable);
}
return sharable;
}
// 在ChannelHandler被添加到实际上下文中并准备好处理事件后调
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
}
// 在ChannelHandler从实际上下文中移除后调用,表明它不再处理事件
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
}
// 在抛出Throwable类后调用
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
// 通道注册事件
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}
// 通道注销事件
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
}
// 通道就绪事件
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}
// 通道读取数据事件
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
// 通道读取数据完毕事件
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
}
// 通道发生异常事件
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public interface ChannelOutboundHandler extends ChannelHandler {
void bind(ChannelHandlerContext var1, SocketAddress var2, ChannelPromise var3) throws Exception;
/**
* 当NioServerSocketChannel创建、初始化、注册到EventLoopGroup完成后,接下来就进行绑定,与本地
* 端口进行绑定以便接收数据,绑定的工作通过代码分析发现最后调用的是 AbstractBootstrap#doBind0方法
* @throws Exception
*/
void connect(ChannelHandlerContext var1, SocketAddress var2, SocketAddress var3, ChannelPromise var4) throws Exception;
// 当请求将 Channel 连接到远程节点时被调用
void disconnect(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
// 当请求关闭 Channel 时被调用
void close(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
// 当请求将 Channel 从远程节点断开时被调用
void deregister(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;
// 当请求从 Channel 读取更多的数据时被调用
void read(ChannelHandlerContext var1) throws Exception;
// 当请求通过 Channel 将数据写到远程节点时被调用
void write(ChannelHandlerContext var1, Object var2, ChannelPromise var3) throws Exception;
// 当请求通过 Channel 将入队数据冲刷到远程节点时被调
void flush(ChannelHandlerContext var1) throws Exception;
}
出站和入站机制
ChannelPipeline
1
2
3
4
5
6
7
8
9
10
...
childHandler(new ChannelInitializer<SocketChannel>() { //创建一个通道初始化对象(匿名对象)
//给pipeline 设置处理器,可以设置多个,是个双向链表
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
System.out.println("client socketChannel hashCode = " + socketChannel.hashCode());
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});
- ChannelPipeline 是一个 Handler 的集合(双向链表),它负责处理和拦截 inbound(入站) 或者 outbound(出栈) 的事件和操作,相当于一个贯穿 Netty 的链。(也可以这样理解: ChannelPipeline 是 保存 ChannelHandler 的 List,用于处理或拦截 Channel 的入站 和出站 事件 / 操作)
- ChannelPipeline 实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及 Channel 中各个的 ChannelHandler 如何相互交互
- 在 Netty 中每个 Channel 都有且仅有一个 ChannelPipeline 与之对应它们的组成关系如下
入站和出站
- 入站:从
ChannelHandlerContext
链表的head
调用到tail
- 出站:从
ChannelHandlerContext
链表的tail
调用到head
说明:
- 一个 Channel 包含了一个 ChannelPipeline,而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表,并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler
- 入站事件和出站事件在一个双向链表中,入站事件会从链表 head 往后传递到最后一个入站的 handler, 出站事件会从链表 tail 往前传递到最前一个出站的 handler,两种类型的 handler 互不干扰
ChannelHandlerContext
- 数据结构:双向链表,
AbstractChannelHandlerContext
包含next
prev
- 包含
- Channel
- ChannelHandler:ChannelHandlerContext和ChannelHandler是一一对应关系
- ChannelPipeline
- 入站:从
ChannelHandlerContext
链表的head
调用到tail
- 出站:从
ChannelHandlerContext
链表的tail
调用到head
Buf
- Netty 重新实现了体系,实现了读写操作不再需要
flip
操作- 实现了同时读写功能
- capacity: buf容量
- readerIndex:readerIndex -> writerIndex 之间是可读性范围
- writerIndex: writerIndex -> capacity 之间是可写范围
- Buf实现了自动扩容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Slf4j
public class BufTest {
public static void main(String[] args) {
// 非池化的Buf
ByteBuf buf = Unpooled.buffer(10);
for (int i = 0; i < 100; i++) {
// 能够自动扩容
buf.writeByte(i);
}
for (int i = 0; i < 5; i++) {
log.info("{}", buf.readByte());
}
log.info("类型:{}", buf.getClass());
log.info("capacity = {}", buf.capacity());
log.info("readIndex = {}", buf.readerIndex());
log.info("writeIndex = {}", buf.writerIndex());
log.info("可读取的 = {}", buf.readableBytes());
// 读取一部分 buf.getCharSequence(6, 2, Charset.forName("utf-8"));
}
}
02:51:58.787 [main] INFO com.wgf.netty.BufTest - 0
02:51:58.790 [main] INFO com.wgf.netty.BufTest - 1
02:51:58.791 [main] INFO com.wgf.netty.BufTest - 2
02:51:58.791 [main] INFO com.wgf.netty.BufTest - 3
02:51:58.791 [main] INFO com.wgf.netty.BufTest - 4
02:51:58.791 [main] INFO com.wgf.netty.BufTest - 类型:class io.netty.buffer.UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf
02:51:58.791 [main] INFO com.wgf.netty.BufTest - capacity = 128
02:51:58.791 [main] INFO com.wgf.netty.BufTest - readIndex = 5
02:51:58.791 [main] INFO com.wgf.netty.BufTest - writeIndex = 100
02:51:58.791 [main] INFO com.wgf.netty.BufTest - 可读取的 = 95
ByteBuf分类
Netty使用ByteBuf对象作为数据容器,进行I/O读写操作,Netty的内存管理也是围绕着ByteBuf对象高效地分配和释放
当讨论ByteBuf对象管理,主要从以下方面进行分类
Pooled 和 Unpooled
- 池化内存分配时基于预分配的一整块大内存,取其中的部分封装成ByteBuf提供使用,用完后回收到内存池中
- 非池化内存每次分配时直接调用系统 API 向操作系统申请ByteBuf需要的同样大小内存,用完后通过系统调用进行释放Pooled
- Netty4默认使用Pooled的方式,可通过参数-Dio.netty.allocator.type=unpooled或pooled进行设置
Heap 和 Direct
- Heap,指ByteBuf关联的内存JVM堆内分配,分配的内存受GC 管理
- Direct,指ByteBuf关联的内存在JVM堆外分配,分配的内存不受GC管理,需要通过系统调用实现申请和释放,底层基于Java NIO的DirectByteBuffer对象
HTTP服务
使用Netty实现Http协议编解码,支持浏览器访问
熟悉Netty Http开发
服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class HttpServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap()
.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new HttpServerInitialize()); // 自定义 handler
final ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
- HttpServerInitialize
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class HttpServerInitialize extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 向管道加入处理器
// 得到管道
final ChannelPipeline pipeline = socketChannel.pipeline();
// 加入一个Netty提供的 HttpServerCodec (http协议编解码器) codec => decoder
pipeline.addLast("MyHttpServerCodec", new HttpServerCodec());
// 增加一个针对http协议的handler
pipeline.addLast("MyHttpServerHandler", new HttpServerHandler());
}
}
- HttpServerHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 1. SimpleChannelInboundHandler 是 ChannelInboundHandlerAdapter 的子类
* 2. HttpObject 客户端和服务端相互通讯的数据对象
*/
@Slf4j
public class HttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception {
// 判断 msg 是否为 http 请求
if (httpObject instanceof HttpRequest) {
log.info("httpObject 类型:{}", httpObject.getClass());
log.info("客户端地址:{}", channelHandlerContext.channel().remoteAddress());
// 回复客户端消息, 封装 httpResponse
ByteBuf byteBuf = Unpooled.copiedBuffer("我是服务器", CharsetUtil.UTF_8);
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteBuf);
// 设置响应头
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json")
.set(HttpHeaderNames.CONTENT_LENGTH, byteBuf.readableBytes());
// 将构建好的response 返回
channelHandlerContext.channel().writeAndFlush(response);
}
}
}
1
2
3
4
5
6
浏览器访问 http://localhost:8080/
服务端输出:
09:10:11.402 [nioEventLoopGroup-3-1] INFO com.wgf.http.HttpServerHandler - httpObject 类型:class io.netty.handler.codec.http.DefaultHttpRequest
09:10:11.402 [nioEventLoopGroup-3-1] INFO com.wgf.http.HttpServerHandler - 客户端地址:/0:0:0:0:0:0:0:1:49999
09:10:11.419 [nioEventLoopGroup-3-1] INFO com.wgf.http.HttpServerHandler - httpObject 类型:class io.netty.handler.codec.http.DefaultHttpRequest
09:10:11.419 [nioEventLoopGroup-3-1] INFO com.wgf.http.HttpServerHandler - 客户端地址:/0:0:0:0:0:0:0:1:49999可以看到服务端连续输出两次有BUG, 打印请求信息,优化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@Slf4j
public class HttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception {
// 判断 msg 是否为 http 请求
if (httpObject instanceof HttpRequest) {
log.info("httpObject 类型:{}", httpObject.getClass());
log.info("客户端地址:{}", channelHandlerContext.channel().remoteAddress());
// 打印请求链接
// /
// /favicon.ico 请求网络图标
HttpRequest httpRequest = (HttpRequest) httpObject;
URI uri = new URI(httpRequest.uri());
log.info(uri.getPath());
// 过滤http请求
if ("/favicon.ico".equals(uri.getPath())) {
log.info("过滤图标请求");
return;
}
// 打印每次请求信息
// http 属于短链接,浏览器每次刷新都会创建一个新的 channel -> pipeline -> handler,
// 所以 channel -> pipeline -> handler 三者是连接独享关系
log.info("channel class: {}, hashCode: {}", channelHandlerContext.channel().getClass(),
channelHandlerContext.channel().hashCode());
log.info("pipeline class: {}, pipeline: {}", channelHandlerContext.pipeline().getClass(),
channelHandlerContext.pipeline().hashCode());
log.info("handler class: {}, handler: {}", channelHandlerContext.handler().getClass(),
channelHandlerContext.handler().hashCode());
// 回复客户端消息, 封装 httpResponse
ByteBuf byteBuf = Unpooled.copiedBuffer("我是服务器", CharsetUtil.UTF_8);
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteBuf);
// 设置响应头
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json")
.set(HttpHeaderNames.CONTENT_LENGTH, byteBuf.readableBytes());
// 将构建好的response 返回
channelHandlerContext.channel().writeAndFlush(response);
}
}
}
刷新两次浏览器
服务端输出:
10:59:53.346 [nioEventLoopGroup-3-1] INFO com.wgf.http.HttpServerHandler - 客户端地址:/0:0:0:0:0:0:0:1:65232
10:59:53.346 [nioEventLoopGroup-3-1] INFO com.wgf.http.HttpServerHandler - /
10:59:53.347 [nioEventLoopGroup-3-1] INFO com.wgf.http.HttpServerHandler - channel class: class io.netty.channel.socket.nio.NioSocketChannel, hashCode: -817620801
10:59:53.347 [nioEventLoopGroup-3-1] INFO com.wgf.http.HttpServerHandler - pipeline class: class io.netty.channel.DefaultChannelPipeline, pipeline: 930529060
10:59:53.347 [nioEventLoopGroup-3-1] INFO com.wgf.http.HttpServerHandler - handler class: class com.wgf.http.HttpServerHandler, handler: 1652365183
10:59:53.366 [nioEventLoopGroup-3-1] INFO com.wgf.http.HttpServerHandler - httpObject 类型:class io.netty.handler.codec.http.DefaultHttpRequest
10:59:53.366 [nioEventLoopGroup-3-1] INFO com.wgf.http.HttpServerHandler - 客户端地址:/0:0:0:0:0:0:0:1:65232
10:59:53.366 [nioEventLoopGroup-3-1] INFO com.wgf.http.HttpServerHandler - /favicon.ico
10:59:53.366 [nioEventLoopGroup-3-1] INFO com.wgf.http.HttpServerHandler - 过滤图标请求
11:00:38.945 [nioEventLoopGroup-3-2] INFO com.wgf.http.HttpServerHandler - httpObject 类型:class io.netty.handler.codec.http.DefaultHttpRequest
11:00:38.945 [nioEventLoopGroup-3-2] INFO com.wgf.http.HttpServerHandler - 客户端地址:/0:0:0:0:0:0:0:1:65276
11:00:38.945 [nioEventLoopGroup-3-2] INFO com.wgf.http.HttpServerHandler - /
11:00:38.945 [nioEventLoopGroup-3-2] INFO com.wgf.http.HttpServerHandler - channel class: class io.netty.channel.socket.nio.NioSocketChannel, hashCode: 1632808762
11:00:38.945 [nioEventLoopGroup-3-2] INFO com.wgf.http.HttpServerHandler - pipeline class: class io.netty.channel.DefaultChannelPipeline, pipeline: 958372783
11:00:38.945 [nioEventLoopGroup-3-2] INFO com.wgf.http.HttpServerHandler - handler class: class com.wgf.http.HttpServerHandler, handler: 2082276072
11:00:38.962 [nioEventLoopGroup-3-2] INFO com.wgf.http.HttpServerHandler - httpObject 类型:class io.netty.handler.codec.http.DefaultHttpRequest
11:00:38.962 [nioEventLoopGroup-3-2] INFO com.wgf.http.HttpServerHandler - 客户端地址:/0:0:0:0:0:0:0:1:65276
11:00:38.962 [nioEventLoopGroup-3-2] INFO com.wgf.http.HttpServerHandler - /favicon.ico
11:00:38.962 [nioEventLoopGroup-3-2] INFO com.wgf.http.HttpServerHandler - 过滤图标请求结论:
两次请求是因为有个图标请求
/favicon.ico
http属于段连接,每次刷新都会建立一个新的连接
channel
,pipeline
,handler
三者都是连接独享,非共享的对象
多人聊天室
- 熟悉 ChannelHandler API使用
- 熟悉 ChannelGroup 使用
- 了解 Netty 心跳机制 IdleStateHandler(空闲状态处理器)
服务端源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class ChatServer {
private int port;
public ChatServer(int port) {
this.port = port;
}
public void run() {
// 创建bossGroup,workGroup
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup(8);
try {
// 创建启动器
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.handler(new LoggingHandler(LogLevel.INFO)) // 添加日志打印
.childHandler(new ServerHandler());
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
// 优雅停机
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
new ChatServer(9999).run();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class ServerHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 加入编解码器
ch.pipeline().addLast("decode", new StringDecoder())
.addLast("encode", new StringEncoder())
.addLast(new ServerChatHandler()); // 加入业务处理Handler
/**
* 说明:
* 1. IdleStateHandler 是 Netty 提供的 空闲状态处理器
* 2. 四个参数:
* readerIdleTime : 表示多久没有 读 事件后,就会发送一个心跳检测包,检测是否还是连接状态
* writerIdleTime : 表示多久没有 写 事件后,就会发送一个心跳检测包,检测是否还是连接状态
* allIdleTime : 表示多久时间既没读也没写 后,就会发送一个心跳检测包,检测是否还是连接状态
* TimeUnit : 时间单位
* 3. 当 Channel 一段时间内没有执行 读 / 写 / 读写 事件后,就会触发一个 IdleStateEvent 空闲状态事件
* 4. 当 IdleStateEvent 触发后,就会传递给 Pipeline 中的下一个 Handler 去处理,通过回调下一个 Handler 的 userEventTriggered 方法,在该方法中处理 IdleStateEvent
*/
ch.pipeline().addLast(new IdleStateHandler(20, 20, 40, TimeUnit.SECONDS))
.addLast(new ServerHeartbeatHandler()); // 添加一个Handler 处理空闲状态事件
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/**
* 消息处理
* SimpleChannelInboundHandler 是 ChannelInboundHandlerAdapter的子类
* 配合编码解码器,可以范型消息类型
*/
@Slf4j
public class ServerChatHandler extends SimpleChannelInboundHandler<String> {
/**
* 定义一个 Channel 线程组,管理所有的 Channel, channel断开连接会自动删除, 参数 执行器
* GlobalEventExecutor => 全局事件执行器
* INSTANCE => 表示是单例的
*/
private static final ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// 在ChannelHandler被添加到实际上下文中并准备好处理事件后调用
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// 提示聊天室所有人有新用户上线
Channel channel = ctx.channel();
Date date = new Date();
// 向整个ChannelGroup发送消息
channelGroup.writeAndFlush(String.format("%s [channel %s] 加入聊天", simpleDateFormat.format(new Date()), channel.remoteAddress()));
channelGroup.add(channel);
}
// 在ChannelHandler从实际上下文中移除后调用,表明它不再处理事件
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush(String.format("%s [channel %s] 离开聊天", simpleDateFormat.format(new Date()), channel.remoteAddress()));
// channelGroup.remove(channel); 不需要,handlerRemoved()直接删除了channel
}
// Channel不活跃的时候,调用,此时生命周期马上结束
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("{} [channel {}] 下线了", simpleDateFormat.format(new Date()), ctx.channel().remoteAddress());
}
// Channel活跃的时候,调用
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("{} [channel {}] 上线了", simpleDateFormat.format(new Date()), ctx.channel().remoteAddress());
}
// 读取事件
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
Channel channel = ctx.channel();
String date = simpleDateFormat.format(new Date());
// 读取客户端发送的数据,发送给其他人包括自己
channelGroup.forEach(ch -> {
// 判断是否为自己
if (ch.equals(channel)) {
ch.writeAndFlush(String.format("%s [自己]: %s\n", date, msg));
} else {
ch.writeAndFlush(String.format("%s [%s]: %s\n", date, channel.remoteAddress(), msg));
}
});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Slf4j
public class ServerHeartbeatHandler extends ChannelInboundHandlerAdapter {
// 用户事件触发,处理IdleStateHandler触发的用户事件
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 判断是否空闲状态事件
if (!(evt instanceof IdleStateEvent)) {
return;
}
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
String evtType = null;
switch (idleStateEvent.state()) {
case READER_IDLE:
evtType = "读空闲";
break;
case WRITER_IDLE:
evtType = "写空闲";
break;
case ALL_IDLE:
evtType = "读写空闲";
break;
}
Channel channel = ctx.channel();
log.info("[channel: {} {}]", channel.remoteAddress(), evtType);
}
}客户端源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class ChatClient {
private String host;
private int port;
public ChatClient(String host, int port) {
this.host = host;
this.port = port;
}
public void run() {
NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(nioEventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ClientHandler());
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
// 得到当前建立的Channel
Channel channel = channelFuture.channel();
// 扫描用户输入
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String msg = scanner.nextLine();
channel.writeAndFlush(msg);
}
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
nioEventLoopGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
new ChatClient("127.0.0.1", 9999).run();
}
}
1
2
3
4
5
6
7
8
public class ClientHandler extends ChannelInitializer {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast("decode", new StringDecoder())
.addLast("encode", new StringEncoder())
.addLast(new ClientChatHandler());
}
}
1
2
3
4
5
6
7
8
@Slf4j
public class ClientChatHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
// 直接输入服务端返回的消息
log.info(msg);
}
}
ChannelGroup
- Netty提供了
ChannelGroup
来用于保存Channel组
,ChannelGroup
是一个线程安全的Channel
集合,它提供一些对Channel
的批量操作。当一个TCP连接关闭后,对应的Channel会自动从ChannelGroup移除,所以不需要手动去移除关闭的Channel- 当有新的客户端连接到服务器,将对应的
Channel
加入到一个ChannelGroup
中,当发布者发布消息时,服务器可以将消息通过ChannelGroup
写入到所有客户端
心跳机制
什么是 IdleStateHandler ?
当连接的空闲时间(读或者写)太长时,将会触发一个 IdleStateEvent 事件。然后,你可以通过你的 ChannelInboundHandler 中重写 userEventTrigged 方法来处理该事件
如何使用?
IdleStateHandler 既是出站处理器也是入站处理器,继承了 ChannelDuplexHandler 。通常在 initChannel 方法中将 IdleStateHandler 添加到 pipeline 中。然后在自己的 handler 中重写 userEventTriggered 方法,当发生空闲事件(读或者写),就会触发这个方法,并传入具体事件
这时,你可以通过 Context 对象尝试向目标 Socekt 写入数据,并设置一个 监听器,如果发送失败就关闭 Socket (Netty 准备了一个
ChannelFutureListener.CLOSE_ON_FAILURE
监听器用来实现关闭 Socket 逻辑)说明:
IdleStateHandler 是 Netty 提供的 空闲状态处理器
四个参数:
- readerIdleTime : 表示多久没有 读 事件后,就会发送一个心跳检测包,检测是否还是连接状态
- writerIdleTime : 表示多久没有 写 事件后,就会发送一个心跳检测包,检测是否还是连接状态
- allIdleTime : 表示多久时间既没读也没写 后,就会发送一个心跳检测包,检测是否还是连接状态
- TimeUnit : 时间单位
当 Channel 一段时间内没有执行 读 / 写 / 读写 事件后,就会触发一个 IdleStateEvent 空闲状态事件
当 IdleStateEvent 触发后,就会传递给 Pipeline 中的下一个 Handler 去处理,通过回调下一个 Handler 的 userEventTriggered 方法,在该方法中处理 IdleStateEvent
WebSocket
实现基于webSocket的长连接 的全双工的交互
改变Http协议多次请求的约束,实现长连接了, 服务器可以发送消息 给浏览器
客户端浏览器和服务器端会相互感 知,比如服务器关闭了,浏览器会感知,同样浏览器关闭了,服务器会感知
服务端源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class WebServer {
public static void main(String[] args) throws InterruptedException {
//创建bossGroup,workGroup
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
//创建启动器
ServerBootstrap serverBootstrap = new ServerBootstrap();
//循环事件组
serverBootstrap.group(bossGroup, workGroup)//线程组
.channel(NioServerSocketChannel.class)//通道类型
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ServerHandler());
System.out.println("server is ok");
ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class ServerHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//基于http协议使用http的编码和解码器
pipeline.addLast(new HttpServerCodec());
/**
* 添加块处理器
* 在需要将数据从文件系统复制到用户内存中时,可以使用 ChunkedWriteHandler,
* 它支持异步写大型数据流,而又不会导致大量的内存消耗
* 每次只生成固定大小的数据块,防止客户端因为网速接收慢而导致服务端无限将数据写入内存
*/
pipeline.addLast(new ChunkedWriteHandler());
/*
说明:
1. 因为 HTTP 数据传输时是分段的,HttpObjectAggregator 可以将多个端聚合
2. 这就是为什么浏览器发送大量数据时,就会发出多次 HTTP 请求
*/
pipeline.addLast(new HttpObjectAggregator(8192));
/*
说明:
1. 对于 WebSocket 是以 帧(frame) 的形式传递的
2. 后面的参数表示 :请求的 URL
3. WebSocketServerProtocolHandler 将 HTTP 协议升级为 WebSocket 协议,即保持长连接
4. 切换协议通过一个状态码101
*/
pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
// 自定义的 Handler
pipeline.addLast(new TextWebSocketFrameHandler());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Slf4j
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
String message = msg.text();
log.info("服务器收到消息:{}", message);
// 回复消息
ctx.channel().writeAndFlush(new TextWebSocketFrame(String.format("%s 服务器收到消息:%s", LocalDateTime.now(), message)));
}
/**
* 客户端连接后,触发方法
* @param ctx
* @throws Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// longText 保证全局唯一
log.info("handlerAdded 被调用:{}", ctx.channel().id().asLongText());
// shortText 不保证全局唯一
log.info("handlerAdded 被调用:{}", ctx.channel().id().asShortText());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// longText 保证全局唯一
log.info("handlerRemoved 被调用:{}", ctx.channel().id().asLongText());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("发生异常", cause);
ctx.close();
}
}客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<form onsubmit="return false">
<textarea id="message" name="message" style="height: 300px; width: 300px"></textarea>
<input type="button" value="发送消息" onclick="send(this.form.message.value)">
<textarea id="responseText" style="height: 300px; width: 300px"></textarea>
<input type="button" value="清空内容" onclick="document.getElementById('responseText').value=''">
</form>
<script>
var socket;
// 判断当前浏览器是否支持 WebSocket
if (window.WebSocket){
socket = new WebSocket("ws://localhost:8080/hello");
// 相当于 channelRead0 方法,ev 收到服务器端回送的消息
socket.onmessage = function (ev){
var rt = document.getElementById("responseText");
rt.value = rt.value + "\n" + ev.data;
}
// 相当于连接开启,感知到连接开启
socket.onopen = function (){
var rt = document.getElementById("responseText");
rt.value = rt.value + "\n" + "连接开启……";
}
// 感知连接关闭
socket.onclose = function (){
var rt = document.getElementById("responseText");
rt.value = rt.value + "\n" + "连接关闭……";
}
}else {
alert("不支持 WebSocket");
}
// 发送消息到服务器
function send(message){
// 判断 WebSocket 是否创建好了
if (!window.socket){
return ;
}
// 判断 WebSocket 是否开启
if (socket.readyState == WebSocket.OPEN){
// 通过 Socket 发送消息
socket.send(message);
}else {
alert("连接未开启");
}
}
</script>
</body>
</html>
编码和解码
编码和解码的基本介绍
- 编写网络应用程序时,因为数据在网络中传输的都是二进制字节码数据,在发送数据时就需要编码,接收数据时就需要解码
- codec(编码器)的组成部分有两个:decoder(解码器)和encoder(编码器)。encoder负责把业务数据缓存字节码数据,decoder负责把字节码数据转换成业务数据
Netty本身的编码解码的机制和问题分析
- Netty提供的编码器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
ChannelHandlerAdapter (io.netty.channel)
ChannelOutboundHandlerAdapter (io.netty.channel)
MessageToMessageEncoder (io.netty.handler.codec)
ByteArrayEncoder (io.netty.handler.codec.bytes)
DatagramDnsQueryEncoder (io.netty.handler.codec.dns)
WebSocket08FrameEncoder (io.netty.handler.codec.http.websocketx)
WebSocket07FrameEncoder (io.netty.handler.codec.http.websocketx)
WebSocket13FrameEncoder (io.netty.handler.codec.http.websocketx)
DatagramDnsResponseEncoder (io.netty.handler.codec.dns)
SctpOutboundByteStreamHandler (io.netty.handler.codec.sctp)
SpdyHttpEncoder (io.netty.handler.codec.spdy)
WebSocketExtensionEncoder (io.netty.handler.codec.http.websocketx.extensions)
DeflateEncoder (io.netty.handler.codec.http.websocketx.extensions.compression)
RedisEncoder (io.netty.handler.codec.redis)
LineEncoder (io.netty.handler.codec.string)
ProtobufEncoderNano (io.netty.handler.codec.protobuf)
StringEncoder (io.netty.handler.codec.string)
SmtpRequestEncoder (io.netty.handler.codec.smtp)
WebSocket00FrameEncoder (io.netty.handler.codec.http.websocketx)
ProtobufEncoder (io.netty.handler.codec.protobuf)
MqttEncoder (io.netty.handler.codec.mqtt)
HttpObjectEncoder (io.netty.handler.codec.http)
HttpRequestEncoder (io.netty.handler.codec.http)
Encoder in HttpClientCodec (io.netty.handler.codec.http)
HttpResponseEncoder (io.netty.handler.codec.http)
HttpServerResponseEncoder in HttpServerCodec (io.netty.handler.codec.http)
RtspObjectEncoder (io.netty.handler.codec.rtsp)
RtspEncoder (io.netty.handler.codec.rtsp)
RtspRequestEncoder (io.netty.handler.codec.rtsp)
RtspResponseEncoder (io.netty.handler.codec.rtsp)
DatagramPacketEncoder (io.netty.handler.codec)
AbstractMemcacheObjectEncoder (io.netty.handler.codec.memcache)
AbstractBinaryMemcacheEncoder (io.netty.handler.codec.memcache.binary)
StompSubframeEncoder (io.netty.handler.codec.stomp)
LengthFieldPrepender (io.netty.handler.codec)
Base64Encoder (io.netty.handler.codec.base64)
...
- Netty提供的解码器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30ChannelHandlerAdapter (io.netty.channel)
ChannelInboundHandlerAdapter (io.netty.channel)
MessageToMessageDecoder (io.netty.handler.codec)
SctpInboundByteStreamHandler (io.netty.handler.codec.sctp)
SctpMessageCompletionHandler (io.netty.handler.codec.sctp)
Base64Decoder (io.netty.handler.codec.base64)
DatagramDnsQueryDecoder (io.netty.handler.codec.dns)
WebSocketExtensionDecoder (io.netty.handler.codec.http.websocketx.extensions)
DeflateDecoder (io.netty.handler.codec.http.websocketx.extensions.compression)
RedisArrayAggregator (io.netty.handler.codec.redis)
SpdyHttpDecoder (io.netty.handler.codec.spdy)
ProtobufDecoder (io.netty.handler.codec.protobuf)
HttpContentDecoder (io.netty.handler.codec.http)
HttpContentDecompressor (io.netty.handler.codec.http)
DatagramPacketDecoder (io.netty.handler.codec)
SctpMessageToMessageDecoder (io.netty.handler.codec.sctp)
StringDecoder (io.netty.handler.codec.string)
WebSocketProtocolHandler (io.netty.handler.codec.http.websocketx)
WebSocketClientProtocolHandler (io.netty.handler.codec.http.websocketx)
WebSocketServerProtocolHandler (io.netty.handler.codec.http.websocketx)
DatagramDnsResponseDecoder (io.netty.handler.codec.dns)
ByteArrayDecoder (io.netty.handler.codec.bytes)
MessageAggregator (io.netty.handler.codec)
WebSocketFrameAggregator (io.netty.handler.codec.http.websocketx)
AbstractMemcacheObjectAggregator (io.netty.handler.codec.memcache)
StompSubframeAggregator (io.netty.handler.codec.stomp)
RedisBulkStringAggregator (io.netty.handler.codec.redis)
HttpObjectAggregator (io.netty.handler.codec.http)
ProtobufDecoderNano (io.netty.handler.codec.protobuf)
...
- Netty本身自带的ObjectDecoder和ObjectEncoder可以用实现POJO对象或业务对象的编码和解码,底层使用的仍然是Java序列化技术,而Java序列化技术本身效率就不高,存在如下问题:
- 无法跨语言
- 序列化后体积太大,是二进制编码的5倍多
- 序列化后传输效率太低
- 序列化性能太低
- 解决方案:
Google ProtoBuf
ProtoBuf
1.ProtoBuf 简介
protobuf
(protocol buffer) 是谷歌内部的混合语言数据标准。通过将结构化的数据进行序列化(串行化),用于通讯协议、数据存储等领域的语言无关
、平台无关
、可扩展
的序列化结构数据格式2.为什么使用protobuf
我们知道数据在网络传输中是以二进制进行的,一般我们使用字节byte来表示, 一个byte是8bits,如果要在网络上中传输对象,一般需要将对象序列化,序列化的目的就是将对象转换成byte数组在网络中传输,当接收方接收到byte数组之后,再对byte数组进行反序列化,最终转换成java中的对象
java对象序列化 常见的方式
- 使用JDK自带的对象序列化,但是JDK自带的序列化本身存在一些问题,并且这种序列化手段只适合在java程序之间进行传输,如果是非java程序,比如PHP或者GO,那么序列化就不通用了
- 你还可以自定义序列化协议,这种方式的灵活程度比较高,但是不够通用,并且实现起来也比较复杂,很可能出现意想不到的问题
- 将数据转换成为XML或者JSON进行传输。XML和JSON的好处在于他们都有可以区分对象的起始符号,通过判断这些符号的位置就可以读取到完整的对象。但是不管是XML还是JSON的缺点都是转换成的数据比较大。在反序列化的时候对资源的消耗也比较多
所以我们需要一种新的序列化的方法,这就是protobuf,它是一种灵活、高效、自动化的解决方案
通过编写一个.proto的数据结构定义文件,然后调用protobuf的编译器,就会生成对应的类,该类以高效的二进制格式实现protobuf数据的自动编码和解析。 生成的类为定义文件中的数据字段提供了getter和setter方法,并提供了读写的处理细节。 重要的是,protobuf可以向前兼容,也就是说老的二进制代码也可以使用最新的协议进行读取
message 介绍
message
:protobuf
中定义一个消息类型是通过关键字message
字段指定的,这个关键字类似于C++/Java中的class关键字。使用protobuf编译器将proto
编译成Java代码之后,每个message
都会生成一个名字与之对应的Java类,该类公开继承自com.google.protobuf.Message
message 消息定义
Msg.proto 文件
1
2
3
4
5
6
7
8
syntax = "proto3"; // 协议版本
option java_outer_classname = "NettyMsg"; // 生成的外部类名,同时也是文件名
// proto 使用 message 管理数据, 会在 NettyMsg 下生成一个内部类
message Msg {
int32 id = 1; // 类似java对象定义属性,1 表示的是属性的序号,不是属性的值
string msg = 2;
}
example
添加 Maven 坐标
1
2
3
4
5
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.21.1</version>
</dependency>将
Msg.proto
文件拷贝到 Protobuf3 bin 目录下执行命令
protoc.exe --java_out=. Msg.proto
生成 NettyMsg.java服务端源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class NettyServer {
private final static int PORT = 6666;
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() { //创建一个通道初始化对象(匿名对象)
//给pipeline 设置处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
final ChannelPipeline pipeline = socketChannel.pipeline();
// 添加ProtoBuf解码器, 指定对哪种对象进行节码
pipeline.addLast(new ProtobufDecoder(NettyMsg.Msg.getDefaultInstance()));
pipeline.addLast(new NettyServerHandler());
}
});
System.out.println("server is ready");
ChannelFuture channelFuture = bootstrap.bind(PORT).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
1
2
3
4
5
6
7
8
9
@Slf4j
public class NettyServerHandler extends SimpleChannelInboundHandler<NettyMsg.Msg> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, NettyMsg.Msg msg) throws Exception {
log.info("消息id:{}", msg.getId());
log.info("消息:{}", msg.getMsg());
}
}客户端源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class NettyClient {
public static void main(String[] args) throws Exception {
// 客户端需要一个时间循环组
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
//创建客户端启动对象
//注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class) // 设置客户端通道的实现类(反射)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
// 设置ProtoBuf编码器
pipeline.addLast("encode", new ProtobufEncoder());
// 给pipeline 设置处理器
pipeline.addLast(new NettyClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", 6666).sync();
// 对关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} finally {
// 优雅停机
eventLoopGroup.shutdownGracefully();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
// 当通道准备就绪就会触发该方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 发送一个Msg对象到服务端
final NettyMsg.Msg msg = NettyMsg.Msg.newBuilder().setId(1).setMsg("hi! 服务端~").build();
ctx.writeAndFlush(msg);
}
}
ProtoBuf 传输多种类型
核心:
- 枚举DataType的定义
- oneof的使用
- 通过一个
message
管理多个message
MultiMsg.proto
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
syntax = "proto3"; // 指定协议版本
option optimize_for = SPEED; // 加快解析
option java_package = "com.wgf.protobuf"; // 指定生成在哪个包下
option java_outer_classname = "MultiMsg"; // 外部类名
// ProtoBuf 可以使用 message 管理其他的message
message MsgType {
// 定义一个枚举
enum DataType {
commonMsgType = 0; // 在 ProtoBuf 要求Enum的编号从0开始
customerMsgType = 1;
}
// 定义一个属性来标识传的是哪一个枚举类型
DataType data_type = 1;
// 表示枚举类型最多只能出现其中的一个,节省空间
oneof dataBody {
CommonMsg commonMsg = 2;
CustomerMsg customerMsg = 3;
}
}
message CommonMsg {
string msg = 1;
}
message CustomerMsg {
string msg = 1;
string customerAddress = 2;
}服务端源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class NettyServer {
private final static int PORT = 6666;
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() { //创建一个通道初始化对象(匿名对象)
//给pipeline 设置处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
final ChannelPipeline pipeline = socketChannel.pipeline();
// 添加ProtoBuf解码器, 指定对哪种对象进行节码, 多种类型使用一个message管理多个message
pipeline.addLast(new ProtobufDecoder(MultiMsg.MsgType.getDefaultInstance()));
pipeline.addLast(new NettyServerHandler());
}
});
System.out.println("server is ready");
ChannelFuture channelFuture = bootstrap.bind(PORT).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Slf4j
public class NettyServerHandler extends SimpleChannelInboundHandler<MultiMsg.MsgType> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, MultiMsg.MsgType msgType) throws Exception {
MultiMsg.MsgType.DataType dataType = msgType.getDataType();
// 判断接收的对象类型
if (MultiMsg.MsgType.DataType.commonMsgType == dataType) {
MultiMsg.CommonMsg commonMsg = msgType.getCommonMsg();
log.info(commonMsg.getMsg());
} else if (MultiMsg.MsgType.DataType.customerMsgType == dataType) {
MultiMsg.CustomerMsg customerMsg = msgType.getCustomerMsg();
log.info(customerMsg.getCustomerAddress());
log.info(customerMsg.getMsg());
}
}
}客户端源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class NettyClient {
public static void main(String[] args) throws Exception {
// 客户端需要一个时间循环组
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
//创建客户端启动对象
//注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class) // 设置客户端通道的实现类(反射)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
// 设置ProtoBuf编码器
pipeline.addLast("encode", new ProtobufEncoder());
// 给pipeline 设置处理器
pipeline.addLast(new NettyClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", 6666).sync();
// 对关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} finally {
// 优雅停机
eventLoopGroup.shutdownGracefully();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
// 当通道准备就绪就会触发该方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final int i = new Random().nextInt(3);
// 发送一个Msg对象到服务端
if (i == 0) {
final MultiMsg.MsgType common = MultiMsg.MsgType.newBuilder()
.setDataType(MultiMsg.MsgType.DataType.commonMsgType)
.setCommonMsg(MultiMsg.CommonMsg.newBuilder().setMsg("commonMsg ~~").build())
.build();
ctx.writeAndFlush(common);
} else {
final MultiMsg.MsgType custome = MultiMsg.MsgType.newBuilder()
.setDataType(MultiMsg.MsgType.DataType.customerMsgType)
.setCustomerMsg(MultiMsg.CustomerMsg.newBuilder().setMsg("customer ~~")
.setCustomerAddress(ctx.channel().remoteAddress().toString()).build())
.build();
ctx.writeAndFlush(custome);
}
}
}
maven 插件使用
自定义编解码器
从类图上可知,编码和节码其实就是建立在入站和出站的Handler中的
解码:入站时,数据是以二进制形式传递的,解码器就是在入站的Handler中将二进制数据转换为特定的数据格式
编码:出站时,将特定的数据格式转换成二进制字节流后传输到网络间
服务端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class Server {
private final static int PORT = 6666;
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ServerHandler());
System.out.println("server is ready");
ChannelFuture channelFuture = bootstrap.bind(PORT).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ServerHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
// 加入自定义解码器
pipeline.addLast(new MyByteToLongDecoder());
// 加入自定义编码器
pipeline.addLast(new MyLongToByteEncoder());
// 业务逻辑处理
pipeline.addLast(new ServerBusinessHandler());
}
}
1
2
3
4
5
6
7
8
9
10
@Slf4j
public class ServerBusinessHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
log.info("读取客户端数据:{}", msg);
// 回复客户端
ctx.writeAndFlush(987654L);
}
}编解码器
1
2
3
4
5
6
7
8
9
@Slf4j
public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {
@Override
protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
log.info("MyLongToByteEncoder encoder 被调用");
log.info("msg = {}", msg);
out.writeLong(msg);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Slf4j
public class MyByteToLongDecoder extends ByteToMessageDecoder {
//上下文channelHandlerContext
//入站的ByteBuf
//List集合,将解码后的数据传给下一个handler
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list) throws Exception {
log.info("MyByteToLongDecoder被调用");
//因为long8字节,8个字节,才能读取一个long
if (byteBuf.readableBytes() >= 8) {
list.add(byteBuf.readLong());
}
}
}客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class Client {
public static void main(String[] args) throws Exception {
// 客户端需要一个时间循环组
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
//创建客户端启动对象
//注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class) // 设置客户端通道的实现类(反射)
.handler(new ClientHandler());
ChannelFuture channelFuture = bootstrap.connect("localhost", 6666).sync();
// 对关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} finally {
// 优雅停机
eventLoopGroup.shutdownGracefully();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Slf4j
public class ClientHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
// 添加自定义解码器
pipeline.addLast(new MyByteToLongDecoder());
// 添加自定义编码器
pipeline.addLast(new MyLongToByteEncoder());
// 业务逻辑处理
pipeline.addLast(new ClientBusinessHandler());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
@Slf4j
public class ClientBusinessHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
log.info("服务端回复: {}", msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(123456L);
}
}
问题1
如果客户端自定义的
handler
发送的是下面这个代码,数据是16个字节,那么服务端的入解码器的decode
方法会被调用几次?
1
2
3
4
5
6
7
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// ctx.writeAndFlush(123456L);
// 一次性发送16字节,服务端规定每次大于等于8字节才读取
ctx.writeAndFlush(Unpooled.copiedBuffer("1234567890123456", CharsetUtil.UTF_8));
}由图可见,一个16字节的消息被自定义解码器
MyByteToLongDecoder
进行两次读取,原因在于服务器只知道每次要大于8个字节才去读取数据,但是一条完整的数据有多长,不得而知,这就是拆包现象
- 对于
decode
方法会根据接收到的数据,被调用多次,直到确定没有新的元素被添加到list
或者是ByteBuf
没有更多的可读字节为止- 如果
list
不为空,就会将list
内容传递给下一个ChannelInboundhandler
处理,该处理器的方法也会被调用多次- 这里"
1234567890123456
"是16个字节,所以服务端解码的时候decode
会被调两次,每次解码出来的数据放到list
里面,list
的里数据传给自定义的handler
进行处理
问题2
使用问题1的代码片段后,发现,客户端的出站编码
handler-》MyLongToByteEncoder
,没有被调用,怎么回事呢?
- 对于客户端的自定义
handler
的前一个handler
是MyLongToByteEncoder
,MyLongToByteEncoder
父类MessageToByteEncoder
有一个write
方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null;
try {
//判断数据是不是应该处理的类型,是的话调用encode方法,进行编码,不是,就不编码,直接write,然后交给前一个handler
if (acceptOutboundMessage(msg)) {
@SuppressWarnings("unchecked")
I cast = (I) msg;
buf = allocateBuffer(ctx, cast, preferDirect);
try {
//我们写子类的时候,重写了该方法
encode(ctx, cast, buf);
} finally {
ReferenceCountUtil.release(cast);
}
if (buf.isReadable()) {
ctx.write(buf, promise);
} else {
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
} else {
ctx.write(msg, promise);
}
- 因此,我们编写的
Encoder
是要注意传入的数据类型和处理的数据类型一致
结论
- 不论解码器handler 还是 编码器handler 即接收的消息类型必须与待处理的消息类型一致,否则该handler不会被执行
- 在解码器进行数据解码时,需要判断缓存区(ByteBuf)的数据是否足够 ,否则接收到的结果会期望结果可能不一致
其他常见的编解码器
ReplayingDecoder
1
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
ReplayingDecoder
扩展了ByteToMessageDecoder
类,使用这个类,我们不必调用readableBytes
()方法。参数S
指定了用户状态管理的类型,其中Void
代表不需要状态管理- 应用实例:使用
ReplayingDecoder
编写解码器,对前面的案例进行简化
1
2
3
4
5
6
7
8
9
10
@Slf4j
public class MyByteToLongDecoder2 extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list) throws Exception {
log.info("MyByteToLongDecoder2被调用");
// 在ReplayingDecoder不需要判断数据是否足够读取,内部会进行处理判断
list.add(byteBuf.readLong());
}
}测试:使用
MyByteToLongDecoder2
替代之前的MyByteToLongDecoder
,结果和之前的一样它有一些局限性:
- 并不是所有的
ByteBuf
操作都被支持,如果调用了一个不被支持的方法,将会抛出一个UnsupportedOperationException
ReplayingDecoder
在某些情况下可能稍慢于ByteToMessageDecoder
,例如网络缓慢并且消息格式复杂时,消息会被拆成了多个碎片,速度变慢其他解码器
- LineBasedFrameDecoder:这个类在Netty内部也有使用,它使用行尾控制字符(\n或者\r\n)作为分隔符来解析数据
- DelimiterBasedFrameDecoder:使用自定义的特殊字符作为消息的分隔符
- HttpObjectDecoder:一个HTTP数据的解码器
- LengthFieldBasedFrameDecoder:通过指定长度来标识整包消息,这样就可以自动的处理粘包和拆包消息
粘包和拆包
什么是粘包和拆包?
TCP是个
流
的协议,所谓流
就是没有界限的一串数据,大家可以想想河里的水,他们是连成一片的,其间并没有分界线。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分。所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送,也可能会把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包/拆包问题 由于TCP无消息保护边界,需要在接收端处理消息边界问题,也就是我们所说的粘包,拆包问题
假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到的字节数不是确定的,故可能存在四种情况:
- 服务端分两次读取到两个独立的包,分别是D1和D2,没有粘包和拆包
- 服务端一次接受到两个数据包,D1和D1粘合在一起,被称为TCP粘包
- 服务端分两次接收到了两个数据包,第一次读取到了完成的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这被称为TCP的拆包
- 服务端分两次接受到了两个数据包,第一次读取到了D1包的部分内容D1-1,第二次读取到了D1包的剩余内容D1-2和D2包的整包
粘包例子
1 |
|
1 |
|
1 |
|
1 |
|
第一次启动客户端
服务端输出
客户端输出
第二次启动客户端
服务端输出
客户端输出
解决方案
原因:服务端或客户端并不知道每个请求(write操作)消息的长度是多少,服务器有可能多读或少读数据。所以每次读取数据的时候就有可能遇到
粘包
和拆包
的情况解决方案:
- 使用自定义协议 + 编解码器 来解决
- 关键就是要解决 服务器端每次读取数据长度的问题,这个问题解决就不会出现服务器多读或者少读数据的问题,从而避免TCP的
粘包
和拆包
具体实现:
- 自定义一个协议实体
- msg字段:需要发送的消息
- length: 消息长度,用于告诉接收端本次消息应该读取的长度是多少,避免多读和少读
- 自定义一个编解码器来将实体转成byte,自定义一个解码器将byte转成实体
自定义协议
1
2
3
4
5
6
7
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MsgProtocol {
private int length;
private byte[] msg;
}自定义编码器
1
2
3
4
5
6
7
8
9
10
@Slf4j
public class MsgEncoder extends MessageToByteEncoder<MsgProtocol> {
@Override
protected void encode(ChannelHandlerContext ctx, MsgProtocol msg, ByteBuf out) throws Exception {
// 将对象转成字节流
log.info("自定义编码器被调用");
out.writeInt(msg.getLength());
out.writeBytes(msg.getMsg());
}
}自定义解码器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Slf4j
public class MsgDecoder extends ReplayingDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
log.info("自定义解码器被调用");
// 将二进制字节流转换成对象
int length = msg.readInt();
byte[] bytes = new byte[length];
msg.readBytes(bytes);
// 封装成 MsgEncoder 对象加入 List, 传递给下一个Handler处理
MsgProtocol msgEncoder = new MsgProtocol(length, bytes);
out.add(msgEncoder);
}
}服务端源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class NettyServer {
private final static int PORT = 6666;
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
//给pipeline 设置处理器
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 加入自定义编码器
ch.pipeline().addLast(new MsgEncoder());
// 加入自定义解码器
ch.pipeline().addLast(new MsgDecoder());
ch.pipeline().addLast(new NettyServerHandler());
}
});
ChannelFuture channelFuture = bootstrap.bind(PORT).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Slf4j
public class NettyServerHandler extends SimpleChannelInboundHandler<MsgProtocol> {
private int count;
// 异常处理
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, MsgProtocol msg) throws Exception {
// 接收对象并处理
int length = msg.getLength();
byte[] bytes = msg.getMsg();
log.info("服务端接收到消息长度:{}", length);
log.info("服务端接收到的消息:{}", new String(bytes, CharsetUtil.UTF_8));
log.info("服务端接收到数据包的次数:{}", ++count);
// 回复消息
String response = String.format("%s=%s", "服务端回复消息", UUID.randomUUID().toString());
byte[] responseBytes = response.getBytes(CharsetUtil.UTF_8);
int responseLength = responseBytes.length;
MsgProtocol responseMsg = new MsgProtocol(responseLength, responseBytes);
ctx.writeAndFlush(responseMsg);
}
}客户端源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class NettyClient {
public static void main(String[] args) throws Exception {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 加入自定义编码器
ch.pipeline().addLast(new MsgEncoder());
// 加入自定义解码器
ch.pipeline().addLast(new MsgDecoder());
ch.pipeline().addLast(new NettyClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", 6666).sync();
channelFuture.channel().closeFuture().sync();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Slf4j
public class NettyClientHandler extends SimpleChannelInboundHandler<MsgProtocol> {
private int count;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 发送10条数据
for (int j = 0; j < 5; j++) {
String msg = String.format("%s=%s", j + 1, UUID.randomUUID().toString());
byte[] bytes = msg.getBytes(CharsetUtil.UTF_8);
int length = bytes.length;
MsgProtocol msgProtocol = new MsgProtocol(length, bytes);
ctx.writeAndFlush(msgProtocol);
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, MsgProtocol msg) throws Exception {
int length = msg.getLength();
byte[] bytes = msg.getMsg();
String message = new String(bytes, CharsetUtil.UTF_8);
log.info("客户端收到服务端消息长度:{}", length);
log.info("客户端收到服务端消息:{}", message);
log.info("客户端接收消息次数:{}", ++count);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}由输出的消息可知,当告知接收方数据包的大小后,数据就避免了多读和少读问题,从而避免了
粘包
和拆包
的产生