首页  

flink datastream batch mode wordcount实例     所属分类 flink 浏览量 879
datastream  batch mode
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);



import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class WordCount {

    public static void main(String[] args) throws Exception {
        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);

        System.out.println(env);    
        System.out.println("parallelism="+env.getParallelism());    

        DataStream<String> text = env.fromElements(
            "hello java","hello tiger","hello tiger","cat cat cat","dog","dog"
        );
                
        text.flatMap(new Splitter())
        // .keyBy(0)
        .keyBy(new MyKeySelector())
        .sum(1)
        .print();
     
        env.execute();
    }
    
    private static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1L;

        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            String[]words = sentence.split("\\W+");
            for (String word: words) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }
    
    private static class MyKeySelector  implements KeySelector<Tuple2<String, Integer>,String>{ 
        private static final long serialVersionUID = 1L;

        public String getKey(Tuple2<String, Integer> value) {
            return value.f0;
        }
    }

}



用 .keyBy(0) 最简单 , 不过  Deprecated 了

例子里有个问题 
最后只有1个 dog 时 ,dog 无法被统计到 , 加上第二个dog 时 ,结果ok 
flink1.12.0 的 bug ? 
1.12.2  没问题 !!!




完整代码
https://gitee.com/dyyx/demos/blob/master/flinkdemo/src/main/java/dyyx/stream/WordCount.java


datastream api  用 keyBy
dataset    api  用 groupBy


flink流处理WordCount 实例 flink批处理wordcount实例 flink批处理wordcount实例2

上一篇     下一篇
flink dataset 大数据集测试

flink dataset groupBy sortBy 实例与说明

flink术语

flink内存管理机制

为何kafka要去掉zookeeper依赖

flink 运行模式 批处理与流处理模式