Spark(13) -- DAG的生成以及Shuffle的过程

    科技2022-07-17  143

    1. 什么是DAG

     DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就形成了 DAG,根据RDD之间依赖关系的不同将DAG划分成不同的Stage(调度阶段)。对于窄依赖, partition的转换处理在一个Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在 parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据

    DAG的边界 开始:通过SparkContext创建的RDD结束:触发Action,一旦触发Action就形成了一个完整的DAG 一个job和一个DAG有什么样的关系? 触发一个job形成一个DAG

    注意:

    一个Spark应用中可以有一到多个DAG,取决于触发了多少次Action一个DAG中会有不同的阶段/stage,划分阶段/stage的依据就是宽依赖一个阶段/stage中可以有多个Task,一个分区对应一个Task

    2. DAG划分Stage

    为什么要划分Stage? --并行计算  一个复杂的业务逻辑如果有shuffle,那么就意味着前面阶段产生结果后,才能执行下一个阶段,即下一个阶段的计算要依赖上一个阶段的数据。那么我们按照shuffle进行划分(也就是按照宽依赖就行划分),就可以将一个DAG划分成多个Stage/阶段,在同一个Stage中,会有多个算子操作,可以形成一个pipeline流水线,流水线内的多个平行的分区可以并行执行。  Pipeline:HDFS----textRDD----splitRDD-----tupleRDD

    如何划分DAG的stage  对于窄依赖,partition的转换处理在stage中完成计算,不划分(将窄依赖尽量放在在同一个stage中,可以实现流水线计算)  对于宽依赖,由于有shuffle的存在,只能在父RDD处理完成后,才能开始接下来的计算, 也就是说需要要划分stage

    总结 Spark会根据shuffle/宽依赖使用回溯算法来对DAG进行Stage划分,从后往前,遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入到当前的stage/阶段中 具体的划分算法请参见AMP实验室发表的论文 《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》

    3. Shuffle的过程

    spark的所有配置项 Spark3.0.1-官网configuration说明

     在spark的stage划分当中,宽依赖之间会划分stage,而Stage之间就是Shuffle,如图中的stage0,stage1和stage3之间就会产生Shuffle  在Spark的中,负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager,也即shuffle管理器。ShuffleManager随着Spark的发展有两种实现的方式,分别为HashShuffleManager和SortShuffleManager,因此spark的Shuffle有Hash Shuffle和Sort Shuffle两种。  在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager。该ShuffleManager而HashShuffleManager有着一个非常严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操作影响了性能。因此在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。SortShuffleManager相较于HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可

    3.1 HashShuffle图解

    Shuffle阶段划分: shuffle write:mapper阶段,上一个stage得到最后的结果写出 shuffle read :reduce阶段,下一个stage拉取上一个stage进行合并 1)未经优化的hashShuffleManager:  首先1个cpu里面建议设置2到3倍的task,根据下游的task决定生成几个文件,先生成缓冲区文件在写入磁盘文件,在将block文件进行合并。对相同的key执行hash算法,从而将相同的key都写入到一个磁盘文件中,而每一个磁盘文件都只属于下游stage的一个task。每个文件中只存储key取hash之后相同的数据,导致了当下游的task任务过多的时候,上游会堆积大量的小文件。  HashShuffle是根据task的计算结果的key值的hashcode%ReduceTask来决定放入哪一个区分,这样保证相同的数据一定放入一个分区,Hash Shuffle过程如下:  根据下游的task决定生成几个文件,先生成缓冲区文件在写入磁盘文件,再将block文件进行合并。  未经优化的shuffle write操作所产生的磁盘文件的数量是极其惊人的。提出如下解决方案 2)经过优化的hashShuffleManager:  在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了。此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,每一个Group磁盘文件的数量与下游stage的task数量是相同的。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内。当Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的shuffleFileGroup,包括其中的磁盘文件。也就是说,此时task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因此,consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。  设置配置consolidateFiles调整为true。(当然,默认是开启的)  开启consolidate机制之后,在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了。此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件。 总结: 未经优化: 上游的task数量:m 下游的task数量:n 上游的executor数量:k (m>=k) 总共的磁盘文件:m*n

    优化之后的: 上游的task数量:m 下游的task数量:n 上游的executor数量:k (m>=k) 总共的磁盘文件:k*n

    4. SortShuffleManager基本介绍

     SortShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行 机制。当shuffle read task的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为200M),就会启用bypass机制。

    4.1 SortShuffle的普通机制

     (1)该模式下,数据会先写入一个内存数据结构中(默认5M),此时根据不同的shuffle算子,可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。 接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。 注意:  (2)shuffle中的定时器:定时器会检查内存数据结构的大小,如果内存数据结构空间不够,那么会申请额外的内存,申请的大小满足如下公式:  applyMemory=nowMenory * 2 - oldMemory  申请的内存=当前的数据内存情况 * 2 - 上一次的内存情况  意思就是说内存数据结构的大小的动态变化,如果存储的数据超出内存数据结构的大小,将申请内存数据结构存储的数据 * 2 - 内存数据结构的设定值的内存大小空间。申请到了,内存数据结构的大小变大,内存不够,申请不到,则发生溢写。

    (3)排序  在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序

    (4)溢写  排序过后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。写入磁盘文件是通过Java的BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。

    (5)merge  一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个task就只对应一个磁盘文件,也就意味着该task为Reduce端的stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。  SortShuffleManager由于有一个磁盘文件merge的过程,因此大大减少了文件数量。比如第一个stage有50个task,总共有10个Executor,每个Executor执行5个task,而第二个stage有100个task。由于每个task最终只有一个磁盘文件,因此此时每个Executor上只有5个磁盘文件,所有Executor只有50个磁盘文件。

    4.2 Sort shuffle的bypass机制

    bypass运行机制的触发条件如下:

    1)shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。2)不是聚合类的shuffle算子(比如reduceByKey)。  此时task会为每个reduce端的task都创建一个临时磁盘文件,并将数据按key进行hash,然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。  该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。  而该机制与普通SortShuffleManager运行机制的不同在于:  第一,磁盘写机制不同;  第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

    总结: SortShuffle也分为普通机制和bypass机制普通机制在内存数据结构(默认为5M)完成排序,会产生2M个磁盘小文件。而当shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。或者算子不是聚合类的shuffle算子(比如reduceByKey)的时候会触发SortShuffle的bypass机制,SortShuffle的bypass机制不会进行排序,极大的提高了其性能。

    5. Shuffle的配置选项

    spark的shuffle配置选项 shuffle-behavior

    下面是不全截图:用到的时候查查吧,解释很详细 Shuffle阶段划分: shuffle write:mapper阶段,上一个stage得到最后的结果写出 shuffle read :reduce阶段,下一个stage拉取上一个stage进行合并

    (1)SortShuffleManager-普通机制: 1,在普通模式下,数据会先写入一个内存数据结构中,此时根据不同的shuffle算子,可以选用不同的数据结构。如果是由聚合操作的shuffle算子,就是用map的数据结构(边聚合边写入内存),如果是join的算子,就使用array的数据结构(直接写入内存)。 2,接着,每写一条数据进入内存数据结构之后,就会判断是否达到了某个临界值,如果达到了临界值的话,就会尝试的将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。 3,在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序,排序之后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批次1万条数据的形式分批写入磁盘文件,写入磁盘文件是通过Java的BufferedOutputStream实现的。首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。 4,此时task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写,会产生多个临时文件,最后会将之前所有的临时文件都进行合并,最后会合并成为一个大文件。最终只剩下两个文件,一个是合并之后的数据文件,一个是索引文件(标识了下游各个task的数据在文件中的start offset与end offset)。最终再由下游的task根据索引文件读取相应的数据文件。

    (2)SortShuffleManager-bypass机制: 此时task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。    bypass机制与普通SortShuffleManager运行机制的不同在于: 第一,磁盘写机制不同; 第二,不会进行排序。

    也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。    触发bypass机制的条件: shuffle map task的数量小于spark.shuffle.sort.bypassMergeThreshold参数的值(默认200)或者不是聚合类的shuffle算子(比如groupByKey)

    下面列出Shuffle类不同分类算子 去重 def distinct() def distinct(numPartitions: Int) 聚合 def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] def groupBy[K](f: T => K, p: Partitioner):RDD[(K, Iterable[V])] def groupByKey(partitioner: Partitioner):RDD[(K, Iterable[V])] def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner): RDD[(K, U)] def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int): RDD[(K, U)] def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)] def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)] def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)] 排序 def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)] def sortBy[K](f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] 重分区 def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty) def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null) 集合或者表操作 def intersection(other: RDD[T]): RDD[T] def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] def intersection(other: RDD[T], numPartitions: Int): RDD[T] def subtract(other: RDD[T], numPartitions: Int): RDD[T] def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)] def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)] def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)] def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

    (4)spark 的shuffle调优: 主要是调整缓冲的大小,拉取次数重试重试次数与等待时间,内存比例分配,是否进行排序操作等等 spark.shuffle.file.buffer 参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小(默认是32K)。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。 spark.reducer.maxSizeInFlight : 参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。 调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。 spark.shuffle.io.maxRetries and spark.shuffle.io.retryWait: spark.shuffle.io.maxRetries :shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。(默认是3次) spark.shuffle.io.retryWait:该参数代表了每次重试拉取数据的等待间隔。(默认为5s) 调优建议:一般的调优都是将重试次数调高,不调整时间间隔。 spark.shuffle.memoryFraction: 参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例。 spark.shuffle.manager 参数说明:该参数用于设置shufflemanager的类型(默认为sort)。Spark1.5x以后有三个可选项: Hash:spark1.x版本的默认值,HashShuffleManager Sort:spark2.x版本的默认值,普通机制,当shuffle read task 的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数,自动开启bypass 机制

    spark.shuffle.sort.bypassMergeThreshold 参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作。 调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些 spark.shuffle.consolidateFiles: 参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,也就是开启优化后的HashShuffleManager。 调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。

    Processed: 0.011, SQL: 8