人的知识就好比一个圆圈,圆圈里面是已知的,圆圈外面是未知的。你知道得越多,圆圈也就越大,你不知道的也就越多。

0%

检测消息丢失的方法

如果是 IT 基础设施比较完善的公司,一般都有分布式链路追踪系统,使用类似的追踪系统可以很方便地追踪每一条消息。如果没有这样的追踪系统,那么我们可以利用消息队列的有序性来验证消息丢失

原理非常简单,在 Producer 端,我们给每个发出的消息附加一个连续递增的序号,然后在 Consumer 端来检查这个序号的连续性。

如果没有消息丢失,Consumer 收到消息的序号必然是连续递增的,或者说收到的消息,其中的序号必然是上一条消息的序号 +1。如果检查到序号不连续,那就是丢消息了。还可以通过缺失的序号来确定丢失的是那条消息,方便进一步排查原因。

大多数消息队列的客户端都支持拦截器机制,我们可以利用这个拦截器机制,在 Producer 发送消息之前的拦截器中将序号注入到消息中,在 Consumer 收到消息的拦截器中检测序号的连续性,这样实现的好处是消息检测的代码不会侵入到我们的业务代码中,待我们的系统稳定后,也方便将这部分检测的逻辑关闭或者删除。

如果是在一个分布式系统中实现这个检测方法,有几个问题需要我们注意。

首先,像 Kafka 这样的消息队列,它是不保证在 Topic 上的严格顺序的,只能保证分区上的消息是有序的,所以我们在发消息的时候必须要指定分区,并且,在每个分区单独检测消息序号的连续性。

然后,如果我们系统中 Producer 是多实例的,由于并不好协调多个 Producer 之间的发送顺序,所以也需要每个 Producer 分别生成各自的消息序号,并且需要附加上 Producer 的标识,在 Consumer 端按照每个 Producer 分别来检测序号的连续性。

另外,Consumer 实例的数量最好和分区数量一致,做到 Consumer 和分区一一对应,这样会比较方便地在 Consumer 内检测消息序号的连续性。

确保消息可靠传递

从消息的生产到消费,可以划分以下三个阶段:
消息生产消费三阶段

  • 生产阶段:从消息在 Producer 创建出来,经过网络传输发送到 Broker 端。
  • 存储阶段:消息在 Broker 端存储,如果是集群,消息会在这个阶段被复制到其它的副本上。
  • 消费阶段:Consumer 从 Broker 上拉取消息,经过网络传输发送到 Consumer 上。

生产阶段

在生产阶段,消息队列通过最常用的请求确认机制,来保证消息的可靠传递:当我们的代码调用发消息方法时,消息队列的客户端会把消息发送到 Broker,Broker 收到消息后,会给客户端返回一个确认响应,表明消息已经收到了。客户端收到相应后,完成了一次正常消息的发送。

只要 Producer 收到了 Broker 的确认响应,就可以保证消息在生产阶段不会丢失。

在生产方可以设置 retries 属性,这样当生产者长时间没收到发送确认响应后进行自动重试,如果重试后依然失败,就会以返回值或者异常的方式告知用户。
如果要严格保证消息不丢失,还需要设置 acks = all,并保证集群中 broker 的数目大于 2。

我们在编写发送消息代码时,需要注意,正确处理返回值或者捕获异常,就可以保证这个阶段的消息不会丢失。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 发送消息,同时同步获取消息发送结果
*/
public void sendAndGetResultSync(Message message) {
ListenableFuture<SendResult<Object, Object>> future = kafkaTemplate.send(TOPIC, message);
try {
SendResult<Object, Object> result = future.get();
log.info("Sync get send result success, result: {}", result);
} catch (Throwable e) {
log.error("Sync get send result failure", e);
}
}

/**
* 发送消息,同时异步获取消息发送结果
*/
public void sendAndGetResultAsync(Message message) {
kafkaTemplate.send(TOPIC, message).addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {

@Override
public void onSuccess(SendResult<Object, Object> result) {
log.info("Async get send result success, result: {}", result);

}

@Override
public void onFailure(Throwable e) {
log.error("Async get send result failure", e);
}
});
}

存储阶段

在存储阶段,正常情况下,只要 Broker 正常运行,就不会出现丢失消息的问题,但是如果 Broker 出现了故障,比如进程死掉了或者服务器宕机了,还可以可能会丢失消息的。

对于单个节点的 Broker,需要配置 Broker 参数,在收到消息后,将消息写入磁盘后再给 Producer 返回确认响应,这样即使发生宕机,由于消息已经被写入磁盘,就不会丢失消息,恢复后还可以继续消费。(不过这种同步刷盘的方式效率很低)

如果 Broker 是由多个节点组成的集群,需要将 Broker 集群配置成:至少将消息发送到 2 个以上的节点,再给客户端回复发送确认响应。这样当某个 Broker 宕机时,其它 Broker 可以替代宕机的 Broker,也不会发生消息丢失。

另外,在 Broker 端,我们还需要设置以下参数的值:

  • unclean.leader.election.enable = false
    它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个Broker落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。

  • replication.factor >= 3
    它控制的是将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。

  • min.insync.replicas > 1
    它控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。

消费阶段

消费阶段采用和生产阶段类似的确认机制来保证消息的可靠传递,客户端从 Broker 拉去消息后,执行用户的消费业务逻辑,成功后,才会给 Broker 发送消费确认响应。如果 Broker 没有收到确认响应,下次拉消息的时候还会返回同一条消息,确保消息不会在网络传输过程中丢失,也不会因为客户端在执行消费逻辑中出错导致丢失。

我们在编写消费代码时需要注意的是,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。这就需要设置 enable.auto.commit = false 来关闭位移自动提交。

位移主题

位移主题,即 __consumer_offsets,是 Kafka中 针对 Consumer Group,用于记录消费者的消费位置信息。

老版本 Consumer 的位移管理是依托于 Apache ZooKeeper 的,它会自动或手动地将位移数据提交到 ZooKeeper 中保存。当 Consumer 重启后,它能自动从 ZooKeeper 中读取位移数据,从而在上次消费截止的地方继续消费。这种设计使得 Kafka Broker 不需要保存位移数据,减少了 Broker 端需要持有的状态空间,因而有利于实现高伸缩性。
但是,ZooKeeper 其实并不适用于这种高频的写操作,因此,Kafka 最终在新版本 Consumer 中正式推出了全新的位移管理机制。

新版本 Consumer 的位移管理机制其实很简单,就是将 Consumer 的位移数据作为一条条普通的 Kafka 消息,提交到 _ _consumer_offsets 中。可以这么说,__consumer_offsets 的主要作用是保存 Kafka 消费者的位移信息。它要求这个提交过程不仅要实现高持久性,还要支持高频的写操作。显然,Kafka的主题设计天然就满足这两个条件。

需要强调的是,和我们创建的其他主题一样,位移主题就是普通的 Kafka 主题。我们可以手动地创建它、修改它,甚至是删除它。只不过,它同时也是一个内部主题,大部分情况下,我们其实并不需要“搭理”它,也不用花心思去管理它,把它丢给Kafka就完事了。

虽说位移主题是一个普通的 Kafka 主题,但它的消息格式却是 Kafka 自己定义的,用户不能修改,也就是说我们不能随意地向这个主题写消息,因为一旦我们写入的消息不满足 Kafka 规定的格式,那么 Kafka 内部无法成功解析,就会造成 Broker 的崩溃。事实上,Kafka Consumer 有 API 帮我们提交位移,也就是向位移主题写消息。我们千万不要自己写个 Producer 随意向该主题发送消息。

消息格式

对于 Consumer Group而言,它是一组 KV 对,key 包含3部分内容:<Group ID,主题名,分区号>,value 对应 Consumer 消费该分区的最新位移(其实也可以适用于保存 Consumer Group 信息的消息,或用于删除 Group 过期位移甚至是删除 Group 的消息)。

创建位移主题

位移主题既可以由 Kafka 自动创建,也可以由我们手动创建。
默认位移主题是由 Kafka 自动创建的,此时该主题的默认分区数是 50,分区的默认副本数是 3。
我们也可以通过设置 Broker 端的参数 offsets.topic.num.partitions 和 offsets.topic.replication.factor 来改变分区数或副本数。

通常还是让 Kafka 自动创建比较好。目前 Kafka 源码中有一些地方硬编码了 50 分区数,因此如果我们自行创建了一个不同于默认分区数的位移主题,可能会碰到各种各种奇怪的问题。

提交位移

目前 Kafka Consumer 提交位移的方式有两种:自动提交位移(默认)和手动提交位移。

自动提交

