Flink DataSet 数据转换示例
所属分类 flink
浏览量 873
DataSet转换 转换 transformations
Map
DataSource source = env.fromElements("I", "like", "flink");
source.map(new MapFunction() {
@Override
// 将数据转为大写
public String map(String value) throws Exception {
return value.toUpperCase();
}
}).print();
FlatMap
输入一个元素,产生0个、1个或多个元素
stringDataSource
.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String value, Collector> out) throws Exception {
String[] split = value.split(" ");
for (String word : split) {
out.collect(Tuple2.of(word, 1));
}
}
})
.groupBy(0)
.sum(1);
MapPartition
source.mapPartition(new MapPartitionFunction() {
@Override
public void mapPartition(Iterable values, Collector out) throws Exception {
long c = 0;
for (String value : values) {
c++;
}
//输出每个分区元素个数
out.collect(c);
}
}).print();
Filter
DataSource source = env.fromElements(1L, 2L, 3L,4L,5L);
source.filter(new FilterFunction() {
@Override
public boolean filter(Long value) throws Exception {
return value % 2 == 0;
}
}).print();
Project
DataSource> source = env.fromElements(
Tuple3.of(1L, 20, "tom"),
Tuple3.of(2L, 25, "jack"),
Tuple3.of(3L, 22, "bob"));
// 去第一个和第三个元素
source.project(0, 2).print();
Reduce
DataSource> source = env.fromElements(
Tuple2.of("Flink", 1),
Tuple2.of("Flink", 1),
Tuple2.of("Hadoop", 1),
Tuple2.of("Spark", 1),
Tuple2.of("Flink", 1));
source
.groupBy(0)
.reduce(new ReduceFunction>() {
@Override
public Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception {
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}
}).print();
ReduceGroup
DataSource> source = env.fromElements(
Tuple2.of("Flink", 1L),
Tuple2.of("Flink", 1L),
Tuple2.of("Hadoop", 1L),
Tuple2.of("Spark", 1L),
Tuple2.of("Flink", 1L));
source
.groupBy(0)
.reduceGroup(new GroupReduceFunction, Tuple2>() {
@Override
public void reduce(Iterable> values, Collector> out) throws Exception {
Long sum = 0L;
String word = "";
for(Tuple2 value:values){
sum += value.f1;
word = value.f0;
}
out.collect(Tuple2.of(word,sum));
}
}).print();
Aggregate
DataSource> source = env.fromElements(
Tuple2.of("Flink", 1L),
Tuple2.of("Flink", 1L),
Tuple2.of("Hadoop", 1L),
Tuple2.of("Spark", 1L),
Tuple2.of("Flink", 1L));
source.groupBy(0)
.aggregate(SUM,1)// 按第2个值求和
.print();
Distinct
DataSource source = env.fromElements(Tuple1.of("Flink"),Tuple1.of("Flink"),Tuple1.of("hadoop"));
// 按照tuple的第一个字段去重
source.distinct(0).print();
Join
DataSource> source1 = env.fromElements(
Tuple2.of(1,"jack"),
Tuple2.of(2,"tom"),
Tuple2.of(3,"Bob"));
DataSource> source2 = env.fromElements(
Tuple2.of("order1", 1),
Tuple2.of("order2", 2),
Tuple2.of("order3", 3));
source1.join(source2).where(0).equalTo(1).print();
自定义Join Funciton
// 用户id,购买商品名称,购买商品数量
DataSource> source1 = env.fromElements(
Tuple3.of(1,"item1",2),
Tuple3.of(2,"item2",3),
Tuple3.of(3,"item3",4));
//商品名称与商品单价
DataSource> source2 = env.fromElements(
Tuple2.of("item1", 10),
Tuple2.of("item2", 20),
Tuple2.of("item3", 15));
source1.join(source2)
.where(1)
.equalTo(0)
.with(new JoinFunction, Tuple2, Tuple3>() {
// 用户每种商品购物总金额
@Override
public Tuple3 join(Tuple3 first, Tuple2 second) throws Exception {
return Tuple3.of(first.f0,first.f1,first.f2 * second.f1.doubleValue());
}
}).print();
通过Size Hint标记数据集的大小,根据用户给定的hint(提示)调整计算策略
例如可以使用joinWithTiny或joinWithHuge提示第二个数据集的大小
DataSet> input1 = // [...]
DataSet> input2 = // [...]
DataSet, Tuple2>>
result1 =
// 提示第二个数据集为小数据集
input1.joinWithTiny(input2)
.where(0)
.equalTo(0);
DataSet, Tuple2>>
result2 =
// h提示第二个数据集为大数据集
input1.joinWithHuge(input2)
.where(0)
.equalTo(0);
可以使用多种方式执行join 可指定策略
DataSet input1 = // [...]
DataSet input2 = // [...]
// 广播第一个输入并从中构建一个哈希表,第二个输入将对其进行探测,适用于第一个数据集非常小的场景
DataSet result =
input1.join(input2, JoinHint.BROADCAST_HASH_FIRST)
.where("id").equalTo("key");
// 广播第二个输入并从中构建一个哈希表,第一个输入将对其进行探测,适用于第二个数据集非常小的场景
DataSet result =
input1.join(input2, JoinHint.BROADCAST_HASH_SECOND)
.where("id").equalTo("key");
// 将两个数据集重新分区,并将第一个数据集转换成哈希表,适用于第一个数据集比第二个数据集小,但两个数据集都比较大的场景
DataSet result =
input1.join(input2, JoinHint.REPARTITION_HASH_FIRST)
.where("id").equalTo("key");
// 将两个数据集重新分区,并将第二个数据集转换成哈希表,适用于第二个数据集比第一个数据集小,但两个数据集都比较大的场景
DataSet result =
input1.join(input2, JoinHint.REPARTITION_HASH_SECOND)
.where("id").equalTo("key");
// 将两个数据集重新分区,并将每个分区排序,适用于两个数据集都已经排好序的场景
DataSet result =
input1.join(input2, JoinHint.REPARTITION_SORT_MERGE)
.where("id").equalTo("key");
// 相当于不指定,有系统自行处理
DataSet result =
input1.join(input2, JoinHint.OPTIMIZER_CHOOSES)
.where("id").equalTo("key");
OuterJoin
//左外连接
source1.leftOuterJoin(source2).where(1).equalTo(0);
//右外链接
source1.rightOuterJoin(source2).where(1).equalTo(0);
外连接也支持算法提示,可以跟据左右数据集的分布情况选择合适的优化策略,提升数据处理效率
DataSet input1 = // [...]
DataSet input2 = // [...]
DataSet result1 =
input1.leftOuterJoin(input2, JoinHint.REPARTITION_SORT_MERGE)
.where("id").equalTo("key");
DataSet result2 =
input1.rightOuterJoin(input2, JoinHint.BROADCAST_HASH_FIRST)
.where("id").equalTo("key");
每种外连接只支持部分算法
LeftOuterJoin支持
OPTIMIZER_CHOOSES
BROADCAST_HASH_SECOND
REPARTITION_HASH_SECOND
REPARTITION_SORT_MERGE
RightOuterJoin支持
OPTIMIZER_CHOOSES
BROADCAST_HASH_FIRST
REPARTITION_HASH_FIRST
REPARTITION_SORT_MERGE
FullOuterJoin支持
OPTIMIZER_CHOOSES
REPARTITION_SORT_MERGE
CoGroup
CoGroup 对分组之后的DataSet进行join操作,将两个DataSet数据集合并在一起,
先对每个DataSet按照key进行分组,然后将分组之后的DataSet传输到用户定义的CoGroupFunction,
将两个数据集根据相同的Key记录组合在一起,相同Key的记录会存放在一个Group中
// 用户id,购买商品名称,购买商品数量
DataSource> source1 = env.fromElements(
Tuple3.of(1,"item1",2),
Tuple3.of(2,"item2",3),
Tuple3.of(3,"item2",4));
//商品名称与商品单价
DataSource> source2 = env.fromElements(
Tuple2.of("item1", 10),
Tuple2.of("item2", 20),
Tuple2.of("item3", 15));
source1.coGroup(source2)
.where(1)
.equalTo(0)
.with(new CoGroupFunction, Tuple2, Tuple2>() {
// 每个Iterable存储的是分好组的数据,即相同key的数据组织在一起
@Override
public void coGroup(Iterable> first, Iterable> second, Collector> out) throws Exception {
//存储每种商品购买数量
int sum = 0;
for(Tuple3 val1:first){
sum += val1.f2;
}
// 每种商品数量 * 商品单价
for(Tuple2 val2:second){
out.collect(Tuple2.of(val2.f0,sum * val2.f1.doubleValue()));
}
}
}).print();
Cross
笛卡儿积
//[id,x,y],坐标值
DataSet> coords1 = env.fromElements(
Tuple3.of(1, 20, 18),
Tuple3.of(2, 15, 20),
Tuple3.of(3, 25, 10));
DataSet> coords2 = env.fromElements(
Tuple3.of(1, 20, 18),
Tuple3.of(2, 15, 20),
Tuple3.of(3, 25, 10));
// 求任意两点之间的欧氏距离
coords1.cross(coords2)
.with(new CrossFunction, Tuple3, Tuple3>() {
@Override
public Tuple3 cross(Tuple3 val1, Tuple3 val2) throws Exception {
// 计算欧式距离
double dist = sqrt(pow(val1.f1 - val2.f1, 2) + pow(val1.f2 - val2.f2, 2));
// 返回两点之间的欧式距离
return Tuple3.of(val1.f0,val2.f0,dist);
}
}).print();
Union
DataSet> vals1 = env.fromElements(
Tuple2.of("jack",20),
Tuple2.of("Tom",21));
DataSet> vals2 = env.fromElements(
Tuple2.of("Robin",25),
Tuple2.of("Bob",30));
DataSet> vals3 = env.fromElements(
Tuple2.of("Jasper",24),
Tuple2.of("jarry",21));
DataSet> unioned = vals1
.union(vals2)
.union(vals3);
unioned.print();
Rebalance
DataSet in = // [...]
// rebalance DataSet,然后使用map算子.
DataSet> out = in.rebalance().map(new Mapper());
Hash-Partition
根据给定的Key进行Hash分区,key相同的数据会被放入同一个分区内
DataSet> in = // [...]
// 根据第一个值进行hash分区,然后使用 MapPartition转换操作.
DataSet> out = in.partitionByHash(0)
.mapPartition(new PartitionMapper());
Range-Partition
根据给定的Key进行Range分区,key相同的数据会被放入同一个分区内
DataSet> in = // [...]
// 根据第一个值进行Range分区,然后使用 MapPartition转换操作.
DataSet> out = in.partitionByRange(0)
.mapPartition(new PartitionMapper());
Custom Partitioning
自定义分区函数
DataSet> in = // [...]
DataSet result = in.partitionCustom(partitioner, key)
.mapPartition(new PartitionMapper());
Sort Partition
在本地对DataSet数据集中的所有分区根据指定字段进行重排序,
排序方式通过Order.ASCENDING以及Order.DESCENDING关键字指定
支持指定多个字段进行分区排序
DataSet> in = // [...]
// 按照第一个字段升序排列,第二个字段降序排列.
DataSet> out = in.sortPartition(1, Order.ASCENDING)
.sortPartition(0, Order.DESCENDING)
.mapPartition(new PartitionMapper());
First-n
DataSet> in = // [...]
// 返回数据集中的任意5个元素
DataSet> out1 = in.first(5);
//返回每个分组内的任意两个元素
DataSet> out2 = in.groupBy(0)
.first(2);
// 返回每个分组内的前三个元素
// 分组后的数据集按照第二个字段进行升序排序
DataSet> out3 = in.groupBy(0)
.sortGroup(1, Order.ASCENDING)
.first(3);
MinBy / MaxBy
从数据集中返回指定字段或组合对应最小或最大的记录,如果选择的字段具有多个相同值,则在集合中随机选择一条记录返回。
DataSet> source = env.fromElements(
Tuple2.of("jack",20),
Tuple2.of("Tom",21),
Tuple2.of("Robin",25),
Tuple2.of("Bob",30));
// 按照第2个元素比较,找出第二个元素为最小值的那个tuple
// 在整个DataSet上使用minBy
ReduceOperator> tuple2Reduce = source.minBy(1);
tuple2Reduce.print();// 返回(jack,20)
// 也可以在分组的DataSet上使用minBy
source.groupBy(0) // 按照第一个字段进行分组
.minBy(1) // 找出每个分组内的按照第二个元素为最小值的那个tuple
.print();
上一篇
下一篇
Flink DataSet 简介
Flink DataSet 数据源示例
Flink DataSet 数据输出 示例
flink广播变量简介
spring事务隔离级别设置
mysql show processlist State 含义