人的知识就好比一个圆圈,圆圈里面是已知的,圆圈外面是未知的。你知道得越多,圆圈也就越大,你不知道的也就越多。

0%

  1. 创建 network

    1
    docker network create --driver bridge --subnet 172.22.0.0/16 --gateway 172.22.0.1  op_net
  2. 创建 elasticsearch.yml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    version: '3.6'

    services:
    cerebro:
    image: lmenezes/cerebro
    restart: always
    container_name: cerebro
    ports:
    - 9000:9000
    command:
    - -Dhosts.0.host=http://elasticsearch:9200
    networks:
    default:
    ipv4_address: 172.22.0.24
    kibana:
    image: kibana
    restart: always
    container_name: kibana
    environment:
    - I18N_LOCALE=zh-CN
    - TIMELION_ENABLED=true
    - XPACK_GRAPH_ENABLED=true
    - XPACK_MONITORING_COLLECTION_ENABLED="true"
    ports:
    - 5601:5601
    networks:
    default:
    ipv4_address: 172.22.0.25
    elasticsearch:
    image: elasticsearch
    restart: always
    container_name: es_hot
    environment:
    - cluster.name=es_cluster
    - node.name=es_hot
    - node.attr.box_type=hot
    - bootstrap.memory_lock=true
    - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    - discovery.seed_hosts=es_hot,es_warm,es_cold
    - cluster.initial_master_nodes=es_hot,es_warm,es_cold
    ulimits:
    memlock:
    soft: -1
    hard: -1
    volumes:
    - es_data_hot:/usr/share/elasticsearch/data
    ports:
    - 9200:9200
    networks:
    default:
    ipv4_address: 172.22.0.21
    elasticsearch2:
    image: elasticsearch
    restart: always
    container_name: es_warm
    environment:
    - cluster.name=es_cluster
    - node.name=es_warm
    - node.attr.box_type=warm
    - bootstrap.memory_lock=true
    - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    - discovery.seed_hosts=es_hot,es_warm,es_cold
    - cluster.initial_master_nodes=es_hot,es_warm,es_cold
    ulimits:
    memlock:
    soft: -1
    hard: -1
    volumes:
    - es_data_warm:/usr/share/elasticsearch/data
    networks:
    default:
    ipv4_address: 172.22.0.22
    elasticsearch3:
    image: elasticsearch
    restart: always
    container_name: es_cold
    environment:
    - cluster.name=es_cluster
    - node.name=es_cold
    - node.attr.box_type=cold
    - bootstrap.memory_lock=true
    - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    - discovery.seed_hosts=es_hot,es_warm,es_cold
    - cluster.initial_master_nodes=es_hot,es_warm,es_cold
    ulimits:
    memlock:
    soft: -1
    hard: -1
    volumes:
    - es_data_cold:/usr/share/elasticsearch/data
    networks:
    default:
    ipv4_address: 172.22.0.23


    volumes:
    es_data_hot:
    driver: local
    es_data_warm:
    driver: local
    es_data_cold:
    driver: local

    networks:
    default:
    external:
    name: op_net
  3. 启动 Elasticsearch 集群

    1
    docker-compose -f elasticsearch.yml up -d

何为心跳

顾名思义,所谓心跳,即在 TCP 长连接中,客户端和服务器之间定期发送的一种特殊的数据包,通知对方自己还在线,以确保 TCP 连接的有效性。

为什么需要心跳

因为网络的不可靠性,有可能在 TCP 保持长连接的过程中,由于某些突发情况,例如网线被拔出,突然掉电等,会造成服务器和客户端的连接中断。
在这些突发情况下,如果恰好服务器和客户端之间没有交互的话,那么它们是不能在短时间内发现对方已经掉线的。
为了解决这个问题,我们就需要引入心跳机制。
心跳机制的工作原理是:在服务器和客户端之间一定时间内没有数据交互时,即处于 idle 状态时,客户端或服务器会发送一个特殊的数据包给对方,当接收方收到这个数据报文之后,也立即发送一个特殊的数据报文,回应发送方,此即一个 PING-PONMG 交互。自然地,当某一端收到心跳消息后,就知道了对方仍然在线,这就确保 TCP 连接的有效性。

如何实现心跳

我们可以通过两种方式实现心跳机制:

  • 使用 TCP 协议层里面的 keepalive 机制
  • 在应用层上实现自定义的心跳机制(在 Netty 中即为 Idle check)

虽然在 TCP 协议层上,提供了 keepalive 保活机制,但是使用它有几个缺点:

  1. 它不是 TCP 的标准协议,并且默认时关闭的。
  2. TCP keepalive 机制依赖于操作系统的实现,默认的 keepalive 心跳时间是两个小时,并且对 keepalive 的修改需要系统调用(或者修改系统配置),灵活性不够。
  3. TCP keepalive 与 TCP 协议绑定,因此如果需要更换为 UDP 协议时,keepalive 机制就失效了。

虽然使用 TCP 层面的 keepalive 机制比自定义的应用层心跳机制节省流量, 但是基于上面的几点缺点, 一般的实践中, 人们大多数都是选择在应用层上实现自定义的心跳。

TCP keepalive

TCP keepalive 核心参数:

  • net.ipv4.tcp_keepalive_time = 7200
  • net.ipv4.tcp_keepalive_intl = 75
  • net.ipv4.tcp_keepalive_probes = 9
    当启用(默认关闭)keepalive 时,TCP 在连接没有数据通过的 7200 秒后发送 keepalive 消息,当探测没有回复时按 75 秒的重试频率重发,一直发 9 个探测包都没有确认时连接失败,所以总耗时一般为:2 小时 11 分(7200 秒 + 75 * 9)。

Idle check

Idle 监测,只是负责诊断,诊断后,做出不同的行为,决定 Idle 监测的最终用途。

  • 发送 keepalive(区别于 TCP keepalive):在有其它数据传输的时候,不发送 keepalive,无数据传输超过一定时间,判定为 Idle,再发 keepalive。
  • 直接关闭连接:快速释放损坏的、恶意的、很久不用的连接,让系统时刻保持最好的状态;简单粗暴,客户端可能需要重连。
    实际应用中:结合起来使用。按需 keepalive,保证不会空闲,如果空闲,关闭连接。

Netty 中开启心跳监测

Server 端开启 TCP keepalive

  • bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true)
  • bootstrap.childOption(NioChannelOption.SO_KEEPALIVE, true)

注意,是 .childOption(ChannelOption.SO_KEEPALIVE, true) 而不是 .option(ChannelOption.SO_KEEPALIVE, true),虽然可以设置后者,但是无效。

开启 Idle Check

要开启 Idle Check,关键是继承 IdleStateHandler。实例化一个 IdleStateHandler 需要提供 3 个参数:

  • readerIdleTimeSeconds:读超时. 即当在指定的时间间隔内没有从 Channel 读取到数据时, 会触发一个 READER_IDLE 的 IdleStateEvent 事件。
  • writerIdleTimeSeconds:写超时. 即当在指定的时间间隔内没有数据写入到 Channel 时, 会触发一个 WRITER_IDLE 的 IdleStateEvent 事件。
  • allIdleTimeSeconds:读/写超时. 即当在指定的时间间隔内没有读或写操作时, 会触发一个 ALL_IDLE 的 IdleStateEvent 事件。

参考资料

  1. 浅析 Netty 实现心跳机制与断线重连

为什么需要二次编解码

假设我们把解决粘包半包问题的常用三种解码器叫做一次解码器,那么我们在项目中,除了可选的压缩解压缩之外,还需要一层解码,因为一次解码的结果是字节,需要和项目中所使用的对象做转换,方便使用,这层解码器可以成为“二次解码器”,相应的,对应的编码器是为了将 Java 对象转化成字节流方便存储或传输。

  • 一次解码器:ByteToMessageDecoder
    io.netty.buffer.ByteBuf(原始数据流) -> io.netty.buffer.ByteBuf(用户数据)

  • 二次解码器:MessageToMessageDecoder<I>
    io.netty.buffer.ByteBuf(用户数据) -> Java Object

