4.3 TaskScheduler解析
TaskScheduler的核心任务是提交TaskSet到集群运算并汇报结果。
(1)为TaskSet创建和维护一个TaskSetManager,并追踪任务的本地性以及错误信息。
(2)遇到Straggle任务时,会放到其他节点进行重试。
(3)向DAGScheduler汇报执行情况,包括在Shuffle输出丢失的时候报告fetch failed错误等信息。
4.3.1 TaskScheduler原理剖析
DAGScheduler将划分的一系列的Stage(每个Stage封装一个TaskSet),按照Stage的先后顺序依次提交给底层的TaskScheduler去执行。接下来我们分析TaskScheduler接收到DAGScheduler的Stage任务后,是如何来管理Stage(TaskSet)的生命周期的。
首先,回顾一下DAGScheduler在SparkContext中实例化的时候,TaskScheduler以及SchedulerBackend就已经先在SparkContext的createTaskScheduler创建出实例对象了。
虽然Spark支持多种部署模式(包括Local、Standalone、YARN、Mesos等),但是底层调度器TaskScheduler接口的实现类都是TaskSchedulerImpl。并且,考虑到方便读者对TaskScheduler的理解,对于SchedulerBackend的实现,我们也只专注Standalone部署模式下的具体实现StandaloneSchedulerBackend来作分析。
TaskSchedulerImpl在createTaskScheduler方法中实例化后,就立即调用自己的initialize方法把StandaloneSchedulerBackend的实例对象传进来,从而赋值给TaskSchedulerImpl的backend。在TaskSchedulerImpl的initialize方法中,根据调度模式的配置创建实现了SchedulerBuilder接口的相应的实例对象,并且创建的对象会立即调用buildPools创建相应数量的Pool存放和管理TaskSetManager的实例对象。实现SchedulerBuilder接口的具体类都是SchedulerBuilder的内部类。
(1)FIFOSchedulableBuilder:调度模式是SchedulingMode.FIFO,使用先进先出策略调度。先进先出(FIFO)为默认模式。在该模式下只有一个TaskSetManager池。
(2)FairSchedulableBuilder:调度模式是SchedulingMode.FAIR,使用公平策略调度。
在createTaskScheduler方法返回后,TaskSchedulerImpl通过DAGScheduler的实例化过程设置DAGScheduler的实例对象。然后调用自己的start方法。在TaskSchedulerImpl调用start方法的时候,会调用StandaloneSchedulerBackend的start方法,在StandaloneSchedulerBackend的start方法中,会最终注册应用程序AppClient。TaskSchedulerImpl的start方法中还会根据配置判断是否周期性地检查任务的推测执行。
TaskSchedulerImpl启动后,就可以接收DAGScheduler的submitMissingTasks方法提交过来的TaskSet进行进一步处理。TaskSchedulerImpl在submitTasks中初始化一个TaskSetManager,对其生命周期进行管理,当TaskSchedulerImpl得到Worker节点上的Executor计算资源的时候,会通过TaskSetManager发送具体的Task到Executor上执行计算。
如果Task执行过程中有错误导致失败,会调用TaskSetManager来处理Task失败的情况,进而通知DAGScheduler结束当前的Task。TaskSetManager会将失败的Task再次添加到待执行Task队列中。Spark Task允许失败的次数默认是4次,在TaskSchedulerImpl初始化的时候,通过spark.task.maxFailures设置该值。
如果Task执行完毕,执行的结果会反馈给TaskSetManager,由TaskSetManager通知DAGScheduler,DAGScheduler根据是否还存在待执行的Stage,继续迭代提交对应的TaskSet给TaskScheduler去执行,或者输出Job的结果。
通过下面的调度链,Executor把Task执行的结果返回给调度器(Scheduler)。
(1)Executor.run。
(2)CoarseGrainedExecutorBackend.statusUpdate(发送StatusUpdate消息)。
(3)CoarseGrainedSchedulerBackend.receive(处理StatusUpdate消息)。
(4)TaskSchedulerImpl.statusUpdate。
(5)TaskResultGetter.enqueueSuccessfulTask或者enqueueFailedTask。
(6)TaskSchedulerImpl.handleSuccessfulTask或者handleFailedTask。
(7)TaskSetManager.handleSuccessfulTask或者handleFailedTask。
(8)DAGScheduler.taskEnded。
(9)DAGScheduler.handleTaskCompletion。
在上面的调度链中值得关注的是:第(7)步中,TaskSetManager的handleFailedTask方法会将失败的Task再次添加到待执行Task队列中。在第(6)步中,TaskSchedulerImpl的handleFailedTask方法在TaskSetManager的handleFailedTask方法返回后,会调用CoarseGrainedSchedulerBackend的reviveOffers方法给重新执行的Task获取资源。
4.3.2 TaskScheduler源码解析
TaskScheduler是Spark的底层调度器。底层调度器负责Task本身的调度运行。
下面编写一个简单的测试代码,setMaster("local-cluster[1, 1, 1024]")设置为Spark本地伪分布式开发模式,从代码的运行日志中观察Spark框架的运行情况。
1. object SparkTest { 2. def main(args: Array[String]): Unit = { 3. Logger.getLogger("org").setLevel(Level.ALL) 4. val conf = new SparkConf() //创建SparkConf对象 5. conf.setAppName("Wow,My First Spark App!") //设置应用程序的名称,在程序 //运行的监控界面中可以看到名称 6. conf.setMaster("local-cluster[1, 1, 1024]") 7. conf.setSparkHome(System.getenv("SPARK_HOME")) 8. val sc = new SparkContext(conf) //创建SparkContext对象,通过传入SparkConf //实例来定制Spark运行的具体参数和配置信息 9. sc.parallelize(Array("100","200"),4).count() 10. sc.stop() 11. } 12. }
在IDEA中运行代码,运行结果中打印的日志如下。
1. Using Spark's default log4j profile: org/apache/spark/log4j-defaults. properties 2. 17/05/31 05:32:07 INFO SparkContext: Running Spark version 2.1.0 3. ...... 4. 17/05/31 05:46:06 INFO WorkerWebUI: Bound WorkerWebUI to 0.0.0.0, and started at http://192.168.93.1:51034 5. 17/05/31 05:46:06 INFO Worker: Connecting to master 192.168.93.1:51011... 6. 17/05/31 05:46:06 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://192.168.93.1:51011... 7. 17/05/31 05:46:06 INFO TransportClientFactory: Successfully created connection to /192.168.93.1:51011 after 38 ms (0 ms spent in bootstraps) 8. 17/05/31 05:46:06 INFO TransportClientFactory: Successfully created connection to /192.168.93.1:51011 after 100 ms (0 ms spent in bootstraps) 9. 17/05/31 05:46:07 INFO Master: Registering worker 192.168.93.1:51033 with 1 cores, 1024.0 MB RAM 10. 17/05/31 05:46:07 INFO Worker: Successfully registered with master spark://192.168.93.1:51011 11. 17/05/31 05:46:07 INFO Master: Registering app Wow,My First Spark App! 12. 17/05/31 05:46:07 INFO Master: Registered app Wow,My First Spark App! with ID app-20170531054607-0000 13. 17/05/31 05:46:07 INFO StandaloneSchedulerBackend: Connected to Spark cluster with app ID app-20170531054607-0000 14. 17/05/31 05:46:07 INFO Master: Launching executor app-20170531054607- 0000/0 on worker worker-20170531054606-192.168.93.1-51033 15. 17/05/31 05:46:07 INFO Worker: Asked to launch executor app- 20170531054607-0000/0 for Wow,My First Spark App! 16. 17/05/31 05:46:07 INFO StandaloneAppClient$ClientEndpoint: Executor added: app-20170531054607-0000/0 on worker-20170531054606-192.168.93.1- 51033 (192.168.93.1:51033) with 1 cores 17. ...... 18. 17/05/31 05:46:07 INFO StandaloneSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0 19. 17/05/31 05:46:07 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20170531054607-0000/0 is now RUNNING
日志中显示:StandaloneAppClient$ClientEndpoint: Connecting to master spark://192.168.93. 1:50686表明StandaloneAppClient的ClientEndpoint注册给master。日志中显示StandaloneApp-Client$ClientEndpoint: Executor added获取了Executor。具体是通过StandaloneAppClient的ClientEndpoint来管理Executor。日志中显示StandaloneSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0说明Standalone-SchedulerBackend已经准备好。
这里是在IDEA本地伪分布式运行的(通过count的action算子启动了Job)。如果是通过Spark-shell运行程序来观察日志,当启动Spark-shell本身的时候,命令终端反馈回来的主要是ClientEndpoint和StandaloneSchedulerBackend,因为此时还没有任何Job的触发,这是启动Application本身而已,所以主要就是实例化SparkContext,并注册当前的应用程序给Master,且从集群中获得ExecutorBackend计算资源。
IDEA本地伪分布式运行,Job启动的日志如下。
1. 17/05/31 05:46:08 INFO DAGScheduler: Got job 0 (count at SparkTest.scala: 17) with 4 output partitions 2. 17/05/31 05:46:08 INFO DAGScheduler: Final stage: ResultStage 0 (count at SparkTest.scala:17) 3. 17/05/31 05:46:08 INFO DAGScheduler: Parents of final stage: List() 4. 17/05/31 05:46:08 INFO DAGScheduler: Missing parents: List() 5. 17/05/31 05:46:08 INFO DAGScheduler: Submitting ResultStage 0 (ParallelCollectionRDD[0] at parallelize at SparkTest.scala:17), which has no missing parents 6. ...... 7. 17/05/31 05:46:08 INFO DAGScheduler: Submitting 4 missing tasks from ResultStage 0 (ParallelCollectionRDD[0] at parallelize at SparkTest.scala:17)
count是action算子触发了Job;然后DAGScheduler获取Final stage: ResultStage,提交Submitting ResultStage。最后提交任务给TaskSetManager,启动任务。任务完成后,DAGScheduler完成Job。
DAGScheduler划分好Stage后,会通过TaskSchedulerImpl中的TaskSetManager来管理当前要运行的Stage中的所有任务TaskSet。TaskSetManager会根据locality aware来为Task分配计算资源、监控Task的执行状态(如重试、慢任务进行推测式执行等)。
TaskSet是一个数据结构,TaskSet包含了一系列高层调度器交给底层调度器的任务的集合。第一个成员是Tasks,第二个成员task属于哪个Stage,stageAttemptId是尝试的ID,priority优先级,调度的时候有一个调度池,调度归并调度的优先级。
1. private[spark] class TaskSet( 2. val tasks: Array[Task[_]], 3. val stageId: Int, 4. val stageAttemptId: Int, 5. val priority: Int, 6. val properties: Properties) { 7. val id: String = stageId + "." + stageAttemptId 8. 9. override def toString: String = "TaskSet " + id 10. }
TaskSetManager实例化的时候完成TaskSchedulerImpl的工作,接收TaskSet任务的集合,maxTaskFailures是任务失败重试的次数。
Spark 2.1.1版本的TaskSetManager.scala的源码如下。
1. private[spark] class TaskSetManager( 2. sched: TaskSchedulerImpl, 3. val taskSet: TaskSet, 4. val maxTaskFailures: Int, 5. clock: Clock = new SystemClock()) extends Schedulable with Logging {
Spark 2.2.0版本的TaskSetManager.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第4行之后增加了blacklistTracker的成员变量,用于黑名单列表executors及nodes的跟踪。
1. ..... 2. blacklistTracker: Option[BlacklistTracker] = None, 3. .....
TaskScheduler与SchedulerBackend总体的底层任务调度的过程如下。
a. TaskSchedulerImpl.submitTasks:主要作用是将TaskSet加入到TaskSetManager中进行管理。
DAGScheduler.scala收到JobSubmitted消息,调用handleJobSubmitted方法。
1. private def doOnReceive(event: DAGSchedulerEvent): Unit = event match { 2. case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) => 3. dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
在handleJobSubmitted方法中提交submitStage。
1. private[scheduler] def handleJobSubmitted(jobId: Int, 2. 3. ...... 4. submitStage(finalStage) 5. }
submitStage方法调用submitMissingTasks提交task。
1. private def submitStage(stage: Stage) { 2. ...... 3. submitMissingTasks(stage, jobId.get) 4. ......
DAGScheduler.scala的submitMissingTasks里面调用了taskScheduler.submitTasks。
Spark 2.1.1版本的DAGScheduler.scala的源码如下。
1. private def submitMissingTasks(stage: Stage, jobId: Int) { 2. ...... 3. val tasks: Seq[Task[_]] = try { 4. stage match { 5. case stage: ShuffleMapStage => 6. ...... 7. new ShuffleMapTask(stage.id, stage.latestInfo.attemptId, 8. ...... 9. case stage: ResultStage => 10. ...... 11. new ResultTask(stage.id, stage.latestInfo.attemptId, 12. ...... 13. if (tasks.size > 0) { 14. logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")") 15. stage.pendingPartitions ++= tasks.map(_.partitionId) 16. logDebug("New pending partitions: " + stage.pendingPartitions) 17. taskScheduler.submitTasks(new TaskSet( 18. tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties)) 19. stage.latestInfo.submissionTime = Some(clock.getTimeMillis()) 20. ......
Spark 2.2.0版本的DAGScheduler.scala的源码与Spark 2.1.1版本相比具有如下特点。
上段代码中第14行打印日志内容做了修改。
上段代码中第15~16行代码删除,删除代码stage.pendingPartitions ++= tasks.map (_.partitionId),以及logDebug语句。
1. ..... 2. logInfo(s"Submitting ${tasks.size} missing tasks from $stage ($ {stage.rdd}) (first 15 " + s"tasks are for partitions ${tasks. take(15).map(_.partitionId)})") 3. ......
taskScheduler是一个接口trait,这里没有具体的实现。
1. //提交要运行的任务序列 2. def submitTasks(taskSet: TaskSet): Unit
taskScheduler的子类是TaskSchedulerImpl,TaskSchedulerImpl中submitTasks的具体实现如下。
1. override def submitTasks(taskSet: TaskSet) { 2. val tasks = taskSet.tasks 3. logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") 4. this.synchronized { 5. val manager = createTaskSetManager(taskSet, maxTaskFailures) 6. val stage = taskSet.stageId 7. val stageTaskSets = 8. taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap [Int, TaskSetManager]) 9. stageTaskSets(taskSet.stageAttemptId) = manager 10. val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => 11. ts.taskSet != taskSet && !ts.isZombie 12. } 13. if (conflictingTaskSet) { 14. throw new IllegalStateException(s"more than one active taskSet for stage $stage:" + 15. s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}") 16. } 17. schedulableBuilder.addTaskSetManager(manager, manager.taskSet. properties) 18. 19. if (!isLocal && !hasReceivedTask) { 20. starvationTimer.scheduleAtFixedRate(new TimerTask() { 21. override def run() { 22. if (!hasLaunchedTask) { 23. logWarning("Initial job has not accepted any resources; " + 24. "check your cluster UI to ensure that workers are registered " + 25. "and have sufficient resources") 26. } else { 27. this.cancel() 28. } 29. } 30. }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS) 31. } 32. hasReceivedTask = true 33. } 34. backend.reviveOffers() 35. }
高层调度器把任务集合传给了TaskSet,任务可能是ShuffleMapTask,也可能是ResultTask。获得taskSet.tasks任务赋值给变量tasks。然后使用了同步块synchronized,在同步块中调用createTaskSetManager,创建createTaskSetManager。createTaskSetManager代码如下。
Spark 2.1.1版本的TaskSchedulerImpl.scala的源码如下。
1. private[scheduler] def createTaskSetManager( 2. taskSet: TaskSet, 3. maxTaskFailures: Int): TaskSetManager = { 4. new TaskSetManager(this, taskSet, maxTaskFailures) 5. }
Spark 2.2.0版本的TaskSchedulerImpl.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第4行TaskSetManager的成员变量新增加了blacklistTrackerOpt变量。
1. ...... 2. new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt) 3. ......
TaskSchedulerImpl.scala的createTaskSetManager会调用new()函数创建一个TaskSetManager,传进来的this是其本身TaskSchedulerImpl、任务集taskSet、最大失败重试次数maxTaskFailures。maxTaskFailures是在构建TaskSchedulerImpl时传入的。
而TaskSchedulerImpl是在SparkContext中创建的。SparkContext的源码如下。
1. val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) 2. _schedulerBackend = sched 3. _taskScheduler = ts 4. _dagScheduler = new DAGScheduler(this) 5. _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet) 6. ...... 7. private def createTaskScheduler( 8. ...... 9. case SPARK_REGEX(sparkUrl) => 10. val scheduler = new TaskSchedulerImpl(sc) 11. val masterUrls = sparkUrl.split(",").map("spark://" + _) 12. val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls) 13. scheduler.initialize(backend) 14. (backend, scheduler)
在SparkContext.scala中,通过createTaskScheduler创建taskScheduler,而在createTaskScheduler方法中,模式匹配到Standalone的模式,用new函数创建一个TaskSchedulerImpl。
TaskSchedulerImpl的构造方法如下,Spark 2.2版本默认情况下,将获取配置文件中的config.MAX_TASK_FAILURES,MAX_TASK_FAILURES默认的最大失败重试次数是4次。
回到TaskSchedulerImpl,createTaskSetManager创建了TaskSetManager后,非常关键的一行代码是schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)。
b. SchedulableBuilder.addTaskSetManager:SchedulableBuilder会确定TaskSetManager的调度顺序,然后按照TaskSetManager的locality aware来确定每个Task具体运行在哪个ExecutorBackend中。
schedulableBuilder是应用程序级别的调度器。SchedulableBuilder是一个接口trait,建立调度树。buildPools:建立树节点pools。addTaskSetManager:建立叶子节点TaskSetManagers。
1. private[spark] trait SchedulableBuilder { 2. def rootPool: Pool 3. def buildPools(): Unit 4. def addTaskSetManager(manager: Schedulable, properties: Properties): Unit 5. }
schedulableBuilder支持两种调度模式:FIFOSchedulableBuilder、FairSchedulableBuilder。FIFOSchedulableBuilder是先进先出调度模式。FairSchedulableBuilder是公平调度模式。调度策略可以通过spark-env.sh中的spark.scheduler.mode进行具体设置,默认是FIFO的方式。
回到TaskSchedulerImpl的submitTasks,看一下schedulableBuilder.addTaskSetManager中的调度模式schedulableBuilder。
1. var schedulableBuilder: SchedulableBuilder = null
schedulableBuilder是SparkContext中new TaskSchedulerImpl(sc)在创建TaskSchedulerImpl的时候通过scheduler.initialize(backend)的initialize方法对schedulableBuilder进行了实例化。
具体调度模式有FIFO和FAIR两种,对应的SchedulableBuilder也有两种,即FIFOSchedulableBuilder、FairSchedulableBuilder。initialize方法中的schedulingMode模式默认是FIFO。
回到TaskSchedulerImpl的submitTasks,schedulableBuilder.addTaskSetManager之后,关键的一行代码是backend.reviveOffers()。
c. CoarseGrainedSchedulerBackend.reviveOffers:给DriverEndpoint发送ReviveOffers。SchedulerBackend.scala的reviveOffers方法没有具体实现。
1. private[spark] trait SchedulerBackend { 2. private val appId = "spark-application-" + System.currentTimeMillis 3. def start(): Unit 4. def stop(): Unit 5. def reviveOffers(): Unit 6. def defaultParallelism(): Int
CoarseGrainedSchedulerBackend是SchedulerBackend的子类。CoarseGrainedScheduler-Backend的reviveOffers方法如下。
1. override def reviveOffers() { 2. driverEndpoint.send(ReviveOffers) 3. }
CoarseGrainedSchedulerBackend的reviveOffers方法中给DriverEndpoint发送ReviveOffers消息,而ReviveOffers本身是一个空的case object对象,ReviveOffers本身是一个空的case object对象,只是起到触发底层资源调度的作用,在有Task提交或者计算资源变动的时候,会发送ReviveOffers这个消息作为触发器。
1. case object ReviveOffers extends CoarseGrainedClusterMessage
TaskScheduler中要负责为Task分配计算资源:此时程序已经具备集群中的计算资源了,根据计算本地性原则确定Task具体要运行在哪个ExecutorBackend中。
driverEndpoint.send(ReviveOffers)将ReviveOffers消息发送给driverEndpoint,而不是发送给StandaloneAppClient,因为driverEndpoint是程序的调度器。driverEndpoint的receive方法中模式匹配到ReviveOffers消息,就调用makeOffers方法。
1. override def receive: PartialFunction[Any, Unit] = { 2. case StatusUpdate(executorId, taskId, state, data) => 3. ...... 4. case ReviveOffers => 5. makeOffers()
d.在DriverEndpoint接受ReviveOffers消息并路由到makeOffers具体的方法中:在makeOffers方法中首先准备好所有可以用于计算的workOffers(代表了所有可用ExecutorBackend中可以使用的Cores等信息)。
Spark 2.1.1版本的CoarseGrainedSchedulerBackend.scala的源码如下。
1. private def makeOffers() { 2. //过滤掉已被Kill的executors节点 3. val activeExecutors = executorDataMap.filterKeys(executorIsAlive) 4. val workOffers = activeExecutors.map { case (id, executorData) => 5. new WorkerOffer(id, executorData.executorHost, executorData. freeCores) 6. }.toIndexedSeq 7. launchTasks(scheduler.resourceOffers(workOffers)) 8. }
Spark 2.2.0版本的CoarseGrainedSchedulerBackend.scala的源码与Spark 2.1.1版本相比具有如下特点。
上段代码中第3行之前新增一行代码:增加了同步锁CoarseGrainedSchedulerBackend. this.synchronized。
上段代码中第6行之前,新增代码scheduler.resourceOffers(workOffers)。
删除toIndexedSeq语句。
上段代码中第7行代码launchTasks调整:增加taskDescs不为空的逻辑判断,然后launchTasks。
1. private def makeOffers() { 2. //在执行某项任务时,确保没有executor节点被Kill 3. val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized { 4. ....... 5. scheduler.resourceOffers(workOffers) 6. } 7. if (!taskDescs.isEmpty) { 8. launchTasks(taskDescs) 9. } 10. }
其中的executorData类如下,包括freeCores、totalCores等信息。
1. private[cluster] class ExecutorData( 2. val executorEndpoint: RpcEndpointRef, 3. val executorAddress: RpcAddress, 4. override val executorHost: String, 5. var freeCores: Int, 6. override val totalCores: Int, 7. override val logUrlMap: Map[String, String] 8. ) extends ExecutorInfo(executorHost, totalCores, logUrlMap)
在makeOffers中首先找到可以利用的activeExecutors,然后创建workOffers。workOffers是一个数据结构case class,表示具体的Executor可能的资源。这里只考虑CPU cores,不考虑内存,因为之前内存已经分配完成。
1. private[spark] 2. case class WorkerOffer(executorId: String, host: String, cores: Int)
makeOffers方法中,TaskSchedulerImpl.resourceOffers为每个Task具体分配计算资源,输入offers: IndexedSeq[WorkerOffer]一维数组是可用的计算资源,ExecutorBackend及其上可用的Cores,输出TaskDescription的二维数组Seq[Seq[TaskDescription]]定义每个任务的数据本地性及放在哪个Executor上执行。
TaskDescription包括executorId,TaskDescription中已经确定好了Task具体要运行在哪个ExecutorBackend上。而确定Task具体运行在哪个ExecutorBackend上的算法由TaskSetManager的resourceOffer方法决定。
Spark 2.1.1版本的TaskDescription.scala的源码如下。
1. private[spark] class TaskDescription( 2. val taskId: Long, 3. val attemptNumber: Int, 4. val executorId: String, 5. val name: String, 6. val index: Int, //在该任务中的TaskSet的索引 7. _serializedTask: ByteBuffer) 8. extends Serializable {
Spark 2.2.0版本的TaskDescription.scala的源码与Spark 2.1.1版本相比具有如下特点。
上段代码中第6行之后,TaskDescription类的成员变量中增加了addedFiles、addedJars、properties等成员。
上段代码中第7行_serializedTask名称调整为val serializedTask。
上段代码中第8行删除。删掉继承类Serializable。
1. ...... 2. val addedFiles: Map[String, Long], 3. val addedJars: Map[String, Long], 4. val properties: Properties, 5. val serializedTask: ByteBuffer) {
resourceOffers由群集管理器调用提供slaves的资源,根据优先级顺序排列任务,以循环的方式填充每个节点的任务,使得集群的任务运行均衡。
Spark 2.1.1版本的TaskSchedulerImpl.scala的源码如下。
1. def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { 2. //标记每一个slave 节点活跃状态,记录主机名 3. //如是新的executor节点增加,则进行跟踪 4. var newExecAvail = false 5. for (o <- offers) { 6. if (!hostToExecutors.contains(o.host)) { 7. hostToExecutors(o.host) = new HashSet[String]() 8. } 9. if (!executorIdToRunningTaskIds.contains(o.executorId)) { 10. hostToExecutors(o.host) += o.executorId 11. executorAdded(o.executorId, o.host) 12. executorIdToHost(o.executorId) = o.host 13. executorIdToRunningTaskIds(o.executorId) = HashSet[Long]() 14. newExecAvail = true 15. } 16. for (rack <- getRackForHost(o.host)) { 17. hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host 18. } 19. } 20. 21. //随机洗牌,避免总是把任务放在同一组worker节点 22. val shuffledOffers = Random.shuffle(offers) 23. //建立要分配给每个worker节点的任务列表 24. val tasks = shuffledOffers.map(o => new ArrayBuffer [TaskDescription](o.cores)) 25. val availableCpus = shuffledOffers.map(o => o.cores).toArray 26. val sortedTaskSets = rootPool.getSortedTaskSetQueue 27. for (taskSet <- sortedTaskSets) { 28. logDebug("parentName: %s, name: %s, runningTasks: %s".format( 29. taskSet.parent.name, taskSet.name, taskSet.runningTasks)) 30. if (newExecAvail) { 31. taskSet.executorAdded() 32. } 33. } 34. 35. //把每个TaskSet放在调度顺序中,然后提供它的每个节点本地性级别的递增顺序,以便 //它有机会启动所有任务的本地任务 36. //注意:数据本地性优先级别顺序:PROCESS_LOCAL, NODE_LOCAL, NO_PREF, //RACK_LOCAL, ANY 37. for (taskSet <- sortedTaskSets) { 38. var launchedAnyTask = false 39. var launchedTaskAtCurrentMaxLocality = false 40. for (currentMaxLocality <- taskSet.myLocalityLevels) { 41. do { 42. launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( 43. taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks) 44. launchedAnyTask |= launchedTaskAtCurrentMaxLocality 45. } while (launchedTaskAtCurrentMaxLocality) 46. } 47. if (!launchedAnyTask) { 48. taskSet.abortIfCompletelyBlacklisted(hostToExecutors) 49. } 50. } 51. 52. if (tasks.size > 0) { 53. hasLaunchedTask = true 54. } 55. return tasks 56. }
Spark 2.2.0版本的TaskSchedulerImpl.scala的源码与Spark 2.1.1版本相比具有如下特点。
上段代码中第22行之前新增加一段代码,删除黑名单节点。
上段代码中第22行将Random.shuffle(offers)调整为shuffleOffers(filteredOffers)。
1. ...... 2. //在分配资源之前,删除黑名单列表中已过期的任何节点。这么做是为了避免单独的线程和增加 //同步开销,也因为黑名单的更新只有在分配任务Task资源时才是相关的 3. blacklistTrackerOpt.foreach(_.applyBlacklistTimeout()) 4. 5. val filteredOffers = blacklistTrackerOpt.map { blacklistTracker => 6. offers.filter { offer => 7. !blacklistTracker.isNodeBlacklisted(offer.host) && 8. !blacklistTracker.isExecutorBlacklisted(offer.executorId) 9. } 10. }.getOrElse(offers) 11. 12. val shuffledOffers = shuffleOffers(filteredOffers) 13. .......
resourceOffers中:
标记每一个活着的slave,记录它的主机名,并跟踪是否增加了新的Executor。感知集群动态资源的状况。
offers是集群有哪些可用的资源,循环遍历offers,hostToExecutors是否包含当前的host,如果不包含,就将Executor加进去。因为这里是最新请求,获取机器有哪些可用的计算资源。
getRackForHost是数据本地性,默认情况下,在一个机架Rack里面,生产环境中可能分若干个机架Rack。
重要的一行代码val shuffledOffers = Random.shuffle(offers):将可用的计算资源打散。
tasks将获得洗牌后的shuffledOffers通过map转换,对每个worker用了ArrayBuffer[TaskDescription],每个Executor可以放几个[TaskDescription],就可以运行多少个任务。即多少个Cores,就可以分配多少任务。ArrayBuffer是一个一维数组,数组的长度根据当前机器的CPU个数决定。
ArrayBuffer[TaskDescription](o.cores)说明当前ExecutorBackend上可以分配多少个Task,并行运行多少Task,和RDD的分区个数是两个概念:这里不是决定Task的个数,RDD的分区数在创建RDD时就已经决定了。这里,具体任务调度是指Task分配在哪些机器上,每台机器上分配多少Task,一次能分配多少Task。
在TaskSchedulerImpl中的initialize中创建rootPool,将schedulingMode调度模式传进去。rootPool的叶子节点是TaskSetManagers,按照一定的算法计算Stage的TaskSet调度的优先顺序。
for循环遍历sortedTaskSets,如果有新的可用的Executor,通过taskSet.executorAdded()加入taskSet。
TastSetManager的executorAdded方法如下。
1. def recomputeLocality() { 2. val previousLocalityLevel = myLocalityLevels(currentLocalityIndex) 3. myLocalityLevels = computeValidLocalityLevels() 4. localityWaits = myLocalityLevels.map(getLocalityWait) 5. currentLocalityIndex = getLocalityIndex(previousLocalityLevel) 6. } 7. 8. def executorAdded() { 9. recomputeLocality() 10. }
数据本地优先级从高到低依次为:PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、ANY。其中,NO_PREF是指机器本地性,一台机器上有很多Node,Node的优先级高于机器本地性。
resourceOffers中追求最高级别的优先级本地性源码如下。
1. for (taskSet <- sortedTaskSets) { 2. var launchedAnyTask = false 3. var launchedTaskAtCurrentMaxLocality = false 4. for (currentMaxLocality <- taskSet.myLocalityLevels) { 5. do { 6. launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( 7. taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks) 8. launchedAnyTask |= launchedTaskAtCurrentMaxLocality 9. } while (launchedTaskAtCurrentMaxLocality) 10. } 11. if (!launchedAnyTask) { 12. taskSet.abortIfCompletelyBlacklisted(hostToExecutors) 13. } 14. }
循环遍历sortedTaskSets,对其中的每个taskSet,首先考虑myLocalityLevels的优先性,myLocalityLevels计算数据本地性的Level,将PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、ANY循环一遍。myLocalityLevels是通过computeValidLocalityLevels方法获取到的。
Spark 2.1.1版本的TaskSetManager.scala的computeValidLocalityLevels的源码如下。
1. var myLocalityLevels: Array[TaskLocality] = computeValidLocalityLevels() 2. ...... 3. private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = { 4. import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY} 5. val levels = new ArrayBuffer[TaskLocality.TaskLocality] 6. if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 && pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) { 7. levels += PROCESS_LOCAL 8. } 9. if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0 && 10. pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) { 11. levels += NODE_LOCAL 12. } 13. if (!pendingTasksWithNoPrefs.isEmpty) { 14. levels += NO_PREF 15. } 16. if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0 && 17. pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) { 18. levels += RACK_LOCAL 19. } 20. levels += ANY 21. logDebug("Valid locality levels for " + taskSet + ": " + levels. mkString(", ")) 22. levels.toArray 23. }
Spark 2.2.0版本的TaskSetManager.scala的computeValidLocalityLevels的源码与Spark 2.1.1版本相比具有如下特点。
上段代码中第6行在if逻辑判断时去掉了getLocalityWait获取数据本地性等待时间的判断。
上段代码中第9行删掉了getLocalityWait判断。
上段代码中第16行删掉了getLocalityWait判断。
1. ........ 2. if (!pendingTasksForExecutor.isEmpty && 3. pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) { 4. ........ 5. if (!pendingTasksForHost.isEmpty && 6. pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) { 7. ........ 8. if (!pendingTasksForRack.isEmpty && 9. pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) { 10. ........
resourceOfferSingleTaskSet的源码如下。
1. private def resourceOfferSingleTaskSet( 2. taskSet: TaskSetManager, 3. maxLocality: TaskLocality, 4. shuffledOffers: Seq[WorkerOffer], 5. availableCpus: Array[Int], 6. tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = { 7. var launchedTask = false 8. for (i <- 0 until shuffledOffers.size) { 9. val execId = shuffledOffers(i).executorId 10. val host = shuffledOffers(i).host 11. if (availableCpus(i) >= CPUS_PER_TASK) { 12. try { 13. for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { 14. tasks(i) += task 15. val tid = task.taskId 16. taskIdToTaskSetManager(tid) = taskSet 17. taskIdToExecutorId(tid) = execId 18. executorIdToRunningTaskIds(execId).add(tid) 19. availableCpus(i) -= CPUS_PER_TASK 20. assert(availableCpus(i) >= 0) 21. launchedTask = true 22. } 23. } catch { 24. case e: TaskNotSerializableException => 25. logError(s"Resource offer failed, task set ${taskSet.name} was not serializable") 26. //不为这个任务提供资源,但不抛出错误,以允许其他任务集提交任务 27. return launchedTask 28. } 29. } 30. } 31. return launchedTask 32. }
resourceOfferSingleTaskSet方法中的CPUS_PER_TASK是每个Task默认采用一个线程进行计算的。TaskSchedulerImpl.scala中CPUS_PER_TASK的源码如下。
1. //CPUs to request per task 2. val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)
resourceOfferSingleTaskSet方法中的taskSet.resourceOffer,通过调用TaskSetManager的 resourceOffer最终确定每个Task具体运行在哪个ExecutorBackend的具体的Locality Level。
Spark 2.1.1版本的TaskSetManager.scala的源码如下。
1. def resourceOffer( 2. execId: String, 3. host: String, 4. maxLocality: TaskLocality.TaskLocality) 5. : Option[TaskDescription] = 6. { 7. ...... 8. sched.dagScheduler.taskStarted(task, info) 9. new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId, 10. taskName, index, serializedTask) 11.
Spark 2.2.0版本的TaskSetManager.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第9行构建TaskDescription实例时,新增传入addedFiles、addedJars、localProperties等成员变量。
1. ...... 2. new TaskDescription( 3. ...... 4. sched.sc.addedFiles, 5. sched.sc.addedJars, 6. task.localProperties, 7. ......
以上内容都在做一件事情:获取Locality Level本地性的层次。DagScheduler告诉我们任务运行在哪台机器上,DAGScheduler是从数据层面考虑preferedLocation的,DAGScheduler从RDD的层面确定就可以;而TaskScheduler是从具体计算Task的角度考虑计算的本地性,TaskScheduler是更具体的底层调度。本地性的两个层面:①数据的本地性;②计算的本地性。
总结:scheduler.resourceOffers确定了每个Task具体运行在哪个ExecutorBackend上;resourceOffers到底是如何确定Task具体运行在哪个ExecutorBackend上的呢?
i.通过Random.shuffle方法重新洗牌所有的计算资源,以寻求计算的负载均衡。
ii.根据每个ExecutorBackend的cores的个数声明类型为TaskDescription的ArrayBuffer数组。
iii.如果有新的ExecutorBackend分配给我们的Job,此时会调用executorAdded来获得最新的、完整的可用计算资源。
iv.通过下述代码追求最高级别的优先级本地性。
1. for (taskSet <- sortedTaskSets) { 2. var launchedAnyTask = false 3. var launchedTaskAtCurrentMaxLocality = false 4. for (currentMaxLocality <- taskSet.myLocalityLevels) { 5. do { 6. launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet( 7. taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks) 8. launchedAnyTask |= launchedTaskAtCurrentMaxLocality 9. } while (launchedTaskAtCurrentMaxLocality) 10. } 11. if (!launchedAnyTask) { 12. taskSet.abortIfCompletelyBlacklisted(hostToExecutors) 13. } 14. }
v.通过调用TaskSetManager的resourceOffer最终确定每个Task具体运行在哪个ExecutorBackend的具体的Locality Level。
回到CoarseGrainedSchedulerBackend.scala的launchTasks方法。
Spark 2.1.1版本的CoarseGrainedSchedulerBackend.scala的源码如下。
1. private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { 2. for (task <- tasks.flatten) { 3. val serializedTask = ser.serialize(task) 4. if (serializedTask.limit >= maxRpcMessageSize) { 5. scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr => 6. try { 7. var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + 8. "spark.rpc.message.maxSize (%d bytes). Consider increasing " + 9. "spark.rpc.message.maxSize or using broadcast variables for large values." 10. msg = msg.format(task.taskId, task.index, serializedTask .limit, maxRpcMessageSize) 11. taskSetMgr.abort(msg) 12. } catch { 13. case e: Exception => logError("Exception in error callback", e) 14. } 15. } 16. } 17. else { 18. val executorData = executorDataMap(task.executorId) 19. executorData.freeCores -= scheduler.CPUS_PER_TASK 20. 21. logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " + 22. s"${executorData.executorHost}.") 23. 24. executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer (serializedTask))) 25. } 26. } 27. }
Spark 2.2.0版本的CoarseGrainedSchedulerBackend.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第3行调整使用TaskDescription.encode方法编码,对任务Task进行序列化。
1. ...... 2. val serializedTask = TaskDescription.encode(task) 3. ......
e.通过launchTasks把任务发送给ExecutorBackend去执行。
launchTasks首先进行序列化,但序列化Task的大小不能太大,如果超过maxRpcMessageSize,则提示出错信息。
RpcUtils.scala中maxRpcMessageSize的定义,spark.rpc.message.maxSize默认设置是128MB:
1. private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf) 2. ...... 3. def maxMessageSizeBytes(conf: SparkConf): Int = { 4. val maxSizeInMB = conf.getInt("spark.rpc.message.maxSize", 128) 5. if (maxSizeInMB > MAX_MESSAGE_SIZE_IN_MB) { 6. throw new IllegalArgumentException( 7. s"spark.rpc.message.maxSize should not be greater than $MAX_ MESSAGE_SIZE_IN_MB MB") 8. } 9. maxSizeInMB * 1024 * 1024 10. } 11. }
Task进行广播时的maxSizeInMB大小是128MB,如果任务大于等于128MB,则Task直接被丢弃掉;如果小于128MB,会通过CoarseGrainedSchedulerBackend去launchTask到具体的ExecutorBackend上。
CoarseGrainedSchedulerBackend.scala的launchTasks方法:通过executorData. executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))交给Task要运行的ExecutorBackend,给它发送一个消息LaunchTask,发送序列化的Task。
CoarseGrainedExecutorBackend就收到了launchTasks消息,启动executor.launchTask。