RDD和它依赖的父RDD的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。 注意:join操作比较特殊,可能同时存在宽、窄依赖
阅读源码:
窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用。 总结:窄依赖我们形象的比喻为独生子女
宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition。 总结:宽依赖我们形象的比喻为超生
如何区分宽窄依赖
窄依赖:父RDD的一个分区只会被子RDD的一个分区依赖宽依赖:父RDD的一个分区会被子RDD的多个分区依赖(涉及到shuffle)子RDD的一个分区依赖多个父RDD是宽依赖还是窄依赖?
不能确定,也就是宽窄依赖的划分依据是父RDD的一个分区是否被子RDD的多个分区所依赖, 如果是,就是宽依赖,或者从shuffle的角度去判断,有shuffle就是宽依赖
RDD只支持粗粒度转换,即只记录单个块上执行的单个操作。将创建RDD的一系列 Lineage(即血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
引入缓存
一般的算子如果计算代价不大的化重新的计算也不会影响的计算速度但是如果一些比较昂贵的算子如果能够将其计算结果缓存起来,后面再次使用直接从内存或者是磁盘中获取已经保存的数据,可以节省计算时间为什么需要有缓存?--容错机制
1-缓存可以帮助我们对一些较为昂贵的算子保存在内存或磁盘2-避免重复的计算提出问题:
问题1:当在计算 RDD3 的时候如果出错了, 会怎么进行容错?问题2:会再次计算 RDD1 和 RDD2 的整个链条, 假设 RDD1 和 RDD2 是通过比较 昂贵的操作得来的, 有没有什么办法减少这种开销?上述两个问题的解决方案其实都是缓存, 除此之外, 使用缓存的理由还有很多, 但是总结一句, 就是缓存能够帮助开发者在进行一些昂贵操作后, 将其结果保存下来, 以便下次使用无需再次执行, 缓存能够显著的提升性能. 所以, 缓存适合在一个 RDD 需要重复多次利用, 并且还不是特别大的情况下使用, 例如迭代计算等场景. 因此, Spark速度非常快的原因之一,就是在不同操作中可以在内存中持久化或者缓存数据集。当持久化某个RDD后,每一个节点都将把计算分区结果保存在内存中,对此RDD或衍生出 的RDD进行的其他动作中重用。这使得后续的动作变得更加迅速。RDD相关的持久化和缓 存,是Spark最重要的特征之一。可以说,缓存是Spark构建迭代式算法和快速交互式查询的 关键。
RDD通过persist方法或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。 其实如何缓存是一个技术活, 有很多细节需要思考, 如下
是否使用磁盘缓存?是否使用内存缓存?是否使用堆外内存?缓存前是否先序列化?是否需要有副本?如果要回答这些信息的话, 可以先查看一下 RDD 的缓存级别对象 看一下RDD都有哪些缓存级别,查看 StorageLevel 类的源码: 查看官网:rdd-persistence介绍 查看其构造函数: 可以看到StorageLevel类的主构造器包含了5个参数:
useDisk:使用硬盘(外存)useMemory:使用内存useOffHeap:使用堆外内存,这是Java虚拟机里面的概念,堆外内存意味着把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机)。这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响。deserialized:反序列化,其逆过程序列化(Serialization)是java提供的一种机制,将对象表示成一连串的字节;而反序列化就表示将字节恢复为对象的过程。序列化是对象永久化的一种机制,可以将对象及其属性保存起来,并能在反序列化后直接恢复这个对象。序列化方式存储对象可以节省磁盘或内存的空间,一般序列化:反序列化=1:3replication:备份数(在多个节点上备份)理解了这5个参数,StorageLevel 的12种缓存级别就不难理解了。 valMEMORY_AND_DISK_SER_2=newStorageLevel(true,true,false,false,2) 就表示使用 这种缓存级别的RDD将存储在硬盘以及内存中,使用序列化(在硬盘中),并且在多个节 点上备份2份(正常的RDD只有一份) 另外还注意到有一种特殊的缓存级别:
val OFF_HEAP = new StorageLevel(false, false, true, false) 使用了堆外内存,StorageLevel 类的源码中有一段代码可以看出这个的特殊性,它不能和其它几个参数共存。
如何选择存储级别: 选择哪个存储级别?- 官网解释 Spark的存储级别旨在在内存使用量和CPU效率之间提供不同的权衡。我们建议通过以下过程选择一个:
如果您的RDD与默认的存储级别(MEMORY_ONLY)相称,请保持这种状态。这是CPU效率最高的选项,允许RDD上的操作尽可能快地运行。如果不是,请尝试使用MEMORY_ONLY_SER并选择一个快速的序列化库,以使对象的空间效率更高,但访问速度仍然相当快。(Java和Scala)除非用于计算数据集的函数非常昂贵,否则它们会泄漏到磁盘上,否则它们会过滤大量数据。否则,重新计算分区可能与从磁盘读取分区一样快。如果要快速恢复故障,请使用复制的存储级别(例如,如果使用Spark来处理来自Web应用程序的请求)。所有存储级别都通过重新计算丢失的数据来提供完全的容错能力,但是复制的存储级别使您可以继续在RDD上运行任务,而不必等待重新计算丢失的分区。运行结果时间比对:这里程序比较小,不太明显
运行不加缓存 加cache缓存分析:
缓存的作用?
是RDD的一个重要的容错方法缓存会将我们的RDD的计算的结果缓存在磁盘或内存中如果一个RDD分区失效了Spark程序会直接拿到当前的内存或磁盘中的当前RDD的缓存进行恢复问题:
(Question:)RDD的persist或cache是将RDD的整个chain(lineage)都进行保存还是说保存上一个RDD的数据信息?如果仅保存当前的rdd的结果,试问大家如何进行恢复? 答案是否定的,因为在spark中重要机制就是由依赖关系构成血统lineage,如果不保存所有的依赖关系链chain的每个rdd的转换操作,无法达到容错特性,所以这里就是将rdd3之前的所有的操作都保存起来了。上面也能体现出来了,他们的区别cache其实也是调用了persist方法; 说明是cache()调用了persist(), 想要知道二者的不同还需要看一下persist函数:
在Spark Core中对RDD做checkpoint,Checkpoint主要作用就是切断做checkpoint RDD的依赖关系,将RDD 数据保存到可靠存储(如支持分布式存储和副本机制的HDFS)以便数据恢复;
RDD 数据可以持久化,但是持久化/缓存可以把数据放在内存中,虽然是快速的,但是 也是最不可靠的;也可以把数据放在磁盘上,也不是完全可靠的!例如磁盘会损坏等。
Checkpoint的产生就是为了更加可靠的数据持久化,在Checkpoint的时候一般把数据 放在在HDFS上,这就天然的借助了HDFS天生的高容错、高可靠来实现数据最大程度上的安全,实现了RDD的容错和高可用。
通过上述,为什么需要有CheckPoint?
在上述的RDD的Cache或Persist过程中会发现无论将RDD的chain放在内存或磁盘中都是会丢失当前的计算结果,这时候引入希望能够在分布式的环境中引入非易失的介质中,这里最熟悉也是就是HDFS中,接下来的Checkpoint就是将rdd的chain放在了HDFS中当RDD使用cache机制从内存中读取数据,如果数据没有读到,会使用checkpoint机制读取数据。此时如果没有checkpoint机制,那么就需要找到父RDD重新计算数据了,因此checkpoint是个很重要的容错机制。checkpoint就是对于一个RDD chain(链)如果后面需要反复使用某些中间结果RDD,可能因为一些故障导致该中间数据丢失,那么就可以针对该RDD启动checkpoint机制,使用checkpoint首先需要调用sparkContext的setCheckpointDir方法,设置一个容错文件系统目录,比如hdfs,然后对RDD调用checkpoint方法。之后在RDD所处的job运行结束后,会启动一个单独的job来将checkpoint过的数据写入之前设置的文件系统持久化,进行高可用。所以后面的计算在使用该RDD时,如果数据丢失了,但是还是可以从它的checkpoint中读取数据,不需要重新计算。 persist或者cache与checkpoint的区别在于,前者持久化只是将数据保存在BlockManager中但是其lineage是不变的,但是后者checkpoint执行完后,rdd已经没有依赖RDD,只有一个checkpointRDD,checkpoint之后,RDD的lineage就改变了。persist或者cache持久化的数据丢失的可能性更大,因为可能磁盘或内存被清理,但是checkpoint的数据通常保存到hdfs上,放在了高容错文件系统。
案例2:
import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.{SparkConf, SparkContext} /** * DESC: */ object rddPersist { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setAppName("rddPersist").setMaster("local[*]") val sc = new SparkContext(conf) sc.setCheckpointDir("./datasets/checkpoint1/") val rdd: RDD[Int] = sc.parallelize(1 to 10) //增加使用cache或persist进行缓存 rdd.cache() rdd.persist(StorageLevel.MEMORY_AND_DISK_SER_2) rdd.checkpoint() //可以直接对当前的rdd执行转换 val rdd2: RDD[Int] = rdd.map(_ * 10) rdd2.collect().foreach(println(_)) } }分析: Spark如何实现容错机制(面试常态题目)
Spark首先会在cache或persist中读取数据,如果没有读取到数据再次从Checpoint中读取数据,如果在HDFS中没有读取到Checkponit重新执行RDD的chain的计算本地执行结果: