flink dataset groupBy sortBy 实例与说明
所属分类 flink
浏览量 923
groupBy 分组 分组字段值一样的在一组里
sortGroup 组内排序
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
String file = "file:/path/events.txt";
DataSet> csvInput = env.readCsvFile(file)
.types(String.class,String.class,String.class,String.class, Double.class);
csvInput.groupBy(2)
.sortGroup(3, Order.ASCENDING)
.sortGroup(1, Order.DESCENDING)
.combineGroup(new MyGroupCombineFunction())
.print();
private static class MyGroupCombineFunction implements GroupCombineFunction,String>{
private static final long serialVersionUID = 1L;
public void combine(Iterable> values, Collector out) throws Exception{
for(Tuple5 item:values) {
out.collect(item.toString());
}
out.collect("-------");
}
}
以下两者的区别
csvInput.groupBy(2)
.sortGroup(3, Order.ASCENDING)
.sortGroup(1, Order.DESCENDING)
csvInput.groupBy(2,3)
.sortGroup(1, Order.DESCENDING)
groupBy 相同的key在一起
第一种输出
第3个字段一样的在一组里
(6,2021-01-03 03:00:00,a,b,6.0)
(5,2021-01-03 02:00:00,a,b,5.0)
(1,2021-01-01 00:00:00,a,b,1.0)
(11,2021-01-05 01:00:00,a,c,5.0)
(10,2021-01-05 00:00:00,a,c,3.0)
(2,2021-01-02 00:00:00,a,c,2.0)
(4,2021-01-03 01:00:00,a,d,4.0)
-------
(9,2021-01-04 00:00:00,b,a,3.0)
(8,2021-01-03 05:00:00,b,a,2.0)
(3,2021-01-03 00:00:00,b,a,3.0)
(7,2021-01-03 04:00:00,b,c,1.0)
-------
(0,2021-01-05 01:00:00,c,a,5.0)
-------
第二种输出
第3第4两字段一样的在一组里
(6,2021-01-03 03:00:00,a,b,6.0)
(5,2021-01-03 02:00:00,a,b,5.0)
(1,2021-01-01 00:00:00,a,b,1.0)
-------
(11,2021-01-05 01:00:00,a,c,5.0)
(10,2021-01-05 00:00:00,a,c,3.0)
(2,2021-01-02 00:00:00,a,c,2.0)
-------
(4,2021-01-03 01:00:00,a,d,4.0)
-------
(9,2021-01-04 00:00:00,b,a,3.0)
(8,2021-01-03 05:00:00,b,a,2.0)
(3,2021-01-03 00:00:00,b,a,3.0)
-------
(7,2021-01-03 04:00:00,b,c,1.0)
-------
(0,2021-01-05 01:00:00,c,a,5.0)
-------
完整代码
https://gitee.com/dyyx/demos/blob/master/flinkdemo/src/main/java/dyyx/zb/EventsGroupAndSort3.java
https://gitee.com/dyyx/demos/blob/master/flinkdemo/src/main/java/dyyx/zb/EventsGroupAndSort4.java
测试数据
https://gitee.com/dyyx/demos/blob/master/flinkdemo/docs/data/events.txt
上一篇
下一篇
搞金融必备的各类计算公式
flink 测试数据生成
flink dataset 大数据集测试
flink术语
flink datastream batch mode wordcount实例
flink内存管理机制