Consumer 端有个参数叫 enable.auto.commit,如果值是 true,则 Consumer 在后台默默地为我们定期提交位移,提交间隔由一个专属的参数 auto.commit.interval.ms 来控制。
自动提交位移有一个显著的优点,就是省事,我们不用操心位移提交的事情,就能保证消息消费不会丢失。
但自动提交位移的一个问题在于,它可能会出现重复消费。

在默认情况下,Consumer 每 5 秒自动提交一次位移。
现在,我们假设提交位移之后的 3 秒发生了 Rebalance 操作。在 Rebalance 之后,所有 Consumer 从上一次提交的位移处继续消费,但该位移已经是 3 秒前的位移数据了,故在 Rebalance 发生前 3 秒消费的所有数据都要重新再消费一次。
虽然我们能够通过减少 auto.commit.interval.ms 的值来提高提交频率,但这么做只能缩小重复消费的时间窗口,不可能完全消除它。这是自动提交机制的一个缺陷。

手动提交

反观手动提交位移,它的好处就在于更加灵活,我们完全能够把控位移提交的时机和频率。但是,它也有一个缺陷,就是在调用 commitSync() 时,Consumer 程序会处于阻塞状态,直到远端的 Broker 返回提交结果,这个状态才会结束。
在任何系统中,因为程序而非资源限制而导致的阻塞都可能是系统的瓶颈,会影响整个应用程序的 TPS。当然,我们可以选择拉长提交间隔,但这样做的后果是 Consumer 的提交频率下降,在下次 Consumer 重启回来后,会有更多的消息被重新消费。

鉴于这个问题,Kafka 社区为手动提交位移提供了另一个 API 方法:KafkaConsumer#commitAsync()。从名字上来看它就不是同步的,而是一个异步操作。调用 commitAsync() 之后,它会立即返回,不会阻塞,因此不会影响 Consumer 应用的 TPS。由于它是异步的,Kafka 提供了回调函数(callback),供我们实现提交之后的逻辑,比如记录日志或处理异常等。

那么,commitAsync 是否能够替代 commitSync 呢?答案是不能。commitAsync 的问题在于,出现问题时它不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过期”或不是最新值了。因此,异步提交的重试其实没有意义,所以 commitAsync 是不会重试的。

事实上,很多与 Kafka 集成的大数据框架都是禁用自动提交位移的,如 Spark、Flink 等。

混合提交

显然,如果是手动提交,我们需要将 commitSync 和 commitAsync 组合使用才能到达最理想的效果,原因有两个:

  1. 我们可以利用 commitSync 的自动重试来规避那些瞬时错误,比如网络的瞬时抖动,Broker 端 GC 等。因为这些问题都是短暂的,自动重试通常都会成功,因此,我们不想自己重试,而是希望 Kafka Consumer 帮我们做这件事。
  2. 我们不希望程序总处于阻塞状态,影响 TPS。

概念

重平衡(Rebalance)本质上是一种协议,规定了一个 Consumer Group 下的所有 Consumer 如何达成一致,来分配订阅 Topic 的每个分区。在 Rebalance 过程中,所有 Consumer 实例共同参与,在协调者(Coordinator)组件的帮助下,完成订阅主题分区的分配。

触发条件

Rebalance 的触发条件有以下3个:

  • 组成员数发生变更: 比如有新的 Consumer 实例加入组或者离开组,抑或是有 Consumer 实例崩溃被“踢出”组。
  • 订阅主题数发生变更: Consumer Group 可以是用正则表达式的方式订阅主题,比如 consumer.subscribe(Pattern.compile(“t.*c”)) 就表明该 Group 订阅所有以字母t开头、字母 c 结尾的主题。在 Consumer Group 的运行过程中,一旦我们新创建了一个满足这样条件的主题,那么该 Group 就会发生 Rebalance。
  • 订阅主题的分区数发生变更: Kafka 当前只允许增加一个主题的分区数。当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance。

分区分配策略

Rebalance 发生时,Group 下所有的 Consumer 实例都会协调在一起共同参与。那每个 Consumer 实例怎么知道应该消费订阅主题的哪些分区呢?这就需要Kafka 分区分配策略的协助了。

弊端

  • Rebalance 影响 Consumer 端 TPS: 这是因为在 Rebalance 过程中,所有 Consumer 实例都会停止消费,直到 Rebalance 完成。
  • Rebalance 很慢: 尤其是当我们的 Group 下成员很多的时候。
  • Rebalance 效率不高: 当前 Kafka 的设计机制决定了每次 Rebalance 时,Group 下的所有成员都要参与进来,而且通常不会考虑局部性原理,但局部性原理对提升系统性能是特别重要的。

避免 Rebalance

在真实的业务场景中,很多 Rebalance 都是计划外的或者说是不必要的。

要避免 Rebalance,还是要从 Rebalance 发生的时机入手。我们在前面说过,Rebalance 发生的时机有 3 个:

  • 组成员数量发生变化
  • 订阅主题数量发生变化
  • 订阅主题的分区数发生变化

后面两个通常都是运维的主动操作,所以它们引发的 Rebalance 大都是不可避免的。

至于组成员数量发生变化,如果也是我们的主动操作,那么它也不属于我们要规避的那类“不必要 Rebalance”。

我们需要重点关注以下两类导致 Rebalance 的场景:

  • Consumer 未能及时发送心跳
    当 Consumer Group 完成 Rebalance 之后,每个 Consumer 实例都会定期地向 Coordinator 发送心跳请求,表明它还存活着。如果某个 Consumer 实例不能及时地发送这些心跳请求,Coordinator 就会认为该 Consumer已经“死”了,从而将其从 Group 中移除,然后开启新一轮 Rebalance。
    Consumer 端有个参数,叫 session.timeout.ms,就是被用来表征此事的。该参数的默认值是10秒,即如果 Coordinator 在10秒之内没有收到 Group 下某 Consumer 实例的心跳,它就会认为这个 Consumer 实例已经挂了。可以这么说,session.timout.ms 决定了 Consumer 存活性的时间间隔。
    除了这个参数,Consumer 还提供了一个允许我们控制发送心跳请求频率的参数,就是 heartbeat.interval.ms。这个值设置得越小,Consumer 实例发送心跳请求的频率就越高。频繁地发送心跳请求会额外消耗带宽资源,但好处是能够更加快速地知晓当前是否开启 Rebalance。
    对于上面两个参数,在这里给出一些推荐数值,我们可以“无脑”地应用在你的生产环境中。

    • 设置 session.timeout.ms = 6s。
    • 设置 heartbeat.interval.ms = 2s。
    • 要保证Consumer实例在被判定为“dead”之前,能够发送至少3轮的心跳请求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。
  • Consumer 消费时间过长
    Consumer 端还有一个参数,用于控制 Consumer 实际消费能力对 Rebalance 的影响,即 max.poll.interval.ms 参数。它限定了 Consumer 端应用程序两次调用 poll 方法的最大时间间隔。它的默认值是 5 分钟,表示 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起“离开组”的请求,Coordinator 也会开启新一轮 Rebalance。
    如果消费逻辑很重,此时,max.poll.interval.ms 参数值的设置显得尤为关键。如果要避免非预期的 Rebalance,我们最好将该参数值设置得大一点,比我们的下游最大处理时间稍长一点。这样,Consumer 就不会因为处理这些消息的时间太长而引发 Rebalance。

如果上面提到的 3 个参数,我们已经设置了合理的数值,却发现还是出现了 Rebalance,那么需要关注一下 Consumer 端的 GC 表现,比如是否出现了频繁的 Full GC 导致的长时间停顿,实际应用中经常出现因为 GC 设置不合理导致程序频发 Full GC 而引发的非预期 Rebalance。

当前 Kafka 默认提供了 3 种分区分配策略:RangeAssignor、 RoundRobinAssignor、StickyAssignor。

Kafka 提供了消费者客户端参数 partition.assignment.strategy 用来设置消费者与订阅主题之间的分区分配策略。默认情况下,此参数的值为:org.apache.kafka.clients.consumer.RangeAssignor,即采用 RangeAssignor 分配策略。

RangeAssignor

RangeAssignor 策略的原理是按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配,以保证分区尽可能均匀地分配给所有的消费者。
对于每一个 Topic,RangeAssignor 策略会将消费组内所有订阅这个 Topic 的消费者按照名称的字典序排序,然后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配一个分区。
假设 n=分区数/消费者数量,m=分区数%消费者数量,那么前 m 个消费者每个分配 n+1 个分区,后面的(消费者数量-m)个消费者每个分配 n 个分区。

为了更加通俗的讲解 RangeAssignor 策略,我们不妨再举一些示例。假设消费组内有 2 个消费者 c0 和 c1,都订阅了主题 t0和 t1,并且每个主题都有 4 个分区,那么所订阅的所有分区可以标识为:t0p0、t0p1、t0p2、t0p3、t1p0、t1p1、t1p2、t1p3。最终的分配结果为:

