人的知识就好比一个圆圈,圆圈里面是已知的,圆圈外面是未知的。你知道得越多,圆圈也就越大,你不知道的也就越多。

0%

ZooKeeper 是一个开源的分布式协同服务系统。ZooKeeper 的设计目标是将那些复杂且容易出错的分布式协同服务封装起来,抽象出一个高效可靠的原语集,并以一系列简单的接口提供给用户使用。

特性

最终一致性

客户端不论连接到哪个 Zookeeper 的哪一个节点,都会收到同一份状态。这是zookeeper最重要的性能。

可靠性

Zookeeper 集群具有简单、健壮、良好的性能,如果消息被到一台 server 接受,那么它将被所有的 server 接受。

实时性

Zookeeper 保证 client 将在一个时间间隔范围内获得 server 的更新信息,或者 server 失效的信息。但由于网络延时等原因,Zookeeper 不能保证两个 client 能同时得到刚更新的数据,如果需要最新数据,应该在读数据之前调用 sync() 接口。

等待无关(wait-free)

慢的或者失效的 client 不得干预快速的 client 的请求,使得每个 client 都能有效的等待。

原子性

更新只能成功或者失败,没有中间状态。

顺序性

包括全局有序和偏序两种:全局有序是指如果在一台 server 上消息 a 在消息 b 前发布,则在所有 Server 上消息 a 都将在消息 b 前被发布;偏序是指如果一个消息 b 在消息 a 后被同一个发送者发布,a 必将排在 b 前面。

数据模型

ZooKeeper 的数据模型是层次模型。层次模型常见于文件系统。层次模型和 key-value 模型是两种主流的数据模型。ZooKeeper 使用文件系统模型主要基于以下两点考虑:

  1. 文件系统的树形结构便于表达数据之间的层次关系
  2. 文件系统的属性结构便于为不同的应用分配独立的命名空间(namespace)

ZooKeeper数据模型

ZooKeeper 的层次模型称作 Data tree。Data tree 的每个节点叫做 znode。不同于文件系统,每个节点都可以保存数据。每个节点都有一个版本(version)。版本从 0 开始计数。

节点类型

一个 znode 可以是持久性的,也可以是临时性的;可以是顺序性的,也可以是非顺序性的。

  1. PERSISTENT
    持久性的 znode,在创建之后即使发生 ZooKeeper 集群宕机或者 client 宕机也不会丢失。
  2. EPHEMERAL
    临时性的 znode,client 宕机或者 client 在指定的 timeout 时间内没有给 ZooKeeper 集群发消息,这样的节点就会消失。
  3. PERSISTENT_SEQUENTIAL
    持久顺序性的 znode,除了具备持久性 znode 的特点之外,znode 的名字具备顺序性。
  4. EPHEMERAL_SEQUENTIAL
    临时顺序性的 znode,除了具备临时性 znode 的特点之外,znode 的名字具备顺序性。

每一个顺序性的 znode 关联一个唯一的单调递增整数。这个单调递增整数是 znode 名字的后缀。

其实除了以上 4 中节点类型,还有另外一类节点:container 节点。
container 节点是一种新引入的 znode,目的在于下挂子节点。当一个 container 节点的所有子节点被删除之后,ZooKeeper 会删除掉这个 container 节点。服务发现的 base path 节点和服务节点就是 container 节点。

Data tree

接口
ZooKeeper 对外提供一个用来访问 Data tree 的简化文件系统 API:

  • 使用 UNIX 风格的路径名来定位 znode,例如 /A/X 表示 znode A 的子节点 X。
  • znode 的数据只支持全量写入和读取,没有像通用文件系统那样支持部分写入和读取。
  • Data tree 的所有 API 都是 wait-free 的。正在执行中的 API 调用不会影响其他 API 的完成。
  • Data tree 的 API 都是对文件系统的 wait-free 操作,不直接提供锁这样的分布式协同机制。但是 Data tree 的 API 非常强大,可以用来实现多种分布式协同机制。

持久化原理

Kafka 依赖文件系统来存储和缓存消息。
对于硬盘的传统观念是硬盘总是很慢,基于文件系统的架构能否提供优异的性能?实际上硬盘的快慢完全取决于使用方式。
为了提高性能,现代操作系统往往使用内存作为磁盘的缓存,所有的磁盘读写操作都会经过这个缓存,所以如果程序在线程中缓存了一份数据,实际在操作系统的缓存中还有一份,这等于存了两份数据。

同时 Kafka 基于 JVM 内存有以下缺点:

  • 对象的内存开销非常高,通常是要存储的数据的两倍甚至更高
  • 随着堆内数据的增加,GC 的速度越来越慢

实际上磁盘的线性写入的性能远远大于任意位置写的性能,线性读写由操作系统进行了大量优化(read-ahead、write-behind 等技术),甚至比随机的内存读写更快。所以与常见的数据缓存在内存中然后刷到磁盘的设计不同,Kafka 直接将数据写到了文件系统的日志中:

  • 写操作:将数据顺序追加到文件中
  • 读操作:从文件中读取

