Kafka消费配置

offset

在Kafka中无论是producer往topic中写数据,还是consumer从topic中读数据,都避免不了和offset打交道,关于offset主要有以下几个概念。

Last Committed Offset

  • consumer group最新一次 commit 的 offset,表示这个 group 已经把 Last Committed Offset 之前的数据都消费成功了。

Current Position

  • consumer group 当前消费数据的 offset,也就是说,Last Committed Offset 到 Current Position 之间的数据已经拉取成功,可能正在处理,但是还未 commit。

Log End Offset(LEO)

  • 记录底层日志(log)中的下一条消息的 offset。,对producer来说,就是即将插入下一条消息的offset。

High Watermark(HW)

  • 已经成功备份到其他 replicas 中的最新一条数据的 offset,也就是说 Log End Offset 与 High Watermark 之间的数据已经写入到该 partition 的 leader 中,但是还未完全备份到其他的 replicas 中,consumer是无法消费这部分消息(未提交消息)。

消费

  • 每个consumer都要维护一个独立的TCP连接,如果分区数和创建consumer线程的数量过多,会造成不小系统开销。但是如果处理消息足够快速,消费性能也会提升,如果慢的话就会导致消费性能降低。

  • 采用一个consumer,多个消息处理线程来处理消息,其实在生产中,瓶颈一般是集中在消息处理上的(可能会插入数据到数据库,或者请求第三方API),所以我们采用多个线程来处理这些消息。

订阅/取消主题

订阅:

  • 使用subscribe()方法订阅主题
  • 使用assign()方法订阅确定主题和分区
    区别:
    通过subscribe()方法订阅主题具有消费者自动再均衡(reblance)的功能,存在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当组内的消费者增加或者减少时,分区关系会自动调整。实现消费负载均衡以及故障自动转移。使用assign()方法订阅则不具有该功能。

消费配置

消费者必须的属性有4个。

  • group.id。Consummer中有一个Consumer group(消费组),由它来决定同一个Consumer group中的消费者具体拉取哪个partition的数据,所以这里必须指定group.id属性。同一个group只能消费1次。

  • bootstrap.servers。连接Kafka集群的地址,多个地址以逗号分隔

  • key.deserializer。消息中key反序列化类,需要和Producer中key序列化类相对应。

  • value.deserializer。消息中value的反序列化类,需要和Producer中Value序列化类相对应。

消费涉及到了2个offset,一个是current position,一个是处理完毕向服务器确认的committed offset。

提交策略

Kafka提供了3种提交offset的方式。

  • 自动提交

    1
    2
    3
    4
    // 自动提交,默认true
    props.put("enable.auto.commit", "true");
    // 设置自动每1s提交一次
    props.put("auto.commit.interval.ms", "1000");
  • 手动同步提交offset

    1
    consumer.commitSync();
  • 手动异步提交offset

    1
    consumer.commitAsync();

重复消费

异步提交offset和同步提交都存在重复消费问题。

  • 异步模式下committed offset是落后于current position的。如果consumer挂掉了,那么下一次消费数据又只会从committed offset的位置拉取数据,就会导致数据被重复消费。

  • 同步操作存在原子操作问题。例如insertIntoDB和commitSync()做不到原子操作;如果insertIntoDB()成功了,但是提交offset的时候consumer挂掉了,然后服务器重启,仍然会导致重复消费问题。

是否需要做到不重复消费?

只要保证处理消息和提交offset得操作是原子操作,就可以做到不重复消费。我们可以自己管理committed offset,而不让kafka来进行管理。

  • 如果消费的数据刚好需要存储在数据库,那么可以把offset也存在数据库,就可以就可以在一个事务中提交这两个结果,保证原子操作。

  • 借助搜索引擎,把offset和数据一起放到索引里面,比如Elasticsearch。

每条记录都有自己的offset,所以如果要管理自己的offset还得要做下面事情:

  • 设置enable.auto.commit=false

  • 使用每个ConsumerRecord提供的offset来保存消费的位置

  • 在重新启动时使用seek(TopicPartition, long)恢复上次消费的位置

通过上面的方式就可以在消费端实现”Exactly Once”的语义,即保证只消费一次。

但是是否真的需要保证不重复消费呢?这个得看具体业务,重复消费数据对整体有什么影响在来决定是否需要做到不重复消费。

再均衡(reblance)

Kafka数据使用多线程阻塞的方式进行消费,即每个线程通过poll()的形式消费一个或者多个partition, 每次得到的消息集处理完成之后才会继续进行下一次poll()操作,同时使用了自动提交offset的模式。

Rebalance发生的原因有可能是集群的问题,但大部分都在客户端,一旦服务端在设定的超时时间内没有收到消费者发起的心跳,则认为这个消费者已经死掉,就会执行Rebalance动作。

Rebalance对数据的影响主要有以下几点:

  • 数据重复消费: 消费过的数据由于提交offset任务也会失败,在partition被分配给其他消费者的时候,会造成重复消费,数据重复且增加集群压力
  • Rebalance扩散到整个ConsumerGroup的所有消费者,因为一个消费者的退出,导致整个Group进行了Rebalance,并在一个比较慢的时间内达到稳定状态,影响面较大
  • 频繁的Rebalance反而降低了消息的消费速度,大部分时间都在重复消费和Rebalance
  • 数据不能及时消费,会消息堆积即消费滞后(Lag),在Kafka的TTL之后会丢弃数据

为了更精确的控制消息的消费,我们可以在订阅主题的时候,通过指定监听器的方式来设定发生再均衡动作前后的一些准备或者收尾的动作。

1
2
3
4
5
6
7
8
9
10
11
consumer.subscribe(Collections.singletonList("test3"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
//再均衡之前和消费者停止读取消息之后被调用
}

@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
//重新分配分区之后和消费者开始消费之前被调用
}
});

其中采用了 N consumer thread + N Event Handler的方式来消费数据,并采用自动提交offset。对于无法消费的数据往往只是简单处理下,打印下日志以及消息体(无法消费的情况非常非常少)。

配置

  • fetch.min.bytes
    配置poll()拉取请求过程种能从Kafka拉取的最小数据量,如果可用数据量小于它指定的大小会等到有足够可用数据时才会返回给消费者,其默认值时1B

  • fetch.max.wait.ms
    和fetch.min.bytes有关,用于指定Kafka的等待时间,默认时间500ms。

如果fetch.min.bytes设置为1MB,fetch.max.wait.ms设置为100ms,Kafka收到消费者请求后,要么返回1MB数据,要么在100ms后返回所有可用数据,就看哪个提交得到满足。

  • max.poll.records
    用于控制单次调用poll()能返回的最大记录数量,默认为500条数据

  • partition.assignment.stragety
    分区会被分配给群组的消费者,这个参数用于指定分区分配策略。默认是RangeAssignore,可选的还有RoundRobinAssignor。同样它还支持自定义。

注意:KafkaConsumer是非线程安全的类,当使用多个线程操作同一个KafkaConsumer对象时就会引起KafkaConsumer is not safe for multi-threaded access错误。

------ 本文结束------

本文标题:Kafka消费配置

文章作者:Perkins

发布时间:2019年11月18日

原始链接:https://perkins4j2.github.io/posts/39128/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。