spark RDD 持久化 缓存
所属分类 spark
浏览量 583
调用 persist 或 cache 方法缓存计算结果
触发 action 时 才缓存
cache 调用 persist 无参方法
def cache(): this.type = persist()
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
缓存级别 默认 MEMORY_ONLY
执行 action 操作时才会真正持久化/缓存
MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
2 代表 2份数据 ,缓存在集群2个节点的内存中
存储级别
class StorageLevel private(
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int = 1)
extends Externalizable {
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val DISK_ONLY_3 = new StorageLevel(true, false, false, false, 3)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
val rdd = sc.makeRDD(List(1,2,3,4,5,6),2)
rdd.cache
rdd.count
action 操作执行后才会缓存
可在webui storage 查看
http://127.0.0.1:4040/storage/
ID RDD Name Storage Level Cached Partitions Fraction Cached Size in Memory Size on Disk
30 ParallelCollectionRDD Memory Deserialized 1x Replicated 2 100% 128.0 B 0.0 B
cache 缓存在内存中
数据过大时只能缓存部分数据
Fraction Cached 缓存比例
没缓存的数据 会重新计算
val rdd2 = sc.makeRDD(List(1,2,3,4,5,6),2)
rdd2.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)
ID RDD Name Storage Level Cached Partitions Fraction Cached Size in Memory Size on Disk
30 ParallelCollectionRDD Memory Deserialized 1x Replicated 2 100% 3.7 KiB 0.0 B
31 ParallelCollectionRDD Disk Serialized 1x Replicated 2 100% 0.0 B 12.0 B
Memory Deserialized 1x Replicated
Disk Serialized 1x Replicated
val rdd3 = sc.makeRDD(List(1,2,3,4,5,6),2)
rdd3.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY_2)
Disk Serialized 2x Replicated
WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
WARN BlockManager: Block rdd_33_0 replicated to only 0 peer(s) instead of 1 peers
WARN BlockManager: Block rdd_33_1 replicated to only 0 peer(s) instead of 1 peers
本地只有一个节点 , DISK_ONLY_2 只能缓存一份
上一篇
下一篇
spark core RDD
spark RDD saveAsTextFile
spark RDD 分组求平均例子
spark RDD Checkpoint 容错机制
spark RDD 窄依赖 和 宽依赖
spark RDD Stage 划分