创建 network
1
docker network create --driver bridge --subnet 172.22.0.0/16 --gateway 172.22.0.1 op_net
创建 elasticsearch.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107version: '3.6'
services:
cerebro:
image: lmenezes/cerebro
restart: always
container_name: cerebro
ports:
- 9000:9000
command:
- -Dhosts.0.host=http://elasticsearch:9200
networks:
default:
ipv4_address: 172.22.0.24
kibana:
image: kibana
restart: always
container_name: kibana
environment:
- I18N_LOCALE=zh-CN
- TIMELION_ENABLED=true
- XPACK_GRAPH_ENABLED=true
- XPACK_MONITORING_COLLECTION_ENABLED="true"
ports:
- 5601:5601
networks:
default:
ipv4_address: 172.22.0.25
elasticsearch:
image: elasticsearch
restart: always
container_name: es_hot
environment:
- cluster.name=es_cluster
- node.name=es_hot
- node.attr.box_type=hot
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- discovery.seed_hosts=es_hot,es_warm,es_cold
- cluster.initial_master_nodes=es_hot,es_warm,es_cold
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- es_data_hot:/usr/share/elasticsearch/data
ports:
- 9200:9200
networks:
default:
ipv4_address: 172.22.0.21
elasticsearch2:
image: elasticsearch
restart: always
container_name: es_warm
environment:
- cluster.name=es_cluster
- node.name=es_warm
- node.attr.box_type=warm
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- discovery.seed_hosts=es_hot,es_warm,es_cold
- cluster.initial_master_nodes=es_hot,es_warm,es_cold
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- es_data_warm:/usr/share/elasticsearch/data
networks:
default:
ipv4_address: 172.22.0.22
elasticsearch3:
image: elasticsearch
restart: always
container_name: es_cold
environment:
- cluster.name=es_cluster
- node.name=es_cold
- node.attr.box_type=cold
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- discovery.seed_hosts=es_hot,es_warm,es_cold
- cluster.initial_master_nodes=es_hot,es_warm,es_cold
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- es_data_cold:/usr/share/elasticsearch/data
networks:
default:
ipv4_address: 172.22.0.23
volumes:
es_data_hot:
driver: local
es_data_warm:
driver: local
es_data_cold:
driver: local
networks:
default:
external:
name: op_net启动 Elasticsearch 集群
1
docker-compose -f elasticsearch.yml up -d
Netty心跳机制
何为心跳
顾名思义,所谓心跳,即在 TCP 长连接中,客户端和服务器之间定期发送的一种特殊的数据包,通知对方自己还在线,以确保 TCP 连接的有效性。
为什么需要心跳
因为网络的不可靠性,有可能在 TCP 保持长连接的过程中,由于某些突发情况,例如网线被拔出,突然掉电等,会造成服务器和客户端的连接中断。
在这些突发情况下,如果恰好服务器和客户端之间没有交互的话,那么它们是不能在短时间内发现对方已经掉线的。
为了解决这个问题,我们就需要引入心跳机制。
心跳机制的工作原理是:在服务器和客户端之间一定时间内没有数据交互时,即处于 idle 状态时,客户端或服务器会发送一个特殊的数据包给对方,当接收方收到这个数据报文之后,也立即发送一个特殊的数据报文,回应发送方,此即一个 PING-PONMG 交互。自然地,当某一端收到心跳消息后,就知道了对方仍然在线,这就确保 TCP 连接的有效性。
如何实现心跳
我们可以通过两种方式实现心跳机制:
- 使用 TCP 协议层里面的 keepalive 机制
- 在应用层上实现自定义的心跳机制(在 Netty 中即为 Idle check)
虽然在 TCP 协议层上,提供了 keepalive 保活机制,但是使用它有几个缺点:
- 它不是 TCP 的标准协议,并且默认时关闭的。
- TCP keepalive 机制依赖于操作系统的实现,默认的 keepalive 心跳时间是两个小时,并且对 keepalive 的修改需要系统调用(或者修改系统配置),灵活性不够。
- TCP keepalive 与 TCP 协议绑定,因此如果需要更换为 UDP 协议时,keepalive 机制就失效了。
虽然使用 TCP 层面的 keepalive 机制比自定义的应用层心跳机制节省流量, 但是基于上面的几点缺点, 一般的实践中, 人们大多数都是选择在应用层上实现自定义的心跳。
TCP keepalive
TCP keepalive 核心参数:
- net.ipv4.tcp_keepalive_time = 7200
- net.ipv4.tcp_keepalive_intl = 75
- net.ipv4.tcp_keepalive_probes = 9
当启用(默认关闭)keepalive 时,TCP 在连接没有数据通过的 7200 秒后发送 keepalive 消息,当探测没有回复时按 75 秒的重试频率重发,一直发 9 个探测包都没有确认时连接失败,所以总耗时一般为:2 小时 11 分(7200 秒 + 75 * 9)。
Idle check
Idle 监测,只是负责诊断,诊断后,做出不同的行为,决定 Idle 监测的最终用途。
- 发送 keepalive(区别于 TCP keepalive):在有其它数据传输的时候,不发送 keepalive,无数据传输超过一定时间,判定为 Idle,再发 keepalive。
- 直接关闭连接:快速释放损坏的、恶意的、很久不用的连接,让系统时刻保持最好的状态;简单粗暴,客户端可能需要重连。
实际应用中:结合起来使用。按需 keepalive,保证不会空闲,如果空闲,关闭连接。
Netty 中开启心跳监测
Server 端开启 TCP keepalive
- bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true)
- bootstrap.childOption(NioChannelOption.SO_KEEPALIVE, true)
注意,是 .childOption(ChannelOption.SO_KEEPALIVE, true) 而不是 .option(ChannelOption.SO_KEEPALIVE, true),虽然可以设置后者,但是无效。
开启 Idle Check
要开启 Idle Check,关键是继承 IdleStateHandler。实例化一个 IdleStateHandler 需要提供 3 个参数:
- readerIdleTimeSeconds:读超时. 即当在指定的时间间隔内没有从 Channel 读取到数据时, 会触发一个 READER_IDLE 的 IdleStateEvent 事件。
- writerIdleTimeSeconds:写超时. 即当在指定的时间间隔内没有数据写入到 Channel 时, 会触发一个 WRITER_IDLE 的 IdleStateEvent 事件。
- allIdleTimeSeconds:读/写超时. 即当在指定的时间间隔内没有读或写操作时, 会触发一个 ALL_IDLE 的 IdleStateEvent 事件。
参考资料
Netty 序列化/反序列化
为什么需要二次编解码
假设我们把解决粘包半包问题的常用三种解码器叫做一次解码器,那么我们在项目中,除了可选的压缩解压缩之外,还需要一层解码,因为一次解码的结果是字节,需要和项目中所使用的对象做转换,方便使用,这层解码器可以成为“二次解码器”,相应的,对应的编码器是为了将 Java 对象转化成字节流方便存储或传输。
一次解码器:ByteToMessageDecoder
io.netty.buffer.ByteBuf(原始数据流) -> io.netty.buffer.ByteBuf(用户数据)二次解码器:
MessageToMessageDecoder<I>
io.netty.buffer.ByteBuf(用户数据) -> Java Object
常用二次编码码方式
- Java序列化:基本已经没人使用,因为它占用的空间比较大,且只有在 Java 中能够使用
- Marshaling
- XML:占用空间比较大
- JSON
- MessagePack
- ProtoBuf:灵活的、高效的用于序列化数据的协议;相较于 XML 和 JSON 格式,Protobuf 更小、更快、更便捷;Protobuf 是跨语言的,并且自带了一个编译器(protoc),只需要用它进行编译,可以自动生成 Java、Python、C++ 等代码,不需要再写其它代码。
- 其它
编解码方式选择依据
- 空间:编码后占用空间,需要比较不同的数据大小情况
- 时间:编解码速度,需要比较不同的数据大小情况
- 可读性
- 多语言的支持:重要
Netty 对常用编解码方式的支持
- 相关类都在包 io.netty.handler.codec
- 支持 base64、bytes、compression、json、marshaling、protobuf、serialization、string、xml等
- 编码器继承自
MessageToMessageEncoder<I>
,解码器继承自MessageToMessageDecoder<I>
Netty 粘贴/半包处理
什么是粘包/半包
比如客户端向服务器端发送了两条消息,分别是“ABC”、“DEF”,由于 TCP 是流式协议,服务器端可能会一次性接收到这两条消息“ABCDEF”,也可能会分 3、4次才接收到这两条消息,如分 3 次分别接收“AB”、“CD”、“EF”,前面一次性接收就是粘包,后面分 3、4 接收就是半包。
粘包/半包主要原因
粘包的主要原因:
- 发送方每次写入数据 < 套接字缓冲区大小
- 接收方读取套接字缓冲区数据不够及时
半包的主要原因:
- 发送方每次写入数据 > 套接字缓冲区大小
- 发送的数据大于协议的 MTU(Maximum Transmission Unit,最大传输单元),必须拆包
从收发角度看:一次发送可能被多次接收(半包),多个发送可能被一次接收(粘包)
从传输角度看:一个发送可能占用多个传输包(半包),多个发送可能公用一个传输包(粘包)
根本原因:TCP 协议是流式协议,消息无边界。(UDP 像邮寄的包裹,虽然一次运输多个,但每个包裹都有“界限”,是一个一个签收的,所以使用 UDP 协议不会出现粘包/半包问题)
粘包/半包解决思路
要解决 TCP 粘包/半包问题,根本手段在于找出消息边界,下表列举了一些常见的解决方式。
方式\比较 | 寻找消息边界的方式 | 优点 | 缺点 | 推荐度 |
---|---|---|---|---|
TCP 连接改成短连接,一个请求一个短连接 | 建立连接到释放连接之间的信息即为传输信息 | 简单 | 效率低下 | 不推荐 |
封装成帧(Framing):固定长度 | 满足固定长度即可 | 简单 | 空间浪费 | 不推荐 |
封装成帧(Framing):分隔符 | 分隔符之间 | 空间不浪费,也比较简单 | 内容本身出现分隔符时需转义,所以需要扫描内容 | 推荐 |
封装成帧(Framing):固定长度字段存内容的长度信息 | 先解析固定长度的字段获取长度,然后读取后续内容 | 精确定位用户数据,内容也不用转义 | 长度理论上有限制,需提前预知可能的最大长度从而定义长度占用字节数 | 推荐 |
封装成帧(Framing):其它方式 | 每种都不同,例如 JSON 可以看 {} 是否已经成对 |
Netty 对三种常用封帧方式的支持
方式\支持 | 解码 | 编码 |
---|---|---|
固定长度 | FixedLengthFrameDecoder | 简单 |
分隔符 | DelimiterBasedFrameDecoder | 简单 |
固定长度字段存内容的长度信息 | LengthFieldBasedFrameDecoder | LengthFieldPrepender |
上面三种解码器都继承自 ByteToMessageDecoder。
下面以“固定长度字段存内容的长度信息”这种方式来演示如何在 Netty 中使用:
1 | /** |
Netty开发实践-下
在Netty开发实践-上中,我们通过一些示例介绍了如何使用 Netty 进行服务器端/客户端编程,虽然我在示例代码中都做了注释,但是为了更深入的掌握 Netty 编程,接下来我们再做一些知识点梳理,其中有些知识点在前面已经用到了,有些知识点没有用到但仍是我们需要了解的。
IO 模型切换
我们知道,IO 模型分为:BIO、NIO 和 AIO 三种。在 Netty 中,AIO 相关代码已经移除,BIO 相关代码虽然保留,但是已经不再推荐,也就是说,在以后的 Netty 中,我们可能只能以 NIO 的方式进行编程。
在之前的示例代码中,我们选择的都是 NIO 模型,如下面这样:
服务器端:
1 | EventLoopGroup bossGroup = new NioEventLoopGroup(1); |
客户端:
1 | EventLoopGroup group = new NioEventLoopGroup(); |
那如果我们一定要用 BIO 模型,又该怎么切换呢?跟简单,只需要调整相关类名即可:
服务器端:
1 | EventLoopGroup bossGroup = new OioEventLoopGroup(1); |
客户端:
1 | EventLoopGroup group = new OioEventLoopGroup(); |
Reactor 模式切换
在 NIO 模型中,它使用开发模式我们称之为 Reactor 模式。Reactor 模式分为:单线程 Reactor 模式、多线程 Reactor 模式 和 主从 Reactor 模式 三种。
在之前的示例代码中,我们选择的都是 主从 Reactor 模式,如下面这样:
1 | EventLoopGroup bossGroup = new NioEventLoopGroup(1); |
那如果我们想要使用单线程 Reactor 模式或 多线程 Reactor 模式,又该怎么切换呢?跟简单,只需要调整相关参数即可:
使用单线程 Reactor 模式:
1 | EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1); |
使用多线程 Reactor 模式:
1 | EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); |
粘包/半包处理
为了处理 TCP 的粘包/半包问题,Netty 提供了三种常用的封帧方式:
方式\支持 | 解码 | 编码 |
---|---|---|
固定长度 | FixedLengthFrameDecoder | 简单 |
分隔符 | DelimiterBasedFrameDecoder | 简单 |
固定长度字段存内容的长度信息 | LengthFieldBasedFrameDecoder | LengthFieldPrepender |
上面三种解码器都继承自 ByteToMessageDecoder。
在之前的示例代码中,我们选择的都是“固定长度字段存内容的长度信息”方式,如下面这样:
1 | /** |
由于另外两种都不常用,这里就不演示怎么切换了。
序列化/反序列化
Netty 对一些常用的序列化/反序列化方式都提供了支持,如 base64、bytes、compression、json、marshaling、protobuf、serialization、string、xml 等,编码器都继承自 MessageToMessageEncoder<I>
,解码器都继承自 MessageToMessageDecoder<I>
。
在之前的示例代码中,我们选择的都是 json 方式,如下面这样:
1 | /** |
在其它官方示例中,演示了如果使用其它方式的编解码,如在 “Word Clock”示例中,演示了如何使用 protobuf,大家可以自行查看相关代码。
空闲监测
在应用程序运行中,为了避免一些不怀好意或者无所事事的客户端一直和我们的应用程序保持连接,从而给我们造成资源浪费,因此一般我们都会进行空闲监测。
在 Netty 中,实现空闲监测的核心类是 IdleStateHandler。在前面的订单示例中,我们已经用到了这个类,现在再来回顾下是如何使用它的。
服务端读监测:
1 | /** |
客户端写监测:
1 | /** |
客户端写超时,发送 keepalive:
1 | /** |
调优参数
System 参数
ulimit
Linux Option。进行 TCP 连接时,系统为每个 TCP 连接创建一个 socket 句柄,也就是一个文件句柄,但是 Linux 对每个进程打开的文件句柄数量做了限制,如果超出,报错“Too many open file”。
注意:ulimit 命令修改的数值只对当前登录用户的目前使用环境有效,系统重启或者用户退出后就会失效,所以可以作为程序启动脚本的一部分,让它在程序启动前执行。TCP_NODELAY
SocketChannel 参数。设置是否启用 Nagle 算法:通过将小的碎片数据连接成更大的报文来提高发送效率。如果需要发送一些较小的报文,则需要禁用该算法。默认不开启。SO_BACKLOG
ServerSocketChannel 参数。最大的等待连接数量。当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度。默认值 128。SO_REUSEADDR
SocketChannel/ServerSocketChannel 参数。地址重用,解决“Address already in use”。常用开启场景:多网卡(IP)绑定相同端口;让关闭连接释放的端口更早可使用。默认不开启。
Netty 参数
CONNECT_TIMEOUT_MILLIS
SocketChannel 参数。客户端连接服务器最大允许时间。默认值 30s。io.netty.eventLoopThreads
System 参数。IO Thread 数量。默认值 availableProcessors * 2。io.netty.availableProcessors
System 参数。指定 availableProcessors。考虑 Docker/VM 等情况。io.netty.leakDetection.level
System 参数。内存泄漏检测级别:DISABLED/SIMPLE 等。默认值 SIMPLE。
跟踪诊断
设置线程名
1 | EventLoopGroup bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("boss")); |
设置 Handler 名
1 | pipeline.addLast("frameDecoder", new OrderFrameDecoder()); |
使用日志
1 | // 不同位置输出的内容不同 |
优化使用
整改线程模型
对于 IO 密集型应用,我们通常需要整改线程模型:独立出“线程池”来处理业务(处理业务的线程不再与 NioEventLoop 共享)。
1 | // 对于 IO 密集型应用,独立出“线程池”来处理业务(这里不能使用 NioEventLoopGroup,不然只会使用到 1 个线程) |
增强写,延迟与吞吐量的抉择
在前面的点单示例中,我们调用的是ctx.writeAndFlush(msg)
:写数据之后立刻发送出去,这样虽然延迟降低了,但是吞吐量又会受影响下降。如果我们是要吞吐量优先,那么有下面两种改进方式。
- 利用 channelReadComplete
就像我们在 echo 示例中演示的这样:
1 | /** |
不过使用 channelReadComplete 也有弊端:
不适合异步业务线程(不复用 Nio event loop)处理
channelRead 中的业务处理结果的 write 很可能发生在 channelReadComplete 之后。不适合更精细的控制
例如连续读 16 次时,第 16 次是 flush,但是如果保持连续的次数不变,如何做到 3 次就 flush ?
- 使用 flushConsolidationHandler
1
2
3
4
5// 读 5 次之后才 flush,并开启异步增强
pipeline.addLast(new FlushConsolidationHandler(5, true));
// 业务处理 Handler 放到最后添加
pipeline.addLast(eventExecutorGroup, new OrderServerProcessHandler());
流量整形
1 | // 全局流量整形 |
为不同平台开启 native
Netty 针对不同平台都做了一定的优化,如果我们想切换到特定平台,也是非常方便的。
- 修改代码
- NioServerSocketChannel -> [Prefix]ServerSocketChannel
- NioEventLopGroup -> [Prefix]EventLopGroup
- NioChannelOption -> [Prefix]ChannelOption
- 准备好 native 库
黑白名单
1 | // 黑白名单过滤 |
自定义授权
1 | /** |
Server 端:
1 | // 认证处理 |
Client 端:
1 | // 做了认证处理,要求第一个操作是认证操作 |
Netty开发实践-上
Maven 依赖
1 | <dependency> |
简单示例
Echo 示例
我们先用官方示例 echo 来简单了解下如何进行 Netty 开发。该示例类似我们打兵乓球一样,把一个消息从客户端发送到服务器,服务器接收消息后,再把该消息返回给客户端,循环往复。
服务器端代码
1 | /** |
客户端代码
1 | /** |
Hello Word 示例
完成入门级示例后,我们再来了解下,如何使用 Netty 来实现类似 Spring MVC 的 Web 服务。下面的示例同样来自官方,其实现功能为当我们访问“http://127.0.0.1:8080/”时,服务器会返回“Hello,Word”给浏览器。
服务器端代码
1 | /** |
可能大家注意到了,这里没有对客户端编程,确实,因为这里的客户端就是我们的浏览器。
网络应用程序编程基本步骤
上面两个示例都非常简单,接下来我们会模拟真实订单场景(当然还是很简单的),以此来了解 Netty 更多的特性。但是在具体示例之前,我们有必要先了解下网络应用程序编程基本步骤。
点单示例
Maven 依赖
由于用到了 Json 编解码,和 guava 中的一些工具类,因此除了 Netty 依赖外,还需要添加以下依赖:
1 | <dependency> |
案例介绍
在本示例中,我们会模拟饭店点单:在哪一桌,要吃什么菜。因为示例本身是为了让我们了解 Netty 而非业务,所以本示例的业务非常简单,就是发送客户端一个订单操作给服务器,服务器收到请求后,再返回订单结果给客户端。
如上图,我们处理 OrderOperation 之外,还定义了 AuthOperation 和 KeepOperation,这是为了演示 Netty 的高级特性。
数据结构设计
消息对象数据结构:
- 消息体由消息头和消息体组成
- 消息头由版本号(version)、操作码(opCode)和消息id(streamId)组成
- 消息体使用 Json 编解码,既可以是操作对象(operation),也可以是操作结果对象(operation result)
- 传输协议使用 TCP,因此要解决粘包/半包问题,因此消息对象需要指定 length
UML:
- 消息对象基类 BaseMessage 有两个属性:消息头对象 MessageHeader 和 消息体对象 BaseMessageBody 基类,消息体对象具体类型由泛型 T 指定
- 操作对象基类 BaseOperation 和操作结果对象基类 BaseOperationResult 都继承自消息体对象 BaseMessageBody 基类
- 操作对象基类 BaseOperation 有 3 个子类:点单操作对象 OrderOperation、认证操纵对象 OrderOperation 和心跳检测操作对象 KeepaliveOperation
- 操作结果对象 BaseOperationResult 基类有 3 个子类:点单操作结果对象 OrderOperationResult、认证操纵结果对象 OrderOperationResult 和心跳检测操作结果对象 KeepaliveOperationResult,分别对应各自的操作对象
- 消息对象基类 BaseMessage 有 2 个子类:请求对象 RequestMessage 和响应对象 ResponseMessage。
服务器端编程
编解码
一次解码器 OrderFrameDecoder:处理粘包/半包
1
2
3
4
5
6
7
8
9/**
* 一次解码器:处理粘包/半包
*/
public class OrderFrameDecoder extends LengthFieldBasedFrameDecoder {
public OrderFrameDecoder() {
super(Integer.MAX_VALUE, 0, 2, 0, 2);
}
}一次编码器 OrderFrameEncoder:处理粘包/半包
1
2
3
4
5
6
7
8
9/**
* 一次编码码器:处理粘包/半包
*/
public class OrderFrameEncoder extends LengthFieldPrepender {
public OrderFrameEncoder() {
super(2);
}
}二次解码器 OrderProtocolDecoder:将 ByteBuf 解析为 RequestMessage
1
2
3
4
5
6
7
8
9
10
11
12
13/**
* 二次解码器:将 {@link ByteBuf} 解析为 {@link RequestMessage}
*/
public class OrderProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {
RequestMessage requestMessage = new RequestMessage();
requestMessage.decode(msg);
out.add(requestMessage);
}
}二次编码器 OrderProtocolEncoder:将 ResponseMessage 解析为 ByteBuf
1
2
3
4
5
6
7
8
9
10
11
12
13/**
* 二次编码器:将 {@link ResponseMessage} 解析为 {@link ByteBuf}
*/
public class OrderProtocolEncoder extends MessageToMessageEncoder<ResponseMessage> {
@Override
protected void encode(ChannelHandlerContext ctx, ResponseMessage msg, List<Object> out) {
ByteBuf byteBuf = ctx.alloc().buffer();
msg.encode(byteBuf);
out.add(byteBuf);
}
}
业务处理
1 | /** |
读监测
1 | /** |
认证处理
1 | /** |
服务器 Server
1 | /** |
客户端编程
编解码
一次解码器 OrderFrameDecoder:处理粘包/半包
1
2
3
4
5
6
7
8
9/**
* 一次解码器:处理粘包/半包
*/
public class OrderFrameDecoder extends LengthFieldBasedFrameDecoder {
public OrderFrameDecoder() {
super(Integer.MAX_VALUE, 0, 2, 0, 2);
}
}一次编码器 OrderFrameEncoder:处理粘包/半包
1
2
3
4
5
6
7
8
9/**
* 一次编码码器:处理粘包/半包
*/
public class OrderFrameEncoder extends LengthFieldPrepender {
public OrderFrameEncoder() {
super(2);
}
}二次解码器 OrderProtocolDecoder:将 ByteBuf 解析为 ResponseMessage
1
2
3
4
5
6
7
8
9
10
11
12
13/**
* 二次解码器:将 {@link ByteBuf} 解析为 {@link ResponseMessage}
*/
public class OrderProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {
ResponseMessage responseMessage = new ResponseMessage();
responseMessage.decode(msg);
out.add(responseMessage);
}
}二次编码器 OrderProtocolEncoder:将 RequestMessage 解析为 ByteBuf
1
2
3
4
5
6
7
8
9
10
11
12
13/**
* 二次码码器:将 {@link RequestMessage} 解析为 {@link ByteBuf}
*/
public class OrderProtocolEncoder extends MessageToMessageEncoder<RequestMessage> {
@Override
protected void encode(ChannelHandlerContext ctx, RequestMessage msg, List<Object> out) {
ByteBuf byteBuf = ctx.alloc().buffer();
msg.encode(byteBuf);
out.add(byteBuf);
}
}二次编码器 OperationToRequestMessageEncoder,将 BaseOperation 转换为 RequestMessage,以实现自动添加消息 id
1
2
3
4
5
6
7
8
9
10
11/**
* 二次编码器:将 {@link BaseOperation} 转换为 {@link RequestMessage},以实现自动添加消息 id
*/
public class OperationToRequestMessageEncoder extends MessageToMessageEncoder<BaseOperation> {
@Override
protected void encode(ChannelHandlerContext ctx, BaseOperation msg, List<Object> out) {
RequestMessage requestMessage = new RequestMessage(IdUtil.nextId(), msg);
out.add(requestMessage);
}
}
业务处理
1 | /** |
写监测
1 | /** |
客户端 Client
1 | /** |
基于 Docker 搭建 Mysql 集群
1 | docker run -d ` |
Netty IO模型
IO 模型
IO 模型主要分类:
同步(synchronous)IO 和异步(asynchronous)IO
阻塞(blocking)IO 和非阻塞(non-blocking)IO
同步阻塞(blocking-IO)简称 BIO
同步非阻塞(non-blocking-IO)简称 NIO
异步非阻塞(synchronous-non-blocking-IO)简称 AIO
同步 VS 异步
同步:发送一个请求,等待返回,再发送下一个请求,同步可以避免出现死锁,脏读的发生。
异步:发送一个请求,不等待返回,随时可以再发送下一个请求,可以提高效率,保证并发。
简述:数据就绪后需要自己去读是同步,数据就绪直接读好再回调给程序是异步。
阻塞 VS 非阻塞
阻塞:传统的 IO 流都是阻塞式的。也就是说,当一个线程调用 read()或者 write()方法时,该线程将被阻塞,直到有一些数据读读取或者被写入,在此期间,该线程不能执行其他任何任务。在完成网络通信进行 IO 操作时,由于线程会阻塞,所以服务器端必须为每个客户端都提供一个独立的线程进行处理,当服务器端需要处理大量的客户端时,性能急剧下降。
非阻塞:Java NIO 是非阻塞式的。当线程从某通道进行读写数据时,若没有数据可用时,该线程会去执行其他任务。线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行 IO 操作,所以单独的线程可以管理多个输入和输出通道。因此 NIO 可以让服务器端使用一个或有限几个线程来同时处理连接到服务器端的所有客户端。
简述:没有数据传过来时,读会阻塞直到有数据;缓冲区满时,写操作也会阻塞,这就是阻塞。非粗赛遇到这些情况,都是直接返回。
举例说明
以我们去饭店吃饭为例:
- BIO:食堂排队打饭模式,排队在窗口,打好才走;
- NIO:等待被叫模式,等待被叫,好了自己去端;
- AIO:包厢模式,点单后菜直接被端上桌。
IO VS NIO
IO | NIO |
---|---|
面向流(Stream Oriented) | 面向缓冲区(Buffer Oriented) |
阻塞 IO(Blocking IO) | 非阻塞 IO(Non Blocking IO) |
无 | 选择器(Selectors) |
BIO VS NIO VS AIO
BIO | NIO | AIO |
---|---|---|
Thread-Per-Connection | Reactor | Proactor |
BIO、NIO、AIO 适用场景
- BIO 方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4 以前的唯一选择。
- NIO 方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局限于应用中,编程比较复杂。
- AIO方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用 OS 参与并发操作,编程比较复杂,JDK7 开始支持。
多路复用 IO(IO Multiplexing)
即经典的 Reactor 设计模式,有时也称为异步阻塞 IO,Java 中的 Selector 和 Linux 中的 epoll 都是这种模型。
Reactor 模式
Reactor 模式的核心流程:注册感兴趣的事件 -> 扫描是否有感兴趣的事件发生 -> 事件发生后做出响应的处理。
Client/Server | SocketChannel/ServerSocketChannel | OP_ACCEPT | OP_CONNECT | OP_WRITE | OP_READ |
---|---|---|---|---|---|
client | SocketChannel | Y | Y | Y | |
server | ServerSocketChannel | Y | |||
server | SocketChannel | Y | Y |
Reactor 模式有三种实现方式:Reactor 单线程、Reactor 多线程模式、Reactor 主从模式。
- Reactor 单线程模式
每个客户端发起连接请求都会交给 acceptor,acceptor 根据事件类型交给线程 handler 处理,注意 acceptor 处理和 handler 处理都在一个线程中处理,所以其中某个 handler 阻塞时,会导致其他所有的 client 的 handler 都得不到执行,并且更严重的是,handler 的阻塞也会导致整个服务不能接收新的 client 请求(因为 acceptor 也被阻塞了)。 因为有这么多的缺陷,因此单线程 Reactor 模型用的比较少。
- Reactor 多线程模式
有专门一个线程,即 Acceptor 线程用于监听客户端的TCP连接请求。
客户端连接的 IO 操作都是由一个特定的 NIO 线程池负责。每个客户端连接都与一个特定的 NIO 线程绑定,因此在这个客户端连接中的所有 IO 操作都是在同一个线程中完成的。
客户端连接有很多,但是 NIO 线程数是比较少的,因此一个 NIO 线程可以同时绑定到多个客户端连接中。
缺点:如果我们的服务器需要同时处理大量的客户端连接请求或我们需要在客户端连接时,进行一些权限的检查,那么单线程的 Acceptor 很有可能就处理不过来,造成了大量的客户端不能连接到服务器。
- Reactor 主从模式
举例说明:
以饭店规模变化为例:
- Reactor 单线程模式:一个人包揽所有,迎宾、点菜、做饭、上菜、送客等;
- Reactor 多线程模式:多招几个伙计,大家一起做上面的事情;
- Reactor 主从模式:进一步分工,搞一个或多个人专门做迎宾。
核心概念
- 缓冲区 Buffer
Buffer 是一个对象。它包含一些要写入或者读出的数据。
在面向流的 I/O 中,可以将数据写入或者将数据直接读到 Stream 对象中。在 NIO 中,所有的数据都是用缓冲区处理。这也就是很多博客说,IO 是面向流的,NIO 是面向缓冲区的。
缓冲区实质是一个数组,通常它是一个字节数组(ByteBuffer),也可以使用其他类的数组。但是一个缓冲区不仅仅是一个数组,缓冲区提供了对数据的结构化访问以及维护读写位置(limit)等信息。
最常用的缓冲区是 ByteBuffer,一个 ByteBuffer 提供了一组功能于操作 byte 数组。除了 ByteBuffer,还有其他的一些缓冲区,事实上,每一种Java基本类型(除了Boolean)都对应一种缓冲区,具体如下:
- ByteBuffer:字节缓冲区
- CharBuffer:字符缓冲区
- ShortBuffer:短整型缓冲区
- IntBuffer:整型缓冲区
- LongBuffer:长整型缓冲区
- FloatBuffer:浮点型缓冲区
- DoubleBuffer:双精度浮点型缓冲区
- 通道 Channel
Channel 是一个通道,可以通过它读取和写入数据,它就像自来水管一样,网络数据通过 Channel 读取和写入。
通道和流不同之处在于通道是双向的,流只是在一个方向移动,而且通道可以用于读,写或者同时用于读写。因为 Channel 是全双工的,所以它比流更好地映射底层操作系统的 API,特别是在 UNIX 网络编程中,底层操作系统的通道都是全双工的,同时支持读和写。
Channel 有四种实现:
- FileChannel:文件中读取数据。
- DatagramChannel:从 UDP 网络中读取或者写入数据。
- SocketChannel:从 TCP 网络中读取或者写入数据。
- ServerSocketChannel:允许你监听来自 TCP 的连接,就像服务器一样。每一个连接都会有一个 SocketChannel 产生。这句话的意思就是,ServerSocketChannel 用于服务端,SocketChannel 用于客户端。
- 多路复用器 Selector
Selector 会不断轮询注册在上面的 Channel,如果某个 Channel 上面有新的 TCP 连接接入、读或写事件,这个channel就处于就绪状态,会被 Selector 轮询出来,然后通过 SelectionKey 可以获取 Channel 的集合,进行后续的 I/O 操作。一个多路复用器 Selector 可以同时轮询多个 Channel,由于 JDK 使用了 epoll() 代替传统的 select 实现,所以它没有最大连接句柄 1024/2048 的限制。这就意味着只需一个线程负责 Selector 的轮询,就可以接入成千上万的客户端。
Netty IO 模型
目前 Netty 只支持 NIO 模型。
参考资料
Netty 架构设计
Netty 简介
Netty 是一个广受欢迎的异步的、事件驱动的、基于NIO实现的 Java 开源网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。
JDK 原生 NIO 程序的问题
JDK 原生也有一套网络应用程序 API,但是存在一系列问题,主要如下:
NIO 的类库和 API 繁杂,使用麻烦:你需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer 等。
需要具备其他的额外技能做铺垫:例如熟悉 Java 多线程编程,因为 NIO 编程涉及到 Reactor 模式,你必须对多线程和网路编程非常熟悉,才能编写出高质量的 NIO 程序。
可靠性能力补齐,开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常码流的处理等等。NIO 编程的特点是功能开发相对容易,但是可靠性能力补齐工作量和难度都非常大。
JDK NIO 的 Bug:例如臭名昭著的 Epoll Bug,它会导致 Selector 空轮询,最终导致 CPU 100%。官方声称在 JDK 1.6 版本的 update 18 修复了该问题,但是直到 JDK 1.7 版本该问题仍旧存在,只不过该 Bug 发生概率降低了一些而已,它并没有被根本解决。
Netty 框架的优势
- API使用简单,开发门槛低;
- 功能强大,预置了多种编解码功能,支持多种主流协议;
- 定制能力强,可以通过 ChannelHandler 对通信框架进行灵活地扩展;
- 性能高,通过与其他业界主流的 NIO 框架对比,Netty 的综合性能最优;
- 成熟、稳定,Netty 修复了已经发现的所有 JDK NIO BUG,业务开发人员不需要再为 NIO 的 BUG 而烦恼;
- 社区活跃,版本迭代周期短,发现的BUG可以被及时修复,同时,更多的新功能会加入;
- 经历了大规模的商业应用考验,质量得到验证。在互联网、大数据、网络游戏、企业应用、电信软件等众多行业得到成功商用,证明了它已经完全能够满足不同行业的商业应用。
Netty 的应用场景
互联网行业:在分布式系统中,各个节点之间需要远程服务调用,高性能的 RPC 框架必不可少,Netty 作为异步高性能的通信框架,往往作为基础通信组件被这些 RPC 框架使用。典型的应用有:阿里分布式服务框架 Dubbo 的 RPC 框架使用 Dubbo 协议进行节点间通信,Dubbo 协议默认使用 Netty 作为基础通信组件,用于实现各进程节点之间的内部通信。
游戏行业:无论是手游服务端还是大型的网络游戏,Java 语言得到了越来越广泛的应用。Netty 作为高性能的基础通信组件,它本身提供了 TCP/UDP 和 HTTP 协议栈。非常方便定制和开发私有协议栈,账号登录服务器,地图服务器之间可以方便的通过 Netty 进行高性能的通信。
大数据领域:经典的 Hadoop 的高性能通信和序列化组件 Avro 的 RPC 框架,默认采用 Netty 进行跨界点通信,它的 Netty Service 基于 Netty 框架二次封装实现。
企业软件:企业和 IT 集成需要 ESB,Netty 对多协议支持、私有协议定制的简洁性和高性能是 ESB RPC 框架的首选通信组件。事实上,很多企业总线厂商会选择 Netty 作为基础通信组件,用于企业的 IT 集成。
通信行业:Netty 的异步高性能、高可靠性和高成熟度的优点,使它在通信行业得到了大量的应用。
Netty 在开源框架中的应用
- 数据库:Cassandra
- 大数据处理:Spark、Hadoop
- 消息队列:RocketMQ
- 搜索引擎:Elasticsearch
- 框架:gRPC、Apache Dubbo、Spring5(WebFlux)
- 分布式协调器:ZooKeeper
- 工具类:async-http-client
- 其它参考:https://netty.io/wiki/adopters.html