常用二次编码码方式

  • Java序列化:基本已经没人使用,因为它占用的空间比较大,且只有在 Java 中能够使用
  • Marshaling
  • XML:占用空间比较大
  • JSON
  • MessagePack
  • ProtoBuf:灵活的、高效的用于序列化数据的协议;相较于 XML 和 JSON 格式,Protobuf 更小、更快、更便捷;Protobuf 是跨语言的,并且自带了一个编译器(protoc),只需要用它进行编译,可以自动生成 Java、Python、C++ 等代码,不需要再写其它代码。
  • 其它

编解码方式选择依据

  • 空间:编码后占用空间,需要比较不同的数据大小情况
  • 时间:编解码速度,需要比较不同的数据大小情况
  • 可读性
  • 多语言的支持:重要

Netty 对常用编解码方式的支持

  1. 相关类都在包 io.netty.handler.codec
  2. 支持 base64、bytes、compression、json、marshaling、protobuf、serialization、string、xml等
  3. 编码器继承自 MessageToMessageEncoder<I>,解码器继承自 MessageToMessageDecoder<I>

什么是粘包/半包

比如客户端向服务器端发送了两条消息,分别是“ABC”、“DEF”,由于 TCP 是流式协议,服务器端可能会一次性接收到这两条消息“ABCDEF”,也可能会分 3、4次才接收到这两条消息,如分 3 次分别接收“AB”、“CD”、“EF”,前面一次性接收就是粘包,后面分 3、4 接收就是半包

粘包/半包主要原因

粘包的主要原因:

  • 发送方每次写入数据 < 套接字缓冲区大小
  • 接收方读取套接字缓冲区数据不够及时

半包的主要原因:

  • 发送方每次写入数据 > 套接字缓冲区大小
  • 发送的数据大于协议的 MTU(Maximum Transmission Unit,最大传输单元),必须拆包

从收发角度看:一次发送可能被多次接收(半包),多个发送可能被一次接收(粘包)
从传输角度看:一个发送可能占用多个传输包(半包),多个发送可能公用一个传输包(粘包)

根本原因:TCP 协议是流式协议,消息无边界。(UDP 像邮寄的包裹,虽然一次运输多个,但每个包裹都有“界限”,是一个一个签收的,所以使用 UDP 协议不会出现粘包/半包问题)

粘包/半包解决思路

要解决 TCP 粘包/半包问题,根本手段在于找出消息边界,下表列举了一些常见的解决方式。

方式\比较 寻找消息边界的方式 优点 缺点 推荐度
TCP 连接改成短连接,一个请求一个短连接 建立连接到释放连接之间的信息即为传输信息 简单 效率低下 不推荐
封装成帧(Framing):固定长度 满足固定长度即可 简单 空间浪费 不推荐
封装成帧(Framing):分隔符 分隔符之间 空间不浪费,也比较简单 内容本身出现分隔符时需转义,所以需要扫描内容 推荐
封装成帧(Framing):固定长度字段存内容的长度信息 先解析固定长度的字段获取长度,然后读取后续内容 精确定位用户数据,内容也不用转义 长度理论上有限制,需提前预知可能的最大长度从而定义长度占用字节数 推荐
封装成帧(Framing):其它方式 每种都不同,例如 JSON 可以看 {} 是否已经成对

Netty 对三种常用封帧方式的支持

方式\支持 解码 编码
固定长度 FixedLengthFrameDecoder 简单
分隔符 DelimiterBasedFrameDecoder 简单
固定长度字段存内容的长度信息 LengthFieldBasedFrameDecoder LengthFieldPrepender

上面三种解码器都继承自 ByteToMessageDecoder。

下面以“固定长度字段存内容的长度信息”这种方式来演示如何在 Netty 中使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 一次解码器:处理粘包/半包
*/
public class OrderFrameDecoder extends LengthFieldBasedFrameDecoder {

public OrderFrameDecoder() {
super(Integer.MAX_VALUE, 0, 2, 0, 2);
}
}

/**
* 一次编码器:处理粘包/半包
*/
public class OrderFrameEncoder extends LengthFieldPrepender {

public OrderFrameEncoder() {
super(2);
}
}

Netty开发实践-上中,我们通过一些示例介绍了如何使用 Netty 进行服务器端/客户端编程,虽然我在示例代码中都做了注释,但是为了更深入的掌握 Netty 编程,接下来我们再做一些知识点梳理,其中有些知识点在前面已经用到了,有些知识点没有用到但仍是我们需要了解的。

IO 模型切换

我们知道,IO 模型分为:BIO、NIO 和 AIO 三种。在 Netty 中,AIO 相关代码已经移除,BIO 相关代码虽然保留,但是已经不再推荐,也就是说,在以后的 Netty 中,我们可能只能以 NIO 的方式进行编程。
在之前的示例代码中,我们选择的都是 NIO 模型,如下面这样:

服务器端:

1
2
3
4
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

ServerBootstrap b = new ServerBootstrap().channel(NioServerSocketChannel.class)...

客户端:

1
2
3
EventLoopGroup group = new NioEventLoopGroup();

Bootstrap bootstrap = new Bootstrap().channel(NioSocketChannel.class)...

那如果我们一定要用 BIO 模型,又该怎么切换呢?跟简单,只需要调整相关类名即可:

服务器端:

1
2
3
4
EventLoopGroup bossGroup = new OioEventLoopGroup(1);
EventLoopGroup workerGroup = new OioEventLoopGroup();

ServerBootstrap serverBootstrap = new ServerBootstrap().channel(OioServerSocketChannel.class)...

客户端:

1
2
3
EventLoopGroup group = new OioEventLoopGroup();

Bootstrap bootstrap = new Bootstrap().channel(OioSocketChannel.class)...

Reactor 模式切换

在 NIO 模型中,它使用开发模式我们称之为 Reactor 模式。Reactor 模式分为:单线程 Reactor 模式、多线程 Reactor 模式 和 主从 Reactor 模式 三种。
在之前的示例代码中,我们选择的都是 主从 Reactor 模式,如下面这样:

1
2
3
4
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup)...

那如果我们想要使用单线程 Reactor 模式或 多线程 Reactor 模式,又该怎么切换呢?跟简单,只需要调整相关参数即可:

使用单线程 Reactor 模式:

1
2
3
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);

ServerBootstrap serverBootstrap = new ServerBootstrap().group(eventLoopGroup)...

使用多线程 Reactor 模式:

1
2
3
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

ServerBootstrap serverBootstrap = new ServerBootstrap().group(eventLoopGroup)...

粘包/半包处理

为了处理 TCP 的粘包/半包问题,Netty 提供了三种常用的封帧方式:

方式\支持 解码 编码
固定长度 FixedLengthFrameDecoder 简单
分隔符 DelimiterBasedFrameDecoder 简单
固定长度字段存内容的长度信息 LengthFieldBasedFrameDecoder LengthFieldPrepender

上面三种解码器都继承自 ByteToMessageDecoder。

在之前的示例代码中,我们选择的都是“固定长度字段存内容的长度信息”方式,如下面这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 一次解码器:处理粘包/半包
*/
public class OrderFrameDecoder extends LengthFieldBasedFrameDecoder {

public OrderFrameDecoder() {
super(Integer.MAX_VALUE, 0, 2, 0, 2);
}
}

/**
* 一次编码器:处理粘包/半包
*/
public class OrderFrameEncoder extends LengthFieldPrepender {

public OrderFrameEncoder() {
super(2);
}
}

由于另外两种都不常用,这里就不演示怎么切换了。

序列化/反序列化

