人的知识就好比一个圆圈,圆圈里面是已知的,圆圈外面是未知的。你知道得越多,圆圈也就越大,你不知道的也就越多。

0%

Kafka 消费者位移主题

位移主题

位移主题,即 __consumer_offsets,是 Kafka中 针对 Consumer Group,用于记录消费者的消费位置信息。

老版本 Consumer 的位移管理是依托于 Apache ZooKeeper 的,它会自动或手动地将位移数据提交到 ZooKeeper 中保存。当 Consumer 重启后,它能自动从 ZooKeeper 中读取位移数据,从而在上次消费截止的地方继续消费。这种设计使得 Kafka Broker 不需要保存位移数据,减少了 Broker 端需要持有的状态空间,因而有利于实现高伸缩性。
但是,ZooKeeper 其实并不适用于这种高频的写操作,因此,Kafka 最终在新版本 Consumer 中正式推出了全新的位移管理机制。

新版本 Consumer 的位移管理机制其实很简单,就是将 Consumer 的位移数据作为一条条普通的 Kafka 消息,提交到 _ _consumer_offsets 中。可以这么说,__consumer_offsets 的主要作用是保存 Kafka 消费者的位移信息。它要求这个提交过程不仅要实现高持久性,还要支持高频的写操作。显然,Kafka的主题设计天然就满足这两个条件。

需要强调的是,和我们创建的其他主题一样,位移主题就是普通的 Kafka 主题。我们可以手动地创建它、修改它,甚至是删除它。只不过,它同时也是一个内部主题,大部分情况下,我们其实并不需要“搭理”它,也不用花心思去管理它,把它丢给Kafka就完事了。

虽说位移主题是一个普通的 Kafka 主题,但它的消息格式却是 Kafka 自己定义的,用户不能修改,也就是说我们不能随意地向这个主题写消息,因为一旦我们写入的消息不满足 Kafka 规定的格式,那么 Kafka 内部无法成功解析,就会造成 Broker 的崩溃。事实上,Kafka Consumer 有 API 帮我们提交位移,也就是向位移主题写消息。我们千万不要自己写个 Producer 随意向该主题发送消息。

消息格式

对于 Consumer Group而言,它是一组 KV 对,key 包含3部分内容:<Group ID,主题名,分区号>,value 对应 Consumer 消费该分区的最新位移(其实也可以适用于保存 Consumer Group 信息的消息,或用于删除 Group 过期位移甚至是删除 Group 的消息)。

创建位移主题

位移主题既可以由 Kafka 自动创建,也可以由我们手动创建。
默认位移主题是由 Kafka 自动创建的,此时该主题的默认分区数是 50,分区的默认副本数是 3。
我们也可以通过设置 Broker 端的参数 offsets.topic.num.partitions 和 offsets.topic.replication.factor 来改变分区数或副本数。

通常还是让 Kafka 自动创建比较好。目前 Kafka 源码中有一些地方硬编码了 50 分区数,因此如果我们自行创建了一个不同于默认分区数的位移主题,可能会碰到各种各种奇怪的问题。

提交位移

目前 Kafka Consumer 提交位移的方式有两种:自动提交位移(默认)和手动提交位移。

自动提交

Consumer 端有个参数叫 enable.auto.commit,如果值是 true,则 Consumer 在后台默默地为我们定期提交位移,提交间隔由一个专属的参数 auto.commit.interval.ms 来控制。
自动提交位移有一个显著的优点,就是省事,我们不用操心位移提交的事情,就能保证消息消费不会丢失。
但自动提交位移的一个问题在于,它可能会出现重复消费。

在默认情况下,Consumer 每 5 秒自动提交一次位移。
现在,我们假设提交位移之后的 3 秒发生了 Rebalance 操作。在 Rebalance 之后,所有 Consumer 从上一次提交的位移处继续消费,但该位移已经是 3 秒前的位移数据了,故在 Rebalance 发生前 3 秒消费的所有数据都要重新再消费一次。
虽然我们能够通过减少 auto.commit.interval.ms 的值来提高提交频率,但这么做只能缩小重复消费的时间窗口,不可能完全消除它。这是自动提交机制的一个缺陷。

手动提交

反观手动提交位移,它的好处就在于更加灵活,我们完全能够把控位移提交的时机和频率。但是,它也有一个缺陷,就是在调用 commitSync() 时,Consumer 程序会处于阻塞状态,直到远端的 Broker 返回提交结果,这个状态才会结束。
在任何系统中,因为程序而非资源限制而导致的阻塞都可能是系统的瓶颈,会影响整个应用程序的 TPS。当然,我们可以选择拉长提交间隔,但这样做的后果是 Consumer 的提交频率下降,在下次 Consumer 重启回来后,会有更多的消息被重新消费。

鉴于这个问题,Kafka 社区为手动提交位移提供了另一个 API 方法:KafkaConsumer#commitAsync()。从名字上来看它就不是同步的,而是一个异步操作。调用 commitAsync() 之后,它会立即返回,不会阻塞,因此不会影响 Consumer 应用的 TPS。由于它是异步的,Kafka 提供了回调函数(callback),供我们实现提交之后的逻辑,比如记录日志或处理异常等。

那么,commitAsync 是否能够替代 commitSync 呢?答案是不能。commitAsync 的问题在于,出现问题时它不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过期”或不是最新值了。因此,异步提交的重试其实没有意义,所以 commitAsync 是不会重试的。

事实上,很多与 Kafka 集成的大数据框架都是禁用自动提交位移的,如 Spark、Flink 等。

混合提交

显然,如果是手动提交,我们需要将 commitSync 和 commitAsync 组合使用才能到达最理想的效果,原因有两个:

  1. 我们可以利用 commitSync 的自动重试来规避那些瞬时错误,比如网络的瞬时抖动,Broker 端 GC 等。因为这些问题都是短暂的,自动重试通常都会成功,因此,我们不想自己重试,而是希望 Kafka Consumer 帮我们做这件事。
  2. 我们不希望程序总处于阻塞状态,影响 TPS。
小礼物走一走,来 Github 关注我