RDD持久化
RDD的数据是过程数据
- RDD之间进行相互迭代计算(Transformation的转换),当执行开启后,新RDD的生成,代表老RDD的消失。
- 这个特性可以最大化的利用资源,老旧RDD没用了 就从内存中清理,给后续的计算腾出内存空间.
RDD的数据是过程数据,只在处理的过程中存在,一旦处理完成,就不见了。RDD的持久化,就是将RDD的数据保存到内存中,以便后续使用。
RDD 的缓存(Persistence)是一种将 RDD 的计算结果保存在内存或者磁盘上的机制,以便在后续的计算中重复使用这些数据,从而提高程序的执行效率。
RDD 的缓存分为两种类型:内存缓存和磁盘缓存。
- 内存缓存:将 RDD 的数据保存在内存中,以便在后续的计算中重复使用这些数据。内存缓存可以提高程序的执行效率,因为内存的读写速度比磁盘快得多。但是,内存的容量有限,如果数据量过大,可能会导致内存不足,从而引发 OutOfMemoryError 异常。
- 磁盘缓存:将 RDD 的数据保存在磁盘上,以便在后续的计算中重复使用这些数据。磁盘缓存的读写速度比内存慢,但是磁盘的容量比内存大得多。如果数据量过大,内存缓存无法容纳,可以使用磁盘缓存。
RDD 的缓存可以通过 persist() 或 cache() 方法来启用。persist() 方法可以指定缓存的存储级别,包括 MEMORY_ONLY、MEMORY_AND_DISK、DISK_ONLY 等。cache() 方法是 persist() 方法的简化版,默认使用 MEMORY_ONLY 存储级别。
from pyspark import SparkContext,SparkConf,StorageLevel
conf = SparkConf().setAppName("coalesceRDD").setMaster("local[*]")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([('a', 1), ('b', 2), ('a', 3), ('b', 4), ('a', 5), ('c', 2), ('c', 3)],3)
# 例如 RDD 如果要被多次使用,可以加入缓存进行优化
rdd.cache() #缓存到内存中
rdd.persist(StorageLevel.MEMORY_ONLY) #仅内存缓存
rdd.persist(StorageLevel.MEMORY_ONLY_2) # 仅内存缓存,2个副本
rdd.persist(StorageLevel.DISK_ONLY) # 仅缓存硬盘上
rdd.persist(StorageLevel.DISK_ONLY_2) # 仅缓存硬盘上,2个副本
rdd.persist(StorageLevel.DISK_ONLY_3) # 仅缓存硬盘上,3个副本
rdd.persist(StorageLevel.MEMORY_AND_DISK) # 先放内存,不够放硬盘
rdd.persist(StorageLevel.MEMORY_AND_DISK_2) # 先放内存,不够放硬盘,2个副本
rdd.persist(StorageLevel.OFF_HEAP) # 堆外内存(系统内存)
# 如上API,自行选择使用即可
# 一般建议使用rdd.persist(StorageLevel.MEMORY_AND_DISK)
# 如果内存比较小的集群,建议使用rdd3.persist(StorageLevel.DISK_ONLY)或者就别用缓存了 用CheckPoint
# 主动消理缓存的API
rdd.unpersist()
sc.stop()
RDD 的CheckPoint
缓存的特点:
- 缓存技术可以将过程RDD数据,持久化保存到内存或者硬盘上
- 但是,这个保存在设定上是认为不安全的,其保留RDD之间的血缘(依赖)关系
- 如果发生故障,可以重新计算,但是,如果数据量过大,重新计算时间过长,效率低下
- RDD是将自己分区的数据,每个分区自行将其数据保存在其所在的Executor内存和硬盘上
CheckPoint的特点:
- CheckPoint技术可以将过程RDD数据,持久化保存到磁盘上
- 并且切断血缘关系,这样,即使发生故障,也可以从CheckPoint恢复数据,效率更高
- CheckPoint的使用,需要配置检查点目录,目录必须是HDFS上的目录
- CheckPoint的使用,需要使用checkpoint()方法,在需要CheckPoint的RDD上调用
- CheckPoint存储RDD数据,是集中收集各个分区 数据进行存储,而缓存是 分散存储
注意:
- CheckPoint是一种重量级的使用,也就是RDD的重新计算成本很高的时候,我们采用checkPoint比较合适,或者数据量很大,用CheckPoint比较合适
- 如果数据量小,或者RDD重新计算是非常快的,用CheckPoint没啥必要,直接缓存即可
- Cache和CheckPoint两个API都不是Action类型。所以,想要它俩工作,必须在后面接上Action
- 接上Action的目的,是让RDD有数据,而不是为了让CheckPoint和Cache工作,
from pyspark import SparkContext,SparkConf,StorageLevel
conf = SparkConf().setAppName("coalesceRDD").setMaster("local[*]")
sc = SparkContext(conf=conf)
# 设置CheckPoint第一件事情,选择CheckPoint的保存路径
#如果是Local模式,可以支持本地文件系统,如果在集群运行,千万要用HDFS
sc.setCheckpointDir("hdfs://ns1/user/admin/files/cache")
rdd = sc.parallelize([('a', 1), ('b', 2), ('a', 3), ('b', 4), ('a', 5), ('c', 2), ('c', 3)],3)
# 用的时候,直接调用checkpoint算子即可
rdd.checkpoint()
print(rdd.collect())
sc.stop()