Netty 对一些常用的序列化/反序列化方式都提供了支持,如 base64、bytes、compression、json、marshaling、protobuf、serialization、string、xml 等,编码器都继承自 MessageToMessageEncoder<I>,解码器都继承自 MessageToMessageDecoder<I>

在之前的示例代码中,我们选择的都是 json 方式,如下面这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
/**
* 二次解码器:将 {@link ByteBuf} 解析为 {@link RequestMessage}
*/
public class OrderProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {
RequestMessage requestMessage = new RequestMessage();
requestMessage.decode(msg);

out.add(requestMessage);
}
}

/**
* 二次编码器:将 {@link ResponseMessage} 解析为 {@link ByteBuf}
*/
public class OrderProtocolEncoder extends MessageToMessageEncoder<ResponseMessage> {

@Override
protected void encode(ChannelHandlerContext ctx, ResponseMessage msg, List<Object> out) {
ByteBuf byteBuf = ctx.alloc().buffer();
msg.encode(byteBuf);

out.add(byteBuf);
}
}

/**
* 消息对象基类
*/
@Data
public abstract class BaseMessage<T extends BaseMessageBody> {
private MessageHeader header;
private T body;

/**
* 编码消息
*
* @param byteBuf 存储编码后的消息
*/
public void encode(ByteBuf byteBuf) {
byteBuf.writeInt(header.getVersion());
byteBuf.writeLong(header.getStreamId());
byteBuf.writeInt(header.getOpcode());
byteBuf.writeBytes(JsonUtil.toJson(body).getBytes());
}

/**
* 根据 opcode 获得对应的 MessageBody
*
* @param opcode 操作码
* @return 消息体
*/
public abstract Class<T> getMessageBodyDecodeClass(int opcode);

/**
* 解码
*
* @param msg 解码前的消息
*/
public void decode(ByteBuf msg) {
int version = msg.readInt();
long streamId = msg.readLong();
int opcode = msg.readInt();

this.header = MessageHeader.builder()
.version(version)
.streamId(streamId)
.opcode(opcode)
.build();

this.body = JsonUtil.fromJson(msg.toString(StandardCharsets.UTF_8), getMessageBodyDecodeClass(opcode));
}
}

在其它官方示例中,演示了如果使用其它方式的编解码,如在 “Word Clock”示例中,演示了如何使用 protobuf,大家可以自行查看相关代码。

空闲监测

在应用程序运行中,为了避免一些不怀好意或者无所事事的客户端一直和我们的应用程序保持连接,从而给我们造成资源浪费,因此一般我们都会进行空闲监测。
在 Netty 中,实现空闲监测的核心类是 IdleStateHandler。在前面的订单示例中,我们已经用到了这个类,现在再来回顾下是如何使用它的。

服务端读监测:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* Read Idle Handler
* 服务器 10s 接收不到 channel 的请求就断掉连接:保护自己、瘦身(及时清理空闲的连接)
*/
@Slf4j
public class ServerIdleCheckHandler extends IdleStateHandler {

public ServerIdleCheckHandler() {
super(10, 0, 0, TimeUnit.SECONDS);
}

@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
// 如果是第一次 read idle,就断掉连接
if (evt == IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT) {
log.info("Idle check happen, so close the connection");
ctx.close();

// 因此做了自定义处理,就不再触发该事件
return;
}

// 如果是其它事件,保持触发
super.channelIdle(ctx, evt);
}
}

客户端写监测:

1
2
3
4
5
6
7
8
9
10
11
/**
* Write Idle Handler
* 客户端 5s 不发送数据就触发一个写空闲事件
* 配合{@link KeepaliveHandler}使用,以避免连接被断,同时启用不频繁的 keepalive
*/
public class ClientIdleCheckHandler extends IdleStateHandler {

public ClientIdleCheckHandler() {
super(0, 5, 0, TimeUnit.SECONDS);
}
}

客户端写超时,发送 keepalive:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 捕捉写空闲事件:发一个 keepalive
* 配合{@link ClientIdleCheckHandler}使用,以避免连接被断,同时启用不频繁的 keepalive
*
* 因为不涉及内存共享,所以设置为可共享的{@link io.netty.channel.ChannelHandler.Sharable}
*/
@Slf4j
@ChannelHandler.Sharable
public class KeepaliveHandler extends ChannelDuplexHandler {

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 捕捉第一次写空闲事件,发送 keepalive
if (evt == IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT) {
log.info("Write Idle happen, so need to send keepalive to keep connection not closed by server");
KeepaliveOperation operation = new KeepaliveOperation();
RequestMessage message = new RequestMessage(IdUtil.nextId(), operation);
ctx.writeAndFlush(message);
}
super.userEventTriggered(ctx, evt);
}
}

调优参数

System 参数

  • ulimit
    Linux Option。进行 TCP 连接时,系统为每个 TCP 连接创建一个 socket 句柄,也就是一个文件句柄,但是 Linux 对每个进程打开的文件句柄数量做了限制,如果超出,报错“Too many open file”。
    注意:ulimit 命令修改的数值只对当前登录用户的目前使用环境有效,系统重启或者用户退出后就会失效,所以可以作为程序启动脚本的一部分,让它在程序启动前执行。

  • TCP_NODELAY
    SocketChannel 参数。设置是否启用 Nagle 算法:通过将小的碎片数据连接成更大的报文来提高发送效率。如果需要发送一些较小的报文,则需要禁用该算法。默认不开启。

  • SO_BACKLOG
    ServerSocketChannel 参数。最大的等待连接数量。当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度。默认值 128。

  • SO_REUSEADDR
    SocketChannel/ServerSocketChannel 参数。地址重用,解决“Address already in use”。常用开启场景:多网卡(IP)绑定相同端口;让关闭连接释放的端口更早可使用。默认不开启。

Netty 参数

  • CONNECT_TIMEOUT_MILLIS
    SocketChannel 参数。客户端连接服务器最大允许时间。默认值 30s。

  • io.netty.eventLoopThreads
    System 参数。IO Thread 数量。默认值 availableProcessors * 2。

  • io.netty.availableProcessors
    System 参数。指定 availableProcessors。考虑 Docker/VM 等情况。

  • io.netty.leakDetection.level
    System 参数。内存泄漏检测级别:DISABLED/SIMPLE 等。默认值 SIMPLE。

跟踪诊断

设置线程名

1
2
EventLoopGroup bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("boss"));
EventLoopGroup workerGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("worker"));

设置 Handler 名

1
pipeline.addLast("frameDecoder", new OrderFrameDecoder());

使用日志

1
2
// 不同位置输出的内容不同
pipeline.addLast(new LoggingHandler(LogLevel.INFO));

优化使用

整改线程模型

对于 IO 密集型应用,我们通常需要整改线程模型:独立出“线程池”来处理业务(处理业务的线程不再与 NioEventLoop 共享)。

1
2
3
4
5
// 对于 IO 密集型应用,独立出“线程池”来处理业务(这里不能使用 NioEventLoopGroup,不然只会使用到 1 个线程)
EventExecutorGroup eventExecutors = new UnorderedThreadPoolEventExecutor(1, new DefaultThreadFactory("business"));

// 业务处理 Handler 放到最后添加
pipeline.addLast(eventExecutors, new OrderServerProcessHandler());

增强写,延迟与吞吐量的抉择

在前面的点单示例中,我们调用的是ctx.writeAndFlush(msg):写数据之后立刻发送出去,这样虽然延迟降低了,但是吞吐量又会受影响下降。如果我们是要吞吐量优先,那么有下面两种改进方式。

  • 利用 channelReadComplete
    就像我们在 echo 示例中演示的这样:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* Handler implementation for the echo server.
*/
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

