首页  

spark RDD 持久化 缓存     所属分类 spark 浏览量 576
调用 persist 或 cache 方法缓存计算结果
触发 action 时 才缓存
cache  调用 persist 无参方法 

def cache(): this.type = persist()
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

缓存级别 默认 MEMORY_ONLY
执行 action 操作时才会真正持久化/缓存

MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
2 代表 2份数据 ,缓存在集群2个节点的内存中

存储级别

class StorageLevel private(
    private var _useDisk: Boolean,
    private var _useMemory: Boolean,
    private var _useOffHeap: Boolean,
    private var _deserialized: Boolean,
    private var _replication: Int = 1)
  extends Externalizable {
  
object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val DISK_ONLY_3 = new StorageLevel(true, false, false, false, 3)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)



val rdd = sc.makeRDD(List(1,2,3,4,5,6),2) rdd.cache rdd.count action 操作执行后才会缓存 可在webui storage 查看 http://127.0.0.1:4040/storage/ ID RDD Name Storage Level Cached Partitions Fraction Cached Size in Memory Size on Disk 30 ParallelCollectionRDD Memory Deserialized 1x Replicated 2 100% 128.0 B 0.0 B
cache 缓存在内存中 数据过大时只能缓存部分数据 Fraction Cached 缓存比例 没缓存的数据 会重新计算 val rdd2 = sc.makeRDD(List(1,2,3,4,5,6),2) rdd2.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY) ID RDD Name Storage Level Cached Partitions Fraction Cached Size in Memory Size on Disk 30 ParallelCollectionRDD Memory Deserialized 1x Replicated 2 100% 3.7 KiB 0.0 B 31 ParallelCollectionRDD Disk Serialized 1x Replicated 2 100% 0.0 B 12.0 B Memory Deserialized 1x Replicated Disk Serialized 1x Replicated val rdd3 = sc.makeRDD(List(1,2,3,4,5,6),2) rdd3.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY_2) Disk Serialized 2x Replicated WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s. WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s. WARN BlockManager: Block rdd_33_0 replicated to only 0 peer(s) instead of 1 peers WARN BlockManager: Block rdd_33_1 replicated to only 0 peer(s) instead of 1 peers 本地只有一个节点 , DISK_ONLY_2 只能缓存一份

上一篇     下一篇
spark core RDD

spark RDD saveAsTextFile

spark RDD 分组求平均例子

spark RDD Checkpoint 容错机制

spark RDD 窄依赖 和 宽依赖

spark RDD Stage 划分