spark RDD 持久化 缓存  
   
所属分类 spark
浏览量 865
调用 persist 或 cache 方法缓存计算结果
触发 action 时 才缓存
cache  调用 persist 无参方法 
def cache(): this.type = persist()
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
缓存级别 默认 MEMORY_ONLY
执行 action 操作时才会真正持久化/缓存
MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
2 代表 2份数据 ,缓存在集群2个节点的内存中
存储级别
class StorageLevel private(
    private var _useDisk: Boolean,
    private var _useMemory: Boolean,
    private var _useOffHeap: Boolean,
    private var _deserialized: Boolean,
    private var _replication: Int = 1)
  extends Externalizable {
  
object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val DISK_ONLY_3 = new StorageLevel(true, false, false, false, 3)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
val rdd = sc.makeRDD(List(1,2,3,4,5,6),2)
rdd.cache
rdd.count
action 操作执行后才会缓存
可在webui storage 查看
http://127.0.0.1:4040/storage/
ID	RDD Name	Storage Level	Cached Partitions	Fraction Cached	Size in Memory	Size on Disk
30	ParallelCollectionRDD	Memory Deserialized 1x Replicated	2	100%	128.0 B	0.0 B
cache 缓存在内存中
数据过大时只能缓存部分数据 
Fraction Cached	   缓存比例
没缓存的数据 会重新计算 
val rdd2 = sc.makeRDD(List(1,2,3,4,5,6),2)
rdd2.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)
 
ID	RDD Name	Storage Level	Cached Partitions	Fraction Cached	Size in Memory	Size on Disk
30	ParallelCollectionRDD	Memory Deserialized 1x Replicated	2	100%	3.7 KiB	0.0 B
31	ParallelCollectionRDD	Disk Serialized 1x Replicated	2	100%	0.0 B	12.0 B
Memory Deserialized 1x Replicated
Disk Serialized 1x Replicated
val rdd3 = sc.makeRDD(List(1,2,3,4,5,6),2)
rdd3.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY_2)
 
Disk Serialized 2x Replicated
WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
WARN BlockManager: Block rdd_33_0 replicated to only 0 peer(s) instead of 1 peers
WARN BlockManager: Block rdd_33_1 replicated to only 0 peer(s) instead of 1 peers
本地只有一个节点 , DISK_ONLY_2 只能缓存一份
 上一篇  
   
 下一篇  
 spark core RDD 
 spark RDD saveAsTextFile 
 spark RDD 分组求平均例子 
 spark RDD  Checkpoint 容错机制 
 spark RDD 窄依赖 和 宽依赖 
 spark RDD Stage 划分