首页  

kafka消费分区重平衡机制     所属分类 kafka 浏览量 87
Rebalance触发机制

消费组成员变更 消费者加入或离开(重启 宕机 等)
消费组订阅的Topic发生变化
订阅的Topic的partition发生变化
消费者无法在指定的时间内完成消息的消费(活锁 livelock)poll时间间隔大于 max.poll.interval.ms

注意 消息处理耗时 和  max.poll.interval.ms 配置 !!!

模拟 消费处理超时的情况

https://gitee.com/dyyx/kafkademo/blob/master/src/main/java/dyyx/ConsumerRebalanceTest.java

kafka客户端版本  1.1.0

重平衡相关的日志信息

2020-12-03 08:02:37 [main] WARN  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -[Consumer clientId=consumer-1, groupId=kafkademo_test_group] Synchronous auto-commit of offsets {kafka_demo_test_topic-2=OffsetAndMetadata{offset=1445, metadata=''}, kafka_demo_test_topic-1=OffsetAndMetadata{offset=1488, metadata=''}, kafka_demo_test_topic-0=OffsetAndMetadata{offset=1510, metadata=''}, kafka_demo_test_topic-5=OffsetAndMetadata{offset=1511, metadata=''}, kafka_demo_test_topic-4=OffsetAndMetadata{offset=1441, metadata=''}, kafka_demo_test_topic-3=OffsetAndMetadata{offset=1456, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
2020-12-03 08:02:37 [main] INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -[Consumer clientId=consumer-1, groupId=kafkademo_test_group] Revoking previously assigned partitions [kafka_demo_test_topic-2, kafka_demo_test_topic-1, kafka_demo_test_topic-0, kafka_demo_test_topic-5, kafka_demo_test_topic-4, kafka_demo_test_topic-3]
2020-12-03 08:02:37 [main] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator -[Consumer clientId=consumer-1, groupId=kafkademo_test_group] (Re-)joining group
2020-12-03 08:02:37 [main] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator -[Consumer clientId=consumer-1, groupId=kafkademo_test_group] Successfully joined group with generation 51
2020-12-03 08:02:37 [main] INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator -[Consumer clientId=consumer-1, groupId=kafkademo_test_group] Setting newly assigned partitions [kafka_demo_test_topic-2, kafka_demo_test_topic-1, kafka_demo_test_topic-0, kafka_demo_test_topic-5, kafka_demo_test_topic-4, kafka_demo_test_topic-3]


重平衡相关的关键日志信息 超时之后提交offset failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. Revoking previously assigned partitions (Re-)joining group Successfully joined group with generation 51 Setting newly assigned partitions
有专门的心跳线程,为何还要 设置 max.poll.interval.ms 避免活锁(livelock) 活锁是指应用没有故障但是由于某些原因不能进一步消费
Reblance 作用 让ConsumerGroup下的所有的Consumer 达成一致来分配订阅Topic的每个Partition 比如某个group下有3个consumer,订阅的topic 有 6个分区 ,为每个consumer分配2个分区 Coordinator GroupCoordinator 每个broker 启动一个GroupCoodinator 每一个GroupCoodinator 负责管理一部分消费者组 _consumer_offsets 默认50个分区 位移提交分区 Math.abs(hash(groupID)) % numPartitions 定期压缩,message key(groupID+topic+分区id),合并相同的key,保留最新的 partition leader 所在 Broker 即为该 Group 所对应的 GroupCoordinator GroupCoordinator 会存储与该 group 相关的所有的 Meta 信息 session.timeout.ms Coordinator 检测消费者失败的时间,在该时间内保持心跳即可 更小的值可以更快发现消费者崩溃,更快地开启重平衡,避免消费滞后,但这会导致频繁重平衡 max.poll.interval.ms 消费端 heartbeat.interval.ms 心跳时间间隔 ,应小于 session.timeout.ms 消费组 5 个状态 Empty 没有一个活跃的消费者 PreparingRebalance 准备进行重平衡 AwaitingSync 全部消费者都已经加入组并且正在进行重平衡,各个消费者等待分配分区方案 Stable 分区方案已经全部发送给消费者,消费者已经在正常消费 Dead 被 Coordinator 彻底废弃 重平衡主要的两个步骤 加入组(JoinGroup) 当消费者心跳包响应 REBALANCE_IN_PROGRESS 时,说明消费组正在重平衡,此时消费者会停止消费,并且发送请求加入消费组 同步更新分配方案 当 Coordinator 收到所有组内成员的加入组请求后,会选出一个consumer Leader,然后让consumer Leader进行分配 分配完后会将分配方案放入SyncGroup请求中发送给Coordinator Coordinator根据分配方案发送给每个消费者

上一篇     下一篇
kafka消息存储及索引机制

kafka消费机制要点

kafka消费端核心参数

Kafka网络模型

java nio 编程模型简介

kafka Coordinator 简介