首页  

kafka消费者offset记录位置和方式     所属分类 kafka 浏览量 278
offset 保留消息消费进度 ,存储的位置依赖使用的api

kafka.javaapi.consumer.ConsumerConnector
通过配置参数 zookeeper.connect 来消费
offset 存储位置 consumers/{group}/offsets/{topic}/{partition}

譬如 /consumers/client_test_topic_group1/offsets/client_test_topic/0


org.apache.kafka.clients.consumer.KafkaConsumer
配置参数 bootstrap.servers 来消费

offset 更新到 kafka自带的topic  __consumer_offsets 下面

0.10版本后,引入 __consumer_offsets  ,写入消息key由groupid topic partition 组成,value是offset
清理策略compact ,保留最新的key 
一般情况,每个key的offset缓存在内存中,查询的时候不用遍历partition,
如果没有缓存,第一次会遍历partition建立缓存,然后查询返回

consumer group位移信息写入__consumers_offsets的哪个partition,具体计算公式

__consumers_offsets partition =
           Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)   
//groupMetadataTopicPartitionCount 由 offsets.topic.num.partitions 指定,默认50个分区



kafka自带工具 kafka-consumer-offset-checker

Group  Topic   Pid  Offset  logSize Lag  Owner

offset更新方式 自动提交 手动提交

自动提交 enable.auto.commit=true , 更新频率 auto.commit.interval.ms 。fetch到消息后就更新offset,不管是否消费成功。
手动提交 enable.auto.commit=false, 调用方法 consumer.commitSync() 手动更新offset

上一篇     下一篇
两个程序员的故事

本次互联网寒冬的真实原因

zookeeper状态信息stat字段说明

KPI与KOR

elasticsearch5.0基本概念

elasticsearch5.0术语