Flink DataSet 数据输出 示例
所属分类 flink
浏览量 949
基本的编程模型
获取批处理的执行环境ExecutionEnvironment
加载数据源
转换操作
数据输出
数据输出分为三种类型
基于文件实现 write()方法 输出到文件系统中
基于通用存储介质实现, output()方法,例如使用JDBCOutputFormat将数据输出到关系型数据库中
客户端输出,将DataSet数据从不同的节点收集到Client,并在客户端中输出,例如DataSet的print()方法
Sink
// 文本数据
DataSet textData = // [...]
// 写入本地文件
textData.writeAsText("file:///my/result/on/localFS");
// 写入HDFS文件
textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS");
// 写数据到本地文件,如果存在则覆盖
textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE);
// 将数据输出到本地的CSV文件,指定分隔符为"|"
DataSet> values = // [...]
values.writeAsCsv("file:///path/to/the/result/file", "\n", "|");
// 使用自定义的TextFormatter对象
values.writeAsFormattedText("file:///path/to/the/result/file",
new TextFormatter>() {
public String format (Tuple2 value) {
return value.f1 + " - " + value.f0;
}
});
DataSet> myResult = [...]
// 将tuple类型的数据写入关系型数据库
myResult.output(
// 创建并配置OutputFormat
JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://localhost/mydb")
.setQuery("insert into persons (name, age, height) values (?,?,?)")
.finish()
);
上一篇
下一篇
git工作区域及常用命令
Flink DataSet 简介
Flink DataSet 数据源示例
Flink DataSet 数据转换示例
flink广播变量简介
spring事务隔离级别设置