首页  

Flink DataSet 数据转换示例     所属分类 flink 浏览量 740
DataSet转换 转换 transformations 



Map

DataSource<String> source = env.fromElements("I", "like", "flink");
source.map(new MapFunction<String, String>() {
            @Override
            // 将数据转为大写
            public String map(String value) throws Exception {
                return value.toUpperCase();
            }
        }).print();


FlatMap
输入一个元素,产生0个、1个或多个元素

stringDataSource
               .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        String[] split = value.split(" ");
                        for (String word : split) {
                            out.collect(Tuple2.of(word, 1));
                        }
                    }
                })
                .groupBy(0)
                .sum(1);


MapPartition


source.mapPartition(new MapPartitionFunction<String, Long>() {
            @Override
            public void mapPartition(Iterable<String> values, Collector<Long> out) throws Exception {
                long c = 0;
                for (String value : values) {
                    c++;
                }
                //输出每个分区元素个数
                out.collect(c);
            }
        }).print();

Filter

DataSource<Long> source = env.fromElements(1L, 2L, 3L,4L,5L);
source.filter(new FilterFunction<Long>() {
            @Override
            public boolean filter(Long value) throws Exception {
                return value % 2 == 0;
            }
        }).print();


Project

DataSource<Tuple3<Long, Integer, String>> source = env.fromElements(
                Tuple3.of(1L, 20, "tom"), 
                Tuple3.of(2L, 25, "jack"), 
                Tuple3.of(3L, 22, "bob"));
        // 去第一个和第三个元素
        source.project(0, 2).print();


Reduce
DataSource<Tuple2<String, Integer>> source = env.fromElements(
                Tuple2.of("Flink", 1),
                Tuple2.of("Flink", 1),
                Tuple2.of("Hadoop", 1),
                Tuple2.of("Spark", 1),
                Tuple2.of("Flink", 1));
        source
                .groupBy(0)
                .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                return Tuple2.of(value1.f0, value1.f1 + value2.f1);
            }
        }).print();


ReduceGroup
DataSource<Tuple2<String, Long>> source = env.fromElements(
                Tuple2.of("Flink", 1L),
                Tuple2.of("Flink", 1L),
                Tuple2.of("Hadoop", 1L),
                Tuple2.of("Spark", 1L),
                Tuple2.of("Flink", 1L));
source
                .groupBy(0)
                .reduceGroup(new GroupReduceFunction<Tuple2<String,Long>, Tuple2<String,Long>>() {
                    @Override
                    public void reduce(Iterable<Tuple2<String, Long>> values, Collector<Tuple2<String, Long>> out) throws Exception {
                        Long sum = 0L;
                        String word = "";
                        for(Tuple2<String, Long> value:values){
                            sum += value.f1;
                            word = value.f0;

                        }
                        out.collect(Tuple2.of(word,sum));
                    }
                }).print();


Aggregate
DataSource<Tuple2<String, Long>> source = env.fromElements(
                Tuple2.of("Flink", 1L),
                Tuple2.of("Flink", 1L),
                Tuple2.of("Hadoop", 1L),
                Tuple2.of("Spark", 1L),
                Tuple2.of("Flink", 1L));
                
source.groupBy(0)
     .aggregate(SUM,1)// 按第2个值求和
     .print();



Distinct

DataSource<Tuple> source = env.fromElements(Tuple1.of("Flink"),Tuple1.of("Flink"),Tuple1.of("hadoop"));
// 按照tuple的第一个字段去重
source.distinct(0).print();


Join
DataSource<Tuple2<Integer,String>> source1 = env.fromElements(
                Tuple2.of(1,"jack"),
                Tuple2.of(2,"tom"),
                Tuple2.of(3,"Bob"));
                
DataSource<Tuple2<String, Integer>> source2 = env.fromElements(
                Tuple2.of("order1", 1),
                Tuple2.of("order2", 2),
                Tuple2.of("order3", 3));