1
2
消费者c0:t0p0、t0p1、t1p0、t1p1
消费者c1:t0p2、t0p3、t1p2、t1p3

这样分配的很均匀,那么此种分配策略能够一直保持这种良好的特性呢?我们再来看下另外一种情况。假设上面例子中 2 个主题都只有 3 个分区,那么所订阅的所有分区可以标识为:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2。最终的分配结果为:

1
2
消费者c0:t0p0、t0p1、t1p0、t1p1
消费者c1:t0p2、t1p2

可以明显的看到这样的分配并不均匀,如果将类似的情形扩大,有可能会出现部分消费者过载的情况。

RoundRobinAssignor

RoundRobinAssignor 策略的原理是将消费组内所有消费者以及消费者所订阅的所有 Topic 的 Partition 按照字典序排序,然后通过轮询方式逐个将分区依次分配给每个消费者。

如果同一个消费组内所有的消费者的订阅信息都是相同的,那么 RoundRobinAssignor 策略的分区分配会是均匀的。

举例,假设消费组中有 2 个消费者 c0 和 c1,都订阅了主题 t0 和 t1,并且每个主题都有 3 个分区,那么所订阅的所有分区可以标识为:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2。最终的分配结果为:

1
2
消费者c0:t0p0、t0p2、t1p1
消费者c1:t0p1、t1p0、t1p2

如果同一个消费组内的消费者所订阅的信息是不相同的,那么在执行分区分配的时候就不是完全的轮询分配,有可能会导致分区分配的不均匀。如果某个消费者没有订阅消费组内的某个 Topic,那么在分配分区的时候此消费者将分配不到这个 Topic 的任何分区。

举例,假设消费组内有 3 个消费者 c0、c1和c2,它们共订阅了3个主题:t0、t1、t2,这3个主题分别有1、2、3个分区,即整个消费组订阅了 t0p0、t1p0、t1p1、t2p0、t2p1、t2p2 这 6 个分区。具体而言,消费者 c0 订阅的是主题 t0,消费者 c1 订阅的是主题 t0和 t1,消费者 c2 订阅的是主题 t0、t1和t2,那么最终的分配结果为:

1
2
3
消费者c0:t0p0
消费者c1:t1p0
消费者c2:t1p1、t2p0、t2p1、t2p2

可以看到 RoundRobinAssignor 策略也不是十分完美,这样分配其实并不是最优解,因为完全可以将分区 t1p1 分配给消费者 c1。

RoundRobinAssignor 策略对应的 partition.assignment.strategy 参数值为:org.apache.kafka.clients.consumer.RoundRobinAssignor。

StickyAssignor

“Sticky”这个单词可以翻译为“粘性的”,Kafka从0.11.x版本开始引入这种分配策略,它主要有两个目的:

  • 分区的分配要尽可能的均匀。
  • 分区的分配尽可能的与上次分配的保持相同。

当两者发生冲突时,第一个目标优先于第二个目标。鉴于这两个目标,StickyAssignor 策略的具体实现要比 RangeAssignor 和 RoundRobinAssignor 这两种分配策略要复杂很多。

我们举例来看一下 StickyAssignor 策略的实际效果。
假设消费组内有 3 个消费者:c0、c1 和 c2,它们都订阅了4个主题:t0、t1、t2、t3,并且每个主题有 2 个分区,也就是说整个消费组订阅了 t0p0、t0p1、t1p0、t1p1、t2p0、t2p1、t3p0、t3p1 这 8 个分区。最终的分配结果如下:

1
2
3
消费者c0:t0p0、t1p1、t3p0
消费者c1:t0p1、t2p0、t3p1
消费者c2:t1p0、t2p1

这样初看上去似乎与采用 RoundRobinAssignor 策略所分配的结果相同,但事实是否真的如此呢?再假设此时消费者 c1 脱离了消费组,那么消费组就会执行再平衡操作,进而消费分区会重新分配。如果采用 RoundRobinAssignor 策略,那么此时的分配结果如下:

1
2
消费者c0:t0p0、t1p0、t2p0、t3p0
消费者c2:t0p1、t1p1、t2p1、t3p1

如分配结果所示,RoundRobinAssignor 策略会按照消费者 c0和 c2 进行重新轮询分配。

而如果此时使用的是 StickyAssignor 策略,那么分配结果为:

1
2
消费者c0:t0p0、t1p1、t3p0、t2p0
消费者c2:t1p0、t2p1、t0p1、t3p1

可以看到分配结果中保留了上一次分配中对于消费者 c0 和 c2 的所有分配结果,并将原来消费者 c1 的“负担”分配给了剩余的两个消费者 c0 和 c2,最终 c0 和 c2 的分配还保持了均衡。

如果发生分区重分配,那么对于同一个分区而言有可能之前的消费者和新指派的消费者不是同一个,对于之前消费者进行到一半的处理还要在新指派的消费者中再次复现一遍,这显然很浪费系统资源。
StickyAssignor 策略如同其名称中的“sticky”一样,让分配策略具备一定的“粘性”,尽可能地让前后两次分配相同,进而减少系统资源的损耗以及其它异常情况的发生。

到目前为止所分析的都是消费者的订阅信息都是相同的情况,我们来看一下订阅信息不同的情况下的处理。

举例,同样消费组内有 3 个消费者:c0、c1 和 c2,集群中有3个主题:t0、t1 和 t2,这3个主题分别有 1、2、3 个分区,也就是说集群中有 t0p0、t1p0、t1p1、t2p0、t2p1、t2p2 这 6 个分区。消费者 c0 订阅了主题 t0,消费者 c1 订阅了主题 t0 和 t1,消费者 c2 订阅了主题 t0、t1 和 t2。
如果此时采用 RoundRobinAssignor 策略,那么最终的分配结果如下所示(和讲述RoundRobinAssignor策略时的一样,这样不妨赘述一下):

1
2
3
消费者c0:t0p0
消费者c1:t1p0
消费者c2:t1p1、t2p0、t2p1、t2p2

如果此时采用的是 StickyAssignor 策略,那么最终的分配结果为:

1
2
3
消费者C0:t0p0
消费者C1:t1p0、t1p1
消费者C2:t2p0、t2p1、t2p2

可以看到这是一个最优解(消费者 c0 没有订阅主题 t1 和t2,所以不能分配主题 t1 和 t2 中的任何分区给它,对于消费者 c1 也可同理推断)。

假如此时消费者c0脱离了消费组,那么RoundRobinAssignor策略的分配结果为:

1
2
消费者c1:t0p0、t1p1
消费者c2:t1p0、t2p0、t2p1、t2p2

可以看到 RoundRobinAssignor 策略保留了消费者 c1 和 c2 中原有的 3 个分区的分配:t2p0、t2p1 和 t2p2(针对结果集1)。

而如果采用的是StickyAssignor策略,那么分配结果为:

1
2
消费者C1:t1p0、t1p1、t0p0
消费者C2:t2p0、t2p1、t2p2

可以看到 StickyAssignor 策略保留了消费者 c1 和 c2 中原有的 5 个分区的分配:t1p0、t1p1、t2p0、t2p1、t2p2。
从结果上看 StickyAssignor 策略比另外两者分配策略而言显得更加的优异,这个策略的代码实现也是异常复杂。

Hash 路由

这种策略依靠一定的策略生成的一个 key,然后按照 key 的哈希值选择分区。
比如按照业务、机器 ip 等方式去生成的,一旦 key 被定义了,那么所有该类 key 的消息都会被存放到相同的分区里。
其实在现在的 Kafka 版本中,如果指定了消息的 key 的话,就会使用这种策略,如果没指定的话就会使用轮询策略。
这个负载均衡有一个很大的用处,就是实现局部业务的顺序消息,比如我们有 3 个业务的消息需要顺序推送,如果只设置单分区,靠单分区来满足顺序性的话,Kafka 的优势就被限制住了,在这里就可以针对这 3 个业务线设置产生 key 策略,不同业务的 key 放到不同的分区上,相同的 key 在一个分区内是绝对顺序的。这样的话既保证了消息的顺序性,也利用了 Kafka 的高吞吐量的特性。(注意:之前提到过 Kafka 保证 Topic 下的消息顺序,会保证分区的消息顺序,是用追加文件的日志方式记录的消息)

参考资料

  1. Kafka分区分配策略-RangeAssignor、RoundRobinAssignor、StickyAssignor

概念

消费者组,即 Consumer Group,是 Kafka 提供的可扩展且具有容错性的消费者机制。
要理解 Consumer Group,我们需要记住下面这三个特性:

  1. Group ID 是一个字符串,在一个 Kafka 集群中,它标识唯一的一个 Consumer Group。
  2. Consumer Group 下可以有一个或多个 Consumer 实例。这里的实例可以是一个单独的进程,也可以是同一个进程下的线程。在实际场景中,使用进程更为常见一些。
  3. Consumer Group 下所有实例订阅的主题的单个分区,只能分配给组内的某个 Consumer 实例消费。这个分区当然也可以被其它的 Group 消费。

