flink批处理wordcount实例
所属分类 flink
浏览量 869
依赖
org.apache.flink:flink-java:1.7.0
package dyyx.batch;
import java.time.LocalDateTime;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
public class WordCount {
public static void main(String[] args) throws Exception {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String input = parameterTool.get("input");
String output = parameterTool.get("output");
if (input == null) {
input = "/tmp/flinkdemo/batch/input.txt";
}
if (output == null) {
output = "/tmp/flinkdemo/batch/output";
}
System.out.println("input=" + input);
System.out.println("output=" + output);
// ExecutionEnvironment vs StreamExecutionEnvironment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet text = env.readTextFile(input);
DataSet> counts = text.flatMap(new Tokenizer()).groupBy(0).sum(1);
counts.writeAsCsv(output, "\n", " ");
String job = "flink-demo-batch-wordcount";
// env.execute(job);
System.out.println(job+" run done,"+LocalDateTime.now());
}
}
完整代码
https://gitee.com/dyyx/demos/blob/master/flinkdemo/src/main/java/dyyx/batch/WordCount.java
测试数据
/tmp/flinkdemo/batch/input.txt
hello java
hello tiger
hello tiger
cat cat cat
dog
注意 output 是目录
/tmp/flinkdemo/batch/output
TD-gang:output dugang$ ls -l
total 32
-rw-r--r-- 1 dugang wheel 8 4 19 18:06 1
-rw-r--r-- 1 dugang wheel 7 4 19 18:06 2
-rw-r--r-- 1 dugang wheel 6 4 19 18:06 3
-rw-r--r-- 1 dugang wheel 14 4 19 18:06 4
文件1
tiger 2
文件2
java 1
文件3
dog 1
文件4
cat 3
hello 3
flink流处理WordCount 实例
上一篇
下一篇
DevOps简介
Flink理论基础
flink流处理WordCount 实例
flink批处理wordcount实例2
flink介绍
Flink编程模型