source1.join(source2).where(0).equalTo(1).print();

自定义Join Funciton

// 用户id,购买商品名称,购买商品数量
DataSource<Tuple3<Integer,String,Integer>> source1 = env.fromElements(
                Tuple3.of(1,"item1",2),
                Tuple3.of(2,"item2",3),
                Tuple3.of(3,"item3",4));
//商品名称与商品单价
DataSource<Tuple2<String, Integer>> source2 = env.fromElements(
                Tuple2.of("item1", 10),
                Tuple2.of("item2", 20),
                Tuple2.of("item3", 15));

source1.join(source2)
                .where(1)
                .equalTo(0)
                .with(new JoinFunction<Tuple3<Integer,String,Integer>, Tuple2<String,Integer>, Tuple3<Integer,String,Double>>() {
                    // 用户每种商品购物总金额
                    @Override
                    public Tuple3<Integer, String, Double> join(Tuple3<Integer, String, Integer> first, Tuple2<String, Integer> second) throws Exception {
                        return Tuple3.of(first.f0,first.f1,first.f2 * second.f1.doubleValue());
                    }
                }).print();


通过Size Hint标记数据集的大小,根据用户给定的hint(提示)调整计算策略
例如可以使用joinWithTiny或joinWithHuge提示第二个数据集的大小

DataSet<Tuple2<Integer, String>> input1 = // [...]
DataSet<Tuple2<Integer, String>> input2 = // [...]

DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
            result1 =
            // 提示第二个数据集为小数据集
            input1.joinWithTiny(input2)
                  .where(0)
                  .equalTo(0);

DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
            result2 =
            // h提示第二个数据集为大数据集
            input1.joinWithHuge(input2)
                  .where(0)
                  .equalTo(0);

可以使用多种方式执行join 可指定策略
DataSet<SomeType> input1 = // [...]
DataSet<AnotherType> input2 = // [...]
// 广播第一个输入并从中构建一个哈希表,第二个输入将对其进行探测,适用于第一个数据集非常小的场景
DataSet<Tuple2<SomeType, AnotherType> result =
      input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
            .where("id").equalTo("key");
// 广播第二个输入并从中构建一个哈希表,第一个输入将对其进行探测,适用于第二个数据集非常小的场景
DataSet<Tuple2<SomeType, AnotherType> result =
      input1.join(input2, JoinHint.BROADCAST_HASH_SECOND)
            .where("id").equalTo("key");
// 将两个数据集重新分区,并将第一个数据集转换成哈希表,适用于第一个数据集比第二个数据集小,但两个数据集都比较大的场景
DataSet<Tuple2<SomeType, AnotherType> result =
      input1.join(input2, JoinHint.REPARTITION_HASH_FIRST)
            .where("id").equalTo("key");
// 将两个数据集重新分区,并将第二个数据集转换成哈希表,适用于第二个数据集比第一个数据集小,但两个数据集都比较大的场景
DataSet<Tuple2<SomeType, AnotherType> result =
      input1.join(input2, JoinHint.REPARTITION_HASH_SECOND)
            .where("id").equalTo("key");
// 将两个数据集重新分区,并将每个分区排序,适用于两个数据集都已经排好序的场景
DataSet<Tuple2<SomeType, AnotherType> result =
      input1.join(input2, JoinHint.REPARTITION_SORT_MERGE)
            .where("id").equalTo("key");
// 相当于不指定,有系统自行处理
DataSet<Tuple2<SomeType, AnotherType> result =
      input1.join(input2, JoinHint.OPTIMIZER_CHOOSES)
            .where("id").equalTo("key");





OuterJoin
//左外连接
source1.leftOuterJoin(source2).where(1).equalTo(0);
//右外链接
source1.rightOuterJoin(source2).where(1).equalTo(0);