对比传统消息模型

我们知道,JMS 规范定义了两种消息模型:点对点模型(消息队列)和发布/订阅模型,它们是各有优劣的。
传统的消息队列模型的缺陷在于消息一旦被消费,就会从队列中被删除,而且只能被下游的一个 Consumer 消费。严格来说,这一点不算是缺陷,只能算是它的一个特性。但是显然,这种模型的伸缩性(Scalability)很差,因为下游的多个 Consumer 都要“抢”这个共享消息队列的消息。
发布/订阅模型的缺陷同样是伸缩性不高,因为每个订阅者都必须要订阅主题的所有分区。这种全量订阅的方式既不灵活,也会影像消息的真实投递效果。

使用 Consumer Group 机制,Kafka 同时实现了传统消息引擎系统的两大模型(又避开了这两钟模型的缺陷):如果所有实例都属于同一个 group,那么它实现的就是消息队列模型;如果所有实例分别属于不同的 Group,那么它实现的就是 发布/订阅模型。

消费者实例数量

理想情况下,每个 Consumer Group 下的 Consumer 实例的数量应该等于该 Group 订阅主题的分区总数。一般不推荐设置大于总分区数的 Consumer 实例,设置多余的实例只会浪费资源,而没有任何好处。

Kafka 中的事务,它解决的问题是,确保在⼀个事务中发送的多条消息,要么都成功,要么都失败。注意,这⾥⾯的多条消息不⼀定要在同⼀个主题和分区中,可以是发往多个主题和分区的消息。
Kafka 的这种事务机制,单独来使⽤的场景不多。更多的情况下被⽤来配合 Kafka 的幂等机制来实现 Kafka 的 Exactly Once 语义。
当然,我们可以在 Kafka 的事务执⾏过程中,加⼊本地事务,来实现分布式事务。(但是不同于 RocketMQ,Kafka 是没有事务反查机制的)

实现分布式事务

我们以订单和购物车这个例子来介绍 Kafka 是如何用消息队列来实现分布式事务。

Kafka事务应用示例

如上图所示:
首先,订单系统在消息队列上开启一个事务。
然后订单系统给消息服务器发送一个“半消息”,这个半消息不是说消息内容不完整,它包含的内容就是完整的消息内容,半消息和普通消息的唯一区别是,在事务提交之前,对于消费者来说,这个消息是不可见的。
半消息发送成功之后,订单系统就可以执行本地事务了,在订单库中创建一条订单记录,并提交订单库的数据库事务。然后根据本地事务的执行结果决定提交或者回滚事务消息。
如果订单创建成功,那就提交事务消息,购物车系统就可以消费到这条消息继续后续的流程。如果订单创建失败,那就回滚事务消息,购物车系统就不会收到这条消息,这样就基本实现了“要么都成功,要么都失败”的一致性要求。

在上面的实现过程中,有一个问题其实是没有解决的:如果在第四步提交事务消息时失败了怎么办?Kafka 的解决方案比较简单粗暴,直接抛出异常,让用户自行处理。我们可以在业务代码中反复重试提交,知道提交成功,或者删除之前创建的订单进行补偿。

事务消息实现机制

Kafka 是基于两阶段提交来实现事务的。

在 Kafka 事务中,有两个重要角色:事务协调者和事务日志主题。

  • 事务协调者
    负责在服务端协调整个事务。事务协调者并不是一个独立的进程,而是 Broker 进程的一部分,协调者和分区一样通过选举来保证自身的可用性。
  • 事务日志主题
    在 Kafka 集群中有一个特殊的用于记录事务日志的主题,这个事务日志主题的实例和普通的主题是一样的,里面记录的数据就是类似于“开启事务”、“提交事务”这样的事务日志。日志主题同样也包含了很多的分区。在 Kafka 集群中,可以存在多个协调者,每个协调者负责管理和使用事务日志中的几个分区。

Kafka 事务的实现流程:
Kafka事务实现流程

如上图所示,首先,当我们开启事务的时候,生产者会给协调者发一个请求开启事务,协调者在事务日志中记录下事务ID。
然后,生产者在发送消息之前,还要给协调者发送请求,告知发送的消息属于哪个主题和分区,这个信息也会被协调者记录在事务日志中。接下来,生产者就可以像发送普通消息一样来发送事务消息。需要注意的是,Kafka 在处理未提交的事务消息时,和普通消息是一样的,直接发给 Broker,保存在这些消息对应的分区中,Kafka 会在客户端的消费者中,暂时过滤未提交的事务消息。
消息发送完成之后,生产者给协调者发送提交或回滚事务的请求,由协调者来开始两阶段提交,完成事务。第一阶段,协调者把事务的状态设置为“预提交”,并写入事务日志。到这里,实际上事务已经成功了,无论接下来发生什么情况,事务最终都会被提交。之后便开始第二阶段,协调者在事务相关的素有分区中,都会写一条“事务结束”的特殊消息,当 Kafka 的消费者,也就是客户端,督导这个事务结束的特殊消息之后,它就可以把之前暂时过滤的那些未提交的事务消息,放行给业务代码进行消费了。
最后,协调者记录最后一条事务日志,标识这个事务已经结束了。

整个事务的实现流程时序图如下:
Kafka事务实现流程时序图

总结⼀下 Kafka 这个两阶段的流程:
准备阶段,⽣产者发消息给协调者开启事务,然后消息发送到每个分区上。
提交阶段,⽣产者发消息给协调者提交事务,协调者给每个分区发⼀条“事务结束”的消息,完成分布式事务提交。

消息交付可靠性保障

在介绍精确一次语义之前,我们先了解下什么是消息交付可靠性保障。
所谓消息交付可靠性保障,是消息引擎对 Producer 和 Consumer 要处理的消息提供什么样的承诺。常见的承诺有以下三种:

  • 最多一次(at most once):消息可能会丢失,但绝不会重复消费。
  • 至少一次(at lease once):消息不会丢失,但有可能被重复发送。
  • 精确一次(exactly once):消息不会丢失,也不会被重复发送。

目前,Kafka 默认提供的交付可靠性保障是第二种,即至少一次。
在 Kafka 中,只有 Broker 成功“提交”消息且 Producer 接到 Broker 的应答才会认为该消息成功发送。不过倘若消息成功“提交”,但 Broker 的应答没有成功发送回 Producer 端(比如网络出现瞬时抖动),那么 Producer 就无法确定消息是否真的提交成功了。因此,它只能选择重试,也就是再次发送相同的消息。这就是 Kafka 默认提供至少一次可靠性保障的原因,不过这会导致消息重复发送。

Kafka 也能提供对最多一次交付保障,只需要让 Producer 禁止重试即可。这样一来,消息要么写入成功,要么写入失败,但绝不会重新发送。我们通常不会希望出现消息丢失的情况,但在一些场景里偶发的消息丢失其实是被允许的,相反,消息重复消费是绝对要避免的。此时,使用最多一次交付保障就是最恰当的。

无论至少一次还是最多一次,都不如精确一次来得有吸引力。大部分用户还是希望消息只被交付一次,这样,消息不会丢失,也不会被重复处理。或者说,即使 Producer 端重复发送了相同的消息,Broker 端也能做到自动去重。在下游 Consumer 看来,消息依然只有一条。

那么问题来了,Kafka 是怎么做到精确一次的呢?简单来说,这是通过两种机制:幂等性(Idempotence)和事务(Transaction)。

幂等性 Producer

在 Kafka 中,Producer 默认不是幂等性的,但我们可以创建幂等性 Producer,只需要设置属性 enable.idempotence 为 true 即可。

不过,幂等性 Producer是有多用范围的。
首先,它只能保证单分区上的幂等性,即一个幂等性 Producer 能够保证某个主题的一个分区不出现重复消息,它无法实现多个分区的幂等性。
其次,它只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,我们可以理解为 Producer 进程的一次运行。当我们重启了 Producer 进程之后,这种幂等性保证就丧失了。

如果我们想实现对分区以及多会话上的消息无重复,则需要使用事务型 Producer。

事务型 Producer

事务型 Producer 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。另外,事务型 Producer 也不惧进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。
f’f
设置事务型 Producer 的方法也很简单,只需配置以下两个属性:

  • 和幂等性 Producer 一样,开启 enable.idempotence = true。
  • 设置 Producer 端参数 transactional.id。最好为其设置一个有意义的名字。

