flink流处理WordCount 实例
所属分类 flink
浏览量 972
滑动窗口计算
每隔一秒统计最近5秒的数据,并打印输出
Flink应用开发步骤
获得一个执行环境
加载/创建 初始化数据
指定操作数据的 transaction 算子
指定数据数据
调用execute()触发执行程序
延迟计算,调用execute()方法时才会真正触发执行
延迟计算优点
可以将复杂的程序转成一个Plan,将Plan作为一个整体单元执行
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception{
String hostname = "localhost" ;
String delimiter = "\n" ;
int port = 9000;
// parameterTool 控制台获取参数
try {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
port = parameterTool.getInt("port") ;
}catch (Exception e){
port = 9000 ;
}
System.out.println("hostname="+hostname+",port="+port);
// 1 获取运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2 连接socket获取输入的数据
DataStream text = env.socketTextStream(hostname,port,delimiter);
// 3 transformation
DataStream windowCounts = text
.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String value, Collector out)
throws Exception {
String[] splits = value.split("\\s");
for (String word : splits){
out.collect(new WordWithCount(word , 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.reduce(new ReduceFunction() {
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
// timeWindow(Time size, Time slide)
windowCounts.print().setParallelism(1);
env.execute("Socket window count run");
}
public static class WordWithCount{
public String word ;
public long count ;
public WordWithCount(){}
public WordWithCount(String word, long count){
this.word = word ;
this.count = count ;
}
@Override
public String toString() {
return "WordWithCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}
org.apache.flink
flink-java
1.7.0
org.apache.flink
flink-streaming-java_2.11
1.7.0
使用netcat 启动 socket 服务 ,从控制台读取输入
nc -l 9000
启动SocketWindowWordCount
./bin/flink run dyyx/SocketWindowWordCount.jar --port 9000
或者直接在 eclipse 启动
在 nc 控制台输入
a
a
a
a
a
...
WordWithCount{word='a', count=2}
WordWithCount{word='a', count=4}
WordWithCount{word='a', count=6}
WordWithCount{word='a', count=6}
WordWithCount{word='a', count=6}
WordWithCount{word='a', count=4}
WordWithCount{word='a', count=2}
完整代码
https://gitee.com/dyyx/demos/blob/master/flinkdemo/src/main/java/dyyx/SocketWindowWordCount.java
上一篇
下一篇
Elasticsearch Scroll 滚动查询实例
DevOps简介
Flink理论基础
flink批处理wordcount实例
flink批处理wordcount实例2
flink介绍