kafka消费者offset记录位置和方式
所属分类 kafka
浏览量 1688
offset 保留消息消费进度 ,存储的位置依赖使用的api
kafka.javaapi.consumer.ConsumerConnector
通过配置参数 zookeeper.connect 来消费
offset 存储位置 consumers/{group}/offsets/{topic}/{partition}
譬如 /consumers/client_test_topic_group1/offsets/client_test_topic/0
org.apache.kafka.clients.consumer.KafkaConsumer
配置参数 bootstrap.servers 来消费
offset 更新到 kafka自带的topic __consumer_offsets 下面
0.10版本后,引入 __consumer_offsets ,写入消息key由groupid topic partition 组成,value是offset
清理策略compact ,保留最新的key
一般情况,每个key的offset缓存在内存中,查询的时候不用遍历partition,
如果没有缓存,第一次会遍历partition建立缓存,然后查询返回
consumer group位移信息写入__consumers_offsets的哪个partition,具体计算公式
__consumers_offsets partition =
Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)
//groupMetadataTopicPartitionCount 由 offsets.topic.num.partitions 指定,默认50个分区
kafka自带工具 kafka-consumer-offset-checker
Group Topic Pid Offset logSize Lag Owner
offset更新方式 自动提交 手动提交
自动提交 enable.auto.commit=true , 更新频率 auto.commit.interval.ms 。fetch到消息后就更新offset,不管是否消费成功。
手动提交 enable.auto.commit=false, 调用方法 consumer.commitSync() 手动更新offset
上一篇
下一篇
两个程序员的故事
本次互联网寒冬的真实原因
zookeeper状态信息stat字段说明
KPI与KOR
elasticsearch5.0基本概念
elasticsearch5.0术语