实际上即使消息写入失败,Kafka 也会把它们写入到底层的日志中,也就是说 Consumer 还是会看到这些消息。因此在 Consumer 端,读取事务型 Producer 发送的消息也需要做一些变更:设置 isolation.level 参数的值。目前该参数有两个取值:

  • read_uncommitted
    这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。很显然,如果我们用了事务型 Producer,那么对应的 Consumer 就不要使用这个值。
  • read_committed
    表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然,它也能看到非事务型 Producer 写入的所有消息。

尽管从交付语义上来看,事务型 Producer 能做的更多。但是事务型 Producer 的性能要更差,在实际使用过程中,我们需要仔细评估引入事务的开销,切不可无脑地启用事务。

Exactly Once

需要强调的时,Kafka 中的 Exactly Once,和我们通常理解的消息队列的服务⽔平中的 Exactly Once 是不⼀样的。
我们通常理解消息队列的服务⽔平中的 Exactly Once,它指的是,消息从⽣产者发送到 Broker,然后消费者再从 Broker 拉取消息,然后进⾏消费。这个过程中,确保每⼀条消息恰好传输⼀次,不重不丢。
而 Kafka 中的 Exactly Once,它解决的是,在流计算中,⽤ Kafka 作为数据源,并且将计算结果保存到 Kafka 这种场景下,数据从 Kafka 的某个主题中消费,在计算集群中计算,再把计算结果保存在 Kafka 的其他主题中。这样的过程中,保证每条消息都被恰好计算⼀次,确保计算结果正确。
也就是说,Kafka 中的 Exactly Once 机制,是为了解决在“读数据-计算-保存结果”这样的计算过程中数据不重不丢,⽽不是我们通常理解的使⽤消息队列进⾏消息⽣产消费过程中的 Exactly Once。

JMS 规范

JMS 规范定义了两种消息模型:点对点(point to point, queue)和发布/订阅(publish/subscribe, topic)。

点对点(point to point, queue)

生产者生产消息发送到 queue 中,然后消费者从 queue 中取出并且消费消息。 这里要注意:消息被消费以后,queue 中不再有存储, 所以消费者不可能消费已经被消费的消息。queue 支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。

点对点模型示意图如下:
消息模型-点对点

发布/订阅(publish/subscribe, topic)

在发布-订阅消息系统中,消息被持久化到一个 topic 中。与点对点消息系统不同的是,消费者可以订阅一个或多个 topic,消费者可以消费该 topic 中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。在发布-订阅消息系统中,消息的生产者称为发布者,消费者称为订阅者。

发布/订阅模型示意图如下:
消息模型-发布订阅

Kafka 消息模型

从 JMS规范上来说,Kafka 没有实现点对点模型,只实现了发布/订阅模型。不过在这种发布/订阅模型中,如果只有一个订阅者,那它和点对点模型就基本是一样的了。也就是说,发布/订阅模型在功能层面上是可以兼容点对点模型的。

在 Kafka 消息模型中,每个主题包含多个分区,通过多个分区来实现多实例并行生产和消费。需要注意的是,Kafka 只在分区上保证消息的有序性,主题层面是无法保证消息的严格顺序的

在 Kafka 中,订阅者的概念是通过消费组(Consumer Group)来体现的。每个消费组都消费主题中一份完整的消息,不同消费组之间消费进度彼此不受影响,也就是说,一条消息被 Consumer Group1 消费过,也会再给Consumer Group2消费。
消费组中包含多个消费者,同一个组内的消费者是竞争消费的关系,每个消费者负责消费组内的一部分消息。如果一条消息被消费者 Consumer1 消费了,那同组的其他消费者就不会再收到这条消息。

在 Topic 的消费过程中,由于消息需要被不同的组进行多次消费,所以消费完的消息并不会立即被删除,这就需要 Kafka 为每个消费组在每个分区上维护一个消费位置(Consumer Offset),这个位置之前的消息都被消费过,之后的消息都没有被消费过,每成功消费一条消息,消费位置就加一。

Kafka 消息模型示意图如下:
Kafka消息模型

在本文最后,需要再说明下为何 Kafka 会引入分区这个概念:
在生产端,生产者先将消息发送给服务端,也就是Broker,服务端在收到消息并将消息写入主题或者队列中后,会给生产者发送确认的响应。如果生产者没有收到服务端的确认或者收到失败的响应,则会重新发送消息;在消费端,消费者在收到消息并完成自己的消费业务逻辑(比如,将数据保存到数据库中)后,也会给服务端发送消费成功的确认,服务端只有收到消费确认后,才认为一条消息被成功消费,否则它会给消费者重新发送这条消息,直到收到对应的消费成功确认。这个确认机制很好地保证了消息传递过程中的可靠性,但是,引入这个机制在消费端带来了一个不小的问题。什么问题呢?
为了确保消息的有序性,在某一条消息被成功消费之前,下一条消息是不能被消费的,否则就会出现消息空洞,违背了有序性这个原则。也就是说,每个主题在任意时刻,至多只能有一个消费者实例在进行消费,那就没法通过水平扩展消费者的数量来提升消费端总体的消费性能。为了解决这个问题,Kafka 在主题下面增加了队列的概念。

Kafka

Apache Kafka 是一款开源的消息引擎系统,其主要功能是提供一套完备的消息发布与订阅解决方案。
在 Kafka 中,发布订阅的对象是主题(Topic),我们可以为每个业务、每个应用甚至是每类数据都创建专属的主题。

消息(Record)

我们已经知道,Kafka 是消息引擎,这里的消息就是指 Kafka 处理的主要对象。

主题(Topic)

主题是承载消息的逻辑容器,在实际使用中多用来区分具体的业务。

生产者(Producer)

向主题发布消息的客户端应用程序。

消费者(Consumer)

从主题订阅消息的应用程序。

客户端(Clients)

我们把生产者和消费者统称为客户端。

服务器端(Broker)

由被称为 Broker 的服务进程构成,即一个 Kafka 集群由多个 Broker 组成,Broker 负责接收和处理客户端发来的请求,以及对消息进行持久化。

分区(Partition)

一个有序不变的消息队列。每个主题下可以有多个分区。
生产者生产的每条消息只会被发送到一个分区中,也就是说如果向一个双分区的主题发送一条消息,这条消息要么在分区 0 中,要么在分区 1 中。(Kafka 的分区编号从 0 开始递增)
分区的设计是为了解决分布式系统中的伸缩性问题:把数据分割成多份保存在不同的 Broker 上,从而避免数据太多以至于单台 Broker 都无法容纳。
Partition 与 MongoDB 和 Elasticsearch 中的 Sharding、HBase中的Region,其实它们都是相同的原理,只是 Partitioning 是最标准的名称。

副本(Replica)

Kafka 中同一条消息能够被拷贝到多个地方以提供数据冗余,这些地方就是所谓的副本。
副本是在分区层级下实现的,即每个分区可配置多个副本实现高可用。
副本可以分为领导者副本(Leader Replica)和追随者副本(Follower Replica)。前者对外提供服务,这里的对外指的是与客户端程序进行交互;而后者只是被动的追随前者而已,不能与外界进行交互。
副本的工作机制:生产者总是向领导者副本写消息;而消费者总是从领导者副本读消息。至于追随者副本,它只做一件事:向领导者副本发送请求,请求领导者把最新生产的消息发给它,这样它能保持与领导者的同步。

我们知道,MySQL 的从库是可以处理读操作的,但是在 Kafka 中追随者副本不会对外提供服务。其原因有以下几点:

  • kafka 的分区已经能够让读写操作在多个 Broker 上,而不像 MySQL 的主从,压力都在主上;
  • kafka 保存的数据和数据库的性质有实质的区别:Kafka 中的数据是流数据,需要记录消费位移,而数据库是实体数据不存在这个概念,如果从 Kafka 的 Follower 读,消费端 offset 控制将更复杂;
  • 对于生产者来说,kafka 可以通过配置来控制是否等待 Follower 的消息确认,如果从上面读,也需要所有的 Follower 都确认了才可以回复生产者,造成性能下降,如果 Follower 出问题了也不好处理。

消费者组(Consumer Group)

多个消费者实例共同组成的一个组,同时消费多个分区以实现高吞吐。
主题中的每个分区都只会被组内的一个消费者实例消费,其它消费者实例不能消费它。一个消费者实例可以消费多个分区。分区数决定了消费者个数的上限,一般情况下消费者个数应与分区数一致,消费者个数过多会浪费系统资源,因为多出的消费者不会被分配到任何分区。

重平衡(Rebalance)

消费者组内某个实例挂掉后,其它消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 实现高可用的重要手段。

消息位移(Offset)

表示分区中每条消息的位置信息,是一个单调递增且不变的值。

消费者位移(Consumer Offset)

表示消费者的消费进度,每个消费者都有自己的消费者位移。

Maven 依赖

1
2
3
4
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

