首页  

flink checkpoint 检查点     所属分类 flink 浏览量 268
算子 状态 
中间结果 ValueState MapState等
对流中的某个字段进行累加  保存累计值 临时结果 
job挂了 ,内存中数据没了 
检查点  定期保存任务状态的机制

检查点用于错误恢复
数据源可重放(replay)
检查点状态持久化  主要是分布式文件系统


import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); //设置超时时间,过去500ms还没有完成,就认为这个检查点超时了,终止该检查点,不再使用这个检查点 env.getCheckpointConfig().setCheckpointTimeout(500); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000); //设置检查点最大并发数. 设置为1,表示只能有一个检查点,这个检查点完成以后才有可能产生下一个检查点 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //设置外部检查点 将检查点的元数据信息定期写入外部系统,job失败时,检查点不会被清除,可以从检查点恢复job env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.getCheckpointConfig().setPreferCheckpointForRecovery(true); env.getCheckpointConfig().enableUnalignedCheckpoints();
检查点保存配置 默认情况下,状态(state)保存在 TM (task manager )内存中,而检查点(checkpoint)保存 JM(job manager ) 内存中 ExternalizedCheckpointCleanup ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION 取消作业时保留检查点 ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION 取消作业时删除检查点,如果任务失败,检查点不会删除 , 任务失败可以从检查点恢复任务 检查点保存目录在 flink-conf.yml 文件中进行全局配置 ,也可以对某个应用进行单独配置 # yml中的配置是全局的,即每个应用都用这个路径 state.checkpoints.dir: hdfs:///checkpoints/ // 代码中对某个应用进行配置 env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data/")); 在配置的目录下,以jobid再分目录,每个jobid对应的目录下存放本应用的所有检查点

上一篇     下一篇
H2 web console

杭州市内游玩交通

Kube-Install 一键离线安装多K8S集群

Flink1.18.1 本地单机部署 及 flinkCDC3.0 测试

Skywalking 术语

linux安装切换多个版本jdk