外连接也支持算法提示,可以跟据左右数据集的分布情况选择合适的优化策略,提升数据处理效率
DataSet<SomeType> input1 = // [...]
DataSet<AnotherType> input2 = // [...]
DataSet<Tuple2<SomeType, AnotherType> result1 =
      input1.leftOuterJoin(input2, JoinHint.REPARTITION_SORT_MERGE)
            .where("id").equalTo("key");

DataSet<Tuple2<SomeType, AnotherType> result2 =
      input1.rightOuterJoin(input2, JoinHint.BROADCAST_HASH_FIRST)
            .where("id").equalTo("key");


每种外连接只支持部分算法

LeftOuterJoin支持

OPTIMIZER_CHOOSES
BROADCAST_HASH_SECOND
REPARTITION_HASH_SECOND
REPARTITION_SORT_MERGE


RightOuterJoin支持

OPTIMIZER_CHOOSES
BROADCAST_HASH_FIRST
REPARTITION_HASH_FIRST
REPARTITION_SORT_MERGE


FullOuterJoin支持

OPTIMIZER_CHOOSES
REPARTITION_SORT_MERGE




CoGroup
CoGroup 对分组之后的DataSet进行join操作,将两个DataSet数据集合并在一起,
先对每个DataSet按照key进行分组,然后将分组之后的DataSet传输到用户定义的CoGroupFunction,
将两个数据集根据相同的Key记录组合在一起,相同Key的记录会存放在一个Group中




// 用户id,购买商品名称,购买商品数量
DataSource<Tuple3<Integer,String,Integer>> source1 = env.fromElements(
                Tuple3.of(1,"item1",2),
                Tuple3.of(2,"item2",3),
                Tuple3.of(3,"item2",4));
//商品名称与商品单价
DataSource<Tuple2<String, Integer>> source2 = env.fromElements(
                Tuple2.of("item1", 10),
                Tuple2.of("item2", 20),
                Tuple2.of("item3", 15));

source1.coGroup(source2)
                .where(1)
                .equalTo(0)
                .with(new CoGroupFunction<Tuple3<Integer,String,Integer>, Tuple2<String,Integer>, Tuple2<String,Double>>() {
                    // 每个Iterable存储的是分好组的数据,即相同key的数据组织在一起
                    @Override
                    public void coGroup(Iterable<Tuple3<Integer, String, Integer>> first, Iterable<Tuple2<String, Integer>> second, Collector<Tuple2<String, Double>> out) throws Exception {
                        //存储每种商品购买数量
                        int sum = 0;
                        for(Tuple3<Integer, String, Integer> val1:first){
                        sum += val1.f2;

                    }
                    // 每种商品数量 * 商品单价
                    for(Tuple2<String, Integer> val2:second){
                        out.collect(Tuple2.of(val2.f0,sum * val2.f1.doubleValue()));

                        }
                    }
                }).print();


Cross
笛卡儿积
//[id,x,y],坐标值
DataSet<Tuple3<Integer, Integer, Integer>> coords1 = env.fromElements(
                Tuple3.of(1, 20, 18),
                Tuple3.of(2, 15, 20),
                Tuple3.of(3, 25, 10));
DataSet<Tuple3<Integer, Integer, Integer>> coords2 = env.fromElements(
                Tuple3.of(1, 20, 18),
                Tuple3.of(2, 15, 20),
                Tuple3.of(3, 25, 10));
// 求任意两点之间的欧氏距离

coords1.cross(coords2)
                .with(new CrossFunction<Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Integer>, Tuple3<Integer, Integer, Double>>() {
                    @Override
                    public Tuple3<Integer, Integer, Double> cross(Tuple3<Integer, Integer, Integer> val1, Tuple3<Integer, Integer, Integer> val2) throws Exception {
                        // 计算欧式距离
                        double dist = sqrt(pow(val1.f1 - val2.f1, 2) + pow(val1.f2 - val2.f2, 2));
                        // 返回两点之间的欧式距离
                        return Tuple3.of(val1.f0,val2.f0,dist);
                    }
                }).print();


Union