/**
* 收到消息时触发
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 返回消息给客户端
ctx.write(msg);
}

/**
* 消息读取完毕后触发
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}

/**
* 发生异常时触发
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}

不过使用 channelReadComplete 也有弊端:

  1. 不适合异步业务线程(不复用 Nio event loop)处理
    channelRead 中的业务处理结果的 write 很可能发生在 channelReadComplete 之后。

  2. 不适合更精细的控制
    例如连续读 16 次时,第 16 次是 flush,但是如果保持连续的次数不变,如何做到 3 次就 flush ?

  • 使用 flushConsolidationHandler
    1
    2
    3
    4
    5
    // 读 5 次之后才 flush,并开启异步增强
    pipeline.addLast(new FlushConsolidationHandler(5, true));

    // 业务处理 Handler 放到最后添加
    pipeline.addLast(eventExecutorGroup, new OrderServerProcessHandler());

流量整形

1
2
3
4
5
6
7
// 全局流量整形
GlobalTrafficShapingHandler globalTrafficShapingHandler = new GlobalTrafficShapingHandler(
new NioEventLoopGroup(), 100 * 1024 * 1024, 100 * 1024 * 1024);

// 启用流量整形
// 只会处理 ByteBuf,因此要注意放置的位置
pipeline.addLast(globalTrafficShapingHandler);

为不同平台开启 native

Netty 针对不同平台都做了一定的优化,如果我们想切换到特定平台,也是非常方便的。

  • 修改代码
    • NioServerSocketChannel -> [Prefix]ServerSocketChannel
    • NioEventLopGroup -> [Prefix]EventLopGroup
    • NioChannelOption -> [Prefix]ChannelOption
  • 准备好 native 库

黑白名单

1
2
3
4
5
6
7
// 黑白名单过滤
IpSubnetFilterRule ipSubnetFilterRule =new IpSubnetFilterRule("127.0.0.1", 8,
IpFilterRuleType.REJECT);
RuleBasedIpFilter ruleBasedIpFilter = new RuleBasedIpFilter(ipSubnetFilterRule);ine();

// 黑白名单过滤
pipeline.addLast(ruleBasedIpFilter);

自定义授权

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 认证处理 Handler
*/
@Slf4j
@ChannelHandler.Sharable
public class AuthHandler extends SimpleChannelInboundHandler<RequestMessage> {

@Override
protected void channelRead0(ChannelHandlerContext ctx, RequestMessage msg) {
try {
BaseOperation operation = msg.getBody();
if (operation instanceof AuthOperation) {
AuthOperation authOperation = (AuthOperation) operation;
AuthOperationResult authOperationResult = authOperation.execute();
if (authOperationResult.isPassAuth()) {
log.info("Pass auth");
} else {
log.error("Fail to auth");
ctx.close();
}
} else {
log.error("Expect first msg is auth");
ctx.close();
}
} catch (Exception e) {
log.error("Exception happen");
ctx.close();
}
// 只处理一次,处理后移除
finally {
ctx.pipeline().remove(this);
}
}
}

Server 端:

1
2
3
4
5
// 认证处理
AuthHandler authHandler = new AuthHandler();

// 认证处理
pipeline.addLast(authHandler);

Client 端:

1
2
3
4
// 做了认证处理,要求第一个操作是认证操作
channelFuture.channel().writeAndFlush(new RequestMessage(IdUtil.nextId(),
new AuthOperation("admin", "password")));
channelFuture.channel().writeAndFlush(requestMessage);

Maven 依赖

1
2
3
4
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>

简单示例

Echo 示例

我们先用官方示例 echo 来简单了解下如何进行 Netty 开发。该示例类似我们打兵乓球一样,把一个消息从客户端发送到服务器,服务器接收消息后,再把该消息返回给客户端,循环往复。

服务器端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
/**
* Echoes back any received data from a client.
*/
public final class EchoServer {

public static void main(String[] args) throws Exception {
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap()
// 主从 Reactor 模式
.group(bossGroup, workerGroup)
// ServerSocketChannel IO 模式
.channel(NioServerSocketChannel.class)
// 最大等待数量:当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度
.option(ChannelOption.SO_BACKLOG, 100)
// ServerSocketChannel Handler
.handler(new LoggingHandler(LogLevel.INFO))
// SocketChannel Handler
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});

// Start the server.
ChannelFuture f = b.bind(8007).sync();

// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

/**
* Handler implementation for the echo server.
*/
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

/**
* 收到消息时触发
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 返回消息给客户端
ctx.write(msg);
}

/**
* 消息读取完毕后触发
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}

/**
* 发生异常时触发
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}

客户端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
/**
* Sends one message when a connection is open and echoes back any received
* data to the server. Simply put, the echo client initiates the ping-pong
* traffic between the echo client and server by sending the first message to
* the server.
*/
public final class EchoClient {

public static void main(String[] args) throws Exception {
// Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap()
.group(group)
// SocketChannel IO 模式
.channel(NioSocketChannel.class)
// 是否启用 Nagle 算法(通过将小的碎片数据连接成更大的报文来提高发送效率)
// 如果需要发送一些较小的报文,则需要禁用该算法
.option(ChannelOption.TCP_NODELAY, true)
// SocketChannel Handler
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoClientHandler());
}
});

// Start the client.
ChannelFuture f = b.connect("127.0.0.1", 8007).sync();

// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
}
}