配置属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
spring:
kafka:
# kafka 集群地址
bootstrap-servers: localhost:9092,localhost:9093,localhost:9094
# 生产者
producer:
# 生产者每次批量发送数据的大小,默认值 16K
batch-size: 16384
# 生产者能够使用的缓存的大小,默认值 32M
buffer-memory: 33554432
# 生产者发送数据失败时重试的次数,默认值 0
retries: 0
# 指定了必须要有多少个分区副本收到消息,生产者才会认为写入消息是成功的,这个参数对消息丢失的可能性有重大影响。默认值 1
# 0:生产者往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效率最高。
# 1:生产者往集群发送数据只要 Leader 应答就可以发送下一条,只确保 Leader 接收成功。
# all或-1:往集群发送数据需要所有的 ISR Follower 都完成从 Leader 的同步才会发送下一条,确保 Leader 发送成功和所有的副本都成功接收。安全性最高,但是效率最低。
acks: 1
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
# 一个 batch 被创建之后,最多等待多久被发送出去,默认值 0
# 配合 batch-size 一起使用:满足 batch.size 和 ling.ms 之一,生产者便开始发送 batch
linger.ms: 0
# 消费者
consumer:
# 用来唯一标识 consumer 进程所在组的字符串,如果设置同样的 group id,表示这些 processes 都是属于同一个 consumer group
group-id: group-x
# 如果为真,消费者所 fetch 的消息的 offset 将会自动的同步到 zookeeper
enable-auto-commit: true
# 消费者向 zookeeper 提交 offset 的频率,当 enable-auto-commit 为 true 时生效,默认值 5s
auto-commit-interval: 5000
# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
# none: 当各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

生产方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Data
public class Message {
private Long id;
private String msg;
private Date sendTime;
}

@Configuration
public class KafkaConfiguration {
public static final String TOPIC = "topic";

/**
* 当主题不存在时才会创建新的主题
*/
@Bean
public NewTopic topic() {
return TopicBuilder.name(TOPIC).partitions(1).replicas(1).build();
}
}

@Slf4j
@Component
public class MessageSender {
private final KafkaTemplate<Object, Object> kafkaTemplate;

public MessageSender(KafkaTemplate<Object, Object> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}

/**
* 简单发送消息
*/
public void send(Message message) {
kafkaTemplate.send(TOPIC, message);
}
}

消费方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Configuration
public class KafkaConfiguration {
public static final String TOPIC = "topic";

/**
* 当主题不存在时才会创建新的主题
*/
@Bean
public NewTopic topic() {
return TopicBuilder.name(TOPIC).partitions(1).replicas(1).build();
}
}

@Slf4j
@Component
public class MessageReceiver {

/**
* 接收消息,参数类型为 ConsumerRecord
*/
@KafkaListener(id = "group-2-1", topics = {TOPIC})
public void receive(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) {
Optional<Object> messageOptional = Optional.ofNullable(record.value());
if (messageOptional.isPresent()) {
Object message = messageOptional.get();
log.info("Receive message: {}", message);
}

// 设置了手动 ack 模式时,需要在消息处理完毕之后,手动调用 acknowledge
acknowledgment.acknowledge();
}

/**
* 接收消息,参数类型为 String
*/
@KafkaListener(id = "group-2-2", topics = {TOPIC})
public void receive(String message, Acknowledgment acknowledgment) {
log.info("Receive message: {}", message);
acknowledgment.acknowledge();
}
}

高级配置

序列化/反序列化

当我们发送对象类型消息数据时,在消费者方默认接收到的数据类型是字符串,如果我们希望接收到的是对象类型,可以在生产者/消费者两方都添加一点配置。

  • 生产方

    1
    2
    3
    4
    5
    6
    7
    spring:
    kafka:
    # 生产者
    producer:
    properties:
    # 建立映射:message 与 samples.dto.Message 对应
    spring.json.type.mapping: message:samples.dto.Message
  • 消费方

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    @Configuration
    public class KafkaConfiguration {

    /**
    * 消息类型转换
    */
    @Bean
    public RecordMessageConverter converter() {
    DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
    typeMapper.setTypePrecedence(Jackson2JavaTypeMapper.TypePrecedence.TYPE_ID);
    typeMapper.addTrustedPackages("samples.dto");
    Map<String, Class<?>> mappings = new HashMap<>(10);
    mappings.put("message", Message.class);
    typeMapper.setIdClassMapping(mappings);

    StringJsonMessageConverter converter = new StringJsonMessageConverter();
    converter.setTypeMapper(typeMapper);
    return converter;
    }
    }

之后,我们就能够像下面这样接收 Message 参数的消息了:

1
2
3
4
5
6
7
8
9
10
/**
* 接收消息,参数类型为 Message
*/
@KafkaListener(id = "group-2-3", topics = {TOPIC})
public void receive(Message message, Acknowledgment acknowledgment) {
log.info("Receive message: {}", message);

// 设置了手动 ack 模式时,需要在消息处理完毕之后,手动调用 acknowledge
acknowledgment.acknowledge();
}

在后续演示代码中,默认都会配置序列化/反序列化,就不再一一列举了。

获取消息发送结果

在发送消息之后,我们可以获取到消息发送结果,既可以以同步方式获取结果,也可以以异步方式获取结果。

  • 同步方式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    /**
    * 发送消息,同时同步获取消息发送结果
    */
    public void sendAndGetResultSync(Message message) {
    ListenableFuture<SendResult<Object, Object>> future = kafkaTemplate.send(TOPIC, message);
    try {
    SendResult<Object, Object> result = future.get();
    log.info("Sync get send result success, result: {}", result);
    } catch (Throwable e) {
    log.error("Sync get send result failure", e);
    }
    }
  • 异步方式

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    /**
    * 发送消息,同时异步获取消息发送结果
    */
    public void sendAndGetResultAsync(Message message) {
    kafkaTemplate.send(TOPIC, message).addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {

    @Override
    public void onSuccess(SendResult<Object, Object> result) {
    log.info("Async get send result success, result: {}", result);

    }

    @Override
    public void onFailure(Throwable e) {
    log.error("Async get send result failure", e);
    }
    });
    }

使用事务

开启事务

当我们在生产者方配置了属性 transaction-id-prefix 后,Spring 会自动帮我们开启事务。
不过开启事务之后,retries 属性需要设置为大于 0,acks 属性需要设置为 all 或 -1。
另外,我们还需要将消费者方的 isolation.level 设置为 read_committed,这样对于未提交事务的消息,消费者就不会读取到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
spring:
kafka:
# 生产者
producer:
# 生产者发送数据失败时重试的次数,默认值 0
retries: 3
# 指定了必须要有多少个分区副本收到消息,生产者才会认为写入消息是成功的,这个参数对消息丢失的可能性有重大影响。默认值 1
# 0:生产者往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效率最高。
# 1:生产者往集群发送数据只要 Leader 应答就可以发送下一条,只确保 Leader 接收成功。
# all或-1:往集群发送数据需要所有的 ISR Follower 都完成从 Leader 的同步才会发送下一条,确保 Leader 发送成功和所有的副本都成功接收。安全性最高,但是效率最低。
acks: -1
# 事务 id 前缀,设置该属性后会开启事务
transaction-id-prefix: tx.
# 消费者
consumer:
properties:
# 事务隔离级别(read_committed 和 read_uncommitted)
isolation.level: read_committed

使用事务

有两种方式
Spring 提供了两种方式使用事务:调用 KafkaTemplate 的 executeInTransaction 方法,或使用 @Transactional。

  • 调用 KafkaTemplate 的 executeInTransaction 方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    /**
    * 发送消息,并开启事务,使用 kafkaTemplate.executeInTransaction
    */
    public void sendInTransactionByMethod(Message message) {
    kafkaTemplate.executeInTransaction(kafkaTemplate -> {
    kafkaTemplate.send(TOPIC, message);
    log.info("Send in transaction by method success");
    //noinspection ConstantConditions
    return null;
    // throw new RuntimeException("fail");
    });
    }
  • 使用 @Transactional

    1
    2
    3
    4
    5
    6
    7
    8
    9
    /**
    * 发送消息,并开启事务,使用 @Transactional
    */
    @Transactional(rollbackFor = Exception.class)
    public void sendInTransactionByAnnotation(Message message) {
    kafkaTemplate.send(TOPIC, message);
    log.info("Send in transaction by annotation success");
    // throw new RuntimeException("fail");
    }

需要注意的是,在生产者开启事务之后,所有发送消息的地方都必须放在事务中执行。

转发消息