DataSet<Tuple2<String, Integer>> vals1 = env.fromElements(
                Tuple2.of("jack",20),
                Tuple2.of("Tom",21));
DataSet<Tuple2<String, Integer>> vals2 = env.fromElements(
                Tuple2.of("Robin",25),
                Tuple2.of("Bob",30));
DataSet<Tuple2<String, Integer>> vals3 = env.fromElements(
                Tuple2.of("Jasper",24),
                Tuple2.of("jarry",21));
DataSet<Tuple2<String, Integer>> unioned = vals1
                .union(vals2)
                .union(vals3);
unioned.print();

Rebalance
DataSet<String> in = // [...]
// rebalance DataSet,然后使用map算子.
DataSet<Tuple2<String, String>> out = in.rebalance().map(new Mapper());



Hash-Partition
根据给定的Key进行Hash分区,key相同的数据会被放入同一个分区内

DataSet<Tuple2<String, Integer>> in = // [...]
// 根据第一个值进行hash分区,然后使用 MapPartition转换操作.
DataSet<Tuple2<String, String>> out = in.partitionByHash(0)
                                        .mapPartition(new PartitionMapper());



Range-Partition
根据给定的Key进行Range分区,key相同的数据会被放入同一个分区内

DataSet<Tuple2<String, Integer>> in = // [...]
// 根据第一个值进行Range分区,然后使用 MapPartition转换操作.
DataSet<Tuple2<String, String>> out = in.partitionByRange(0)
                                        .mapPartition(new PartitionMapper());



Custom Partitioning
自定义分区函数
DataSet<Tuple2<String,Integer>> in = // [...]
DataSet<Integer> result = in.partitionCustom(partitioner, key)
                            .mapPartition(new PartitionMapper());


Sort Partition
在本地对DataSet数据集中的所有分区根据指定字段进行重排序,
排序方式通过Order.ASCENDING以及Order.DESCENDING关键字指定
支持指定多个字段进行分区排序

DataSet<Tuple2<String, Integer>> in = // [...]
// 按照第一个字段升序排列,第二个字段降序排列.
DataSet<Tuple2<String, String>> out = in.sortPartition(1, Order.ASCENDING)
                                        .sortPartition(0, Order.DESCENDING)
                                        .mapPartition(new PartitionMapper());


First-n


DataSet<Tuple2<String, Integer>> in = // [...]
// 返回数据集中的任意5个元素
DataSet<Tuple2<String, Integer>> out1 = in.first(5);
//返回每个分组内的任意两个元素
DataSet<Tuple2<String, Integer>> out2 = in.groupBy(0)
                                          .first(2);
// 返回每个分组内的前三个元素
// 分组后的数据集按照第二个字段进行升序排序
DataSet<Tuple2<String, Integer>> out3 = in.groupBy(0)
                                          .sortGroup(1, Order.ASCENDING)
                                          .first(3);


MinBy / MaxBy
从数据集中返回指定字段或组合对应最小或最大的记录,如果选择的字段具有多个相同值,则在集合中随机选择一条记录返回。

DataSet<Tuple2<String, Integer>> source = env.fromElements(
                Tuple2.of("jack",20),
                Tuple2.of("Tom",21),
                Tuple2.of("Robin",25),
                Tuple2.of("Bob",30));
// 按照第2个元素比较,找出第二个元素为最小值的那个tuple
// 在整个DataSet上使用minBy
ReduceOperator<Tuple2<String, Integer>> tuple2Reduce = source.minBy(1);
tuple2Reduce.print();// 返回(jack,20)

// 也可以在分组的DataSet上使用minBy
source.groupBy(0) // 按照第一个字段进行分组
      .minBy(1)  // 找出每个分组内的按照第二个元素为最小值的那个tuple
      .print();




上一篇     下一篇
Flink DataSet 简介

Flink DataSet 数据源示例

Flink DataSet 数据输出 示例

flink广播变量简介

spring事务隔离级别设置

mysql show processlist State 含义