kafka消费分区重平衡机制
所属分类 kafka
浏览量 958
Rebalance触发机制
消费组成员变更 消费者加入或离开(重启 宕机 等)
消费组订阅的Topic发生变化
订阅的Topic的partition发生变化
消费者无法在指定的时间内完成消息的消费(活锁 livelock)poll时间间隔大于 max.poll.interval.ms
注意 消息处理耗时 和 max.poll.interval.ms 配置 !!!
模拟 消费处理超时的情况
https://gitee.com/dyyx/kafkademo/blob/master/src/main/java/dyyx/ConsumerRebalanceTest.java
kafka客户端版本 1.1.0
重平衡相关的日志信息
2020-12-03 08:02:37 [main] WARN org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -[Consumer clientId=consumer-1, groupId=kafkademo_test_group] Synchronous auto-commit of offsets {kafka_demo_test_topic-2=OffsetAndMetadata{offset=1445, metadata=''}, kafka_demo_test_topic-1=OffsetAndMetadata{offset=1488, metadata=''}, kafka_demo_test_topic-0=OffsetAndMetadata{offset=1510, metadata=''}, kafka_demo_test_topic-5=OffsetAndMetadata{offset=1511, metadata=''}, kafka_demo_test_topic-4=OffsetAndMetadata{offset=1441, metadata=''}, kafka_demo_test_topic-3=OffsetAndMetadata{offset=1456, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
2020-12-03 08:02:37 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -[Consumer clientId=consumer-1, groupId=kafkademo_test_group] Revoking previously assigned partitions [kafka_demo_test_topic-2, kafka_demo_test_topic-1, kafka_demo_test_topic-0, kafka_demo_test_topic-5, kafka_demo_test_topic-4, kafka_demo_test_topic-3]
2020-12-03 08:02:37 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -[Consumer clientId=consumer-1, groupId=kafkademo_test_group] (Re-)joining group
2020-12-03 08:02:37 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator -[Consumer clientId=consumer-1, groupId=kafkademo_test_group] Successfully joined group with generation 51
2020-12-03 08:02:37 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -[Consumer clientId=consumer-1, groupId=kafkademo_test_group] Setting newly assigned partitions [kafka_demo_test_topic-2, kafka_demo_test_topic-1, kafka_demo_test_topic-0, kafka_demo_test_topic-5, kafka_demo_test_topic-4, kafka_demo_test_topic-3]
重平衡相关的关键日志信息
超时之后提交offset
failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.
This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms,
which typically implies that the poll loop is spending too much time message processing.
You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
Revoking previously assigned partitions
(Re-)joining group
Successfully joined group with generation 51
Setting newly assigned partitions
有专门的心跳线程,为何还要 设置 max.poll.interval.ms
避免活锁(livelock)
活锁是指应用没有故障但是由于某些原因不能进一步消费
Reblance 作用
让ConsumerGroup下的所有的Consumer 达成一致来分配订阅Topic的每个Partition
比如某个group下有3个consumer,订阅的topic 有 6个分区 ,为每个consumer分配2个分区
Coordinator
GroupCoordinator
每个broker 启动一个GroupCoodinator
每一个GroupCoodinator 负责管理一部分消费者组
_consumer_offsets 默认50个分区
位移提交分区
Math.abs(hash(groupID)) % numPartitions
定期压缩,message key(groupID+topic+分区id),合并相同的key,保留最新的
partition leader 所在 Broker 即为该 Group 所对应的 GroupCoordinator
GroupCoordinator 会存储与该 group 相关的所有的 Meta 信息
session.timeout.ms
Coordinator 检测消费者失败的时间,在该时间内保持心跳即可
更小的值可以更快发现消费者崩溃,更快地开启重平衡,避免消费滞后,但这会导致频繁重平衡
max.poll.interval.ms 消费端
heartbeat.interval.ms
心跳时间间隔 ,应小于 session.timeout.ms
消费组 5 个状态
Empty 没有一个活跃的消费者
PreparingRebalance 准备进行重平衡
AwaitingSync 全部消费者都已经加入组并且正在进行重平衡,各个消费者等待分配分区方案
Stable 分区方案已经全部发送给消费者,消费者已经在正常消费
Dead 被 Coordinator 彻底废弃
重平衡主要的两个步骤
加入组(JoinGroup)
当消费者心跳包响应 REBALANCE_IN_PROGRESS 时,说明消费组正在重平衡,此时消费者会停止消费,并且发送请求加入消费组
同步更新分配方案
当 Coordinator 收到所有组内成员的加入组请求后,会选出一个consumer Leader,然后让consumer Leader进行分配
分配完后会将分配方案放入SyncGroup请求中发送给Coordinator
Coordinator根据分配方案发送给每个消费者
上一篇
下一篇
kafka消息存储及索引机制
kafka消费机制要点
kafka消费端核心参数
Kafka网络模型
java nio 编程模型简介
kafka Coordinator 简介