在Netty开发实践-上 中,我们通过一些示例介绍了如何使用 Netty 进行服务器端/客户端编程,虽然我在示例代码中都做了注释,但是为了更深入的掌握 Netty 编程,接下来我们再做一些知识点梳理,其中有些知识点在前面已经用到了,有些知识点没有用到但仍是我们需要了解的。
IO 模型切换 我们知道,IO 模型分为:BIO、NIO 和 AIO 三种。在 Netty 中,AIO 相关代码已经移除,BIO 相关代码虽然保留,但是已经不再推荐,也就是说,在以后的 Netty 中,我们可能只能以 NIO 的方式进行编程。 在之前的示例代码中,我们选择的都是 NIO 模型,如下面这样:
服务器端:
1 2 3 4 EventLoopGroup bossGroup = new NioEventLoopGroup(1 ); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap().channel(NioServerSocketChannel.class )...
客户端:
1 2 3 EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap().channel(NioSocketChannel.class )...
那如果我们一定要用 BIO 模型,又该怎么切换呢?跟简单,只需要调整相关类名即可:
服务器端:
1 2 3 4 EventLoopGroup bossGroup = new OioEventLoopGroup(1 ); EventLoopGroup workerGroup = new OioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap().channel(OioServerSocketChannel.class )...
客户端:
1 2 3 EventLoopGroup group = new OioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap().channel(OioSocketChannel.class )...
Reactor 模式切换 在 NIO 模型中,它使用开发模式我们称之为 Reactor 模式。Reactor 模式分为:单线程 Reactor 模式、多线程 Reactor 模式 和 主从 Reactor 模式 三种。 在之前的示例代码中,我们选择的都是 主从 Reactor 模式,如下面这样:
1 2 3 4 EventLoopGroup bossGroup = new NioEventLoopGroup(1 ); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup)...
那如果我们想要使用单线程 Reactor 模式或 多线程 Reactor 模式,又该怎么切换呢?跟简单,只需要调整相关参数即可:
使用单线程 Reactor 模式:
1 2 3 EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1 ); ServerBootstrap serverBootstrap = new ServerBootstrap().group(eventLoopGroup)...
使用多线程 Reactor 模式:
1 2 3 EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap().group(eventLoopGroup)...
粘包/半包处理 为了处理 TCP 的粘包/半包问题,Netty 提供了三种常用的封帧方式:
方式\支持
解码
编码
固定长度
FixedLengthFrameDecoder
简单
分隔符
DelimiterBasedFrameDecoder
简单
固定长度字段存内容的长度信息
LengthFieldBasedFrameDecoder
LengthFieldPrepender
上面三种解码器都继承自 ByteToMessageDecoder。
在之前的示例代码中,我们选择的都是“固定长度字段存内容的长度信息”方式,如下面这样:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class OrderFrameDecoder extends LengthFieldBasedFrameDecoder { public OrderFrameDecoder () { super (Integer.MAX_VALUE, 0 , 2 , 0 , 2 ); } } public class OrderFrameEncoder extends LengthFieldPrepender { public OrderFrameEncoder () { super (2 ); } }
由于另外两种都不常用,这里就不演示怎么切换了。
序列化/反序列化 Netty 对一些常用的序列化/反序列化方式都提供了支持,如 base64、bytes、compression、json、marshaling、protobuf、serialization、string、xml 等,编码器都继承自 MessageToMessageEncoder<I>
,解码器都继承自 MessageToMessageDecoder<I>
。
在之前的示例代码中,我们选择的都是 json 方式,如下面这样:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 public class OrderProtocolDecoder extends MessageToMessageDecoder <ByteBuf > { @Override protected void decode (ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) { RequestMessage requestMessage = new RequestMessage(); requestMessage.decode(msg); out.add(requestMessage); } } public class OrderProtocolEncoder extends MessageToMessageEncoder <ResponseMessage > { @Override protected void encode (ChannelHandlerContext ctx, ResponseMessage msg, List<Object> out) { ByteBuf byteBuf = ctx.alloc().buffer(); msg.encode(byteBuf); out.add(byteBuf); } } @Data public abstract class BaseMessage <T extends BaseMessageBody > { private MessageHeader header; private T body; public void encode (ByteBuf byteBuf) { byteBuf.writeInt(header.getVersion()); byteBuf.writeLong(header.getStreamId()); byteBuf.writeInt(header.getOpcode()); byteBuf.writeBytes(JsonUtil.toJson(body).getBytes()); } public abstract Class<T> getMessageBodyDecodeClass (int opcode) ; public void decode (ByteBuf msg) { int version = msg.readInt(); long streamId = msg.readLong(); int opcode = msg.readInt(); this .header = MessageHeader.builder() .version(version) .streamId(streamId) .opcode(opcode) .build(); this .body = JsonUtil.fromJson(msg.toString(StandardCharsets.UTF_8), getMessageBodyDecodeClass(opcode)); } }
在其它官方示例中,演示了如果使用其它方式的编解码,如在 “Word Clock”示例中,演示了如何使用 protobuf,大家可以自行查看相关代码。
空闲监测 在应用程序运行中,为了避免一些不怀好意或者无所事事的客户端一直和我们的应用程序保持连接,从而给我们造成资源浪费,因此一般我们都会进行空闲监测。 在 Netty 中,实现空闲监测的核心类是 IdleStateHandler。在前面的订单示例中,我们已经用到了这个类,现在再来回顾下是如何使用它的。
服务端读监测:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Slf 4jpublic class ServerIdleCheckHandler extends IdleStateHandler { public ServerIdleCheckHandler () { super (10 , 0 , 0 , TimeUnit.SECONDS); } @Override protected void channelIdle (ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception { if (evt == IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT) { log.info("Idle check happen, so close the connection" ); ctx.close(); return ; } super .channelIdle(ctx, evt); } }
客户端写监测:
1 2 3 4 5 6 7 8 9 10 11 public class ClientIdleCheckHandler extends IdleStateHandler { public ClientIdleCheckHandler () { super (0 , 5 , 0 , TimeUnit.SECONDS); } }
客户端写超时,发送 keepalive:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Slf 4j@ChannelHandler .Sharablepublic class KeepaliveHandler extends ChannelDuplexHandler { @Override public void userEventTriggered (ChannelHandlerContext ctx, Object evt) throws Exception { if (evt == IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT) { log.info("Write Idle happen, so need to send keepalive to keep connection not closed by server" ); KeepaliveOperation operation = new KeepaliveOperation(); RequestMessage message = new RequestMessage(IdUtil.nextId(), operation); ctx.writeAndFlush(message); } super .userEventTriggered(ctx, evt); } }
调优参数 System 参数
ulimit Linux Option。进行 TCP 连接时,系统为每个 TCP 连接创建一个 socket 句柄,也就是一个文件句柄,但是 Linux 对每个进程打开的文件句柄数量做了限制,如果超出,报错“Too many open file”。 注意:ulimit 命令修改的数值只对当前登录用户 的目前使用环境有效,系统重启或者用户退出后就会失效,所以可以作为程序启动脚本的一部分,让它在程序启动前执行。
TCP_NODELAY SocketChannel 参数。设置是否启用 Nagle 算法:通过将小的碎片数据连接成更大的报文来提高发送效率。如果需要发送一些较小的报文,则需要禁用该算法。默认不开启。
SO_BACKLOG ServerSocketChannel 参数。最大的等待连接数量。当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度。默认值 128。
SO_REUSEADDR SocketChannel/ServerSocketChannel 参数。地址重用,解决“Address already in use”。常用开启场景:多网卡(IP)绑定相同端口;让关闭连接释放的端口更早可使用。默认不开启。
Netty 参数
CONNECT_TIMEOUT_MILLIS SocketChannel 参数。客户端连接服务器最大允许时间。默认值 30s。
io.netty.eventLoopThreads System 参数。IO Thread 数量。默认值 availableProcessors * 2。
io.netty.availableProcessors System 参数。指定 availableProcessors。考虑 Docker/VM 等情况。
io.netty.leakDetection.level System 参数。内存泄漏检测级别:DISABLED/SIMPLE 等。默认值 SIMPLE。
跟踪诊断 设置线程名 1 2 EventLoopGroup bossGroup = new NioEventLoopGroup(1 , new DefaultThreadFactory("boss" )); EventLoopGroup workerGroup = new NioEventLoopGroup(0 , new DefaultThreadFactory("worker" ));
设置 Handler 名 1 pipeline.addLast("frameDecoder" , new OrderFrameDecoder());
使用日志 1 2 pipeline.addLast(new LoggingHandler(LogLevel.INFO));
优化使用 整改线程模型 对于 IO 密集型应用,我们通常需要整改线程模型:独立出“线程池”来处理业务(处理业务的线程不再与 NioEventLoop 共享)。
1 2 3 4 5 EventExecutorGroup eventExecutors = new UnorderedThreadPoolEventExecutor(1 , new DefaultThreadFactory("business" )); pipeline.addLast(eventExecutors, new OrderServerProcessHandler());
增强写,延迟与吞吐量的抉择 在前面的点单示例中,我们调用的是ctx.writeAndFlush(msg)
:写数据之后立刻发送出去,这样虽然延迟降低了,但是吞吐量又会受影响下降。如果我们是要吞吐量优先,那么有下面两种改进方式。
利用 channelReadComplete 就像我们在 echo 示例中演示的这样:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Sharable public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead (ChannelHandlerContext ctx, Object msg) { ctx.write(msg); } @Override public void channelReadComplete (ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
不过使用 channelReadComplete 也有弊端:
不适合异步业务线程(不复用 Nio event loop)处理 channelRead 中的业务处理结果的 write 很可能发生在 channelReadComplete 之后。
不适合更精细的控制 例如连续读 16 次时,第 16 次是 flush,但是如果保持连续的次数不变,如何做到 3 次就 flush ?
使用 flushConsolidationHandler1 2 3 4 5 pipeline.addLast(new FlushConsolidationHandler(5 , true )); pipeline.addLast(eventExecutorGroup, new OrderServerProcessHandler());
流量整形 1 2 3 4 5 6 7 GlobalTrafficShapingHandler globalTrafficShapingHandler = new GlobalTrafficShapingHandler( new NioEventLoopGroup(), 100 * 1024 * 1024 , 100 * 1024 * 1024 ); pipeline.addLast(globalTrafficShapingHandler);
为不同平台开启 native Netty 针对不同平台都做了一定的优化,如果我们想切换到特定平台,也是非常方便的。
修改代码
NioServerSocketChannel -> [Prefix]ServerSocketChannel
NioEventLopGroup -> [Prefix]EventLopGroup
NioChannelOption -> [Prefix]ChannelOption
准备好 native 库
黑白名单 1 2 3 4 5 6 7 IpSubnetFilterRule ipSubnetFilterRule =new IpSubnetFilterRule("127.0.0.1" , 8 , IpFilterRuleType.REJECT); RuleBasedIpFilter ruleBasedIpFilter = new RuleBasedIpFilter(ipSubnetFilterRule);ine(); pipeline.addLast(ruleBasedIpFilter);
自定义授权 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 @Slf 4j@ChannelHandler .Sharablepublic class AuthHandler extends SimpleChannelInboundHandler <RequestMessage > { @Override protected void channelRead0 (ChannelHandlerContext ctx, RequestMessage msg) { try { BaseOperation operation = msg.getBody(); if (operation instanceof AuthOperation) { AuthOperation authOperation = (AuthOperation) operation; AuthOperationResult authOperationResult = authOperation.execute(); if (authOperationResult.isPassAuth()) { log.info("Pass auth" ); } else { log.error("Fail to auth" ); ctx.close(); } } else { log.error("Expect first msg is auth" ); ctx.close(); } } catch (Exception e) { log.error("Exception happen" ); ctx.close(); } finally { ctx.pipeline().remove(this ); } } }
Server 端:
1 2 3 4 5 AuthHandler authHandler = new AuthHandler(); pipeline.addLast(authHandler);
Client 端:
1 2 3 4 channelFuture.channel().writeAndFlush(new RequestMessage(IdUtil.nextId(), new AuthOperation("admin" , "password" ))); channelFuture.channel().writeAndFlush(requestMessage);