这样实现的好处:

  • 读操作不会阻塞写操作和其他操作,数据大小不对性能产生影响
  • 硬盘空间相对于内存空间容量限制更小
  • 线性访问磁盘,速度快,可以保存更长的时间,更稳定

持久化文件

一个 Topic 被分成多 Partition,每个 Partition 在存储层面是一个 append-only 日志文件,属于一个 Partition 的消息都会被直接追加到日志文件的尾部,每条消息在文件中的位置称为 offset(偏移量)。

Kafka日志文件

日志文件由“日志条目(log entries)”序列组成,每一个日志条目包含一个 4 字节整型数(值为 N),其后跟 N 个字节的消息体。每条消息都有一个当前 Partition 下唯一的 64 字节的 offset,标识这条消息的起始位置。消息格式如下:

1
2
3
4
5
6
7
8
9
10
11
12
On-disk format of a message

offset : 8 bytes
message length : 4 bytes (value: 4 + 1 + 1 + 8(if magic value > 0) + 4 + K + 4 + V)
crc : 4 bytes
magic value : 1 byte
attributes : 1 byte
timestamp : 8 bytes (Only exists when magic value is greater than zero)
key length : 4 bytes
key : K bytes
value length : 4 bytes
value : V bytes

Kafka 持久化日志视图:
Kafka持久化日志视图

日志文件允许串行附加,并且总是附加到最后一个文件。当文件达到配置指定的大小(log.segment.bytes = 1073741824 (bytes))时,就会被滚动到一个新文件中(每个文件称为一个 segment file)。
日志有两个配置参数:

  • M 强制操作系统将文件刷新到磁盘之前写入的消息数
  • S 强制操作系统将文件刷新到磁盘之前的时间(秒)
    在系统崩溃的情况下,最多会丢失 M 条消息或 S 秒的数据。

通过给出消息的偏移量(offset)和最大块大小(S)来读取数据。返回一个缓冲区为 S 大小的消息迭代器,S 应该大于任何单个消息的大小,如果消息异常大,则可以多次重试读取,每次都将缓冲区大小加倍,直到成功读取消息为止。可以指定最大消息大小和缓冲区大小,以使服务器拒绝大于某个大小的消息。读取缓冲区可能以部分消息结束,这很容易被大小分隔检测到。

读取指定偏移量的数据时,需要首先找到存储数据的 segment file,由全局偏移量计算 segment file 中的偏移量,然后从此位置开始读取。

删除

消息数据随着 segment file 一起被删除。Log manager 允许可插拔的删除策略来选择哪些文件符合删除条件。当前策略为删除修改时间超过 N 天前的任何日志,或者是保留最近的 N GB 的数据。

为了避免在删除时阻塞读操作,采用了 copy-on-write 技术:删除操作进行时,读取操作的二分查找功能实际是在一个静态的快照副本上进行的。

文件索引

上面提到日志文件非由一个文件构成,而是分成多个 segment(文件达到一定大小时进行滚动),每个 segment 名为该 segment 第一条消息的 offset 和 “.kafka” 组成。另外会有一个索引文件,标明了每个 segment 下包含的日志条目的 offset 范围。

1
2
3
4
5
|- 000000000000000400.index
|- 000000000000000400.log
|-|- 100000000000.kafka
|-|- 200000000000.kafka
|-|- ...

有了索引文件,消费者可以从 Kafka 的任意可用偏移量位置开始读取消息。索引也被分成片段,所以在删除消息时,也可以删除相应的索引。Kafka 不维护索引的校验和,如果索引出现损坏,Kafka 会通过重新读取消息来重新生成索引。

参考资料

  1. Kafka 消息持久化

RabbitMQ有两种部署模式:普通模式、镜像模式。

普通模式

多台机器,每台机器上启动一个 RabbitMQ 实例。
但是创建的 Queue 只会放在一个 RabbitMQ 实例上,其它实例都同步该实例上的 Queue 的元数据。
当我们消费时 Queue 中的消息时,如果连接到了另外一个实例,那么那个实例会从 Queue 所在的实例上拉取数据。

RabbitMQ普通集群模式

这种方式其实就是个普通集群,没有实现所谓的分布式。
因为要么消费者每次随机连接一个实例然后拉取数据,要么固定连接那个 Queue 所在的实例,前者有数据拉取的开销,后者到hi单实例性能瓶颈。
而且如果那个 Queue 所在的实例宕机了,会导致接下来其它实例就无法从那个实例拉取数据,如果开启了消息持久化,让 RabbitMQ 落地存储消息的话,消息不一定会丢,但是得等到那个实例恢复后,其它实例才可以继续从那个 Queue 拉取数据。

这种方案并没有实现高可用性,主要是提高吞吐量的,就是说让集群中多个节点来服务某个 Queue 的读写操作。

