首页  

flink广播变量简介     所属分类 flink 浏览量 928
广播变量 分布式计算框架 数据共享
小数据集采用网络传输的方式,在每个实例上维护一个只读的缓存变量,
计算节点实例可以在本地内存中直接读取被广播的数据集

公共的共享变量  把DataSet广播出去
不同的task都可以读取该数据
广播的数据只会在每个节点上存一份
如果不使用广播变量,则会在每个节点上的task中都要复制一份dataset数据集


广播变量使用基本步骤

//第一步创建需要广播的数据集
DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3);
DataSet<String> data = env.fromElements("a", "b");

data.map(new RichMapFunction<String, String>() {
    @Override
    public void open(Configuration parameters) throws Exception {
      // 第三步访问集合形式的广播变量数据集
      Collection<Integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName");
    }
    @Override
    public String map(String value) throws Exception {
        ...
    }
}).withBroadcastSet(toBroadcast, "broadcastSetName"); // 第二步广播数据集


DataSet API 在RichFunction接口中通过RuntimeContext读取广播变量

RichFunction open()方法
getRuntimeContext() RuntimeContext
getBroadcastVariable

注意点
广播变量保存在每个节点的内存中,因此广播变量数据集不易过大
广播变量初始化之后,不支持修改,保证每个节点的数据是一致的
如果多个算子都要使用一份数据集,那么需要在多个算子分别注册广播变量
只能在批处理中使用广播变量


完整例子代码 public class BroadcastExample { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ArrayList<Tuple2<Integer,String>> RawBroadCastData = new ArrayList<>(); RawBroadCastData.add(new Tuple2<>(1,"jack")); RawBroadCastData.add(new Tuple2<>(2,"tom")); RawBroadCastData.add(new Tuple2<>(3,"Bob")); // 模拟数据源,[userId,userName] DataSource<Tuple2<Integer, String>> userInfoBroadCastData = env.fromCollection(RawBroadCastData); ArrayList<Tuple2<Integer,Double>> rawUserAount = new ArrayList<>(); rawUserAount.add(new Tuple2<>(1,1000.00)); rawUserAount.add(new Tuple2<>(2,500.20)); rawUserAount.add(new Tuple2<>(3,800.50)); // 处理数据:用户id,用户购买金额 ,[UserId,amount] DataSet<Tuple2<Integer, Double>> userAmount = env.fromCollection(rawUserAount); // 转换为map集合类型的DataSet DataSet<HashMap<Integer, String>> userInfoBroadCast = userInfoBroadCastData.map(new MapFunction<Tuple2<Integer, String>, HashMap<Integer, String>>() { @Override public HashMap<Integer, String> map(Tuple2<Integer, String> value) throws Exception { HashMap<Integer, String> userInfo = new HashMap<>(); userInfo.put(value.f0, value.f1); return userInfo; } }); DataSet<String> result = userAmount.map(new RichMapFunction<Tuple2<Integer, Double>, String>() { // 存放广播变量返回的list集合数据 List<HashMap<String, String>> broadCastList = new ArrayList<>(); // 存放广播变量的值 HashMap<String, String> allMap = new HashMap<>(); @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //获取广播数据,返回的是一个list集合 this.broadCastList = getRuntimeContext().getBroadcastVariable("userInfo"); for (HashMap<String, String> value : broadCastList) { allMap.putAll(value); } } @Override public String map(Tuple2<Integer, Double> value) throws Exception { String userName = allMap.get(value.f0); return "用户id: " + value.f0 + " | "+ "用户名: " + userName + " | " + "购买金额: " + value.f1; } }).withBroadcastSet(userInfoBroadCast, "userInfo"); result.print(); } }

上一篇     下一篇
Flink DataSet 数据源示例

Flink DataSet 数据输出 示例

Flink DataSet 数据转换示例

spring事务隔离级别设置

mysql show processlist State 含义

MySQL查看事务和锁状态