首页  

flink dataset groupBy sortGroup 实例     所属分类 flink 浏览量 1012
groupBy sortGroup  先分组再进行分组内排序,支持并行
sortPartition 要保证全局有序,并行度只能设置为1

MyGroupCombineFunction  没有做额外处理,只是把记录原样输出
用于查看处理结果是否ok

groupBy   返回结果   UnsortedGrouping
sortGroup 返回结果   SortedGrouping




import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.util.Collector;

public class EventsGroupAndSort {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // env.setParallelism(1);

        String file = "file:/path/events.txt";
        DataSet<Tuple5<String,String,String,String, Double>> csvInput = env.readCsvFile(file)
               .types(String.class,String.class,String.class,String.class, Double.class);
            
        csvInput.groupBy(2)
        .sortGroup(3, Order.ASCENDING)
        .sortGroup(1, Order.DESCENDING)
         
        // .first(9).print();
        // first 取每一个分组key的前N条记录
        
        .combineGroup(new MyGroupCombineFunction())
        .print();
    }
    
    private static class MyGroupCombineFunction implements GroupCombineFunction<Tuple5<String,String,String,String, Double>,Tuple5<String,String,String,String, Double>>{       
        private static final long serialVersionUID = 1L;
        public void combine(Iterable<Tuple5<String,String,String,String, Double>> values, Collector<Tuple5<String,String,String,String, Double>> out) throws Exception{
            for(Tuple5<String,String,String,String, Double> item:values) {
                out.collect(item);
            }       
        }
    }
}





另一种写法
csvInput.groupBy(2,3)
// .sortGroup(3, Order.ASCENDING)
.sortGroup(1, Order.DESCENDING)




完整代码
https://gitee.com/dyyx/demos/blob/master/flinkdemo/src/main/java/dyyx/zb/EventsGroupAndSort.java


flink dataset 测试数据说明 flink datasset sortPartition 实例

上一篇     下一篇
MySQL查看事务和锁状态

flink dataset 测试数据说明

flink datasset sortPartition 实例

flink dataset 输出注意点

搞金融必备的各类计算公式

flink 测试数据生成