flink checkpoint 检查点
所属分类 flink
浏览量 268
算子 状态
中间结果 ValueState MapState等
对流中的某个字段进行累加 保存累计值 临时结果
job挂了 ,内存中数据没了
检查点 定期保存任务状态的机制
检查点用于错误恢复
数据源可重放(replay)
检查点状态持久化 主要是分布式文件系统
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
//设置超时时间,过去500ms还没有完成,就认为这个检查点超时了,终止该检查点,不再使用这个检查点
env.getCheckpointConfig().setCheckpointTimeout(500);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
//设置检查点最大并发数. 设置为1,表示只能有一个检查点,这个检查点完成以后才有可能产生下一个检查点
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//设置外部检查点 将检查点的元数据信息定期写入外部系统,job失败时,检查点不会被清除,可以从检查点恢复job
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
env.getCheckpointConfig().enableUnalignedCheckpoints();
检查点保存配置
默认情况下,状态(state)保存在 TM (task manager )内存中,而检查点(checkpoint)保存 JM(job manager ) 内存中
ExternalizedCheckpointCleanup
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
取消作业时保留检查点
ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION
取消作业时删除检查点,如果任务失败,检查点不会删除 , 任务失败可以从检查点恢复任务
检查点保存目录在 flink-conf.yml 文件中进行全局配置 ,也可以对某个应用进行单独配置
# yml中的配置是全局的,即每个应用都用这个路径
state.checkpoints.dir: hdfs:///checkpoints/
// 代码中对某个应用进行配置
env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data/"));
在配置的目录下,以jobid再分目录,每个jobid对应的目录下存放本应用的所有检查点
上一篇
下一篇
H2 web console
杭州市内游玩交通
Kube-Install 一键离线安装多K8S集群
Flink1.18.1 本地单机部署 及 flinkCDC3.0 测试
Skywalking 术语
linux安装切换多个版本jdk