flink广播变量简介
所属分类 flink
浏览量 928
广播变量 分布式计算框架 数据共享
小数据集采用网络传输的方式,在每个实例上维护一个只读的缓存变量,
计算节点实例可以在本地内存中直接读取被广播的数据集
公共的共享变量 把DataSet广播出去
不同的task都可以读取该数据
广播的数据只会在每个节点上存一份
如果不使用广播变量,则会在每个节点上的task中都要复制一份dataset数据集
广播变量使用基本步骤
//第一步创建需要广播的数据集
DataSet toBroadcast = env.fromElements(1, 2, 3);
DataSet data = env.fromElements("a", "b");
data.map(new RichMapFunction() {
@Override
public void open(Configuration parameters) throws Exception {
// 第三步访问集合形式的广播变量数据集
Collection broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
}
@Override
public String map(String value) throws Exception {
...
}
}).withBroadcastSet(toBroadcast, "broadcastSetName"); // 第二步广播数据集
DataSet API 在RichFunction接口中通过RuntimeContext读取广播变量
RichFunction open()方法
getRuntimeContext() RuntimeContext
getBroadcastVariable
注意点
广播变量保存在每个节点的内存中,因此广播变量数据集不易过大
广播变量初始化之后,不支持修改,保证每个节点的数据是一致的
如果多个算子都要使用一份数据集,那么需要在多个算子分别注册广播变量
只能在批处理中使用广播变量
完整例子代码
public class BroadcastExample {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
ArrayList> RawBroadCastData = new ArrayList<>();
RawBroadCastData.add(new Tuple2<>(1,"jack"));
RawBroadCastData.add(new Tuple2<>(2,"tom"));
RawBroadCastData.add(new Tuple2<>(3,"Bob"));
// 模拟数据源,[userId,userName]
DataSource> userInfoBroadCastData = env.fromCollection(RawBroadCastData);
ArrayList> rawUserAount = new ArrayList<>();
rawUserAount.add(new Tuple2<>(1,1000.00));
rawUserAount.add(new Tuple2<>(2,500.20));
rawUserAount.add(new Tuple2<>(3,800.50));
// 处理数据:用户id,用户购买金额 ,[UserId,amount]
DataSet> userAmount = env.fromCollection(rawUserAount);
// 转换为map集合类型的DataSet
DataSet> userInfoBroadCast = userInfoBroadCastData.map(new MapFunction, HashMap>() {
@Override
public HashMap map(Tuple2 value) throws Exception {
HashMap userInfo = new HashMap<>();
userInfo.put(value.f0, value.f1);
return userInfo;
}
});
DataSet result = userAmount.map(new RichMapFunction, String>() {
// 存放广播变量返回的list集合数据
List> broadCastList = new ArrayList<>();
// 存放广播变量的值
HashMap allMap = new HashMap<>();
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//获取广播数据,返回的是一个list集合
this.broadCastList = getRuntimeContext().getBroadcastVariable("userInfo");
for (HashMap value : broadCastList) {
allMap.putAll(value);
}
}
@Override
public String map(Tuple2 value) throws Exception {
String userName = allMap.get(value.f0);
return "用户id: " + value.f0 + " | "+ "用户名: " + userName + " | " + "购买金额: " + value.f1;
}
}).withBroadcastSet(userInfoBroadCast, "userInfo");
result.print();
}
}
上一篇
下一篇
Flink DataSet 数据源示例
Flink DataSet 数据输出 示例
Flink DataSet 数据转换示例
spring事务隔离级别设置
mysql show processlist State 含义
MySQL查看事务和锁状态