首页  

flink流处理WordCount 实例     所属分类 flink 浏览量 985
滑动窗口计算
每隔一秒统计最近5秒的数据,并打印输出


Flink应用开发步骤

获得一个执行环境
加载/创建 初始化数据
指定操作数据的 transaction 算子
指定数据数据
调用execute()触发执行程序

延迟计算,调用execute()方法时才会真正触发执行
延迟计算优点
可以将复杂的程序转成一个Plan,将Plan作为一个整体单元执行



import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class SocketWindowWordCount {

     public static void main(String[] args) throws Exception{
            String hostname = "localhost" ;
            String delimiter = "\n" ;
            int port = 9000;
            // parameterTool 控制台获取参数
            try {
                ParameterTool parameterTool = ParameterTool.fromArgs(args);
                port = parameterTool.getInt("port") ;
            }catch (Exception e){     
                port = 9000 ;
            }
            
            System.out.println("hostname="+hostname+",port="+port);

            // 1 获取运行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

            // 2 连接socket获取输入的数据
            DataStream<String> text = env.socketTextStream(hostname,port,delimiter);


            // 3 transformation 
            DataStream<WordWithCount> windowCounts = text
                .flatMap(new FlatMapFunction<String, WordWithCount>() {
                @Override
                public void flatMap(String value, Collector<WordWithCount> out)
                        throws Exception {
                    String[] splits = value.split("\\s");  
                    for (String word : splits){
                        out.collect(new WordWithCount(word , 1L));    
                    }

                }
            })
            .keyBy("word")
            .timeWindow(Time.seconds(5), Time.seconds(1))
            .reduce(new ReduceFunction<WordWithCount>() {
                public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                    return new WordWithCount(a.word, a.count + b.count);
                }
            });
            // timeWindow(Time size, Time slide)

            windowCounts.print().setParallelism(1);

            env.execute("Socket window count run");
        }

        public static class WordWithCount{
            public String word ;
            public long count ;

            public  WordWithCount(){}

            public WordWithCount(String word, long count){
                this.word = word ;
                this.count = count ;
            }

            @Override
            public String toString() {
                return "WordWithCount{" +
                        "word='" + word + '\'' +
                        ", count=" + count +
                        '}';
            }
        }
}



<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.7.0</version>
</dependency>


<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>1.7.0</version>
</dependency>





使用netcat 启动 socket 服务 ,从控制台读取输入 nc -l 9000 启动SocketWindowWordCount ./bin/flink run dyyx/SocketWindowWordCount.jar --port 9000 或者直接在 eclipse 启动 在 nc 控制台输入 a a a a a ... WordWithCount{word='a', count=2} WordWithCount{word='a', count=4} WordWithCount{word='a', count=6} WordWithCount{word='a', count=6} WordWithCount{word='a', count=6} WordWithCount{word='a', count=4} WordWithCount{word='a', count=2}
完整代码 https://gitee.com/dyyx/demos/blob/master/flinkdemo/src/main/java/dyyx/SocketWindowWordCount.java

上一篇     下一篇
Elasticsearch Scroll 滚动查询实例

DevOps简介

Flink理论基础

flink批处理wordcount实例

flink批处理wordcount实例2

flink介绍