/**
* Handler implementation for the echo client. It initiates the ping-pong
* traffic between the echo client and server by sending the first message to
* the server.
*/
public class EchoClientHandler extends ChannelInboundHandlerAdapter {

private final ByteBuf firstMessage;

/**
* Creates a client-side handler.
*/
public EchoClientHandler() {
firstMessage = Unpooled.wrappedBuffer("I am echo message".getBytes());
}

/**
* 客户端连接时触发
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
// 连接之后立马发送消息
ctx.writeAndFlush(firstMessage);
}

/**
* 收到消息时触发
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 返回消息给服务器
ctx.write(msg);
}

/**
* 消息读取完毕后触发
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws InterruptedException {
TimeUnit.SECONDS.sleep(3);
ctx.flush();
}

/**
* 发生异常时触发
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}

Hello Word 示例

完成入门级示例后,我们再来了解下,如何使用 Netty 来实现类似 Spring MVC 的 Web 服务。下面的示例同样来自官方,其实现功能为当我们访问“http://127.0.0.1:8080/”时,服务器会返回“Hello,Word”给浏览器。

服务器端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
/**
* An HTTP server that sends back the content of the received HTTP request
* in a pretty plaintext form.
*/
public final class HttpHelloWorldServer {

public static void main(String[] args) throws Exception {
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap()
// 主从 Reactor 模式
.group(bossGroup, workerGroup)
// ServerSocketChannel IO 模式
.channel(NioServerSocketChannel.class)
// 最大等待数量:当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度
.option(ChannelOption.SO_BACKLOG, 1024)
// ServerSocketChannel Handler
.handler(new LoggingHandler(LogLevel.INFO))
// SocketChannel Handler
.childHandler(new HttpHelloWorldServerInitializer());

// Start the server.
Channel ch = b.bind(8080).sync().channel();

System.err.println("Open your web browser and navigate to http://127.0.0.1:8080/");

// Wait until the server socket is closed.
ch.closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

/**
* 在 Channel 注册到 EventLoop 后,对这个 Channel 执行一些初始化操作。
* ChannelInitializer 虽然会在一开始会被注册到 Channel 相关的 pipeline 里,但是在初始化完成之后,ChannelInitializer 会将自己从 pipeline 中移除,
* <p>
* 使用场景:
* 1. 在 ServerBootstrap 初始化时,为监听端口 accept 事件的 Channel 添加 ServerBootstrapAcceptor
* 2. 在有新链接进入时,为监听客户端 read/write 事件的 Channel 添加用户自定义的 ChannelHandler
*/
public class HttpHelloWorldServerInitializer extends ChannelInitializer<SocketChannel> {

@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new HttpServerCodec());
p.addLast(new HttpServerExpectContinueHandler());
p.addLast(new HttpHelloWorldServerHandler());
}
}

/**
* 业务处理:接收 Http 请求,返回 Hello,Word
* <p>
* 继承自{@link SimpleChannelInboundHandler},好处在于:
* 1. 能自动释放{@link io.netty.buffer.ByteBuf}
* 2. 增加消息类型匹配,当消息类型不匹配时什么都不做
*/
public class HttpHelloWorldServerHandler extends SimpleChannelInboundHandler<HttpObject> {
private static final byte[] CONTENT = {'H', 'e', 'l', 'l', 'o', ' ', 'W', 'o', 'r', 'l', 'd'};

/**
* 消息读取完毕后触发
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}

/**
* 收到消息时触发
*/
@Override
public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
// 只处理 Http 请求
if (msg instanceof HttpRequest) {
HttpRequest req = (HttpRequest) msg;

boolean keepAlive = HttpUtil.isKeepAlive(req);
FullHttpResponse response = new DefaultFullHttpResponse(req.protocolVersion(), OK,
Unpooled.wrappedBuffer(CONTENT));
response.headers()
.set(CONTENT_TYPE, TEXT_PLAIN)
.setInt(CONTENT_LENGTH, response.content().readableBytes());

if (keepAlive) {
if (!req.protocolVersion().isKeepAliveDefault()) {
response.headers().set(CONNECTION, KEEP_ALIVE);
}
} else {
// Tell the client we're going to close the connection.
response.headers().set(CONNECTION, CLOSE);
}

ChannelFuture f = ctx.write(response);

if (!keepAlive) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
}

/**
* 发生异常时触发
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

可能大家注意到了,这里没有对客户端编程,确实,因为这里的客户端就是我们的浏览器。

网络应用程序编程基本步骤

上面两个示例都非常简单,接下来我们会模拟真实订单场景(当然还是很简单的),以此来了解 Netty 更多的特性。但是在具体示例之前,我们有必要先了解下网络应用程序编程基本步骤。

网络应用程序编程基本步骤1

网络应用程序编程基本步骤2

点单示例

Maven 依赖

由于用到了 Json 编解码,和 guava 中的一些工具类,因此除了 Netty 依赖外,还需要添加以下依赖:

1
2
3
4
5
6
7
8
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>

案例介绍

在本示例中,我们会模拟饭店点单:在哪一桌,要吃什么菜。因为示例本身是为了让我们了解 Netty 而非业务,所以本示例的业务非常简单,就是发送客户端一个订单操作给服务器,服务器收到请求后,再返回订单结果给客户端。

点单案例介绍

如上图,我们处理 OrderOperation 之外,还定义了 AuthOperation 和 KeepOperation,这是为了演示 Netty 的高级特性。

数据结构设计

消息对象数据结构:
消息对象数据结构

  1. 消息体由消息头和消息体组成
  2. 消息头由版本号(version)、操作码(opCode)和消息id(streamId)组成
  3. 消息体使用 Json 编解码,既可以是操作对象(operation),也可以是操作结果对象(operation result)
  4. 传输协议使用 TCP,因此要解决粘包/半包问题,因此消息对象需要指定 length

UML:
点单示例UML

  1. 消息对象基类 BaseMessage 有两个属性:消息头对象 MessageHeader 和 消息体对象 BaseMessageBody 基类,消息体对象具体类型由泛型 T 指定
  2. 操作对象基类 BaseOperation 和操作结果对象基类 BaseOperationResult 都继承自消息体对象 BaseMessageBody 基类
  3. 操作对象基类 BaseOperation 有 3 个子类:点单操作对象 OrderOperation、认证操纵对象 OrderOperation 和心跳检测操作对象 KeepaliveOperation
  4. 操作结果对象 BaseOperationResult 基类有 3 个子类:点单操作结果对象 OrderOperationResult、认证操纵结果对象 OrderOperationResult 和心跳检测操作结果对象 KeepaliveOperationResult,分别对应各自的操作对象
  5. 消息对象基类 BaseMessage 有 2 个子类:请求对象 RequestMessage 和响应对象 ResponseMessage。

服务器端编程

编解码

  • 一次解码器 OrderFrameDecoder:处理粘包/半包

    1
    2
    3
    4
    5
    6
    7
    8
    9
    /**
    * 一次解码器:处理粘包/半包
    */
    public class OrderFrameDecoder extends LengthFieldBasedFrameDecoder {

    public OrderFrameDecoder() {
    super(Integer.MAX_VALUE, 0, 2, 0, 2);
    }
    }
  • 一次编码器 OrderFrameEncoder:处理粘包/半包

    1
    2
    3
    4
    5
    6
    7
    8
    9
    /**
    * 一次编码码器:处理粘包/半包
    */
    public class OrderFrameEncoder extends LengthFieldPrepender {

    public OrderFrameEncoder() {
    super(2);
    }
    }
  • 二次解码器 OrderProtocolDecoder:将 ByteBuf 解析为 RequestMessage

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    /**
    * 二次解码器:将 {@link ByteBuf} 解析为 {@link RequestMessage}
    */
    public class OrderProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {
    RequestMessage requestMessage = new RequestMessage();
    requestMessage.decode(msg);

    out.add(requestMessage);
    }
    }
  • 二次编码器 OrderProtocolEncoder:将 ResponseMessage 解析为 ByteBuf

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    /**
    * 二次编码器:将 {@link ResponseMessage} 解析为 {@link ByteBuf}
    */
    public class OrderProtocolEncoder extends MessageToMessageEncoder<ResponseMessage> {

