8.2 Stage划分内幕
本节讲解Stage划分原理及Stage划分源码。一个Application中,每个Job由一个或多个Stage构成,Stage根据宽依赖(如reducByKey、groupByKey算子等)进行划分。
8.2.1 Stage划分原理详解
Spark Application中可以因为不同的Action触发众多的Job。也就是说,一个Application中可以有很多的Job,每个Job是由一个或者多个Stage构成的,后面的Stage依赖于前面的Stage。也就是说,只有前面依赖的Stage计算完毕后,后面的Stage才会运行。
Stage划分的依据就是宽依赖,什么时候产生宽依赖呢?例如,reducByKey、groupByKey等;Action(如collect)导致SparkContext.runJob的执行,最终导致DAGScheduler中的submitJob的执行,其核心是通过发送一个case class JobSubmitted对象给eventProcessLoop。
eventProcessLoop是DAGSchedulerEventProcessLoop的具体实例,而DAGSchedulerEvent-ProcessLoop是EventLoop的子类,具体实现EventLoop的onReceive方法。onReceive方法转过来回调doOnReceive。在doOnReceive中通过模式匹配的方式把执行路由到JobSubmitted,在handleJobSubmitted中首先创建finalStage,创建finalStage时会建立父Stage的依赖链条。
8.2.2 Stage划分源码详解
Spark的Action算子执行SparkContext.runJob,提交至DAGScheduler中的submitJob,submitJob发送JobSubmitted对象到eventProcessLoop循环消息队列,提交该任务,其中JobSubmitted的源码如下。
DAGSchedulerEvent.scala的源码如下。
1. private[scheduler] case class JobSubmitted( 2. jobId: Int, 3. finalRDD: RDD[_], 4. func: (TaskContext, Iterator[_]) => _, 5. partitions: Array[Int], 6. callSite: CallSite, 7. listener: JobListener, 8. properties: Properties = null) 9. extends DAGSchedulerEvent
eventProcessLoop是DAGSchedulerEventProcessLoop的具体实例,而DAGScheduler-EventProcessLoop是EventLoop的子类,具体实现EventLoop的onReceive方法,onReceive方法转过来回调doOnReceive。
DAGScheduler.scala的源码如下。
1. private def doOnReceive(event: DAGSchedulerEvent): Unit = event match { 2. case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) => 3. dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) 4. 5. case MapStageSubmitted(jobId, dependency, callSite, listener, properties) => 6. dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties) 7. ......