netty入门(二)

1 netty工作原理

image-20220313160909453

流程

  1. Netty 抽象出两组线程池 BossGroup 专门负责接收客户端的连接, WorkerGroup 专门负责网络的读写

  2. BossGroup 和 WorkerGroup 类型都是 NioEventLoopGroup

  3. NioEventLoopGroup 相当于一个事件循环组, 这个组中含有多个事件循环 ,每一个事件循环是 NioEventLoop

  4. NioEventLoop 表示一个不断循环的执行处理任务的线程, 每个 NioEventLoop 都有一个 Selector , 用于监听绑定在其上的 Socket 的网络通讯

  5. NioEventLoopGroup(BossGroup、WorkerGroup) 可以有多个线程, 即可以含有多个 NioEventLoop

  6. 每个Boss 的 NioEventLoop 循环执行的步骤有3步

    1. 轮询accept 事件
    2. 处理accept 事件 , 与client建立连接 , 生成NioScocketChannel , 并将其注册到 Worker 的 NIOEventLoop 上的 Selector
    3. 处理任务队列的任务 , 即 runAllTasks
  7. 每个 Worker 的 NIOEventLoop 循环执行的步骤

    1. 轮询read, write 事件
    2. 处理i/o事件, 即read , write 事件,在对应NioScocketChannel 处理
    3. 处理任务队列的任务 , 即 runAllTasks
  8. 每个Worker NIOEventLoop 处理业务时,会使用 Pipeline(管道), Pipeline 中包含了 Channel , 即通过 Pipeline 可以获取到对应通道, 管道中维护了很多的处理器。管道可以使用 Netty 提供的,也可以自定义

2 快速入门

实现

  1. Netty 服务器在 6666 端口监听,客户端能发送消息给服务器 “hello, 服务器~”
  2. 服务器可以回复消息给客户端 “hello, 客户端~”

依赖

1
2
3
4
5
6
<!--netty依赖-->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.52.Final</version>
</dependency>

服务端NettyServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package netty.test;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class NettyServer {
public static void main(String[] args) {
/**
* 创建BossGroup和 WorkerGroup
* 1. 创建两个线程组BossGroup和 WorkerGroup
* 2. BossGroup只处理请求
* 3. WorkerGroup 处理真正客户端的业务
* 4. 运行时,这两个都是无限循环
*/
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 创建服务端启动对象,并配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
// 链式编程
bootstrap.group(bossGroup, workerGroup) // 设置两个线程组
.channel(NioServerSocketChannel.class) //使用NioServerSocketChannel作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列等待连接的个数
.childOption(ChannelOption.SO_KEEPALIVE, true) // 设置连续保持活动连接状态
.childHandler(new ChannelInitializer<SocketChannel>() { // 给workerGroup的NioEventLoop对应的管道设置处理器
/**
* 创建一个通道初始化对象
* 给workerGroup对应的管道设置处理器
* @param socketChannel
* @throws Exception
*/
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline() // 获得这个socketChannel对应的pipeline
.addLast(new NettyServerHandler());
}
});
System.out.println("服务器准备好...");
// 绑定一个端口,并且同步,生成一个ChannelFuture对象
// 这里就已经启动了服务器
ChannelFuture channelFuture = bootstrap.bind(6666).sync();
// 对关闭通道进行监听
// 这里只是监听,只有关闭通道才进行处理,不是直接关闭通道
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

自定义Netty服务端处理器:NettyServerHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package netty.test;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

import java.nio.charset.Charset;

public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("[server]:ctx" + ctx);
/**
* 将 msg 转换成 ByteBuffer
* 说明 :
* 1. 注意这个是 ByteBuf ,是 io.netty.buffer 包下的,不是 NIO 下的 Buffer
* 2. ByteBuf 比 Buffer 的性能更高一点
*/
ByteBuf buf = (ByteBuf) msg;
// 把 buf 转成 UTF8 格式的字符串
System.out.println("客户端发送的msg:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("客户端地址:" + ctx.channel().remoteAddress());
}

/**
* 数据读取完毕后,返回消息给客户端
* @param ctx
* @throws Exception
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
/**
* 把数据写入缓冲区,并刷新缓冲区
* 一般来说,需要对这个发送的消息进行编码
*/
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端", CharsetUtil.UTF_8));
}

/**
* 处理异常
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 关闭通道
ctx.channel().close();
}
}

客户端NettyClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package netty.test;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyClient {
public static void main(String[] args) {
// 客户端需要一个事件循环组
NioEventLoopGroup group = new NioEventLoopGroup();
try {
// 客户端启动对象——Bootstrap,不是服务端的ServerBootstrap
Bootstrap bootstrap = new Bootstrap();
// 设置相关参数
bootstrap.group(group)
.channel(NioSocketChannel.class) // 设置客户端通道的实现类
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyClientHandler());
}
});
System.out.println("客户端准备好了...");
// 启动客户端连接服务器端
ChannelFuture channelFuture = bootstrap.connect("localhost", 6666).sync();
// 对关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
}

客户端处理器NettyClientHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package netty.test;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 当通道准备就绪时,就会触发该方法,就可以发信息了
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("[client]:ctx" + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, Server", CharsetUtil.UTF_8));
}

/**
* 当通道有读取事件时,会触发
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("服务器发送的msg:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("服务器的地址:" + ctx.channel().remoteAddress());
}

/**
* 异常处理
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.channel().close();
}
}

image-20220313165849947

image-20220313165908316

3 分析

3.1 BossGroup 和 WorkGroup 怎么确定自己有多少个 NIOEventLoop

image-20220313172412321

image-20220313172427404

image-20220313172443477

通过源码发现含有的子线程数默认为CPU核数*2

image-20220313172635660

​ 我的CPU核数为8,所以有16个进程,每个进程的类型都是NioEventLoop

3.2 WorkerGroup是如何分配这些进程的

设置BossGroup进程数和WorkerGroup进程数

image-20220313173204342

重复运行5次客户端可发现

image-20220313173257839

所以,WorkerGroup分配的逻辑是按顺序分配的

3.3 BossGroup和WorkerGroup中的Selector和TaskQueue

image-20220313173537472

3.4 CTX上下文,Channel,Pipeline之间关系

  • CTX

    image-20220313173950859

  • pipeline

    image-20220313174040186

  • Pipeline

    image-20220313174136871

三者关系图

image-20220313174150117


netty入门(二)
https://2w1nd.github.io/2022/03/13/netty/netty入门(二)/
作者
w1nd
发布于
2022年3月13日
许可协议