    @Override
    protected void encode(ChannelHandlerContext ctx, ResponseMessage msg, List<Object> out) {
    ByteBuf byteBuf = ctx.alloc().buffer();
    msg.encode(byteBuf);

    out.add(byteBuf);
    }
    }

业务处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 业务处理 Handler
* <p>
* 继承自{@link SimpleChannelInboundHandler},好处在于:
* 1. 能自动释放{@link io.netty.buffer.ByteBuf}
* 2. 增加消息类型匹配,当消息类型不匹配时什么都不做
*/
public class OrderServerProcessHandler extends SimpleChannelInboundHandler<RequestMessage> {

/**
* 读取并处理数据
* 接收{@link RequestMessage}类型数据,如果数据类型不为{@link RequestMessage},则什么都不做
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, RequestMessage msg) {
BaseOperation operation = msg.getBody();
BaseOperationResult operationResult = operation.execute();

ResponseMessage responseMessage = new ResponseMessage();
responseMessage.setHeader(msg.getHeader());
responseMessage.setBody(operationResult);

// 判断是否可写:避免 OOM
if (ctx.channel().isActive() && ctx.channel().isWritable()) {
ctx.writeAndFlush(responseMessage);
} else {
// 这里只是简单丢弃掉,也可以选择存起来
log.error("Message dropped");
}
}
}

读监测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* Read Idle Handler
* 服务器 10s 接收不到 channel 的请求就断掉连接:保护自己、瘦身(及时清理空闲的连接)
*/
@Slf4j
public class ServerIdleCheckHandler extends IdleStateHandler {

public ServerIdleCheckHandler() {
super(10, 0, 0, TimeUnit.SECONDS);
}

@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
// 如果是第一次 read idle,就断掉连接
if (evt == IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT) {
log.info("Idle check happen, so close the connection");
ctx.close();

// 因此做了自定义处理,就不再触发该事件
return;
}

// 如果是其它事件,保持触发
super.channelIdle(ctx, evt);
}
}

认证处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 认证处理 Handler
*/
@Slf4j
@ChannelHandler.Sharable
public class AuthHandler extends SimpleChannelInboundHandler<RequestMessage> {

@Override
protected void channelRead0(ChannelHandlerContext ctx, RequestMessage msg) {
try {
BaseOperation operation = msg.getBody();
if (operation instanceof AuthOperation) {
AuthOperation authOperation = (AuthOperation) operation;
AuthOperationResult authOperationResult = authOperation.execute();
if (authOperationResult.isPassAuth()) {
log.info("Pass auth");
} else {
log.error("Fail to auth");
ctx.close();
}
} else {
log.error("Expect first msg is auth");
ctx.close();
}
} catch (Exception e) {
log.error("Exception happen");
ctx.close();
}
// 只处理一次,处理后移除
finally {
ctx.pipeline().remove(this);
}
}
}

服务器 Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
/**
* 服务器
*/
public class Server {

public static void main(String[] args) throws InterruptedException {
// 即使这里设置了多线程,实际还是只会用到一个线程
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

// 全局流量整形
GlobalTrafficShapingHandler globalTrafficShapingHandler = new GlobalTrafficShapingHandler(
new NioEventLoopGroup(), 100 * 1024 * 1024, 100 * 1024 * 1024);

// 认证处理
AuthHandler authHandler = new AuthHandler();

// 对于 IO 密集型应用,独立出“线程池”来处理业务(这里不能使用 NioEventLoopGroup,不然只会使用到 1 个线程)
EventExecutorGroup eventExecutorGroup = new UnorderedThreadPoolEventExecutor(10,
new DefaultThreadFactory("business"));

try {
ServerBootstrap serverBootstrap = new ServerBootstrap()
// 设置 ServerSocketChannel IO 模式
.channel(NioServerSocketChannel.class)
// 设置主从 Reactor 模式
.group(bossGroup, workerGroup)
// 最大等待数量:当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度
.option(NioChannelOption.SO_BACKLOG, 1024)
// 增加日志输出
.handler(new LoggingHandler(LogLevel.INFO))
// 是否启用 Nagle 算法:通过将小的碎片数据连接成更大的报文来提高发送效率
// 如果需要发送一些较小的报文,则需要禁用该算法,默认不开启
.childOption(NioChannelOption.TCP_NODELAY, true)
// SocketChannel Handler
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();

// 启用流量整形
// 只会处理 ByteBuf,因此要注意放置的位置
pipeline.addLast(globalTrafficShapingHandler);

// 启用读空闲监测:及时清理空闲的连接
pipeline.addLast(new ServerIdleCheckHandler());

// 特别需要注意编解码顺序
// 先接收后发送 -> 先解码(一次解码 -> 二次解码)后编码(二次编码 -> 一次编码)
pipeline.addLast(new OrderFrameDecoder());
pipeline.addLast(new OrderFrameEncoder());
pipeline.addLast(new OrderProtocolEncoder());
pipeline.addLast(new OrderProtocolDecoder());

// 认证处理
pipeline.addLast(authHandler);

// 增加日志输出
// 不同位置输出的内容不同
pipeline.addLast(new LoggingHandler(LogLevel.INFO));

// 读 5 次之后才 flush,并开启异步增强
pipeline.addLast(new FlushConsolidationHandler(5, true));

// 业务处理 Handler 放到最后添加
pipeline.addLast(eventExecutorGroup, new OrderServerProcessHandler());
}
});
// 启动服务器,且保持阻塞
ChannelFuture channelFuture = serverBootstrap.bind(8090).sync();

// 等待直到 Server Socket 关闭
channelFuture.channel().closeFuture().sync();
} finally {
// 注意先后顺序:先关闭 bossGroup(不再接收新请求),后关闭 workerGroup
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

客户端编程

编解码

  • 一次解码器 OrderFrameDecoder:处理粘包/半包

    1
    2
    3
    4
    5
    6
    7
    8
    9
    /**
    * 一次解码器:处理粘包/半包
    */
    public class OrderFrameDecoder extends LengthFieldBasedFrameDecoder {

    public OrderFrameDecoder() {
    super(Integer.MAX_VALUE, 0, 2, 0, 2);
    }
    }
  • 一次编码器 OrderFrameEncoder:处理粘包/半包

    1
    2
    3
    4
    5
    6
    7
    8
    9
    /**
    * 一次编码码器:处理粘包/半包
    */
    public class OrderFrameEncoder extends LengthFieldPrepender {

    public OrderFrameEncoder() {
    super(2);
    }
    }
  • 二次解码器 OrderProtocolDecoder:将 ByteBuf 解析为 ResponseMessage

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    /**
    * 二次解码器:将 {@link ByteBuf} 解析为 {@link ResponseMessage}
    */
    public class OrderProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) {
    ResponseMessage responseMessage = new ResponseMessage();
    responseMessage.decode(msg);

    out.add(responseMessage);
    }
    }
  • 二次编码器 OrderProtocolEncoder:将 RequestMessage 解析为 ByteBuf

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    /**
    * 二次码码器:将 {@link RequestMessage} 解析为 {@link ByteBuf}
    */
    public class OrderProtocolEncoder extends MessageToMessageEncoder<RequestMessage> {

    @Override
    protected void encode(ChannelHandlerContext ctx, RequestMessage msg, List<Object> out) {
    ByteBuf byteBuf = ctx.alloc().buffer();
    msg.encode(byteBuf);

    out.add(byteBuf);
    }
    }
  • 二次编码器 OperationToRequestMessageEncoder,将 BaseOperation 转换为 RequestMessage,以实现自动添加消息 id

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    /**
    * 二次编码器:将 {@link BaseOperation} 转换为 {@link RequestMessage},以实现自动添加消息 id
    */
    public class OperationToRequestMessageEncoder extends MessageToMessageEncoder<BaseOperation> {

    @Override
    protected void encode(ChannelHandlerContext ctx, BaseOperation msg, List<Object> out) {
    RequestMessage requestMessage = new RequestMessage(IdUtil.nextId(), msg);
    out.add(requestMessage);
    }
    }

业务处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* 响应分发 Handler
* <p>
* 继承自{@link SimpleChannelInboundHandler},好处在于:
* 1. 能自动释放{@link io.netty.buffer.ByteBuf}
* 2. 增加消息类型匹配,当消息类型不匹配时什么都不做
*/
@AllArgsConstructor
public class ResponseDispatcherHandler extends SimpleChannelInboundHandler<ResponseMessage> {
private RequestPendingCenter requestPendingCenter;

@Override
protected void channelRead0(ChannelHandlerContext ctx, ResponseMessage msg) {
// 将响应分发到对应的 streamId
requestPendingCenter.set(msg.getHeader().getStreamId(), msg.getBody());
}
}

public class RequestPendingCenter {
/**
* 记录 streamId 与 OperationResultFuture 的对应关系
*/
private Map<Long, OperationResultFuture> map = new ConcurrentHashMap<>();

/**
* 添加 entry
*/
public void add(long streamId, OperationResultFuture future) {
this.map.put(streamId, future);
}

/**
* 设置 future 对象的值,并从 map 中移除 streamId
*/
public void set(long streamId, BaseOperationResult result) {
OperationResultFuture future = this.map.get(streamId);
if (future != null) {
future.setSuccess(result);
this.map.remove(streamId);
}
}
}

/**
* Future 对象,其结果为 BaseOperationResult
*/
public class OperationResultFuture extends DefaultPromise<BaseOperationResult> {

}

写监测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* Write Idle Handler
* 客户端 5s 不发送数据就触发一个写空闲事件
* 配合{@link KeepaliveHandler}使用,以避免连接被断,同时启用不频繁的 keepalive
*/
public class ClientIdleCheckHandler extends IdleStateHandler {

public ClientIdleCheckHandler() {
super(0, 5, 0, TimeUnit.SECONDS);
}
}

