首页  

Spark 累加器 Accumulator     所属分类 spark 浏览量 187
累加器   driver端定义,driver端读取,Executor Task端累加操做

内置三种类型 
LongAccumulator
DoubleAccumulator
CollectionAccumulator


import org.apache.spark.{SparkConf, SparkContext}

object AccumulatorDemo extends App{
  val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkAccumulatorDemo")
  var sc =  new SparkContext(sparkConf);
  val list = List(1,2,3,4,5,6,7,8,9)
  val rdd = sc.makeRDD(list,3)
  val longAccumulator = sc.longAccumulator("longAccumulator");
  val doubleAccumulator = sc.doubleAccumulator("doubleAccumulator");
  val collectionAccumulator = sc.collectionAccumulator[Int]("collectionAccumulator");

  val rdd2 = rdd.map(e => {
    longAccumulator.add(1);
    doubleAccumulator.add(1);
    collectionAccumulator.add(e);
    e
  })

  rdd2.count
  println(longAccumulator)
  println(doubleAccumulator)
  println(collectionAccumulator)
  sc.stop();
}

上一篇     下一篇
为何要收集整理创作技术段子

scala 类继承实例

一站式大数据平台StreamX

scala future 实例

AKKA actor model 的一些思考

Scala 柯里化 Currying