消费者在收到消息并对消息进行处理之后,可以再将新的消息发送出去。
在消费方,我们既可以使用 kafkaTemplate.send 实现手动发送消息,也可以使用 @Send 实现自动发送消息。

  • 生产方

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    @Configuration
    public class KafkaConfiguration {
    public static final String TOPIC_RESEND = "topic-resend";
    public static final String TOPIC_RESEND_NEXT = "topic-resend-next";

    @Bean
    public NewTopic topicResend() {
    return TopicBuilder.name(TOPIC_RESEND).partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic topicResendNext() {
    return TopicBuilder.name(TOPIC_RESEND_NEXT).partitions(1).replicas(1).build();
    }
    }

    /**
    * 消息生产方同时作为消费者,接收其对应的消费者返回的消息
    */
    @Slf4j
    @Component
    public class MessageReceiver {

    /**
    * 接收消息,参数类型为 ConsumerRecord
    */
    @KafkaListener(id = "group-1-1", topics = {TOPIC_RESEND_NEXT})
    public void receive(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) {
    Optional<Object> messageOptional = Optional.ofNullable(record.value());
    if (messageOptional.isPresent()) {
    Object message = messageOptional.get();
    log.info("Receive message: {}", message);
    }
    acknowledgment.acknowledge();
    }

    /**
    * 接收消息,参数类型为 String
    */
    @KafkaListener(id = "group-1-2", topics = {TOPIC_RESEND_NEXT})
    public void receive(String msg, Acknowledgment acknowledgment) {
    log.info("Receive msg: {}", msg);
    acknowledgment.acknowledge();
    }
    }

    @Slf4j
    @Component
    public class MessageReceiver {

    /**
    * 接收消息,参数类型为 ConsumerRecord
    */
    @KafkaListener(id = "group-1-1", topics = {TOPIC_RESEND_NEXT})
    public void receive(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) {
    Optional<Object> messageOptional = Optional.ofNullable(record.value());
    if (messageOptional.isPresent()) {
    Object message = messageOptional.get();
    log.info("Receive message: {}", message);
    }
    acknowledgment.acknowledge();
    }

    /**
    * 接收消息,参数类型为 String
    */
    @KafkaListener(id = "group-1-2", topics = {TOPIC_RESEND_NEXT})
    public void receive(String msg, Acknowledgment acknowledgment) {
    log.info("Receive msg: {}", msg);
    acknowledgment.acknowledge();
    }
    }
  • 消费方

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    @Configuration
    public class KafkaConfiguration {
    public static final String TOPIC_RESEND = "topic-resend";
    public static final String TOPIC_RESEND_NEXT = "topic-resend-next";

    @Bean
    public NewTopic topicResend() {
    return TopicBuilder.name(TOPIC_RESEND).partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic topicResendNext() {
    return TopicBuilder.name(TOPIC_RESEND_NEXT).partitions(1).replicas(1).build();
    }
    }

    @Slf4j
    @Component
    public class MessageReceiver {
    private final KafkaTemplate<Object, Object> kafkaTemplate;

    @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
    public MessageReceiver(KafkaTemplate<Object, Object> kafkaTemplate) {
    this.kafkaTemplate = kafkaTemplate;
    }

    /**
    * 接收消息,在处理完消息之后,再将新的消息 手动 重新发送出去
    */
    @SendTo
    @KafkaListener(id = "group-2-4", topics = {TOPIC_RESEND})
    public void receiveAndResendManual(Message message, Acknowledgment acknowledgment) {
    log.info("Receive message: {}", message);
    acknowledgment.acknowledge();
    kafkaTemplate.send(TOPIC_RESEND_NEXT, message.getMsg());
    }

    /**
    * 接收消息,在处理完消息之后,再将新的消息 自动 重新发送出去
    */
    @SendTo(TOPIC_RESEND_NEXT)
    @KafkaListener(id = "group-2-5", topics = {TOPIC_RESEND})
    public String receiveAndResendAuto(Message message, Acknowledgment acknowledgment) {
    log.info("Receive message: {}", message);
    acknowledgment.acknowledge();
    return message.getMsg();
    }
    }

获取消息回复

生产者在发送消息之后,可以同时等待获取消费者接收并处理消息之后的回复,就像传统的 RPC 交互那样,要实现这个功能,我们需要使用 ReplyingKafkaTemplate。
ReplyingKafkaTemplate 是 KafkaTemplate 的一个子类,它除了继承父类的方法,还新增了方法 sendAndReceive ,该方法实现了消息发送/回复的语义。
Spring Boot 没有提供开箱即用的 ReplyingKafkaTemplate,我们需要做些额外的配置。

  • 生产方

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    @Configuration
    public class KafkaConfiguration {
    public static final String TOPIC_RECEIVE = "topic-receive";

    public static final String TOPIC_REPLIES = "replies";
    public static final String GROUP_REPLIES = "repliesGroup";

    @Bean
    public NewTopic topicReceive() {
    return TopicBuilder.name(TOPIC_RECEIVE).partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic topicReplies() {
    return TopicBuilder.name(TOPIC_REPLIES).partitions(1).replicas(1).build();
    }

    @Bean
    public ConcurrentMessageListenerContainer<Object, Object> concurrentMessageListenerContainer(
    ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory) {
    ConcurrentMessageListenerContainer<Object, Object> container =
    containerFactory.createContainer(TOPIC_REPLIES);
    container.getContainerProperties().setGroupId(GROUP_REPLIES);
    container.setAutoStartup(false);
    return container;
    }

    @Bean
    public ReplyingKafkaTemplate<Object, Object, Object> replyingKafkaTemplate(
    ProducerFactory<Object, Object> producerFactory,
    ConcurrentMessageListenerContainer<Object, Object> concurrentMessageListenerContainer) {
    ReplyingKafkaTemplate<Object, Object, Object> kafkaTemplate =
    new ReplyingKafkaTemplate<>(producerFactory, concurrentMessageListenerContainer);
    // kafkaTemplate.setSharedReplyTopic(true);
    return kafkaTemplate;
    }

    /**
    * 如果不配置 kafkaTemplate,启动时会抛出循环依赖异常
    */
    @Bean
    public KafkaTemplate<Object, Object> kafkaTemplate(ProducerFactory<Object, Object> producerFactory) {
    return new KafkaTemplate<>(producerFactory);
    }
    }

    @Slf4j
    @Component
    public class MessageSender {
    /**
    * KafkaTemplate 的子类,除了继承父类的方法之外,还提供了 sendAndReceive 方法,可以用来实现类似传统 RPC 那样的交互:即发送消息,并等待消息处理结果。
    */
    private final ReplyingKafkaTemplate<Object, Object, Object> replyingKafkaTemplate;

    public MessageSender(ReplyingKafkaTemplate<Object, Object, Object> replyingKafkaTemplate) {
    this.replyingKafkaTemplate = replyingKafkaTemplate;
    }

    /**
    * 发送消息,同时等待消息结果
    */
    public void sendAndReceive(Message message) {
    ProducerRecord<Object, Object> producerRecord = new ProducerRecord<>(TOPIC_RECEIVE, message);
    // producerRecord.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, TOPIC_REPLIES.getBytes()));
    RequestReplyFuture<Object, Object, Object> replyFuture = replyingKafkaTemplate.sendAndReceive(producerRecord);
    ConsumerRecord<Object, Object> consumerRecord;
    try {
    consumerRecord = replyFuture.get();
    log.info("Receive reply success, result: {}", consumerRecord.value());
    } catch (InterruptedException | ExecutionException e) {
    log.error("Receive reply failure", e);
    }
    }
    }
  • 消费方

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    @Configuration
    public class KafkaConfiguration {
    public static final String TOPIC_RECEIVE = "topic-receive";

    @Bean
    public NewTopic topicReceive() {
    return TopicBuilder.name(TOPIC_RECEIVE).partitions(1).replicas(1).build();
    }
    }

    @Slf4j
    @Component
    public class MessageReceiver {

    /**
    * 接收消息,在处理完消息之后,将处理结果返回给生产方
    */
    @SendTo
    @KafkaListener(id = "group-2-6", topics = {TOPIC_RECEIVE})
    public String receiveAndReply(Message message, Acknowledgment acknowledgment) {
    log.info("Receive message: {}", message);
    acknowledgment.acknowledge();
    return "successful";
    }
    }

多方法处理消息

组合使用 @KafkaListener 和 @KafkaHandler,能够让我们在传递消息时,根据转换后的消息有效负载类型来确定调用哪个方法。

  • 生产方

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    @Configuration
    public class KafkaConfiguration {
    public static final String TOPIC_MULTIPLE = "topic-multiple";

    @Bean
    public NewTopic topicMultiple() {
    return TopicBuilder.name(TOPIC_MULTIPLE).partitions(1).replicas(1).build();
    }
    }

    @Slf4j
    @Component
    public class MessageSender {
    private final KafkaTemplate<Object, Object> kafkaTemplate;

    public MessageSender(KafkaTemplate<Object, Object> kafkaTemplate) {
    this.kafkaTemplate = kafkaTemplate;
    }

    /**
    * 发送消息,消息类型为字符串,消费者根据消息类型决定执行哪个方法
    */
    public void sendToMultipleHandlers(String msg) {
    kafkaTemplate.send(TOPIC_MULTIPLE, msg);
    }

    /**
    * 发送消息,消息类型为 Message,消费者根据消息类型决定执行哪个方法
    */
    public void sendToMultipleHandlers(Message message) {
    kafkaTemplate.send(TOPIC_MULTIPLE, message);
    }
    }
  • 消费方

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    @Configuration
    public class KafkaConfiguration {
    public static final String TOPIC_MULTIPLE = "topic-multiple";

    @Bean
    public NewTopic topicMultiple() {
    return TopicBuilder.name(TOPIC_MULTIPLE).partitions(1).replicas(1).build();
    }
    }

    @Slf4j
    @KafkaListener(id = "group-2-7", topics = {TOPIC_MULTIPLE})
    @Component
    public class MessageReceiverMultipleMethods {

    /**
    * 处理字符串类型的消息
    */
    @KafkaHandler
    public void handlerStr(String message, Acknowledgment acknowledgment) {
    log.info("Receive message: {}, type of String", message);
    acknowledgment.acknowledge();
    }

    /**
    * 处理 Message 类型的消息
    */
    @KafkaHandler
    public void handlerMessage(Message message, Acknowledgment acknowledgment) {
    log.info("Receive message: {}, type of Message", message);
    acknowledgment.acknowledge();
    }

    /**
    * 处理其它类型的消息
    */
    @KafkaHandler(isDefault = true)
    public void handlerUnknown(Object message, Acknowledgment acknowledgment) {
    log.info("Receive message: {}, type of Unknown", message);
    acknowledgment.acknowledge();
    }
    }

手动提交 offset (ack)

默认情况下,Kafka 会自动帮我们提交 offset,但是这样做容易导致消息重复消费或消失丢失:

  • 在消费者收到消息之后,且 kafka 未自动提交 offset 之前,broker 宕机了,然后重启 broker,此时消费者会从原来的 offset 开始消费,于是出现了重复消费;
  • 在消费者收到消息之后,且消费者还没有处理完消息时,由于自动提交的间隔时间到了,于是 kafka 自动提交了 offset,但是之后消费者又挂掉了,那么当消费者重启之后,会从下一个 offset 开始消费,这样前面的消息就丢失了。
    我们可以改为使用手动提交 offset,只需要做两处调整:
  1. 修改 application.yml

    1
    2
    3
    4
    5
    6
    7
    8
    spring:
    kafka:
    # 消费方
    consumer:
    enable-auto-commit: false
    listener:
    # 手动 ack,默认值 batch
    ack-mode: manual
  2. 消息处理完成之后,调用 Acknowledgment 的 acknowledge 方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    @Slf4j
    @Component
    public class MessageReceiver {

    @KafkaListener(id = "group-2-1", topics = {"topic-1"})
    public void receive(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) {
    log.info("receive record: {}", record);
    Optional<Object> messageOptional = Optional.ofNullable(record.value());
    if (messageOptional.isPresent()) {
    Object message = messageOptional.get();
    log.info("receive message: {}", message);
    }
    acknowledgment.acknowledge();
    }
    }

异常处理

对于消费者在处理消息过程中抛出的异常,我们可以设置 errorHandler,然后在 errorHandler 中统一处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 接收消息,处理消息时抛出异常,异常由 errorHandler 进行处理
*/
@KafkaListener(id = "group-2-8", topics = {TOPIC_EXCEPTION}, errorHandler = "customErrorHandler")
public void receiveWithException(Message message, Acknowledgment acknowledgment) {
log.info("Receive message: {}", message);
throw new RuntimeException("error");
}

@Slf4j
@Service("customErrorHandler")
public class CustomKafkaListenerErrorHandler implements KafkaListenerErrorHandler {
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
log.error("Handle message with exception, message: {}", message.getPayload().toString());
return null;
}

@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
log.error("Handle message with exception, message: {}", message.getPayload().toString());
return null;
}
}

并发接收消息

1
2
3
4
5
6
7
8
 /**
* 并发接收消息
*/
@KafkaListener(id = "group-2-9", topics = {TOPIC_CONCURRENT}, concurrency = "3")
public void receiveConcurrent(Message message, Acknowledgment acknowledgment) {
log.info("Receive message: {}", message);
acknowledgment.acknowledge();
}

暂停与恢复消费

通过使用 KafkaListenerEndpointRegistry,我们可以动态的暂停与恢复消费者消费消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* bean 的名称需是类名,不然会找不到 bean
*/
private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

public MessageSender(KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry) {
this.kafkaListenerEndpointRegistry = kafkaListenerEndpointRegistry;
}

/**
* 指定消费者开始消费
*/
public void startListener(String listenerId) {
kafkaListenerEndpointRegistry.getListenerContainer(listenerId).start();
}

/**
* 指定消费者暂停消费
*/
public void stopListener(String listenerId) {
kafkaListenerEndpointRegistry.getListenerContainer(listenerId).pause();
}

/**
* 指定消费者恢复消费
*/
public void resumeListener(String listenerId) {
kafkaListenerEndpointRegistry.getListenerContainer(listenerId).resume();
}

消息重试与死信队列

当消费者在处理消息过程中发生异常时,我们可以进行多次重试,如果最终还是存在异常,我们可以将消息发送到预定的 Topic,即死信队列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 在消费方配置 errorHandler
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> consumerFactory,
KafkaTemplate<Object, Object> kafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, consumerFactory);
factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate), new FixedBackOff(0L, 3L)));
return factory;
}