/**
* 捕捉写空闲事件:发一个 keepalive
* 配合{@link ClientIdleCheckHandler}使用,以避免连接被断,同时启用不频繁的 keepalive
*
* 因为不涉及内存共享,所以设置为可共享的{@link io.netty.channel.ChannelHandler.Sharable}
*/
@Slf4j
@ChannelHandler.Sharable
public class KeepaliveHandler extends ChannelDuplexHandler {

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 捕捉第一次写空闲事件,发送 keepalive
if (evt == IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT) {
log.info("Write Idle happen, so need to send keepalive to keep connection not closed by server");
KeepaliveOperation operation = new KeepaliveOperation();
RequestMessage message = new RequestMessage(IdUtil.nextId(), operation);
ctx.writeAndFlush(message);
}
super.userEventTriggered(ctx, evt);
}
}

客户端 Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
/**
* 客户端
*/
@Slf4j
public class Client {

public static void main(String[] args) throws InterruptedException, ExecutionException {
EventLoopGroup group = new NioEventLoopGroup();

RequestPendingCenter requestPendingCenter = new RequestPendingCenter();

try {
Bootstrap bootstrap = new Bootstrap()
// 设置 SocketChannel IO 模式
.channel(NioSocketChannel.class)
// 设置 EventLoopGroup
.group(group)
// 是否启用 Nagle 算法:通过将小的碎片数据连接成更大的报文来提高发送效率
// 如果需要发送一些较小的报文,则需要禁用该算法,默认不开启
.option(NioChannelOption.TCP_NODELAY, true)
// 客户端连接服务器最大允许时间
.option(NioChannelOption.CONNECT_TIMEOUT_MILLIS, 10 * 1000)
// SocketChannel Handler
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();

// 启用写空闲监测
pipeline.addLast(new ClientIdleCheckHandler());

// 特别需要注意编解码顺序
// 先接收后发送 -> 先解码(一次解码 -> 二次解码)后编码(二次编码 -> 一次编码)
pipeline.addLast(new OrderFrameDecoder());
pipeline.addLast(new OrderFrameEncoder());
pipeline.addLast(new OrderProtocolEncoder());
pipeline.addLast(new OrderProtocolDecoder());

// 增加日志输出
// 不同位置输出的内容不同
pipeline.addLast(new LoggingHandler(LogLevel.INFO));

// 将响应结果记录到 streamId 对应的 future 中
pipeline.addLast(new ResponseDispatcherHandler(requestPendingCenter));

// 将响应结果记录到 streamId 对应的 future 中
pipeline.addLast(new ResponseDispatcherHandler(requestPendingCenter));

}
});
// 启动客户端,且保持阻塞
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8090).sync();

// 发送消息
long streamId = IdUtil.nextId();
RequestMessage requestMessage = new RequestMessage(streamId, new OrderOperation(1001, "土豆"));
OperationResultFuture future = new OperationResultFuture();
// 记录 streamId 与 future 的对应关系
requestPendingCenter.add(streamId, future);

// 做了认证处理,要求第一个操作是认证操作
channelFuture.channel().writeAndFlush(new RequestMessage(IdUtil.nextId(),
new AuthOperation("admin", "password")));
channelFuture.channel().writeAndFlush(requestMessage);

// 阻塞,等待获取结果
BaseOperationResult result = future.get();
log.info("Result: {}", result);

// 等待直到 Server Socket 关闭
channelFuture.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
}
}

1
2
3
4
5
6
7
8
9
docker run -d `
--network op_net `
--hostname mysql `
--name mysql `
--restart=always `
-e MYSQL_ROOT_PASSWORD=root `
-p 3306:3306 `
mysql `
--lower_case_table_names=1

IO 模型

IO 模型主要分类:

  • 同步(synchronous)IO 和异步(asynchronous)IO

  • 阻塞(blocking)IO 和非阻塞(non-blocking)IO

  • 同步阻塞(blocking-IO)简称 BIO

  • 同步非阻塞(non-blocking-IO)简称 NIO

  • 异步非阻塞(synchronous-non-blocking-IO)简称 AIO

同步 VS 异步

同步:发送一个请求,等待返回,再发送下一个请求,同步可以避免出现死锁,脏读的发生。

异步:发送一个请求,不等待返回,随时可以再发送下一个请求,可以提高效率,保证并发。

简述:数据就绪后需要自己去读是同步,数据就绪直接读好再回调给程序是异步。

阻塞 VS 非阻塞

阻塞:传统的 IO 流都是阻塞式的。也就是说,当一个线程调用 read()或者 write()方法时,该线程将被阻塞,直到有一些数据读读取或者被写入,在此期间,该线程不能执行其他任何任务。在完成网络通信进行 IO 操作时,由于线程会阻塞,所以服务器端必须为每个客户端都提供一个独立的线程进行处理,当服务器端需要处理大量的客户端时,性能急剧下降。

非阻塞:Java NIO 是非阻塞式的。当线程从某通道进行读写数据时,若没有数据可用时,该线程会去执行其他任务。线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行 IO 操作,所以单独的线程可以管理多个输入和输出通道。因此 NIO 可以让服务器端使用一个或有限几个线程来同时处理连接到服务器端的所有客户端。

简述:没有数据传过来时,读会阻塞直到有数据;缓冲区满时,写操作也会阻塞,这就是阻塞。非粗赛遇到这些情况,都是直接返回。

举例说明

以我们去饭店吃饭为例:

  • BIO:食堂排队打饭模式,排队在窗口,打好才走;
  • NIO:等待被叫模式,等待被叫,好了自己去端;
  • AIO:包厢模式,点单后菜直接被端上桌。

IO VS NIO

IO NIO
面向流(Stream Oriented) 面向缓冲区(Buffer Oriented)
阻塞 IO(Blocking IO) 非阻塞 IO(Non Blocking IO)
选择器(Selectors)

BIO VS NIO VS AIO

BIO NIO AIO
Thread-Per-Connection Reactor Proactor

BIO、NIO、AIO 适用场景

  • BIO 方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4 以前的唯一选择。
  • NIO 方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局限于应用中,编程比较复杂。
  • AIO方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用 OS 参与并发操作,编程比较复杂,JDK7 开始支持。

多路复用 IO(IO Multiplexing)

即经典的 Reactor 设计模式,有时也称为异步阻塞 IO,Java 中的 Selector 和 Linux 中的 epoll 都是这种模型。

Reactor 模式

Reactor 模式的核心流程:注册感兴趣的事件 -> 扫描是否有感兴趣的事件发生 -> 事件发生后做出响应的处理。

Client/Server SocketChannel/ServerSocketChannel OP_ACCEPT OP_CONNECT OP_WRITE OP_READ
client SocketChannel Y Y Y
server ServerSocketChannel Y
server SocketChannel Y Y

Reactor 模式有三种实现方式:Reactor 单线程、Reactor 多线程模式、Reactor 主从模式。

  1. Reactor 单线程模式

Reactor 单线程模式

每个客户端发起连接请求都会交给 acceptor,acceptor 根据事件类型交给线程 handler 处理,注意 acceptor 处理和 handler 处理都在一个线程中处理,所以其中某个 handler 阻塞时,会导致其他所有的 client 的 handler 都得不到执行,并且更严重的是,handler 的阻塞也会导致整个服务不能接收新的 client 请求(因为 acceptor 也被阻塞了)。 因为有这么多的缺陷,因此单线程 Reactor 模型用的比较少。

  1. Reactor 多线程模式

Reactor 多线程模式

有专门一个线程,即 Acceptor 线程用于监听客户端的TCP连接请求。

客户端连接的 IO 操作都是由一个特定的 NIO 线程池负责。每个客户端连接都与一个特定的 NIO 线程绑定,因此在这个客户端连接中的所有 IO 操作都是在同一个线程中完成的。

客户端连接有很多,但是 NIO 线程数是比较少的,因此一个 NIO 线程可以同时绑定到多个客户端连接中。

缺点:如果我们的服务器需要同时处理大量的客户端连接请求或我们需要在客户端连接时,进行一些权限的检查,那么单线程的 Acceptor 很有可能就处理不过来,造成了大量的客户端不能连接到服务器。

  1. Reactor 主从模式

Reactor 主从模式

