kafka Coordinator 简介
所属分类 kafka
浏览量 1029
Broker Consumer
Broker
kafka_2.12-1.1.0.jar
kafka.coordinator.group.GroupCoordinator
GroupCoordinator handles general group membership and offset management.
Each Kafka server instantiates a coordinator which is responsible for a set of groups.
Groups are assigned to coordinators based on their group names.
KafkaApis broker 所有请求的入口,和组相关的请求都由 GroupCoordinator 处理
GroupCoordinator broker端 Coordinator 实现,主要处理组成员变更请求,执行 rebalance 操作
GroupMetadata 组元数据信息,包含组的状态,内部成员信息
MemberMetadata 组成员元数据信息, 是否 leader 等
GroupMetadataManager
Rebalance
给消费者分配 partition
该过程由被选为 leader 的consumer执行
joinGroup syncGroup
简单过程
consumer 向 broker 发送 joinGroup 请求, (joinGroup send)
broker 收到 joinGroup 请求, 判断该 consumer 是否为 leader, 若为 leader 则返回当前所有成员的信息,若为 follower,则返回空。(joinGroup respond), group 进入 PreparingRebalance 状态
consumer 收到请求后,若为 leader,则根据返回的信息,执行 rebalance 的计算,计算完成后将 rebalance 结果通过 syncGroup 请求发送给 broker。若为 follower,则直接发送 syncGroup 请求。(joinGroup recv, syncGroup send)
broker 收到 syncGroup 请求,判断请求方是否为 leader,如果不是 leader,则等待直到 leader 将 rebalance 结果送达,如果请求方是 leader,则给所有的 syncGroup 请求返回 rebalance 的结果。 (syncGroup respond)
consumer 收到 syncGroup 结果,则调用相应的回调方法,(onPartitionRebalance) 按照最新的 rebalance 结果进行消费。(syncGroup recv)
consumer leader 如何选举 ?
heartbeat
五种状态
Empty PreparingRebalance CompletingRebalance Stable Dead
Empty 组内没有任何成员
PreparingRebalance 正在准备 rebalance,可以接收 joinGroup 请求
CompletingRebalance 正在等待 leader 分配结果
Stable coordinator 已收到 leader 发送的 rebalance 结果, 处于稳定状态
Dead 组里没有任何成员并且 metadata 被删除了
GroupMetadataManager
组元数据持久化,保证磁盘中有最新的元数据状态
PreparingRebalance->CompletingRebalance 和 CompletingRebalance->Stable 时将元数据持久化
broker 宕机恢复后,coordinator 通过 leader_and_isr 请求,在 partition leader 选举的过程中将组的元数据取出来
组协调器(GroupCoordinato)
offset提交分区
hash(consumerGroupId)%__consumer_offsets分区数 (默认50)
offset提交分区leader副本当作组协调器
Consumer
kafka-clients-1.1.0.jar
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator
AbstractCoordinator
AbstractCoordinator implements group management for a single group member by interacting with a designated Kafka broker (the coordinator).
Kafka's group management protocol consists of the following sequence of actions
Group Registration:
Group members register with the coordinator providing their own metadata(such as the set of topics they are interested in).
Group/Leader Selection:
The coordinator select the members of the group and chooses one member as the leader.
State Assignment:
The leader collects the metadata from all the members of the group and assigns state.
Group Stabilization:
Each member receives the state assigned by the leader and begins processing.
Note on locking: this class shares state between the caller and a background thread which is used for sending heartbeats after the client has joined the group.
All mutable state as well as state transitions are protected with the class's monitor.
Generally this means acquiring the lock before reading or writing the state of the group (e.g. generation, memberId) and holding the lock when sending a request that affects the state of the group (e.g. JoinGroup, LeaveGroup).
与心跳线程共享状态
可变状态变化 加锁保护
上一篇
下一篇
kafka消费分区重平衡机制
Kafka网络模型
java nio 编程模型简介
云原生时代的Java
进程io监控命令iopp
Stack ArrayDeque LinkedList