flink datastream batch mode wordcount实例
所属分类 flink
浏览量 879
datastream batch mode
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
System.out.println(env);
System.out.println("parallelism="+env.getParallelism());
DataStream text = env.fromElements(
"hello java","hello tiger","hello tiger","cat cat cat","dog","dog"
);
text.flatMap(new Splitter())
// .keyBy(0)
.keyBy(new MyKeySelector())
.sum(1)
.print();
env.execute();
}
private static class Splitter implements FlatMapFunction> {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String sentence, Collector> out) throws Exception {
String[]words = sentence.split("\\W+");
for (String word: words) {
out.collect(new Tuple2(word, 1));
}
}
}
private static class MyKeySelector implements KeySelector,String>{
private static final long serialVersionUID = 1L;
public String getKey(Tuple2 value) {
return value.f0;
}
}
}
用 .keyBy(0) 最简单 , 不过 Deprecated 了
例子里有个问题
最后只有1个 dog 时 ,dog 无法被统计到 , 加上第二个dog 时 ,结果ok
flink1.12.0 的 bug ?
1.12.2 没问题 !!!
完整代码
https://gitee.com/dyyx/demos/blob/master/flinkdemo/src/main/java/dyyx/stream/WordCount.java
datastream api 用 keyBy
dataset api 用 groupBy
flink流处理WordCount 实例
flink批处理wordcount实例
flink批处理wordcount实例2
上一篇
下一篇
flink dataset 大数据集测试
flink dataset groupBy sortBy 实例与说明
flink术语
flink内存管理机制
为何kafka要去掉zookeeper依赖
flink 运行模式 批处理与流处理模式