镜像模式

该模式才是所谓的 RabbitMQ 的高可用模式,跟普通模式不一样的是:创建的 Queue,无论是元数据还是 Queue 里的消息都会存在于多个实例上,然后每次写消息到 Queue 的时候,都会自动把消息同步写到其它实例的 Queue 中。

RabbitMQ镜像集群模式

这样的好处在于,当集群中任何一个机器宕机了,其它的机器都可以使用。
坏处在于,将消息同步到所有机器,会导致网络带宽压力和消耗很重!另外也不再有扩展性可言:如果某个 Queue负载很重,那么当我们新增机器时,新增的机器也会包含这个 Queue 的所有数据,从而不能线性扩展 Queue。

怎么开启镜像集群模式呢?RabbitMQ 有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候可以要求数据同步到所有节点的,也可以要求就同步到指定数量的节点,然后再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。

业务逻辑处理

一般解决重复消息的办法是,在消费端,让我们消费消息的操作具备幂等性。

那么如何实现幂等操作呢?最好的方式就是,从业务逻辑设计上入手,将消费的业务逻辑设计成具备幂等性的操作。但是,不是所有的业务都能设计成天然幂等的,这里就需要一些方法和技巧来实现幂等。

利用数据库的唯一约束实现幂等

以转账为例:将账户 X 的余额加 100 元。
在这个例子中,我们可以通过改造业务逻辑,让它具备幂等性。
首先,我们可以限定,对于每个转账单每个账户只可以执行一次变更操作,在分布式系统中,这个限制实现的方法非常多,最简单的是我们在数据库中建一张转账流水表,这个表有三个字段:转账单 ID、账户 ID 和变更金额,然后给转账单 ID 和账户 ID 这两个字段联合起来创建一个唯一约束,这样对于相同的转账单 ID 和账户 ID,表里至多只能存在一条记录。
这样,我们消费消息的逻辑可以变为:“在转账流水表中增加一条转账记录,然后再根据转账记录,异步操作更新用户余额即可。”在转账流水表增加一条转账记录这个操作中,由于我们在这个表中预先定义了“账户 ID 转账单 ID”的唯一约束,对于同一个转账单同一个账户只能插入一条记录,后续重复的插入操作都会失败,这样就实现了一个幂等的操作。我们只要写一个 SQL,正确地实现它就可以了。
基于这个思路,不光是可以使用关系型数据库,只要是支持类似“INSERT IF NOT EXIST”语义的存储类系统都可以用于实现幂等,比如,我们可以用 Redis 的 SETNX 命令来替代数据库中的唯一约束,来实现幂等消费。

为更新的数据设置前置条件

另外一种实现幂等的思路是,给数据变更设置一个前置条件,如果满足条件就更新数据,否则拒绝更新数据,在更新数据的时候,同时变更前置条件中需要判断的数据。
这样,重复执行这个操作时,由于第一次更新数据的时候已经变更了前置条件中需要判断的数据,不满足前置条件,则不会重复执行更新数据操作。
比如,刚刚我们说过,“将账户 X 的余额增加 100元”这个操作并不满足幂等性,我们可以把这个操作加上一个前置条件,变为:“如果账户 X 当前的余额为 500元,将余额加 100元”,这个操作就具备了幂等性。
对应到消息队列中的使用时,可以在发消息时在消息体中带上当前的余额,在消费的时候进行判断数据库中,当前余额是否与消息中的余额相等,只有相等才执行变更操作。
但是,如果我们要更新的数据不是数值,或者我们要做一个比较复杂的更新操作怎么办?用什么作为前置判断条件呢?
更加通用的方法是,给我们的数据增加一个版本号属性,每次更数据前,比较当前数据的版本号是否和消息中的版本号一致,如果不一致就拒绝更新数据,更新数据的同时将版本号 +1,一样可以实现幂等更新。

记录并检查操作

如果上面提到的两种实现幂等方法都不能适用于我们的场景,我们还有一种通用性最强,适用范围最广的实现幂等性方法:记录并检查操作,也称为“Token 机制或者 GUID(全局唯一ID)机制”,实现的思路特别简单:在执行数据更新操作之前,先检查一下是否执行过这个更新操作。
具体的实现方法是,在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。
原理和实现是不是很简单?其实一点儿都不简单,在分布式系统中,这个方法其实是非常难实现的。
首先,给每个消息指定一个全局唯一的 ID 就是一件不那么简单的事儿,方法有很多,但都不太好同时满足简单、高可用和高性能,或多或少都要有些牺牲。更加麻烦的是,在“检查消费状态,然后更新数据并且设置消费状态”中,三个操作必须作为一组操作保证原子性,才能真正实现幂等,否则就会出现 Bug。

同一个 Queue,如果有多个 Consumer,那么是做不到消息的有序性的。

