首页  

Flink DataSet 数据源示例     所属分类 flink 浏览量 919
Source

InputFormat/RichInputFormat接口
CsvInputFormat TextInputFormat
ExecutionEnvironment



// 读取本地文件
DataSet<String> localLines = env.readTextFile("file:///path/file");
// 读取HDSF文件
DataSet<String> hdfsLines = env.readTextFile("hdfs://host:port/path/file");


DataSet<StringValue> localLines = env.readTextFileWithValue("file:///some/local/file");
DataSet<StringValue> hdfsLines = env.readTextFileWithValue("hdfs://host:port/file/path");

StringValue 是一种可变的String类型


// read a CSV file with five fields, taking only two of them
// 读取一个具有5个字段的CSV文件,只取第一个和第四个字段
DataSet<Tuple2<String, Double>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
                               .includeFields("10010")  
                              .types(String.class, Double.class);

// 读取一个有三个字段的CSV文件,将其转为POJO类型
DataSet<Person>> csvInput = env.readCsvFile("hdfs:///the/CSV/file")
                         .pojoType(Person.class, "name", "age", "zipcode");


DataSet<String> Data = env.readFileOfPrimitives("file:///some/local/file", String.class);

DataSet<String> data = env.fromCollection(arrayList);

DataSet<String> stringDataSource = env.fromElements("hello Flink What is Apache Flink");

DataSet<Long> longDataSource = env.generateSequence(1, 20);


env.readFile(new MyInputFormat(), "file:///some/local/file");


DataSet<Tuple2<String, Integer> dbData =
    env.createInput(
      JDBCInputFormat.buildJDBCInputFormat()
                     .setDrivername("com.mysql.jdbc.Driver")
                     .setDBUrl("jdbc:mysql://localhost/mydb")
                     .setQuery("select name, age from stu")
                     .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
                     .finish()
    );
    
    
  



上一篇     下一篇
git reset 和 diff 使用说明

git工作区域及常用命令

Flink DataSet 简介

Flink DataSet 数据输出 示例

Flink DataSet 数据转换示例

flink广播变量简介