/**
* 当消费者处理消息时发生异常,且多次重试后依然异常,那么消息会被发送到死信队列
* 这里的消息类型参数不能是 Message 类型,不然不会进入到该方法中
*/
@KafkaListener(id = "group-2-10-2", topics = {TOPIC_DEAD_LETTER + ".DLT"})
public void receiveByDeadLetter(ConsumerRecord<Object, Object> record, Acknowledgment acknowledgment) {
log.info("Receive message from DLT, message: {}", record.value());
acknowledgment.acknowledge();
}

拦截器

Apache Kafka 提供了一种向生产者和消费者添加拦截器的机制。下面我们将演示如何在 Spring Boot 中配置拦截器。

  • 生产方

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    @Configuration
    public class KafkaConfiguration {
    @Bean
    public ProducerFactory<Object, Object> kafkaProducerFactory(KafkaProperties properties, CustomComponent customComponent) {
    Map<String, Object> producerProperties = properties.buildProducerProperties();
    producerProperties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomProducerInterceptor.class.getName());
    producerProperties.put("custom.component", customComponent);
    DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(producerProperties);
    String transactionIdPrefix = properties.getProducer().getTransactionIdPrefix();
    if (transactionIdPrefix != null) {
    factory.setTransactionIdPrefix(transactionIdPrefix);
    }
    return factory;
    }
    }

    @Slf4j
    @Component
    public class CustomComponent {

    public void doSomething() {
    log.info("Do something");
    }
    }

    @Slf4j
    public class CustomProducerInterceptor implements ProducerInterceptor<Object, Object> {
    private CustomComponent customComponent;

    @Override
    public ProducerRecord<Object, Object> onSend(ProducerRecord<Object, Object> record) {
    log.info("Before send message, do your own business");
    customComponent.doSomething();
    return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {
    this.customComponent = (CustomComponent) configs.get("custom.component");
    }
    }
  • 消费方

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    @Configuration
    public class KafkaConfiguration {
    @Bean
    public ProducerFactory<Object, Object> kafkaProducerFactory(KafkaProperties properties, CustomComponent customComponent) {
    Map<String, Object> producerProperties = properties.buildProducerProperties();
    producerProperties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomProducerInterceptor.class.getName());
    producerProperties.put("custom.component", customComponent);
    DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(producerProperties);
    String transactionIdPrefix = properties.getProducer().getTransactionIdPrefix();
    if (transactionIdPrefix != null) {
    factory.setTransactionIdPrefix(transactionIdPrefix);
    }
    return factory;
    }
    }

    @Slf4j
    @Component
    public class CustomComponent {

    public void doSomething() {
    log.info("Do something");
    }
    }

    @Slf4j
    public class CustomProducerInterceptor implements ProducerInterceptor<Object, Object> {
    private CustomComponent customComponent;

    @Override
    public ProducerRecord<Object, Object> onSend(ProducerRecord<Object, Object> record) {
    log.info("Before send message, do your own business");
    customComponent.doSomething();
    return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {
    this.customComponent = (CustomComponent) configs.get("custom.component");
    }
    }