7.4 Sorted Based Shuffle
在历史的发展中,为什么Spark最终还是放弃了HashShuffle,使用了Sorted-Based Shuffle,而且作为后起之秀的Tungsten-based Shuffle到底是在什么样的背景下产生的。Tungsten-Sort Shuffle已经并入了Sorted-Based Shuffle,Spark的引擎会自动识别程序需要的是Sorted-Based Shuffle,还是Tungsten-Sort Shuffle,Spark会检查相对的应用程序有没有Aggregrate的操作。Sorted-Based Shuffle也有缺点,其缺点反而是它排序的特性,它强制要求数据在Mapper端必须先进行排序(注意,这里没有说对计算结果进行排序),所以导致它排序的速度有点慢。而Tungsten-Sort Shuffle对它的排序算法进行了改进,优化了排序的速度。
Spark会根据宽依赖把它一系列的算子划分成不同的Stage,Stage的内部会进行Pipeline、Stage与Stage之间进行Shuffle。Shuffle的过程包含三部分,如图7-6所示。
图7-6 Shuffle的过程示意图
第一部分是Shuffle的Writer;第二部分是网络传输;第三部分是Shuffle的Read,这三大部分设置了内存操作、磁盘I/O、网络I/O以及JVM的管理。而这些东西是影响了Spark应用程序95%以上效率的唯一原因。假设程序代码本身非常好,性能的95%都消耗在Shuffle阶段的本地写磁盘文件、网络传输数据以及抓取数据这样的生命周期中,如图7-7所示。
图7-7 Shuffle示意图
在Shuffle写数据的时候,内存中有一个缓存区叫Buffer,可以将其想像成一个Map,同时在本地磁盘有对应的本地文件。如果本地磁盘有文件,在内存中肯定也需要有对应的管理句柄。也就是说,单从ShuffleWriter内存占用的角度讲,已经有一部分内存空间用在存储Buffer数据,另一部分内存空间是用来管理文件句柄的,回顾HashShuffle所产生小文件的个数是Mapper分片数量×Reducer分片数量(M×R)。例如,Mapper端有1000个数据分片,Reducer端也有1000个数据分片,在HashShuffle的机制下,它在本地内存空间中会产生1000 ×1000=1000000个小文件,结果可想而知,这么多的I/O,这么多的内存消耗、这么容易产生OOM,以及这么沉重的CG负担。再说,如果Reducer端去读取Mapper端的数据时,Mapper端有这么多的小文件,要打开很多网络通道去读数据,打开1000000端口不是一件很轻松的事。这会导致一个非常经典的错误:Reducer端下一个Stage通过Driver去抓取上一个Stage属于它自己的数据的时候,说文件找不到。其实,这个时候不是真的在磁盘上找不到文件,而是程序不响应,因为它在进行垃圾回收(GC)操作。
Spark最根本要优化和迫切要解决的问题是:减少Mapper端ShuffleWriter产生的文件数量,这样便可以让Spark从几百台集群的规模瞬间变成可以支持几千台,甚至几万台集群的规模(一个Task背后可能是一个Core去运行,也可能是多个Core去运行,但默认情况下是用一个Core去运行一个Task)。
减少Mapper端的小文件带来的好处是:
(1)Mapper端的内存占用变少了。
(2)Spark不仅仅可以处理小规模的数据,即使处理大规模的数据,也不会很容易达到性能瓶颈。
(3)Reducer端抓取数据的次数变少了。
(4)网络通道的句柄变少了。
(5)不仅仅减少了数据级别内存的消耗,更极大减少了Spark框架运行时必须消耗Reducer的内容。
7.4.1 概述
Sorted-Based Shuffle的出现,最显著的优势是把Spark从只能处理中小规模数据的平台,变成可以处理无限大规模数据的平台。集群规模意味着Spark处理数据的规模,也意味着Spark的运算能力。
Sorted-Based Shuffle不会为每个Reducer中的Task生产一个单独的文件,相反,Sorted-Based Shuffle会把Mapper中每个ShuffleMapTask所有的输出数据Data只写到一个文件中,因为每个ShuffleMapTask中的数据会被分类,所以Sort-based Shuffle使用了index文件,存储具体ShuffleMapTask输出数据在同一个Data文件中是如何分类的信息。基于Sort-based Shuffle会在Mapper中的每个ShuffleMapTask中产生两个文件(并发度的个数×2),如图7-8所示。
图7-8 Sorted-Based Shuffle示意图
图7-8会产生一个Data文件和一个Index文件。其中,Data文件是存储当前Task的Shuffle输出的,而Index文件则存储了Data文件中的数据通过Partitioner的分类信息,此时下一个阶段的Stage中的Task就是根据这个Index文件获取自己所需要抓取的上一个Stage中ShuffleMapTask所产生的数据。
假设现在Mapper端有1000个数据分片,Reducer端也有1000个数据分片,它的并发度是100,使用Sorted-Based Shuffle会产生多少个Mapper端的小文件,答案是100×2 = 200个。它的MapTask会独自运行,每个MapTask在运行时写两个文件,运行成功后就不需要这个MapTask的文件句柄,无论是文件本身的句柄,还是索引的句柄,都不需要,所以如果它的并发度是100个Core,每次运行100个任务,它最终只会占用200个文件句柄,这与HashShuffle的机制不一样,HashShuffle最差的情况是Hashed句柄存储在内存中。
图7-9中,Sorted-Based Shuffle主要在Mapper阶段,这个跟Reducer端没有任何关系,在Mapper阶段,Sorted-Based Shuffle要进行排序,可以认为是二次排序,它的原理是有两个Key进行排序,第一个是PartitionId进行排序,第二个是本身数据的Key进行排序。它会把PartitionId分成3个,索引分别为0、1、2,这个在Mapper端进行排序的过程其实是让Reducer去抓取数据的时候变得更高效。例如,第一个Reducer,它会到Mapper端的索引为0的数据分片中抓取数据。具体而言,Reducer首先找Driver去获取父Stage中每个ShuffleMapTask输出的位置信息,根据位置信息获取Index文件,解析Index文件,从解析的Index文件中获取Data文件中属于自己的那部分内容。
图7-9 Sorted-Based Shuffle流程图
一个Mapper任务除了有一个数据文件外,它也会有一个索引文件,Map Task把数据写到文件磁盘的顺序是根据自身的Key写进去的,同时也是按照Partition写进去的,因为它是顺序写数据,记录每个Partition的大小。
Sort-Based Shuffle的弱点如下。
(1)如果Mapper中Task的数量过大,依旧会产生很多小文件,此时在Shuffle传数据的过程中到Reducer端,Reducer会需要同时大量地记录进行反序列化,导致大量内存消耗和GC负担巨大,造成系统缓慢,甚至崩溃!
(2)强制了在Mapper端必须要排序,这里的前提是数据本身不需要排序。
(3)如果在分片内也需要进行排序,此时需要进行Mapper端和Reducer端的两次排序。
(4)它要基于记录本身进行排序,这就是Sort-Based Shuffle最致命的性能消耗。
7.4.2 Sorted Based Shuffle内核
Sorted-Based Shuffle的核心是借助于ExternalSorter把每个ShuffleMapTask的输出排序到一个文件中(FileSegmentGroup),为了区分下一个阶段Reducer Task不同的内容,它还需要有一个索引文件(Index)来告诉下游Stage的并行任务,那一部分是属于下游Stage的,如图7-10所示。
图7-10 Sorted-Based Shuffle的核心示意图
图7-10中,在Reducer端有4个Reducer Task,它会产生一组File Group和一个索引文件,File Group里的FileSegement会进行排序,下游的Task很容易根据索引(index)定位到这个File中的那一部分。FileSegement是属于下游的,相当于一个指针,下游的Task要向Driver去确定文件在哪里,然后到这个File文件所在的地方,实际上会与BlockManager进行沟通,BlockManager首先会读一个Index文件,根据它的命名规则进行解析。例如,下一个阶段的第一个Task,一般就是抓取第一个Segment,这是一个指针定位的过程。
Sort-Based Shuffle最大的意义是减少临时文件的输出数量,且只会产生两个文件:一个是包含不同内容,划分成不同FileSegment构成的单一文件File;另外一个是索引文件Index。图7-10中,Sort-Based Shuffle展示了一个Sort and Spill的过程(它是Spill到磁盘的时候再进行排序的)。
7.4.3 Sorted Based Shuffle数据读写的源码解析
Sorted Based Shuffle,即基于Sorted的Shuffle实现机制,在该Shuffle过程中,Sorted体现在输出的数据会根据目标的分区Id(即带Shuffle过程的目标RDD中各个分区的Id值)进行排序,然后写入一个单独的Map端输出文件中。相应地,各个分区内部的数据并不会再根据Key值进行排序,除非调用带排序目的的方法,在方法中指定Key值的Ordering实例,才会在分区内部根据该Ordering实例对数据进行排序。当Map端的输出数据超过内存容纳大小时,会将各个排序结果Spill到磁盘上,最终再将这些Spill的文件合并到一个最终的文件中。在Spark的各种计算算子中到处体现了一种惰性的理念,在此也类似,在需要提升性能时,引入根据分区Id排序的设计,同时仅在指定分区内部排序的情况下,才会全局去排序。而Hadoop的MapReduce相比之下带有一定的学术气息,中规中矩,严格设计Shuffle阶段中的各个步骤。
基于Hash的Shuffle实现,ShuffleManager的具体实现子类为HashShuffleManager,对应的具体实现机制如7-11所示。
图7-11 基于Sorted的Shuffle实现机制的框架类图
在图7-11中,各个不同的ShuffleHandle与不同的具体Shuffle写入器实现子类是一一对应的,可以认为是通过注册时生成的不同ShuffleHandle设置不同的Shuffle写入器实现子类。
从ShuffleManager注册的配置属性与具体实现子类的映射关系,即前面提及的在SparkEnv中实例化的代码,可以看出sort与tungsten-sort对应的具体实现子类都是org.apache.spark.shuffle.sort.SortShuffleManager。也就是当前基于Sort的Shuffle实现机制与使用Tungsten项目的Shuffle实现机制都是通过SortShuffleManager类来提供接口,两种实现机制的区别在于,该类中使用了不同的Shuffle数据写入器。
SortShuffleManager根据内部采用的不同实现细节,对应有两种不同的构建Map端文件输出的写方式,分别为序列化排序模式与反序列化排序模式。
(1)序列化排序(Serialized sorting)模式:这种方式对应了新引入的基于Tungsten项目的方式。
(2)反序列化排序(Deserialized sorting)模式:这种方式对应除了前面这种方式之外的其他方式。
基于Sort的Shuffle实现机制采用的是反序列化排序模式。下面分析该实现机制下的数据写入器的实现细节。
基于Sort的Shuffle实现机制,具体的写入器的选择与注册得到的ShuffleHandle类型有关,参考SortShuffleManager类的registerShuffle方法,相关代码如下所示。
Spark 2.1.1版本的SortShuffleManager.scala的源码如下。
1. override def registerShuffle[K, V, C]( 2. shuffleId: Int, 3. numMaps: Int, 4. dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { 5. //通过shouldBypassMergeSort方法判断是否满足回退到Hash风格的Shuffle条件 6. if (SortShuffleWriter.shouldBypassMergeSort(SparkEnv.get.conf, dependency)) { 7. //如果当前的分区个数小于设置的配置属性: //spark.shuffle.sort.bypassMergeThreshold,同时不需要在Map对数据进行聚合, //此时可以直接写文件,并在最后将文件合并 8. 9. new BypassMergeSortShuffleHandle[K, V]( 10. shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) 11. } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) { 12. //否则,试图Map输出缓冲区的序列化形式,因为这杨更高效 13. new SerializedShuffleHandle[K, V]( 14. shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]]) 15. } else { 16. //否则,缓冲在反序列化形式Map输出 17. new BaseShuffleHandle(shuffleId, numMaps, dependency) 18. } 19. }
Spark 2.2.0版本的SortShuffleManager.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第6行shouldBypassMergeSort方法的第一个传入参数SparkEnv.get.conf微调为conf。
1. ...... 2. if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) { 3. ......
Sorted Based Shuffle写数据的源码解析如下。
基于Sort的Shuffle实现机制中相关的ShuffleHandle包含BypassMergeSortShuffleHandle与BaseShuffleHandle。对应这两种ShuffleHandle及其相关的Shuffle数据写入器类型的相关代码可以参考SortShuffleManager类的getWriter方法,关键代码如下所示。
SortShuffleManager的getWriter的源码如下。
1. override def getWriter[K, V]( 2. handle: ShuffleHandle, 3. mapId: Int, 4. context: TaskContext): ShuffleWriter[K, V] = { 5. numMapsForShuffle.putIfAbsent( 6. handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps) 7. val env = SparkEnv.get 8. //通过对ShuffleHandle类型的模式匹配,构建具体的数据写入器 9. handle match { 10. case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] => 11. new UnsafeShuffleWriter( 12. env.blockManager, 13. shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver], 14. context.taskMemoryManager(), 15. unsafeShuffleHandle, 16. mapId, 17. context, 18. env.conf) 19. case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] => 20. new BypassMergeSortShuffleWriter( 21. env.blockManager, 22. shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver], 23. bypassMergeSortHandle, 24. mapId, 25. context, 26. env.conf) 27. case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] => 28. new SortShuffleWriter(shuffleBlockResolver, other, mapId, context) 29. } 30. }
在对应构建的两种数据写入器类BypassMergeSortShuffleWriter与SortShuffleWriter中,都是通过变量shuffleBlockResolver对逻辑数据块与物理数据块的映射进行解析,而该变量使用的是与基于Hash的Shuffle实现机制不同的解析类,即当前使用的IndexShuffleBlockResolver。
下面开始解析这两种写数据块方式的源码实现。
1.BypassMergeSortShuffleWriter写数据的源码解析
该类实现了带Hash风格的基于Sort的Shuffle机制,为每个Reduce端的任务构建一个输出文件,将输入的每条记录分别写入各自对应的文件中,并在最后将这些基于各个分区的文件合并成一个输出文件。
在Reducer端任务数比较少的情况下,基于Hash的Shuffle实现机制明显比基于Sort的Shuffle实现机制要快,因此基于Sort的Shuffle实现机制提供了一个fallback方案,对于Reducer端任务数少于配置属性spark.shuffle.sort.bypassMergeThreshold设置的个数时,使用带Hash风格的fallback计划,由BypassMergeSortShuffleWriter具体实现。
使用该写入器的条件如下所示:
(1)不能指定Ordering,从前面数据读取器的解析可以知道,当指定Ordering时,会对分区内部的数据进行排序。因此,对应的BypassMergeSortShuffleWriter写入器避免了排序开销。
(2)不能指定Aggregator。
(3)分区个数小于spark.shuffle.sort.bypassMergeThreshold配置属性指定的个数。
和其他ShuffleWriter的具体子类一样,BypassMergeSortShuffleWriter写数据的具体实现位于实现的write方法中,关键代码如下所示。
BypassMergeSortShuffleWriter.scala的write的源码如下。
1. public void write(Iterator<Product2<K, V>> records) throws IOException { 2. //为每个Reduce端的分区打开的DiskBlockObjectWriter存放于partitionWriters, //需要根据具体Reduce端的分区个数进行构建 3. 4. 5. assert (partitionWriters == null); 6. if (!records.hasNext()) { 7. partitionLengths = new long[numPartitions]; 8. //初始化索引文件的内容,此时对应各个分区的数据量或偏移量需要在后续获取分区的真实 //数据量时重写 9. 10. shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null); 11. //下面代码的调用形式是对应在Java类中调用Scala提供的object中的apply方法 //的形式,是由编译器编译Scala中的object得到的结果来决定的 12 13. 14. mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); 15. return; 16. } 17. final SerializerInstance serInstance = serializer.newInstance(); 18. final long openStartTime = System.nanoTime(); 19. //对应每个分区各配置一个磁盘写入器DiskBlockObjectWriter 20. partitionWriters = new DiskBlockObjectWriter[numPartitions]; 21. partitionWriterSegments = new FileSegment[numPartitions]; 22. //注意,在该写入方式下,会同时打开numPartitions个DiskBlockObjectWriter, //因此对应的分区数不应设置过大,避免带来过大的内存开销目前对应 DiskBlock- //ObjectWriter的缓存大小默认配置为32KB,比早先的100KB降低了很多,但也说明 //不适合同时打开太多的DiskBlockObjectWriter实例 23. for (int i = 0; i < numPartitions; i++) { 24. final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile = 25. blockManager.diskBlockManager().createTempShuffleBlock(); 26. final File file = tempShuffleBlockIdPlusFile._2(); 27. final BlockId blockId = tempShuffleBlockIdPlusFile._1(); 28. partitionWriters[i] = 29. blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics); 30. } 31. 32. //创建文件写入和创建磁盘写入器都涉及与磁盘的交互,当打开许多文件时,磁盘写会花费 //很长时间,所以磁盘写入时间应包含在Shuffle写入时间内 33. 34. writeMetrics.incWriteTime(System.nanoTime() - openStartTime); 35. //读取每条记录,并根据分区器将该记录交由分区对应的DiskBlockObjectWriter, //写入各自对应的临时文件中 36. 37. while (records.hasNext()) { 38. final Product2<K, V> record = records.next(); 39. final K key = record._1(); 40. partitionWriters[partitioner.getPartition(key)].write(key, record._2()); 41. } 42. 43. for (int i = 0; i < numPartitions; i++) { 44. final DiskBlockObjectWriter writer = partitionWriters[i]; 45. partitionWriterSegments[i] = writer.commitAndGet(); 46. writer.close(); 47. } 48. //获取最终合并后的文件名,对应格式为:"shuffle_" + shuffleId + "_" + mapId // + "_" + reduceId + ".index", 并且其中的 reduceId 为0,对应的含义就是 //该文件包含所有为Reduce端输出的数据 49. File output = shuffleBlockResolver.getDataFile(shuffleId, mapId); 50. File tmp = Utils.tempFileWith(output); 51. try { 52. //在此合并前面生成的各个中间临时文件,并获取各个分区对应的数据量,由数据量可以得 //到对应的偏移量 53. 54. partitionLengths = writePartitionedFile(tmp); 55. //主要是根据前面获取的数据量,重写Index文件中的偏移量信息 56. shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp); 57. } finally { 58. if (tmp.exists() && !tmp.delete()) { 59. logger.error("Error while deleting temp file {}", tmp.getAbsolutePath()); 60. } 61. } 62. //封装并返回任务结果 63. mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); 64. }
其中调用的createTempShuffleBlock方法描述了各个分区生成的中间临时文件的格式与对应的BlockId,具体代码如下所示。
DiskBlockManager的createTempShuffleBlock的源码如下。
1. /**中间临时文件名的格式由前缀temp_shuffle_与randomUUID组成,可以唯一标识 BlockId*/ 2. def createTempShuffleBlock(): (TempShuffleBlockId, File) = { 3. var blockId = new TempShuffleBlockId(UUID.randomUUID()) 4. while (getFile(blockId).exists()) { 5. blockId = new TempShuffleBlockId(UUID.randomUUID()) 6. } 7. (blockId, getFile(blockId)) 8. }
从上面的分析中可以知道,每个Map端的任务最终会生成两个文件,即数据(Data)文件和索引(Index)文件。
另外,使用DiskBlockObjectWriter写记录时,是以32条记录批次写入的,不会占用太大的内存。但由于对应不能指定聚合器(Aggregator),写数据时也是直接写入记录,因此对应后续的网络I/O的开销也会很大。
2.SortShuffleWriter写数据的源码解析
前面BypassMergeSortShuffleWriter的写数据是在Reducer端的分区个数较少的情况下提供的一种优化方式,但当数据集规模非常大时,使用该写数据方式不合适时,就需要使用SortShuffleWriter来写数据块。
和其他ShuffleWriter的具体子类一样,SortShuffleWriter写数据的具体实现位于实现的write方法中,关键代码如下所示。
SortShuffleWriter的write的源码如下。
1. override def write(records: Iterator[Product2[K, V]]): Unit = { 2. //当需要在Map端进行聚合操作时,此时将会指定聚合器(Aggregator) 3. //将Key值的Ordering传入到外部排序器ExternalSorter中 4. sorter = if (dep.mapSideCombine) { 5. require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") 6. new ExternalSorter[K, V, C]( 7. context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) 8. } else { 9. //没有指定Map端使用聚合时,传入ExternalSorter的聚合器(Aggregator) //与Key值的Ordering都设为None,即不需要传入,对应在Reduce端读取数据 //时才根据聚合器分区数据进行聚合,并根据是否设置Ordering而选择是否对分区 //数据进行排序 10. new ExternalSorter[K, V, V]( 11. context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer) 12. } 13. //将写入的记录集全部放入外部排序器 14. sorter.insertAll(records) 15. 16. //不要费心在Shuffle写时间中,包括打开合并输出文件的时间,因为它只打开一个文件, //所以通常太快,无法精确测量(见Spark-3570) 17. //和BypassMergeSortShuffleWriter一样,获取输出文件名和BlockId 18. val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId) 19. val tmp = Utils.tempFileWith(output) 20. try { 21. val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) 22. //将分区数据写入文件,返回各个分区对应的数据量 23. val partitionLengths = sorter.writePartitionedFile(blockId, tmp) 24. //和BypassMergeSortShuffleWriter一样,更新索引文件的偏移量信息 25. shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp) 26. mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) 27. } finally { 28. if (tmp.exists() && !tmp.delete()) { 29. logError(s"Error while deleting temp file ${tmp.getAbsolutePath}") 30. } 31. } 32. }
在这种基于Sort的Shuffle实现机制中引入了外部排序器(ExternalSorter)。ExternalSorter继承了Spillable,因此内存使用达到一定阈值时,会Spill到磁盘,可以减少内存带来的开销。
外部排序器的insertAll方法内部在处理完(包含聚合和非聚合两种方式)每条记录时,都会检查是否需要Spill。内部各种细节比较多,这里以Spill条件判断为主线,简单描述一下条件相关的代码。具体判断是否需要Spill的相关代码可以参考Spillable类中的maybeSpill方法(该方法的简单调用流程为:ExternalSorter #insterAll–>ExternalSorter #maybeSpillCollection ->Spillable#maybeSpill),关键代码如下所示。
Spillable的maybeSpill的源码如下。
1. protected def maybeSpill(collection: C, currentMemory: Long): Boolean = { 2. //判断是否需要Spill 3. var shouldSpill = false 4. //1. 检查当前记录数是否是32的倍数——即对小批量的记录集进行Spill 5. //2. 同时,当前需要的内存大小是否达到或超过了当前分配的内存阈值 6. if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) { 7. //从Shuffle内存池中获取当前内存的两倍 8. val amountToRequest = 2 * currentMemory - myMemoryThreshold 9. //实际上会先申请内存,然后再次判断,最后决定是否Spill 10. val granted = acquireMemory(amountToRequest) 11. myMemoryThreshold += granted 12. 13. //内存很少时,如果准许内存进一步增长(tryToAcquire返回0,或者比 //myMemoryThreshold更多的内存),当前的collection将会溢出 14. shouldSpill = currentMemory >= myMemoryThreshold 15. } 16. //当满足下列条件之一时,需要Spill,条件如下所示: 17. //1. 当前判断结果为true 18. //2. 从上次Spill之后所读取的记录数超过配置的阈值时 19. //配置属性为:spark.shuffle.spill.numElementsForceSpillThreshold 20. shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold 21. //Actually spill 22. if (shouldSpill) { 23. _spillCount += 1 24. logSpillage(currentMemory) 25. spill(collection) 26. _elementsRead = 0 27. _memoryBytesSpilled += currentMemory 28. releaseMemory() 29. } 30. shouldSpill 31. }
对于外部排序器,除了insertAll方法外,它的writePartitionedFile方法也非常重要。
ExternalSorter.scala的writePartitionedFile的源码如下。
1. def writePartitionedFile( 2. blockId: BlockId, 3. outputFile: File): Array[Long] = {
其中,BlockId是数据块的逻辑位置,File参数是对应逻辑位置的物理存储位置。这两个参数值的获取方法和使用BypassMergeSortShuffleHandle及其对应的ShuffleWriter是一样的。
在该方法中,有一个容易混淆的地方,与Shuffle的度量(Metric)信息有关,对应代码如下所示。
1. context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled) 2. context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled)
其中,第1行对应修改了Spilled的数据在内存中的字节大小,第2行则对应修改了Spilled的数据在磁盘中的字节大小。在内存中时,数据是以反序列化形式存放的,而存储到磁盘(默认会序列化)时,会对数据进行序列化。反序列化后的数据会远远大于序列化后的数据(也可以通过UI界面查看这两个度量信息的大小差异来确认,具体差异的大小和数据以及选择的序列化器有关,有兴趣的读者可以参考各序列器间的性能等比较文档)。
从这一点也可以看出,如果在内存中使用反序列化的数据,会大大增加内存的开销(也意味着增加GC负载),并且反序列化也会增加CPU的开销,因此引入了利用Tungsten项目的基于Tungsten Sort的Shuffle实现机制。Tungsten项目的优化主要有三个方面,这里从避免反序列化的数据量会极大消耗内存这方面考虑,主要是借助Tungsten项目的内存管理模型,可以直接处理序列化的数据;同时,CPU开销方面,直接处理序列化数据,可以避免数据反序列化的这部分处理开销。