Flink DataSet 数据源示例
所属分类 flink
浏览量 919
Source
InputFormat/RichInputFormat接口
CsvInputFormat TextInputFormat
ExecutionEnvironment
// 读取本地文件
DataSet localLines = env.readTextFile("file:///path/file");
// 读取HDSF文件
DataSet hdfsLines = env.readTextFile("hdfs://host:port/path/file");
DataSet localLines = env.readTextFileWithValue("file:///some/local/file");
DataSet hdfsLines = env.readTextFileWithValue("hdfs://host:port/file/path");
StringValue 是一种可变的String类型
// read a CSV file with five fields, taking only two of them
// 读取一个具有5个字段的CSV文件,只取第一个和第四个字段
DataSet> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
.includeFields("10010")
.types(String.class, Double.class);
// 读取一个有三个字段的CSV文件,将其转为POJO类型
DataSet> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
.pojoType(Person.class, "name", "age", "zipcode");
DataSet Data = env.readFileOfPrimitives("file:///some/local/file", String.class);
DataSet data = env.fromCollection(arrayList);
DataSet stringDataSource = env.fromElements("hello Flink What is Apache Flink");
DataSet longDataSource = env.generateSequence(1, 20);
env.readFile(new MyInputFormat(), "file:///some/local/file");
DataSet dbData =
env.createInput(
JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("jdbc:mysql://localhost/mydb")
.setQuery("select name, age from stu")
.setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
.finish()
);
上一篇
下一篇
git reset 和 diff 使用说明
git工作区域及常用命令
Flink DataSet 简介
Flink DataSet 数据输出 示例
Flink DataSet 数据转换示例
flink广播变量简介