首页  

Flink DataSet 数据输出 示例     所属分类 flink 浏览量 77
基本的编程模型
获取批处理的执行环境ExecutionEnvironment
加载数据源
转换操作
数据输出

数据输出分为三种类型
基于文件实现 write()方法 输出到文件系统中
基于通用存储介质实现, output()方法,例如使用JDBCOutputFormat将数据输出到关系型数据库中
客户端输出,将DataSet数据从不同的节点收集到Client,并在客户端中输出,例如DataSet的print()方法


Sink


// 文本数据
DataSet<String> textData = // [...]

// 写入本地文件
textData.writeAsText("file:///my/result/on/localFS");

// 写入HDFS文件
textData.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS");

// 写数据到本地文件,如果存在则覆盖
textData.writeAsText("file:///my/result/on/localFS", WriteMode.OVERWRITE);

// 将数据输出到本地的CSV文件,指定分隔符为"|"
DataSet<Tuple3<String, Integer, Double>> values = // [...]
values.writeAsCsv("file:///path/to/the/result/file", "\n", "|");

// 使用自定义的TextFormatter对象
values.writeAsFormattedText("file:///path/to/the/result/file",
    new TextFormatter<Tuple2<Integer, Integer>>() {
        public String format (Tuple2<Integer, Integer> value) {
            return value.f1 + " - " + value.f0;
        }
    });


DataSet<Tuple3<String, Integer, Double>> myResult = [...]

// 将tuple类型的数据写入关系型数据库
myResult.output(
    // 创建并配置OutputFormat
    JDBCOutputFormat.buildJDBCOutputFormat()
                    .setDrivername("com.mysql.jdbc.Driver")
                    .setDBUrl("jdbc:mysql://localhost/mydb")
                    .setQuery("insert into persons (name, age, height) values (?,?,?)")
                    .finish()
    );



上一篇     下一篇
git工作区域及常用命令

Flink DataSet 简介

Flink DataSet 数据源示例

Flink DataSet 数据转换示例

flink广播变量简介

spring事务隔离级别设置