flink dataset groupBy sortGroup 实例
所属分类 flink
浏览量 1012
groupBy sortGroup 先分组再进行分组内排序,支持并行
sortPartition 要保证全局有序,并行度只能设置为1
MyGroupCombineFunction 没有做额外处理,只是把记录原样输出
用于查看处理结果是否ok
groupBy 返回结果 UnsortedGrouping
sortGroup 返回结果 SortedGrouping
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.util.Collector;
public class EventsGroupAndSort {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// env.setParallelism(1);
String file = "file:/path/events.txt";
DataSet> csvInput = env.readCsvFile(file)
.types(String.class,String.class,String.class,String.class, Double.class);
csvInput.groupBy(2)
.sortGroup(3, Order.ASCENDING)
.sortGroup(1, Order.DESCENDING)
// .first(9).print();
// first 取每一个分组key的前N条记录
.combineGroup(new MyGroupCombineFunction())
.print();
}
private static class MyGroupCombineFunction implements GroupCombineFunction,Tuple5>{
private static final long serialVersionUID = 1L;
public void combine(Iterable> values, Collector> out) throws Exception{
for(Tuple5 item:values) {
out.collect(item);
}
}
}
}
另一种写法
csvInput.groupBy(2,3)
// .sortGroup(3, Order.ASCENDING)
.sortGroup(1, Order.DESCENDING)
完整代码
https://gitee.com/dyyx/demos/blob/master/flinkdemo/src/main/java/dyyx/zb/EventsGroupAndSort.java
flink dataset 测试数据说明
flink datasset sortPartition 实例
上一篇
下一篇
MySQL查看事务和锁状态
flink dataset 测试数据说明
flink datasset sortPartition 实例
flink dataset 输出注意点
搞金融必备的各类计算公式
flink 测试数据生成