kafka auto.offset.reset 和 StartingOffsets
所属分类 kafka
浏览量 192
Kafka中
auto.offset.reset 和 startingOffsets
消费者(Consumer)读取数据时的起始偏移量(offset)
auto.offset.reset:
消费者配置参数,没有找到初始偏移量或当前偏移量不存在时 ,指定 消费者 如何设置 offset
earliest(从最早的偏移量开始读取)、latest(从最新的偏移量开始读取)和 none(没有找到则抛出异常)
当消费者首次连接到Kafka集群或重新连接并且没有为特定分区 设置偏移量时,此参数将决定消费者的起始位置。
startingOffsets:
某些Kafka客户端库(特别是与结构化流相关的库,如Spark的Kafka连接器)中,
startingOffsets提供了一种更灵活的方式来指定消费者应从哪里开始读取数据。
通过startingOffsets,可以为每个分区指定一个明确的起始偏移量,而不是依赖于auto.offset.reset的默认行为。
在复杂的场景中,如需要恢复到特定时间点或特定消息时的场景,更加灵活和可控。
注意:不是所有的Kafka客户端库都支持startingOffsets这样的配置选项。
一旦消费者开始读取数据并提交偏移量,这些偏移量就会用于后续的读取操作,不再受auto.offset.reset或startingOffsets的影响。
KafkaSource< String> kafkaSource = KafkaSource.< String>builder()
.setBootstrapServers(kafka_server)
.setTopics("business-log-cmcf-cp").setGroupId("flinkGroup")
// latest earliest
.setStartingOffsets(OffsetsInitializer.latest())
.setProperty("enable.auto.commit","true")
.setProperty("auto.offset.reset","latest")
.setProperty("auto.commit.interval.ms","1000")
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
上一篇
下一篇
M0 M1 M2 简介
activiti 与 信贷风控决策流
Activiti Flowable LiteFlow 对比
AI 工具
ChatGPT Midjourney与Stable Diffusion
mac 安装 mongodb