因此,我们可以基于同一规则(对唯一标识进行 hash),将单个 Queue 拆分成多个 Queue,再将拆分后的 Queue 与 Consumer 一一对应。

同时,我们应该关闭 auto ack,改为 manual ack,并设置 prefetchCount = 1(默认),即 channel.basicQos(1)。

不过这样做的话,吞吐量又可能过低,我们可以在消费端内部用内存队列做排队,然后分发给底层不同的 worker 来处理。这样又可能会存在消息丢失。

消息持久化

  • Exchange 要持久化

    1
    2
    3
    4
    @Bean
    public DirectExchange directExchange() {
    return new DirectExchange(DIRECT_EXCHANGE); // = new DirectExchange(DIRECT_EXCHANGE, true, false)
    }
  • Queue 要持久化

    1
    2
    3
    4
    @Bean
    public Queue queue() {
    return new Queue(QUEUE); // = return new Queue(QUEUE, true, false)
    }
  • Message 要持久化

    1
    2
    3
    4
    5
    6
     MessageProperties properties = new MessageProperties();
    properties.setContentType("UTF-8");
    properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);

    Message messageObj = new Message(message.getBytes(), properties);
    rabbitTemplate.send(EXCHANGE, null, messageObj);

Exchange、Queue 和 Message 在默认情况下都是开启了持久化的。

消息确认

RabbitMQ 的消息确认有两种:

  • 消息发送确认
    用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。

  • 消费接收确认
    确认消费者是否成功消费了队列中的消息

消息发送确认

其实这个也不能叫确认机制,只是起到一个监听的作用,监听生产者是否发送消息到 Exchange 和 Queue。

确认是否到达交换器

设置属性 publisher-confirm-type,并实现接口 RabbitTemplate.ConfirmCallback。

1
2
3
spring:
rabbitmq:
publisher-confirm-type: simple
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Slf4j
@Component
public class RabbitTemplateConfig implements RabbitTemplate.ConfirmCallback {
private final RabbitTemplate rabbitTemplate;

public RabbitTemplateConfig(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}

@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
}

/**
* 确认消息是否到达 Exchange
* 到达 Exchange -> callback, ack: true
* 未到达 Exchange -> callback, ack: false
*/
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("Message send to exchange successful, correlationData: {}, ack: {}, cause: {}",
correlationData, ack, cause);
}
}

确认是否到达队列

设置属性 publisher-returns,并实现 RabbitTemplate.ReturnCallback。

