7.2 Shuffle的框架
本节讲解Shuffle的框架、Shuffle的框架内核、Shuffle数据读写的源码解析。Spark Shuffle从基于Hash的Shuffle,引入了Shuffle Consolidate机制(即文件合并机制),演进到基于Sort的Shuffle实现方式。随着Tungsten计划的引入与优化,引入了基于Tungsten-Sort的Shuffle实现方式。
7.2.1 Shuffle的框架演进
Spark的Shuffle框架演进历史可以从框架本身的演进、Shuffle具体实现机制的演进两部分进行解析。
框架本身的演进可以从面向接口编程的原则出发,结合Build设计模式进行理解。整个Spark的Shuffle框架从Spark 1.1版本开始,提供便于测试、扩展的可插拔式框架。
而对应Shuffle的具体实现机制的演进部分,可以跟踪Shuffle实现细节在各个版本中的变更。具体体现在Shuffle数据的写入或读取,以及读写相关的数据块解析方式。下面简单描述一下整个演进过程。
在Spark 1.1之前,Spark中只实现了一种Shuffle方式,即基于Hash的Shuffle。在基于Hash的Shuffle的实现方式中,每个Mapper阶段的Task都会为每个Reduce阶段的Task生成一个文件,通常会产生大量的文件(即对应为M×R个中间文件,其中,M表示Mapper阶段的Task个数,R表示Reduce阶段的Task个数)。伴随大量的随机磁盘I/O操作与大量的内存开销。
为了缓解上述问题,在Spark 0.8.1版本中为基于Hash的Shuffle的实现引入了Shuffle Consolidate机制(即文件合并机制),将Mapper端生成的中间文件进行合并的处理机制。通过将配置属性spark.shuffle.consolidateFiles设置为true,减少中间生成的文件数量。通过文件合并,可以将中间文件的生成方式修改为每个执行单位(类似于Hadoop的Slot)为每个Reduce阶段的Task生成一个文件。其中,执行单位对应为:每个Mapper阶段的Cores数/每个Task分配的Cores数(默认为1)。最终可以将文件个数从M×R修改为E×C/T×R,其中,E表示Executors个数,C表示可用Cores个数,T表示Task分配的Cores个数。
基于Hash的Shuffle的实现方式中,生成的中间结果文件的个数都会依赖于Reduce阶段的Task个数,即Reduce端的并行度,因此文件数仍然不可控,无法真正解决问题。为了更好地解决问题,在Spark 1.1版本引入了基于Sort的Shuffle实现方式,并且在Spark 1.2版本之后,默认的实现方式也从基于Hash的Shuffle,修改为基于Sort的Shuffle实现方式,即使用的ShuffleManager从默认的hash修改为sort。首先,每个Mapper阶段的Task不会为每个Reduce阶段的Task生成一个单独的文件;而是全部写到一个数据(Data)文件中,同时生成一个索引(Index)文件,Reduce阶段的各个Task可以通过该索引文件获取相关的数据。避免产生大量文件的直接收益就是降低随机磁盘I/O与内存的开销。最终生成的文件个数减少到2M,其中M表示Mapper阶段的Task个数,每个Mapper阶段的Task分别生成两个文件(1个数据文件、1个索引文件),最终的文件个数为M个数据文件与M个索引文件。因此,最终文件个数是2×M个。
随着Tungsten计划的引入与优化,从Spark 1.4版本开始(Tungsten计划目前在Spark 1.5与Spark 1.6两个版本中分别实现了第一与第二两个阶段),在Shuffle过程中也引入了基于Tungsten-Sort的Shuffle实现方式,通过Tungsten项目所做的优化,可以极大提高Spark在数据处理上的性能。
为了更合理、更高效地使用内存,在Spark的Shuffle实现方式演进过程中,引进了外部排序等处理机制(针对基于Sort的Shuffle机制。基于Hash的Shuffle机制从最原始的全部放入内存改为记录级写入)。同时,为了保存Shuffle结果提高性能以及支持资源动态分配等特性,也引进了外部Shuffle服务等机制。
7.2.2 Shuffle的框架内核
Shuffle框架的设计可以从两方面理解:一方面,为了Shuffle模块更加内聚并与其他模块解耦;另一方面,为了更方便替换、测试、扩展Shuffle的不同实现方式。从Spark 1.1版本开始,引进了可插拔式的Shuffle框架(通过将Shuffle相关的实现封装到一个统一的对外接口,提供一种具体实现可插拔的框架)。Spark框架中,通过ShuffleManager来管理各种不同实现机制的Shuffle过程,由ShuffleManager统一构建、管理具体实现子类来实现Shuffle框架的可插拔的Shuffle机制。
在详细描述Shuffle框架实现细节之前,先给出可插拔式Shuffle的整体架构的类图,如图7-2所示。
图7-2 可插拔式Shuffle的整体架构的类图
在DAG的调度过程中,Stage阶段的划分是根据是否有Shuffle过程,也就是当存在ShuffleDependency的宽依赖时,需要进行Shuffle,这时会将作业(Job)划分成多个Stage。对应地,在源码实现中,通过在划分Stage的关键点——构建ShuffleDependency时——进行Shuffle注册,获取后续数据读写所需的ShuffleHandle。
Stage阶段划分后,最终每个作业(Job)提交后都会对应生成一个ResultStage与若干个ShuffleMapStage,其中ResultStage表示生成作业的最终结果所在的Stage。ResultStage与ShuffleMapStage中的Task分别对应了ResultTask与ShuffleMapTask。一个作业,除了最终的ResultStage,其他若干ShuffleMapStage中的各个ShuffleMapTask都需要将最终的数据根据相应的分区器(Partitioner)对数据进行分组(即将数据重组到新的各个分区中),然后持久化分组后的数据。对应地,每个RDD本身记录了它的数据来源,在计算(compute)时会读取所需数据,对于带有宽依赖的RDD,读取时会获取在ShuffleMapTask中持久化的数据。
从图7-2中可以看到,外部宽依赖相关的RDD与ShuffleManager之间的注册交互,通过该注册,每个RDD自带的宽依赖(ShuffleDependency)内部会维护Shuffle的唯一标识信息ShuffleId以及与Shuffle过程具体读写相关的句柄ShuffleHandle,后续在ShuffleMapTask中启动任务(Task)的运行时,可以通过该句柄获取相关的Shuffle写入器实例,实现具体的数据磁盘写操作。
而在带有宽依赖(ShuffleDependency)的RDD中,执行compute时会去读取上一Stage为其输出的Shuffle数据,此时同样会通过该句柄获取相关的Shuffle读取器实例,实现具体数据的读取操作。需要注意的是,当前Shuffle的读写过程中,与BlockManager的交互,是通过MapOutputTracker来跟踪Shuffle过程中各个任务的输出数据的。在任务完成等场景中,会将对应的MapStatus信息注册到MapOutputTracker中,而在compute数据读取过程中,也会通过该跟踪器来获取上一Stage的输出数据在BlockManager中的位置,然后通过getReader得到的数据读取器,从这些位置中读取数据。
目前对Shuffle的输出进行跟踪的MapOutputTracker并没有和Shuffle数据读写类一样,也封装到Shuffle的框架中。如果从代码聚合与解耦等角度出发,也可以将MapOutputTracker合并到整个Shuffle框架中,然后在Shuffle写入器输出数据之后立即进行注册,在数据读取器读取数据前获取位置等(但对应的DAG等调度部分,也需要进行修改)。
ShuffleManager封装了各种Shuffle机制的具体实现细节,包含的接口与属性如下所示。
(1)registerShuffle:每个RDD在构建它的父依赖(这里特指ShuffleDependency)时,都会先注册到ShuffleManager,获取ShuffleHandler,用于后续数据块的读写等。
(2)getWriter:可以通过ShuffleHandler获取数据块写入器,写数据时通过Shuffle的块解析器shuffleBlockResolver,获取写入位置(通常将写入位置抽象为Bucket,位置的选择则由洗牌的规则,即Shuffle的分区器决定),然后将数据写入到相应位置(理论上,位置可以位于任何能存储数据的地方,包括磁盘、内存或其他存储框架等,目前在可插拔框架的几种实现中,Spark与Hadoop一样都采用磁盘的方式进行存储,主要目的是为了节约内存,同时提高容错性)。
(3)getReader:可以通过ShuffleHandler获取数据块读取器,然后通过Shuffle的块解析器shuffleBlockResolver,获取指定数据块。
(4)unregisterShuffle:与注册对应,用于删除元数据等后续清理操作。
(5)shuffleBlockResolver:Shuffle的块解析器,通过该解析器,为数据块的读写提供支撑层,便于抽象具体的实现细节。
7.2.3 Shuffle框架的源码解析
用户可以通过自定义ShuffleManager接口,并通过指定的配置属性进行设置,也可以通过该配置属性指定Spark已经支持的ShuffleManager具体实现子类。
在SparkEnv源码中可以看到设置的配置属性,以及当前在Spark的ShuffleManager可插拔框架中已经提供的ShuffleManager具体实现。Spark 2.0版本中支持sort、tungsten-sort两种方式。
Spark 2.1.1版本的SparkEnv.scala的源码如下。
1. //用户可以通过短格式的命名来指定所使用的ShuffleManager 2. val shortShuffleMgrNames = Map( 3. "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager] .getName, "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort. SortShuffleManager].getName) 4. 5. //指定ShuffleManager的配置属性:"spark.shuffle.manager" 6. //默认情况下使用"sort",即SortShuffleManager的实现 7. val shuffleMgrName = conf.get("spark.shuffle.manager", "sort") 8. val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName. toLowerCase, shuffleMgrName) 9. val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
Spark 2.2.0版本的SparkEnv.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第8行调用toLowerCase小写转换方法,设置Locale.ROOT区域表示。root locale是一个区域设置,其语言、地区、变量都设置为空("")字符串。
1. ...... 2. val shuffleMgrClass = 3. shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase (Locale.ROOT), shuffleMgrName) 4. .......
从代码中可以看出,ShuffleManager是Spark Shuffle系统提供的一个可插拔式接口,可以通过spark.shuffle.manager配置属性来设置自定义的ShuffleManager。
在Driver和每个Executor的SparkEnv实例化过程中,都会创建一个ShuffleManager,用于管理块数据,提供集群块数据的读写,包括数据的本地读写和读取远程节点的块数据。
Shuffle系统的框架可以以ShuffleManager为入口进行解析。在ShuffleManager中指定了整个Shuffle框架使用的各个组件,包括如何注册到ShuffleManager,以获取一个用于数据读写的处理句柄ShuffleHandle,通过ShuffleHandle获取特定的数据读写接口:ShuffleWriter与ShuffleReader,以及如何获取块数据信息的解析接口ShuffleBlockResolver。下面通过源码分别对这几个比较重要的组件进行解析。
1.ShuffleManager的源码解析
由于ShuffleManager是Spark Shuffle系统提供的一个可插拔式接口,提供具体实现子类或自定义具体实现子类时,都需要重写ShuffleManager类的抽象接口。下面首先分析ShuffleManager的源码。
ShuffleManager.scala的源码如下。
1. 2. //Shuffle系统的可插拔接口。在Driver和每个Executor的SparkEnv实例中创建 3. private[spark] trait ShuffleManager { 4. 5. /** 6. *在Driver端向ShuffleManager注册一个Shuffle,获取一个Handle 7. *在具体Tasks中会通过该Handle来读写数据 8. */ 9. def registerShuffle[K, V, C]( 10. shuffleId: Int, 11. numMaps: Int, 12. dependency: ShuffleDependency[K, V, C]): ShuffleHandle 13. 14. /** *获取对应给定的分区使用的ShuffleWriter,该方法在Executors上执行各个Map *任务时调用 15. */ 16. def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext): ShuffleWriter[K, V] 17. /** * 获取在Reduce阶段读取分区的ShuffleReader,对应读取的分区由[startPartition * to endPartition-1]区间指定。该方法在Executors上执行,在各个Reduce任务时调用 * 18 */ 19. def getReader[K, C]( 20 21. handle: ShuffleHandle, 22. startPartition: Int, 23. endPartition: Int, 24. context: TaskContext): ShuffleReader[K, C] 25. 26. /** 27. *该接口和registerShuffle分别负责元数据的取消注册与注册 28. *调用unregisterShuffle接口时,会移除ShuffleManager中对应的元数据信息 29. */ 30. def unregisterShuffle(shuffleId: Int): Boolean 31. 32. /** *返回一个可以基于块坐标来获取Shuffle 块数据的ShuffleBlockResolver 33. */ 34. def shuffleBlockResolver: ShuffleBlockResolver 35. 36. /**终止ShuffleManager */ 37. def stop(): Unit 38. }
2.ShuffleHandle的源码解析
1. abstract class ShuffleHandle(val shuffleId: Int) extends Serializable {}
ShuffleHandle比较简单,用于记录Task与Shuffle相关的一些元数据,同时也可以作为不同具体Shuffle实现机制的一种标志信息,控制不同具体实现子类的选择等。
3.ShuffleWriter的源码解析
ShuffleWriter.scala的源码如下。
1. private[spark] abstract class ShuffleWriter[K, V] { 2. /** Write a sequence of records to this task's output */ 3. @throws[IOException] 4. def write(records: Iterator[Product2[K, V]]): Unit 5. 6. /** Close this writer, passing along whether the map completed */ 7. def stop(success: Boolean): Option[MapStatus] 8. }
继承ShuffleWriter的每个具体子类会实现write接口,给出任务在输出时的写记录的具体方法。
4.ShuffleReader的源码解析
ShuffleReader.scala的源码如下。
1. private[spark] trait ShuffleReader[K, C] { 2. /** Read the combined key-values for this reduce task */ 3. def read(): Iterator[Product2[K, C]]
继承ShuffleReader的每个具体子类会实现read接口,计算时负责从上一阶段Stage的输出数据中读取记录。
5.ShuffleBlockResolver的源码解析
ShuffleBlockResolver的源码如下。
1. /** *该特质的具体实现子类知道如何通过一个逻辑Shuffle块标识信息来获取一个块数据。具体 *实现可以使用文件或文件段来封装Shuffle的数据。这是获取Shuffle块数据时使用的抽 *象接口,在BlockStore中使用 2. */ 3. 4. 5. trait ShuffleBlockResolver { 6. type ShuffleId = Int 7. 8. /** *获取指定块的数据。如果指定块的数据无法获取,则抛出异常 9. */ 10. def getBlockData(blockId: ShuffleBlockId): ManagedBuffer 11. 12. def stop(): Unit 13. }
继承ShuffleBlockResolver的每个具体子类会实现getBlockData接口,给出具体的获取块数据的方法。
目前在ShuffleBlockResolver的各个具体子类中,除给出获取数据的接口外,通常会提供如何解析块数据信息的接口,即提供了写数据块时的物理块与逻辑块之间映射关系的解析方法。
7.2.4 Shuffle数据读写的源码解析
1.Shuffle写数据的源码解析
从Spark Shuffle的整体框架中可以看到,ShuffleManager提供了Shuffle相关数据块的写入与读取,即对应的接口getWriter与getReader。
在解析Shuffle框架数据读取过程中,可以构建一个具有ShuffleDependency的RDD,查看执行过程中,Shuffle框架中的数据读写接口getWriter与getReader如何使用,通过这种具体案例的方式来加深对源码的理解。
Spark中Shuffle具体的执行机制可以参考本书的其他章节,在此仅分析与Shuffle直接相关的内容。通过DAG调度机制的解析,可以知道Spark中一个作业可以根据宽依赖切分Stages,而在Stages中,相应的Tasks也包含两种,即ResultTask与ShuffleMapTask。其中,一个ShuffleMapTask会基于ShuffleDependency中指定的分区器,将一个RDD的元素拆分到多个buckets中,此时通过ShuffleManager的getWriter接口获取数据与buckets的映射关系。而ResultTask对应的是一个将输出返回给应用程序Driver端的Task,在该Task执行过程中,最终都会调用RDD的compute对内部数据进行计算,而在带有ShuffleDependency的RDD中,在compute计算时,会通过ShuffleManager的getReader接口,获取上一个Stage的Shuffle输出结果作为本次Task的输入数据。
首先来看ShuffleMapTask中的写数据流程,具体代码如下所示。
ShuffleMapTask.scala的源码如下。
1. override def runTask(context: TaskContext): MapStatus = { 2. ...... 3. //首先从SparkEnv获取ShuffleManager 4. //然后从ShuffleDependency中获取注册到ShuffleManager时得到的shuffleHandle 5. //根据shuffleHandle和当前Task对应的分区ID,获取ShuffleWriter 6. //最后根据获取的ShuffleWriter,调用其write接口,写入当前分区的数据 7. var writer: ShuffleWriter[Any, Any] = null 8. try { 9. val manager = SparkEnv.get.shuffleManager 10. writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) 11. writer.write(rdd.iterator(partition, context).asInstanceOf [Iterator[_ <: Product2[Any, Any]]]) 12. writer.stop(success = true).get 13. } catch { 14. ...... 15. } 16. }
2.Shuffle读数据的源码解析
对应的数据读取器,从RDD的5个抽象接口可知,RDD的数据流最终会经过算子操作,即RDD中的compute方法。下面以包含宽依赖的RDD、CoGroupedRDD为例,查看如何获取Shuffle的数据。具体代码如下所示。
Spark 1.6.0版本的CoGroupedRDD.scala的源码如下。
1. //对指定分区进行计算的抽象接口,以下为CoGroupedRDD具体子类中该方法的实现 2. override def compute(s: Partition, context: TaskContext): Iterator[(K, Array[Iterable[_]])] = { 3. val split = s.asInstanceOf[CoGroupPartition] 4. val numRdds = dependencies.length 5. 6. //A list of (rdditerator, dependency number) pairs 7. val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)] 8. for ((dep, depNum) <- dependencies.zipWithIndex) dep match { 9. case oneToOneDependency: OneToOneDependency[Product2[K, Any]] @unchecked => 10. val dependencyPartition = split.narrowDeps(depNum).get.split 11. //Read them from the parent 12. val it = oneToOneDependency.rdd.iterator(dependencyPartition, context) 13. rddIterators += ((it, depNum)) 14. 15. case shuffleDependency: ShuffleDependency[_, _, _] => 16. //首先从SparkEnv获取ShuffleManager 17. //然后从ShuffleDependency中获取注册到ShuffleManager时得到的shuffleHandle 18. //根据shuffleHandle和当前Task对应的分区ID,获取ShuffleWriter 19. //最后根据获取的ShuffleReader,调用其read接口,读取Shuffle的Map输出 20. 21. val it = SparkEnv.get.shuffleManager 22. .getReader(shuffleDependency.shuffleHandle, split.index, split.index + 1, context) 23. .read() 24. rddIterators += ((it, depNum)) 25. } 26. 27. val map = createExternalMap(numRdds) 28. for ((it, depNum) <- rddIterators) { 29. map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum)))) 30. } 31. context.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled) 32. context.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled) 33. context.internalMetricsToAccumulators( 34. InternalAccumulator.PEAK_EXECUTION_MEMORY).add (map.peakMemoryUsedBytes) 35. new InterruptibleIterator(context, 36. map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]]) 37. }
Spark 2.2.0版本的CoGroupedRDD.scala的源码与Spark 1.6.0版本相比具有如下特点:上段代码中第28~29行的context.internalMetricsToAccumulators方法调整为context.taskMetrics方法,用于任务的度量监控,监控内存的峰值使用情况。
1. ...... 2. context.taskMetrics().incPeakExecutionMemory(map.peakMemoryUsedBytes) 3. .......
从代码中可以看到,带宽依赖的RDD的compute操作中,最终是通过SparkEnv中的ShuffleManager实例的getReader方法,获取数据读取器的,然后再次调用读取器的read读取指定分区范围的Shuffle数据。注意,是带宽依赖的RDD,而非ShuffleRDD,除了ShuffleRDD外,还有其他RDD也可以带上宽依赖的,如前面给出的CoGroupedRDD。
目前支持的几种具体Shuffle实现机制在读取数据的处理上都是一样的。从源码角度可以看到,当前继承了ShuffleReader这一数据读取器的接口的具体子类只有BlockStoreShuffleReader,因此,本章内容仅在此对各种Shuffle实现机制的数据读取进行解析,后续各实现机制中不再重复描述。
源码解析的第一步仍然是查看该类的描述信息,具体如下所示。
1. /** 2. *通过从其他节点上请求读取 Shuffle 数据来接收并读取指定范围[起始分区, 结束分区) *——对应为左闭右开区间 3. *通过从其他节点上请求读取Shuffle数据来接收并读取指定范围[起始分区,结束分区] 4. *——对应为左闭右开区间 5. */
从注释上可以看出,读取器负责上一Stage为下一Stage输出数据块的读取。从前面对ShuffleReader接口的解析可知,继承的具体子类需要实现真正的数据读取操作,即实现read方法。因此,该方法便是需要重点关注的源码。一些关键的代码如下所示。
Spark 2.1.1版本的BlockStoreShuffleReader.scala的源码如下。
1. //为该Reduce任务读取并合并key-values 值 2. override def read(): Iterator[Product2[K, C]] = { 3. //真正的数据Iterator读取是通过ShuffleBlockFetcherIterator来完成的 4. val blockFetcherItr = new ShuffleBlockFetcherIterator( 5. context, 6. blockManager.shuffleClient, 7. blockManager, 8. //可以看到,当ShuffleMapTask完成后注册到mapOutputTracker的元数据信息 //同样会通过mapOutputTracker来获取,在此同时还指定了获取的分区范围 //通过该方法的返回值类型 9. 10. 11. 12. mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition), 13. //默认读取时的数据大小限制为48m,对应后续并行的读取,都是一种数据读取的控制策 //略,一方面可以避免目标机器占用过多带宽,同时也可以启动并行机制,加快读取速度 14. 15. 16. 17. SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024, 18. SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue)) 19. 20. //在此针对前面获取的各个数据块唯一标识ID信息及其对应的输入流进行处理 21. val wrappedStreams = blockFetcherItr.map { case (blockId, inputStream) => 22. serializerManager.wrapStream(blockId, inputStream) 23. } 24. 25. val serializerInstance = dep.serializer.newInstance() 26. 27. //为每个流stream创建一个键/值迭代器 28. val recordIter = wrappedStreams.flatMap { wrappedStream => 29. //注意:askey Value Iterator在内部迭代器Next Iterator中包裹一个键/值对, //当Input Stream中的数据已读取,Next Iterator确保Close()方法被调用 30. 31. 32. serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator 33. } 34. 35. //为每个记录更新上下文任务度量 36. val readMetrics = context.taskMetrics.createTempShuffleReadMetrics() 37. val metricIter = CompletionIterator[(Any, Any), Iterator[(Any, Any)]]( 38. recordIter.map { record => 39. readMetrics.incRecordsRead(1) 40. record 41. }, 42. context.taskMetrics().mergeShuffleReadMetrics()) 43. 44. //为了支持任务取消,这里必须使用可中断迭代器 45. val interruptibleIter = new InterruptibleIterator[(Any, Any)](context, metricIter) 46. //对读取到的数据进行聚合处理 47. val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator .isDefined) { 48. //如果在Map端已经做了聚合的优化操作,则对读取到的聚合结果进行聚合,注意此时的 //聚合操作与数据类型和Map端未做优化时是不同的 49. 50. 51. 52. if (dep.mapSideCombine) { 53. //对读取到的数据进行聚合处理 54. val combinedKeyValuesIterator = interruptibleIter.asInstanceOf [Iterator[(K, C)]] 55. 56. //Map端各分区针对Key进行合并后的结果再次聚合,Map的合并可以大大减少网络传输 //的数据量 57. 58. dep.aggregator.get.combineCombinersByKey (combinedKeyValuesIterator, context) 59. } else { 60. //我们无需关心值的类型,但应确保聚合是兼容的,其将把值的类型转化成聚合以后的 //C类型 61. 62. 63. val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator [(K, Nothing)]] 64. dep.aggregator.get.combineValuesByKey(keyValuesIterator, context) 65. } 66. } else { 67. require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!") 68. interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]] 69. } 70. //在基于Sort的Shuffle实现过程中,默认基于PartitionId进行排序,在分区的内 //部,数据是没有排序的,因此添加了keyOrdering变量,提供是否需要针对分区内的 //数据进行排序的标识信息 71. //如果定义了排序,则对输出结果进行排序 72. dep.keyOrdering match { 73. case Some(keyOrd: Ordering[K]) => 74. 75. //为了减少内存的压力,避免GC开销,引入了外部排序器对数据进行排序当内存不足 //以容纳排序的数据量时,会根据配置的spark.shuffle.spill属性来决定是否需要 //spill到磁盘中,默认情况下会打开spill开关,若不打开spill开关,数据量比 //较大时会引发内存溢出问题(Out of Memory,OOM) 76. val sorter = 77. new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = dep.serializer) 78. sorter.insertAll(aggregatedIter) 79. context.taskMetrics().incMemoryBytesSpilled(sorter. memoryBytesSpilled) 80. context.taskMetrics().incDiskBytesSpilled(sorter. diskBytesSpilled) 81. context.taskMetrics().incPeakExecutionMemory (sorter.peakMemoryUsedBytes) 82. CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]] (sorter.iterator, sorter.stop()) 83. case None => 84. //不需要排序分区内部数据时直接返回 85. aggregatedIter 86. } 87. } 88. }
Spark 2.2.0版本的BlockStoreShuffleReader.scala的源码与Spark 2.1.1版本相比具有如下特点。
上段代码中第4行blockFetcherItr名称更改为wrappedStreams。
上段代码中第17行之前新增代码serializerManager.wrapStream。
上段代码中第18行之后新增配置参数REDUCER_MAX_REQ_SIZE_SHUFFLE_ TO_MEM:shuffle时可请求内存的最大大小(以字节为单位)。
上段代码中第18行之后新增配置参数spark.shuffle.detectCorrupt:检测获取块blocks中是否有任何损坏。
上段代码中第21~23行删除。
上段代码中第28行wrappedStream调整为case (blockId, wrappedStream)。
1. ....... 2. val wrappedStreams = new ShuffleBlockFetcherIterator( 3. ...... 4. serializerManager.wrapStream, 5. ...... SparkEnv.get.conf.get(config.REDUCER_MAX_REQ_SIZE_SHUFFLE_TO_MEM), 6. SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true)) 7. ...... 8. val recordIter = wrappedStreams.flatMap { case (blockId, wrappedStream) => 9. .......
下面进一步解析数据读取的部分细节。首先是数据块获取、读取的ShuffleBlock-FetcherIterator类,在类的构造体中调用了initialize方法(构造体中的表达式会在构造实例时执行),该方法中会根据数据块所在位置(本地节点或远程节点)分别进行读取,其中关键代码如下所示。
ShuffleBlockFetcherIterator的源码如下。
1. private[this] def initialize(): Unit = { 2. //任务完成进行回调清理(在成功案例和失败案例中调用) 3. context.addTaskCompletionListener(_ => cleanup()) 4. //本地与远程的数据读取方式不同,因此先进行拆分,注意拆分时会考虑一次获取的数据 //大小(拆分时会同时考虑并行数)封装请求,最后会将剩余不足该大小的数据获取也封装 //为一个请求 5. 6. 7. 8. val remoteRequests = splitLocalRemoteBlocks() 9. //存入需要远程读取的数据块请求信息 10. fetchRequests ++= Utils.randomize(remoteRequests) 11. assert ((0 == reqsInFlight) == (0 == bytesInFlight), 12. "expected reqsInFlight = 0 but found reqsInFlight = " + reqsInFlight + 13. ", expected bytesInFlight = 0 but found bytesInFlight = " + bytesInFlight) 14. 15. //发送数据获取请求 16. fetchUpToMaxBytes() 17. 18. val numFetches = remoteRequests.size - fetchRequests.size 19. logInfo("Started " + numFetches + " remote fetches in" + Utils.getUsedTimeMs(startTime)) 20. 21. //除了远程数据获取外,下面是获取本地数据块的方法调用 22. fetchLocalBlocks() 23. logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime)) 24. }
与Hadoop一样,Spark计算框架也基于数据本地性,即移动数据而非移动计算的原则,因此在获取数据块时,也会考虑数据本地性,尽量从本地读取已有的数据块,然后再远程读取。
另外,数据块的本地性是通过ShuffleBlockFetcherIterator实例构建时所传入的位置信息来判断的,而该信息由MapOutputTracker实例的getMapSizesByExecutorId方法提供,可以参考该方法的返回值类型查看相关的位置信息,返回值类型为:Seq[(BlockManagerId, Seq[(BlockId, Long)])]。其中,BlockManagerId是BlockManager的唯一标识信息,BlockId是数据块的唯一信息,对应的Seq[(BlockId, Long)]表示一组数据块标识ID及其数据块大小的元组信息。
最后简单分析一下如何设置分区内部的排序标识,当需要对分区内的数据进行排序时,会设置RDD中的宽依赖(ShuffleDependency)实例的keyOrdering变量。下面以基于排序的OrderedRDDFunctions提供的sortByKey方法给出解析,具体代码如下所示。
OrderedRDDFunctions的源码如下。
1. def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) 2. : RDD[(K, V)] = self.withScope 3. { 4. //注意,这里设置了该方法构建的RDD使用的分区器 //根据Range而非Hash进行分区,对应的Range信息需要计算并将结果 //反馈到Driver端,因此对应调用RDD中的Action,即会触发一个Job的执行 5. val part = new RangePartitioner(numPartitions, self, ascending) 6. //在构建RDD实例后,设置Key的排序算法,即Ordering实例 7. new ShuffledRDD[K, V, V](self, part) 8. .setKeyOrdering(if (ascending) ordering else ordering.reverse) 9. }
当需要对分区内部的数据进行排序时,构建RDD的同时会设置Key值的排序算法,结合前面的read代码,当指定Key值的排序算法时,就会使用外部排序器对分区内的数据进行排序。