1 netty工作原理
流程
Netty 抽象出两组线程池 BossGroup 专门负责接收客户端的连接, WorkerGroup 专门负责网络的读写
BossGroup 和 WorkerGroup 类型都是 NioEventLoopGroup
NioEventLoopGroup 相当于一个事件循环组, 这个组中含有多个事件循环 ,每一个事件循环是 NioEventLoop
NioEventLoop 表示一个不断循环的执行处理任务的线程, 每个 NioEventLoop 都有一个 Selector , 用于监听绑定在其上的 Socket 的网络通讯
NioEventLoopGroup(BossGroup、WorkerGroup) 可以有多个线程, 即可以含有多个 NioEventLoop
每个Boss 的 NioEventLoop 循环执行的步骤有3步
- 轮询accept 事件
- 处理accept 事件 , 与client建立连接 , 生成NioScocketChannel , 并将其注册到 Worker 的 NIOEventLoop 上的 Selector
- 处理任务队列的任务 , 即 runAllTasks
每个 Worker 的 NIOEventLoop 循环执行的步骤
- 轮询read, write 事件
- 处理i/o事件, 即read , write 事件,在对应NioScocketChannel 处理
- 处理任务队列的任务 , 即 runAllTasks
每个Worker NIOEventLoop 处理业务时,会使用 Pipeline(管道), Pipeline 中包含了 Channel , 即通过 Pipeline 可以获取到对应通道, 管道中维护了很多的处理器。管道可以使用 Netty 提供的,也可以自定义
2 快速入门
实现
- Netty 服务器在 6666 端口监听,客户端能发送消息给服务器 “hello, 服务器~”
- 服务器可以回复消息给客户端 “hello, 客户端~”
依赖
1 2 3 4 5 6
| <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.52.Final</version> </dependency>
|
服务端NettyServer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| package netty.test;
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyServer { public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() {
@Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() .addLast(new NettyServerHandler()); } }); System.out.println("服务器准备好..."); ChannelFuture channelFuture = bootstrap.bind(6666).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
|
自定义Netty服务端处理器:NettyServerHandler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| package netty.test;
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil;
import java.nio.charset.Charset;
public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("[server]:ctx" + ctx);
ByteBuf buf = (ByteBuf) msg; System.out.println("客户端发送的msg:" + buf.toString(CharsetUtil.UTF_8)); System.out.println("客户端地址:" + ctx.channel().remoteAddress()); }
@Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端", CharsetUtil.UTF_8)); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.channel().close(); } }
|
客户端NettyClient
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| package netty.test;
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyClient { public static void main(String[] args) { NioEventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new NettyClientHandler()); } }); System.out.println("客户端准备好了..."); ChannelFuture channelFuture = bootstrap.connect("localhost", 6666).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } }
|
客户端处理器NettyClientHandler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| package netty.test;
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil;
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("[client]:ctx" + ctx); ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, Server", CharsetUtil.UTF_8)); }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("服务器发送的msg:" + buf.toString(CharsetUtil.UTF_8)); System.out.println("服务器的地址:" + ctx.channel().remoteAddress()); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.channel().close(); } }
|
3 分析
3.1 BossGroup 和 WorkGroup 怎么确定自己有多少个 NIOEventLoop
通过源码发现含有的子线程数默认为CPU核数*2
我的CPU核数为8,所以有16个进程,每个进程的类型都是NioEventLoop
3.2 WorkerGroup是如何分配这些进程的
设置BossGroup进程数和WorkerGroup进程数
重复运行5次客户端可发现
所以,WorkerGroup
分配的逻辑是按顺序分配的
3.3 BossGroup和WorkerGroup中的Selector和TaskQueue
3.4 CTX上下文,Channel,Pipeline之间关系
三者关系图