1
2
3
4
spring:
rabbitmq:
# 确认消息发送到 Exchange 对应的 Queue
publisher-returns: true
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Slf4j
@Component
public class RabbitTemplateConfig implements RabbitTemplate.ReturnCallback {
private final RabbitTemplate rabbitTemplate;

public RabbitTemplateConfig(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}

@PostConstruct
public void init() {
rabbitTemplate.setReturnCallback(this);
}

/**
* 确认消息是否到达 Queue
* 到达 Queue -> no callback
* 未到达 Queue -> callback, eg: 根据发送消息时指定的 routingKey 找不到队列
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("Message send to queue failed, message: {}, replyCode: {}, replyText: {}, exchange: {}, routingKey: {}",
message, replyCode, replyText, exchange, routingKey);
}
}

消息接收确认

消费者确认分两种:自动确认和手动确认。默认为自动确认。
一旦设置成 manual 手动确认,一定要对消息做出应答,否则 RabbitMQ 会认为当前队列没有消费完成,将不再继续向该队列发送消息。

自动确认

在自动确认模式中,消息在发送到消费者后即被认为”成功消费”。因此通常被称为“即发即忘”。
这种模式可以降低吞吐量(只要消费者可以跟上),以降低交付和消费者处理的安全性。

与手动确认模型不同,如果消费者的 TCP 连接或通道在真正的”成功消费”之前关闭,则服务器发送的消息将丢失。因此,自动消息确认应被视为不安全,并不适用于所有工作负载。

使用自动确认模式时需要考虑的另一件事是消费者过载。
手动确认模式通常与有界信道预取(BasicQos 方法)一起使用,该预取限制了信道上未完成(“进行中”)的消息的数量。
但是,自动确认没有这种限制。
因此,消费者可能会被消息的发送速度所淹没,可能会导致消息积压并耗尽堆或使操作系统终止其进程。某些客户端库将应用 TCP 反压(停止从套接字读取,直到未处理的交付积压超过某个限制)。
因此,仅建议能够以稳定的速度有效处理消息的消费者使用自动确认模式。

手动确认

手动确认又分两种:肯定确认和否定确认。

要开启手动确认,需要设置 acknowledge-mode:

1
2
3
4
5
6
spring:
rabbitmq:
listener:
simple:
# 消费者手动确认消息
acknowledge-mode: manual
  • 肯定确认
    1
    2
    3
    4
    5
    6
    7
    8
    9
    @RabbitListener(queues = QUEUE)
    public void receive(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
    log.info("Receive message: {}", message);
    try {
    channel.basicAck(tag, false);
    } catch (IOException e) {
    log.error("Confirm message error", e);
    }
    }

channel.basicAck(tag, false):确认收到消息,消息将被队列移除,false 只确认当前 consumer 一个消息收到,true 确认所有 consumer 获得的消息。

  • 否定确认
    否定确认的场景不多,但有时候某个消费者因为某种原因无法立即处理某条消息时,就需要否定确认了。

否定确认时,需要指定是丢弃掉这条消息,还是让这条消息重新排队,过一会再来,又或者是让这条消息重新排队,并尽快让另一个消费者接收并处理它。

channel.basicNack(long, boolean,boolean):确认否定消息,第一个 boolean 表示一个 consumer 还是所有,第二个 boolean 表示 requeue 是否重新回到队列,true 重新入队。
channel.basicReject(long, boolean):拒绝消息,requeue=false 表示不再重新入队,如果配置了死信队列则进入死信队列。

当消息回滚到消息队列时,这条消息不会回到队列尾部,而是仍是在队列头部,这时消费者会又接收到这条消息,如果想消息进入队尾,须确认消息后再次发送消息。

参考资料:
RabbitMQ的消息确认机制
RabbitMQ (十一) 消息确认机制 - 消费者确认
SpringBoot+RabbitMQ发送确认和消费手动确认机制

Maven 依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置属性

1
2
3
4
spring:
rabbitmq:
host: localhost
port: 5672

配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
@Configuration
public class RabbitConfiguration {
public static final String QUEUE = "queue";

public static final String DIRECT_EXCHANGE = "direct-exchange";
public static final String DIRECT_QUEUE = "direct-queue";
public static final String DIRECT_ROUTING_KEY = "direct-routing-key";

public static final String TOPIC_EXCHANGE = "topic-exchange";
public static final String TOPIC_QUEUE = "topic-queue";
public static final String TOPIC_ROUTING_KEY = "topic-routing-key.*";

public static final String FANOUT_EXCHANGE = "fanout-exchange";
public static final String FANOUT_QUEUE = "fanout-queue";

public static final String HEADERS_EXCHANGE = "headers-exchange";
public static final String HEADERS_QUEUE = "headers-queue";
public static final String HEADERS_HEADER_KEY = "headers-header";

public static final String CUSTOM_EXCHANGE = "custom-exchange";
public static final String CUSTOM_QUEUE = "custom-queue";
public static final String CUSTOM_ROUTING_KEY = "custom-routing-key";

@Bean
public Queue queue() {
return new Queue(QUEUE);
}

@Bean
public DirectExchange directExchange() {
return new DirectExchange(DIRECT_EXCHANGE);
}

@Bean
public Queue directQueue() {
return new Queue(DIRECT_QUEUE);
}

@Bean
public Binding directQueueBinding() {
return BindingBuilder.bind(directQueue()).to(directExchange()).with(DIRECT_ROUTING_KEY);
}

@Bean
public TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE);
}

@Bean
public Queue topicQueue() {
return new Queue(TOPIC_QUEUE);
}

@Bean
public Binding topicQueueBinding() {
return BindingBuilder.bind(topicQueue()).to(topicExchange()).with(TOPIC_ROUTING_KEY);
}

@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUT_EXCHANGE);
}

@Bean
public Queue fanoutQueue() {
return new Queue(FANOUT_QUEUE);
}

@Bean
public Binding fanoutQueueBinding() {
return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
}

@Bean
public HeadersExchange headersExchange() {
return new HeadersExchange(HEADERS_EXCHANGE);
}

@Bean
public Queue headersQueue() {
return new Queue(HEADERS_QUEUE);
}

@Bean
public Binding headersQueueBinding() {
return BindingBuilder.bind(headersQueue()).to(headersExchange()).where(HEADERS_HEADER_KEY).exists();
}
}

生产方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Slf4j
@Component
public class MessageSender {
private final RabbitTemplate rabbitTemplate;

public MessageSender(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}

public void send(String message) {
rabbitTemplate.convertAndSend(QUEUE, message);
}

public void sendToDirectQueue(String message) {
rabbitTemplate.convertAndSend(DIRECT_EXCHANGE, DIRECT_ROUTING_KEY, message);
}

public void sendToTopicQueue(String message) {
String keyPrefix = TOPIC_ROUTING_KEY.substring(0, TOPIC_ROUTING_KEY.length() - 1);
rabbitTemplate.convertAndSend(TOPIC_EXCHANGE, keyPrefix + "xx", message);
}

public void sendToFanoutQueue(String message) {
rabbitTemplate.convertAndSend(FANOUT_EXCHANGE, null, message);
}

public void sendToHeadersQueue(String message) {
MessageProperties properties = new MessageProperties();
properties.setContentType("UTF-8");
properties.getHeaders().put(HEADERS_HEADER_KEY, "header-xx");

Message messageObj = new Message(message.getBytes(), properties);
rabbitTemplate.send(HEADERS_EXCHANGE, null, messageObj);
}
}

消费方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Slf4j
@Component
public class MessageReceiver {

@RabbitListener(queues = QUEUE)
public void receive(String message) {
log.info("Receive message: {}", message);
}

@RabbitListener(queues = DIRECT_QUEUE)
public void receiveByDirectQueue(String message) {
log.info("Receive message by direct-queue: {}", message);
}

@RabbitListener(queues = TOPIC_QUEUE)
public void receiveByTopicQueue(String message) {
log.info("Receive message by topic-queue: {}", message);
}

@RabbitListener(queues = FANOUT_QUEUE)
public void receiveByFanoutQueue(String message) {
log.info("Receive message by fanout-queue: {}", message);
}

@RabbitListener(queues = HEADERS_QUEUE)
public void receiveByHeadersQueue(String message) {
log.info("Receive message by headers-queue: {}", message);
}
}

高级配置

序列化/反序列化

当我们发送对象类型消息数据时,在消费者方默认接收到的数据类型是字符串,如果我们希望接收到的是对象类型,可以在生产者/消费者两方都添加以下配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
Jackson2JsonMessageConverter jsonConverter = new Jackson2JsonMessageConverter();
jsonConverter.setClassMapper(classMapper());
return jsonConverter;
}

@Bean
public DefaultClassMapper classMapper() {
DefaultClassMapper classMapper = new DefaultClassMapper();
Map<String, Class<?>> idClassMapping = new HashMap<>(1);
idClassMapping.put("message", Message.class);
classMapper.setIdClassMapping(idClassMapping);
return classMapper;
}

之后,我们就能够像下面这样发送/接收 Message 参数的消息了:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void sendObject(samples.dto.Message message) {
rabbitTemplate.convertAndSend(OBJECT_QUEUE, message);
}

@RabbitListener(queues = QUEUE)
public void receive(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
log.info("Receive message: {}", message);
try {
channel.basicAck(tag, false);
} catch (IOException e) {
log.error("Confirm message error", e);
}
}

使用事务

  1. 设置 RabbitTemplate

    1
    rabbitTemplate.setChannelTransacted(true);
  2. 注入 RabbitTransactionManager Bean

    1
    2
    3
    4
    @Bean("rabbitTransactionManager")
    public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) {
    return new RabbitTransactionManager(connectionFactory);
    }
  3. 添加 @Transactional

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    /**
    * 发送消息,并开启事务,使用 @Transactional
    */
    @Transactional(rollbackFor = Exception.class)
    public void sendInTransactionByAnnotation(String message) {
    rabbitTemplate.convertAndSend(QUEUE, message);
    // 与 Kafka 的是,即使后面抛出异常,下面这条日志还是会输出
    log.info("Send in transaction by annotation success");
    throw new RuntimeException("fail");
    }

需要注意的是,在生产者开启事务之后,属性 publisher-confirm-type 要设置为 NONE (默认)。

获取消息回复

生产者在发送消息之后,可以同时等待获取消费者接收并处理消息之后的回复,就像传统的 RPC 交互那样。

