Spark大数据分析实战 第3章 RDD弹性分布式数据集
所属分类 spark
浏览量 767
什么是RDD
RDD常用算子
RDD的分区
RDD的依赖
RDD的持久化
共享变量
什么是RDD
对数据的核心抽象,弹性分布式数据集 RDD Resilient Distributed Dataset
数据集的全部或部分可以缓存在内存中,并且可以在多次计算时重用
分布在多个节点上的数据集合
RDD弹性
当内存不够时,数据可以持久化到磁盘,并且具有高效的容错能力
分布式数据集
数据集存储在不同的节点上,每个节点存储数据集的一部分
例如将数据集(hello,world,scala,spark,love,spark,happy)存储在三个节点上
节点一存储(hello,world)
节点二存储(scala,spark,love)
节点三存储(spark,happy)
三个节点的数据可以并行计算
分布式数据集类似于HDFS中的文件分块,不同的块存储在不同的节点上
并行计算类似使用MapReduce读取HDFS中的数据并进行Map和Reduce操作
Spark包含了这两种功能,并且计算更加灵活
可以把RDD看作是一个数据操作的基本单位,而不必关心数据的分布式特性
Spark会自动将RDD的数据分发到集群的各个节点
Spark中对数据的操作主要是对RDD的操作(创建 转化 求值)
创建RDD
从集合创建RDD
通过 parallelize() 或 makeRDD()方法将一个对象集合转化为RDD
将一个List集合转化为RDD
scala> val rdd=sc.parallelize(List(1,2,3,4,5,6))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0]
scala> val rdd=sc.makeRDD(List(1,2,3,4,5,6))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1]
从外部存储创建RDD
textFile() 读取本地文件系统或外部其它系统中的数据,并创建RDD
//读取本地数据
scala> val rdd=sc.textFile("/home/words.txt")
rdd: org.apache.spark.rdd.RDD[String] = /home/words.txt MapPartitionsRDD[1]
scala> rdd.collect
res1: Array[String] = Array("hello hadoop ", "hello java ", "scala ")
//读取HDFS数据
scala> val rdd=sc.textFile("hdfs://centos01:9000/input/words.txt")
rdd: org.apache.spark.rdd.RDD[String] = hdfs://centos01:9000/input/words.txt MapPartitionsRDD[2]
scala> rdd.collect
res2: Array[String] = Array("hello hadoop ", "hello java ", "scala ")
RDD常用算子
RDD创建后只读
RDD操作 两类算子
转化 Transformation
行动 Action
转化算子
(1)map(func)
scala> val rdd1=sc.parallelize(List(1,2,3,4,5,6))
scala> val rdd2=rdd1.map(x => x+1)
(2)filter(func)
scala> val rdd1=sc.parallelize(List(1,2,3,4,5,6))
scala> val rdd2=rdd1.filter(_>3)
scala> rdd2.collect
res1: Array[Int] = Array(4, 5, 6)
下划线 代表rdd1中的每个元素
flatMap(func)
scala> val rdd1=sc.parallelize(List("hadoop hello scala","spark hello"))
scala> val rdd2=rdd1.flatMap(_.split(" "))
scala> rdd2.collect
res3: Array[String] = Array(hadoop, hello, scala, spark, hello)
(4)reduceByKey()
用于 (key,value)形式(Scala元组)的RDD
将相同key的元素聚集到一起,把所有相同key的元素合并成为一个元素
该元素的key不变,value可以聚合成一个列表或者进行求和等操作
最终返回的RDD的元素类型和原有类型保持一致
zhangsan的语文和数学成绩分别为98 78
lisi的语文和数学成绩分别为88 79
分别求zhangsan和lisi的总成绩
scala> val list=List(("zhangsan",98),("zhangsan",78),("lisi",88),("lisi",79))
scala> val rdd1=sc.parallelize(list)
scala> val rdd2=rdd1.reduceByKey((x,y)=>x+y)
// 可以简化为rdd1.reduceByKey(_+_)
scala> rdd2.collect
res5: Array[(String, Int)] = Array((zhangsan,176), (lisi,167))
行动算子
(1)reduce()
scala> val rdd1 = sc.parallelize(1 to 100)
scala> rdd1.reduce(_+_)
res2: Int = 5050
(2)count()
scala> val rdd1 = sc.parallelize(1 to 100)
scala> rdd1.count
res3: Long = 100
(3)countByKey()
scala> val rdd1 = sc.parallelize(List(("zhang",87),("zhang",79),("li",90)))
scala> rdd1.countByKey
res1: scala.collection.Map[String,Long] = Map(zhang -> 2, li -> 1)
(4)take(n)
返回集合中前5个元素
scala> val rdd1 = sc.parallelize(1 to 100)
scala> rdd1.take(5)
res4: Array[Int] = Array(1, 2, 3, 4, 5)
RDD的分区
RDD是一个大的数据集合
该集合被划分成多个子集合分布到了不同的节点上 ,每一个子集合就称为分区(Partition)
RDD各个分区中的数据可以并行计算 分区的数量决定了并行计算的粒度
Spark会给每一个分区分配一个单独的Task任务对其进行计算
因此并行Task的数量是由分区的数量决定的
RDD分区的一个原则是使得分区的数量尽量等于集群中CPU核心数量
RDD的依赖
对RDD的每一次转化操作都会生成一个新的RDD
RDD 延迟计算,新的RDD会依赖原有RDD,RDD之间存在类似流水线的前后依赖关系
这种依赖关系分为两种 窄依赖和宽依赖
窄依赖 指父RDD的一个分区最多被子RDD的一个分区所用
父RDD的分区与子RDD的分区的对应关系为一对一或多对一
例如map() filter() union()等操作会产生窄依赖
宽依赖
父RDD的一个分区被子RDD的多个分区所用
父RDD的分区与子RDD的分区的对应关系为多对多
例如groupByKey() reduceByKey() sortByKey()等操作会产生宽依赖
Stage划分
根据DAG将整个计算划分为多个阶段,每个阶段称为一个Stage
每个Stage由多个Task任务并行进行计算,每个Task任务作用在一个分区上,
一个Stage的总Task任务数量由Stage中最后一个RDD的分区个数决定
Stage的划分依据为是否有宽依赖,即是否有Shuffle
Spark调度器从DAG图的末端向前进行递归划分,遇到Shuffle则进行划分
RDD的持久化
RDD 延迟计算 遇到行动算子时才会从头计算所有RDD
当同一个RDD被多次使用时,每次都需要重新计算一遍
为避免重复计算同一个RDD,可以将RDD进行持久化
将某个RDD中的数据保存到内存或者磁盘中,
每次需要对这个RDD进行算子操作时
可以直接从内存或磁盘中取出该RDD的持久化数据,而不需要重新计算
例如有多个RDD,它们的依赖关系如图。若RDD3没有持久化保存,则每次对RDD3进行操作时都需要从textFile()开始计算,将文件数据转化为RDD1,再转化为RDD2,最终才得到RDD3。
可以在RDD上使用
persist() 或 cache()
检查点机制(Checkpoint)对RDD数据进行快照
将经常使用的RDD快照到指定的文件系统中,最好是共享文件系统,例如HDFS
当机器发生故障导致内存或磁盘中的RDD数据丢失时可以快速从快照中对指定的RDD进行恢复
cache() 或者 persist() 将数据存储于本地的内存或磁盘,当机器故障时无法进行数据恢复
检查点是将RDD数据存储于外部的共享文件系统(例如HDFS),共享文件系统的副本机制保证数据的可靠性
在Spark应用程序执行结束后,cache()或者persist()存储的数据将被清空
而检查点存储的数据不会受影响,将永久存在,除非手动将其移除
检查点数据可以被下一个Spark应用程序使用,而cache()或者persist()数据只能被当前Spark应用程序使用
val sc = new SparkContext(conf);
//设置检查点数据存储路径
sc.setCheckpointDir("hdfs://centos01:9000/spark-ck")
共享变量
Spark应用程序运行时,Spark算子中的函数会被发送到远程的多个Worker节点上执行
如果一个算子中使用了某个外部变量,则该变量会拷贝到Worker节点的每一个Task任务中,各个Task任务对变量的操作相互独立
当变量所存储的数据量非常大时(例如一个大型集合)将增加网络传输及内存的开销
Spark提供了两种共享变量 广播变量和累加器
广播变量是将一个变量通过广播的形式发送到每个Worker节点的缓存中
而不是发送到每个Task任务中,各个Task任务可以共享该变量的数据
广播变量是只读的
1、默认情况下变量的传递
例如,map()算子中使用了外部变量arr
val arr=Array(1,2,3,4,5);
val lines:RDD[String] = sc.textFile("D:\\test\\data.txt")
val result = lines.map(line =>
(line, arr)
)
使用广播变量时变量的传递
例如,使用广播变量将数组arr传递给了map()算子
val arr=Array(1,2,3,4,5);
val broadcastVar = sc.broadcast(arr)
val result = lines.map(line =>
(line, broadcastVar)
)
通过Broadcast对象的value方法访问广播变量的值
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
累加器
累加器提供了将Worker节点的值聚合到Driver的功能
例如,对一个整型数组进行求和,若不使用累加器,结果不正确
var sum=0 //在Driver中声明
val rdd=sc.makeRDD(Array(1,2,3,4,5))
rdd.foreach(x=>
//在Executor中执行
sum+=x
)
println(sum)//输出0
使用累加器对数组进行求和
//声明一个累加器,默认初始值为0(只能在Driver端定义)
val myacc=sc.longAccumulator("My Accumulator")
val rdd=sc.makeRDD(Array(1,2,3,4,5))
rdd.foreach(x=>
myacc.add(x)//向累加器中添加值
)
println(myacc.value)//输出15(只能在Driver端读取)
注意,累加器只能在Driver端定义,在Executor端更新
Executor端不能读取累加器的值,需要在Driver端使用value属性读取
上一篇
下一篇
编程式绘图工具mermaid
Spark大数据分析实战 第2章 初识Spark
Spark Standalone 的两种提交方式
Spark大数据分析实战 第4章 Spark内核源码分析
Spark大数据分析实战 第5章 Spark SQL结构化数据处理引擎
Spark大数据分析实战 第6章 Kafka分布式消息系统