举例说明:
以饭店规模变化为例:

  • Reactor 单线程模式:一个人包揽所有,迎宾、点菜、做饭、上菜、送客等;
  • Reactor 多线程模式:多招几个伙计,大家一起做上面的事情;
  • Reactor 主从模式:进一步分工,搞一个或多个人专门做迎宾。

核心概念

  1. 缓冲区 Buffer
    Buffer 是一个对象。它包含一些要写入或者读出的数据。
    在面向流的 I/O 中,可以将数据写入或者将数据直接读到 Stream 对象中。在 NIO 中,所有的数据都是用缓冲区处理。这也就是很多博客说,IO 是面向流的,NIO 是面向缓冲区的。
    缓冲区实质是一个数组,通常它是一个字节数组(ByteBuffer),也可以使用其他类的数组。但是一个缓冲区不仅仅是一个数组,缓冲区提供了对数据的结构化访问以及维护读写位置(limit)等信息。
    最常用的缓冲区是 ByteBuffer,一个 ByteBuffer 提供了一组功能于操作 byte 数组。除了 ByteBuffer,还有其他的一些缓冲区,事实上,每一种Java基本类型(除了Boolean)都对应一种缓冲区,具体如下:
  • ByteBuffer:字节缓冲区
  • CharBuffer:字符缓冲区
  • ShortBuffer:短整型缓冲区
  • IntBuffer:整型缓冲区
  • LongBuffer:长整型缓冲区
  • FloatBuffer:浮点型缓冲区
  • DoubleBuffer:双精度浮点型缓冲区
  1. 通道 Channel
    Channel 是一个通道,可以通过它读取和写入数据,它就像自来水管一样,网络数据通过 Channel 读取和写入。
    通道和流不同之处在于通道是双向的,流只是在一个方向移动,而且通道可以用于读,写或者同时用于读写。因为 Channel 是全双工的,所以它比流更好地映射底层操作系统的 API,特别是在 UNIX 网络编程中,底层操作系统的通道都是全双工的,同时支持读和写。
    Channel 有四种实现:
  • FileChannel:文件中读取数据。
  • DatagramChannel:从 UDP 网络中读取或者写入数据。
  • SocketChannel:从 TCP 网络中读取或者写入数据。
  • ServerSocketChannel:允许你监听来自 TCP 的连接,就像服务器一样。每一个连接都会有一个 SocketChannel 产生。这句话的意思就是,ServerSocketChannel 用于服务端,SocketChannel 用于客户端。
  1. 多路复用器 Selector
    Selector 会不断轮询注册在上面的 Channel,如果某个 Channel 上面有新的 TCP 连接接入、读或写事件,这个channel就处于就绪状态,会被 Selector 轮询出来,然后通过 SelectionKey 可以获取 Channel 的集合,进行后续的 I/O 操作。一个多路复用器 Selector 可以同时轮询多个 Channel,由于 JDK 使用了 epoll() 代替传统的 select 实现,所以它没有最大连接句柄 1024/2048 的限制。这就意味着只需一个线程负责 Selector 的轮询,就可以接入成千上万的客户端。

Netty IO 模型

目前 Netty 只支持 NIO 模型。

参考资料

  1. 高并发编程系列:NIO、BIO、AIO的区别,及NIO的应用和框架选型
  2. NIO相关概念介绍:缓冲区Buffer,通道Channel,多路复用器Selector

逻辑架构

Netty 采用了比较典型的三层网络架构进行设计,逻辑架构图如下所示:

Netty架构设计

Netty架构设计-中文

  1. Core 核心:可扩展事件模型、通用通信 API、支持零拷贝的 ByteBuf 缓冲对象。
  2. 传输服务:支持 BIO 和 NIO。
  3. 协议支持:HTTP、Protobuf、二进制、文本、WebSocket 等一系列常见协议都支持。还支持通过实行编码解码逻辑来实现自定义协议。
  4. 容器集成:支持 OSGI、JBossMC、Spring、Guice 容器。

参考资料

  1. 高并发架构系列:Netty的实现原理、特点与优势、以及适用场景

Netty 是一个广受欢迎的异步的、事件驱动的、基于NIO实现的 Java 开源网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端

JDK 原生 NIO 程序的问题

JDK 原生也有一套网络应用程序 API,但是存在一系列问题,主要如下:

  • NIO 的类库和 API 繁杂,使用麻烦:你需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer 等。

  • 需要具备其他的额外技能做铺垫:例如熟悉 Java 多线程编程,因为 NIO 编程涉及到 Reactor 模式,你必须对多线程和网路编程非常熟悉,才能编写出高质量的 NIO 程序。

  • 可靠性能力补齐,开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常码流的处理等等。NIO 编程的特点是功能开发相对容易,但是可靠性能力补齐工作量和难度都非常大。

  • JDK NIO 的 Bug:例如臭名昭著的 Epoll Bug,它会导致 Selector 空轮询,最终导致 CPU 100%。官方声称在 JDK 1.6 版本的 update 18 修复了该问题,但是直到 JDK 1.7 版本该问题仍旧存在,只不过该 Bug 发生概率降低了一些而已,它并没有被根本解决。

Netty 框架的优势

  • API使用简单,开发门槛低;
  • 功能强大,预置了多种编解码功能,支持多种主流协议;
  • 定制能力强,可以通过 ChannelHandler 对通信框架进行灵活地扩展;
  • 性能高,通过与其他业界主流的 NIO 框架对比,Netty 的综合性能最优;
  • 成熟、稳定,Netty 修复了已经发现的所有 JDK NIO BUG,业务开发人员不需要再为 NIO 的 BUG 而烦恼;
  • 社区活跃,版本迭代周期短,发现的BUG可以被及时修复,同时,更多的新功能会加入;
  • 经历了大规模的商业应用考验,质量得到验证。在互联网、大数据、网络游戏、企业应用、电信软件等众多行业得到成功商用,证明了它已经完全能够满足不同行业的商业应用。

Netty 的应用场景

  • 互联网行业:在分布式系统中,各个节点之间需要远程服务调用,高性能的 RPC 框架必不可少,Netty 作为异步高性能的通信框架,往往作为基础通信组件被这些 RPC 框架使用。典型的应用有:阿里分布式服务框架 Dubbo 的 RPC 框架使用 Dubbo 协议进行节点间通信,Dubbo 协议默认使用 Netty 作为基础通信组件,用于实现各进程节点之间的内部通信。

  • 游戏行业:无论是手游服务端还是大型的网络游戏,Java 语言得到了越来越广泛的应用。Netty 作为高性能的基础通信组件,它本身提供了 TCP/UDP 和 HTTP 协议栈。非常方便定制和开发私有协议栈,账号登录服务器,地图服务器之间可以方便的通过 Netty 进行高性能的通信。

  • 大数据领域:经典的 Hadoop 的高性能通信和序列化组件 Avro 的 RPC 框架,默认采用 Netty 进行跨界点通信,它的 Netty Service 基于 Netty 框架二次封装实现。

  • 企业软件:企业和 IT 集成需要 ESB,Netty 对多协议支持、私有协议定制的简洁性和高性能是 ESB RPC 框架的首选通信组件。事实上,很多企业总线厂商会选择 Netty 作为基础通信组件,用于企业的 IT 集成。

  • 通信行业:Netty 的异步高性能、高可靠性和高成熟度的优点,使它在通信行业得到了大量的应用。

Netty 在开源框架中的应用

  • 数据库:Cassandra
  • 大数据处理:Spark、Hadoop
  • 消息队列:RocketMQ
  • 搜索引擎:Elasticsearch
  • 框架:gRPC、Apache Dubbo、Spring5(WebFlux)
  • 分布式协调器:ZooKeeper
  • 工具类:async-http-client
  • 其它参考:https://netty.io/wiki/adopters.html

参考资料

  1. 高并发架构系列:Netty的实现原理、特点与优势、以及适用场景