  • 生产方

    1
    2
    3
    4
    5
    6
    7
    /**
    * 发送消息,同时等待消息结果
    */
    public void sendAndReceive(String message) {
    Object result = rabbitTemplate.convertSendAndReceive(RECEIVE_QUEUE, message);
    log.info("Receive reply success, result: {}", result);
    }
  • 消费方

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    /**
    * 接收消息,在处理完消息之后,将处理结果返回给生产方
    */
    @RabbitListener(queues = RECEIVE_QUEUE)
    public String receiveAndReply(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
    log.info("Receive message: {}", message);
    try {
    channel.basicAck(tag, false);
    } catch (IOException e) {
    log.error("Confirm message error", e);
    }
    return "ok";
    }

多方法处理消息

组合使用 @RabbitListener 和 @RabbitHandler,能够让我们在传递消息时,根据转换后的消息有效负载类型来确定调用哪个方法。

  • 生产方

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    /**
    * 发送消息,消息类型为字符串,消费者根据消息类型决定执行哪个方法
    */
    public void sendToMultipleHandlers(String message) {
    rabbitTemplate.convertSendAndReceive(MULTIPLE_QUEUE, message);
    }

    /**
    * 发送消息,消息类型为 Message,消费者根据消息类型决定执行哪个方法
    */
    public void sendToMultipleHandlers(samples.dto.Message message) {
    rabbitTemplate.convertSendAndReceive(MULTIPLE_QUEUE, message);
    }
  • 消费方

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    @Slf4j
    @RabbitListener(queues = MULTIPLE_QUEUE)
    @Component
    public class MessageReceiverMultipleMethods {

