首页  

kafka auto.offset.reset 和 StartingOffsets     所属分类 kafka 浏览量 142
Kafka中
auto.offset.reset 和 startingOffsets 
消费者(Consumer)读取数据时的起始偏移量(offset) 

auto.offset.reset:
消费者配置参数,没有找到初始偏移量或当前偏移量不存在时 ,指定 消费者 如何设置 offset
earliest(从最早的偏移量开始读取)、latest(从最新的偏移量开始读取)和 none(没有找到则抛出异常)
当消费者首次连接到Kafka集群或重新连接并且没有为特定分区 设置偏移量时,此参数将决定消费者的起始位置。

startingOffsets:
某些Kafka客户端库(特别是与结构化流相关的库,如Spark的Kafka连接器)中,
startingOffsets提供了一种更灵活的方式来指定消费者应从哪里开始读取数据。
通过startingOffsets,可以为每个分区指定一个明确的起始偏移量,而不是依赖于auto.offset.reset的默认行为。
在复杂的场景中,如需要恢复到特定时间点或特定消息时的场景,更加灵活和可控。

注意:不是所有的Kafka客户端库都支持startingOffsets这样的配置选项。

一旦消费者开始读取数据并提交偏移量,这些偏移量就会用于后续的读取操作,不再受auto.offset.reset或startingOffsets的影响。



KafkaSource< String> kafkaSource = KafkaSource.< String>builder()
.setBootstrapServers(kafka_server)
.setTopics("business-log-cmcf-cp").setGroupId("flinkGroup")
// latest  earliest
.setStartingOffsets(OffsetsInitializer.latest())

.setProperty("enable.auto.commit","true")
.setProperty("auto.offset.reset","latest")
.setProperty("auto.commit.interval.ms","1000")

.setValueOnlyDeserializer(new SimpleStringSchema())
.build();

上一篇     下一篇
M0 M1 M2 简介

activiti 与 信贷风控决策流

Activiti Flowable LiteFlow 对比

AI 工具

ChatGPT Midjourney与Stable Diffusion

mac 安装 mongodb