    /**
    * 处理字符串类型的消息
    */
    @RabbitHandler
    public void handlerStr(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
    log.info("Receive message: {}, type of String", message);
    try {
    channel.basicAck(tag, false);
    } catch (IOException e) {
    log.error("Confirm message error", e);
    }
    }

    /**
    * 处理 Message 类型的消息
    */
    @RabbitHandler
    public void handlerMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
    log.info("Receive message: {}, type of Message", message);
    try {
    channel.basicAck(tag, false);
    } catch (IOException e) {
    log.error("Confirm message error", e);
    }
    }

    /**
    * 处理其它类型的消息
    */
    @RabbitHandler(isDefault = true)
    public void handlerUnknown(Object message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
    log.info("Receive message: {}, type of Unknown", message);
    try {
    channel.basicAck(tag, false);
    } catch (IOException e) {
    log.error("Confirm message error", e);
    }
    }
    }

异常处理

对于消费者在处理消息过程中抛出的异常,我们可以设置 errorHandler,然后在 errorHandler 中统一处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 接收消息,处理消息时抛出异常,异常由 errorHandler 进行处理
*/
@RabbitListener(queues = EXCEPTION_QUEUE, errorHandler = "customErrorHandler")
public void receiveWithException(Message message) {
log.info("Receive message: {}", message);
throw new RuntimeException("error");
}

@Slf4j
@Service("customErrorHandler")
public class CustomRabbitListenerErrorHandler implements RabbitListenerErrorHandler {

@Override
public Object handleError(Message amqpMessage, org.springframework.messaging.Message<?> message, ListenerExecutionFailedException exception) {
log.error("Handle message with exception, message: {}", message.getPayload().toString());
return null;
}
}

并发接收消息

1
2
3
4
5
6
7
8
9
10
11
12
/**
* 并发接收消息
*/
@RabbitListener(queues = CONCURRENT_QUEUE, concurrency = "3")
public void receiveConcurrent(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
log.info("Receive message: {}", message);
try {
channel.basicAck(tag, false);
} catch (IOException e) {
log.error("Confirm message error", e);
}
}

  1. 创建 network

    1
    docker network create --driver bridge --subnet 172.22.0.0/16 --gateway 172.22.0.1  op_net
  2. 创建 rabbitmq.yml

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    version: '3.6'

    services:
    rabbitmq-1:
    image: rabbitmq:3-management
    restart: always
    hostname: rabbitmq-1
    container_name: rabbitmq-1
    ports:
    - 5672:5672
    - 15672:15672
    environment:
    RABBITMQ_NODENAME: rabbitmq-1
    RABBITMQ_ERLANG_COOKIE: secret-cookie
    networks:
    default:
    ipv4_address: 172.22.0.17
    rabbitmq-2:
    image: rabbitmq:3-management
    restart: always
    hostname: rabbitmq-2
    container_name: rabbitmq-2
    ports:
    - 5673:5672
    - 15673:15672
    environment:
    RABBITMQ_NODENAME: rabbitmq-2
    RABBITMQ_ERLANG_COOKIE: secret-cookie
    networks:
    default:
    ipv4_address: 172.22.0.18
    rabbitmq-3:
    image: rabbitmq:3-management
    restart: always
    hostname: rabbitmq-3
    container_name: rabbitmq-3
    ports:
    - 5674:5672
    - 15674:15672
    environment:
    RABBITMQ_NODENAME: rabbitmq-3
    RABBITMQ_ERLANG_COOKIE: secret-cookie
    networks:
    default:
    ipv4_address: 172.22.0.19

    networks:
    default:
    external:
    name: op_net
  3. 启动 RabbitMQ 集群

    1
    docker-compose -f rabbitmq.yml up -d
  4. 将 rabbitmq-2 添加到集群(RAM 类型)

    1
    2
    3
    4
    rabbitmqctl stop_app
    rabbitmqctl reset
    rabbitmqctl join_cluster --ram rabbitmq-1@rabbitmq-1
    rabbitmqctl start_app
  5. 将 rabbitmq-3 添加到集群

    1
    2
    3
    4
    rabbitmqctl stop_app
    rabbitmqctl reset
    rabbitmqctl join_cluster --ram rabbitmq-3@rabbitmq-3
    rabbitmqctl start_app

参考资料:

  1. docker-compose配置rabbitmq集群服务器
  2. 使用 docker-compose 安装搭建 RabbitMQ 集群

Kafka 设置

设置 enable-auto-commit

Kafka 默认会自动提交消费者位移,但是这样容易出现重复消费。
在默认情况下,Consumer 每 5 秒自动提交一次位移。
现在,我们假设提交位移之后的 3 秒发生了 Rebalance 操作。在 Rebalance 之后,所有 Consumer 从上一次提交的位移处继续消费,但该位移已经是 3 秒前的位移数据了,故在 Rebalance 发生前 3 秒消费的所有数据都要重新再消费一次。
虽然我们能够通过减少 auto.commit.interval.ms 的值来提高提交频率,但这么做只能缩小重复消费的时间窗口,不可能完全消除它。这是自动提交机制的一个缺陷。
因此要设置 enable-auto-commit = false 来关闭位移自动提交。

合理设置 max.poll.interval.ms 与 max.poll.records

如果消费者两次 poll 的时间间隔超出设置值,Kafka 服务端会进行 Rebalance 操作,导致客户端连接失效,无法提交 offset 信息,从而引发重复消费。

业务逻辑处理

一般解决重复消息的办法是,在消费端,让我们消费消息的操作具备幂等性。

那么如何实现幂等操作呢?最好的方式就是,从业务逻辑设计上入手,将消费的业务逻辑设计成具备幂等性的操作。但是,不是所有的业务都能设计成天然幂等的,这里就需要一些方法和技巧来实现幂等。

利用数据库的唯一约束实现幂等

以转账为例:将账户 X 的余额加 100 元。
在这个例子中,我们可以通过改造业务逻辑,让它具备幂等性。
首先,我们可以限定,对于每个转账单每个账户只可以执行一次变更操作,在分布式系统中,这个限制实现的方法非常多,最简单的是我们在数据库中建一张转账流水表,这个表有三个字段:转账单 ID、账户 ID 和变更金额,然后给转账单 ID 和账户 ID 这两个字段联合起来创建一个唯一约束,这样对于相同的转账单 ID 和账户 ID,表里至多只能存在一条记录。
这样,我们消费消息的逻辑可以变为:“在转账流水表中增加一条转账记录,然后再根据转账记录,异步操作更新用户余额即可。”在转账流水表增加一条转账记录这个操作中,由于我们在这个表中预先定义了“账户 ID 转账单 ID”的唯一约束,对于同一个转账单同一个账户只能插入一条记录,后续重复的插入操作都会失败,这样就实现了一个幂等的操作。我们只要写一个 SQL,正确地实现它就可以了。
基于这个思路,不光是可以使用关系型数据库,只要是支持类似“INSERT IF NOT EXIST”语义的存储类系统都可以用于实现幂等,比如,我们可以用 Redis 的 SETNX 命令来替代数据库中的唯一约束,来实现幂等消费。

为更新的数据设置前置条件

另外一种实现幂等的思路是,给数据变更设置一个前置条件,如果满足条件就更新数据,否则拒绝更新数据,在更新数据的时候,同时变更前置条件中需要判断的数据。
这样,重复执行这个操作时,由于第一次更新数据的时候已经变更了前置条件中需要判断的数据,不满足前置条件,则不会重复执行更新数据操作。
比如,刚刚我们说过,“将账户 X 的余额增加 100元”这个操作并不满足幂等性,我们可以把这个操作加上一个前置条件,变为:“如果账户 X 当前的余额为 500元,将余额加 100元”,这个操作就具备了幂等性。
对应到消息队列中的使用时,可以在发消息时在消息体中带上当前的余额,在消费的时候进行判断数据库中,当前余额是否与消息中的余额相等,只有相等才执行变更操作。
但是,如果我们要更新的数据不是数值,或者我们要做一个比较复杂的更新操作怎么办?用什么作为前置判断条件呢?
更加通用的方法是,给我们的数据增加一个版本号属性,每次更数据前,比较当前数据的版本号是否和消息中的版本号一致,如果不一致就拒绝更新数据,更新数据的同时将版本号 +1,一样可以实现幂等更新。

记录并检查操作

如果上面提到的两种实现幂等方法都不能适用于我们的场景,我们还有一种通用性最强,适用范围最广的实现幂等性方法:记录并检查操作,也称为“Token 机制或者 GUID(全局唯一ID)机制”,实现的思路特别简单:在执行数据更新操作之前,先检查一下是否执行过这个更新操作。
具体的实现方法是,在发送消息时,给每条消息指定一个全局唯一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。
原理和实现是不是很简单?其实一点儿都不简单,在分布式系统中,这个方法其实是非常难实现的。
首先,给每个消息指定一个全局唯一的 ID 就是一件不那么简单的事儿,方法有很多,但都不太好同时满足简单、高可用和高性能,或多或少都要有些牺牲。更加麻烦的是,在“检查消费状态,然后更新数据并且设置消费状态”中,三个操作必须作为一组操作保证原子性,才能真正实现幂等,否则就会出现 Bug。

Kafka 只能保证分区级别的消息顺序性。

要实现全局性的消息顺序性,我们或者只定义一个分区,或者通过指定消息的 key,从而将 key 值相同的消息都路由到同一个分区。并且每个分区对应的消费者组里应该只有一个消费者。

同时,我们应该关闭位移自动提交,改为位移手动提交,即设置 enable.auto.commit = false。

不过这样做的话,吞吐量又可能过低,我们可以在消费端内部用内存队列做排队,然后分发给底层不同的 worker 来处理。这样又可能会存在消息丢失。