Spark大数据商业实战三部曲:内核解密|商业案例|性能调优
上QQ阅读APP看书,第一时间看更新

4.3 TaskScheduler解析

TaskScheduler的核心任务是提交TaskSet到集群运算并汇报结果。

(1)为TaskSet创建和维护一个TaskSetManager,并追踪任务的本地性以及错误信息。

(2)遇到Straggle任务时,会放到其他节点进行重试。

(3)向DAGScheduler汇报执行情况,包括在Shuffle输出丢失的时候报告fetch failed错误等信息。

4.3.1 TaskScheduler原理剖析

DAGScheduler将划分的一系列的Stage(每个Stage封装一个TaskSet),按照Stage的先后顺序依次提交给底层的TaskScheduler去执行。接下来我们分析TaskScheduler接收到DAGScheduler的Stage任务后,是如何来管理Stage(TaskSet)的生命周期的。

首先,回顾一下DAGScheduler在SparkContext中实例化的时候,TaskScheduler以及SchedulerBackend就已经先在SparkContext的createTaskScheduler创建出实例对象了。

虽然Spark支持多种部署模式(包括Local、Standalone、YARN、Mesos等),但是底层调度器TaskScheduler接口的实现类都是TaskSchedulerImpl。并且,考虑到方便读者对TaskScheduler的理解,对于SchedulerBackend的实现,我们也只专注Standalone部署模式下的具体实现StandaloneSchedulerBackend来作分析。

TaskSchedulerImpl在createTaskScheduler方法中实例化后,就立即调用自己的initialize方法把StandaloneSchedulerBackend的实例对象传进来,从而赋值给TaskSchedulerImpl的backend。在TaskSchedulerImpl的initialize方法中,根据调度模式的配置创建实现了SchedulerBuilder接口的相应的实例对象,并且创建的对象会立即调用buildPools创建相应数量的Pool存放和管理TaskSetManager的实例对象。实现SchedulerBuilder接口的具体类都是SchedulerBuilder的内部类。

(1)FIFOSchedulableBuilder:调度模式是SchedulingMode.FIFO,使用先进先出策略调度。先进先出(FIFO)为默认模式。在该模式下只有一个TaskSetManager池。

(2)FairSchedulableBuilder:调度模式是SchedulingMode.FAIR,使用公平策略调度。

在createTaskScheduler方法返回后,TaskSchedulerImpl通过DAGScheduler的实例化过程设置DAGScheduler的实例对象。然后调用自己的start方法。在TaskSchedulerImpl调用start方法的时候,会调用StandaloneSchedulerBackend的start方法,在StandaloneSchedulerBackend的start方法中,会最终注册应用程序AppClient。TaskSchedulerImpl的start方法中还会根据配置判断是否周期性地检查任务的推测执行。

TaskSchedulerImpl启动后,就可以接收DAGScheduler的submitMissingTasks方法提交过来的TaskSet进行进一步处理。TaskSchedulerImpl在submitTasks中初始化一个TaskSetManager,对其生命周期进行管理,当TaskSchedulerImpl得到Worker节点上的Executor计算资源的时候,会通过TaskSetManager发送具体的Task到Executor上执行计算。

如果Task执行过程中有错误导致失败,会调用TaskSetManager来处理Task失败的情况,进而通知DAGScheduler结束当前的Task。TaskSetManager会将失败的Task再次添加到待执行Task队列中。Spark Task允许失败的次数默认是4次,在TaskSchedulerImpl初始化的时候,通过spark.task.maxFailures设置该值。

如果Task执行完毕,执行的结果会反馈给TaskSetManager,由TaskSetManager通知DAGScheduler,DAGScheduler根据是否还存在待执行的Stage,继续迭代提交对应的TaskSet给TaskScheduler去执行,或者输出Job的结果。

通过下面的调度链,Executor把Task执行的结果返回给调度器(Scheduler)。

(1)Executor.run。

(2)CoarseGrainedExecutorBackend.statusUpdate(发送StatusUpdate消息)。

(3)CoarseGrainedSchedulerBackend.receive(处理StatusUpdate消息)。

(4)TaskSchedulerImpl.statusUpdate。

(5)TaskResultGetter.enqueueSuccessfulTask或者enqueueFailedTask。

(6)TaskSchedulerImpl.handleSuccessfulTask或者handleFailedTask。

(7)TaskSetManager.handleSuccessfulTask或者handleFailedTask。

(8)DAGScheduler.taskEnded。

(9)DAGScheduler.handleTaskCompletion。

在上面的调度链中值得关注的是:第(7)步中,TaskSetManager的handleFailedTask方法会将失败的Task再次添加到待执行Task队列中。在第(6)步中,TaskSchedulerImpl的handleFailedTask方法在TaskSetManager的handleFailedTask方法返回后,会调用CoarseGrainedSchedulerBackend的reviveOffers方法给重新执行的Task获取资源。

4.3.2 TaskScheduler源码解析

TaskScheduler是Spark的底层调度器。底层调度器负责Task本身的调度运行。

下面编写一个简单的测试代码,setMaster("local-cluster[1, 1, 1024]")设置为Spark本地伪分布式开发模式,从代码的运行日志中观察Spark框架的运行情况。

1.  object SparkTest {
2.   def main(args: Array[String]): Unit = {
3.     Logger.getLogger("org").setLevel(Level.ALL)
4.     val conf = new SparkConf()                      //创建SparkConf对象
5.     conf.setAppName("Wow,My First Spark App!") //设置应用程序的名称,在程序
                                                           //运行的监控界面中可以看到名称
6.     conf.setMaster("local-cluster[1, 1, 1024]")
7.     conf.setSparkHome(System.getenv("SPARK_HOME"))
8.     val sc = new SparkContext(conf)
                                    //创建SparkContext对象,通过传入SparkConf
                                    //实例来定制Spark运行的具体参数和配置信息
9.     sc.parallelize(Array("100","200"),4).count()
10.     sc.stop()
11.   }
12. }

在IDEA中运行代码,运行结果中打印的日志如下。

1.   Using Spark's default log4j profile: org/apache/spark/log4j-defaults.
     properties
2.  17/05/31 05:32:07 INFO SparkContext: Running Spark version 2.1.0
3.   ......
4.   17/05/31 05:46:06 INFO WorkerWebUI: Bound WorkerWebUI to 0.0.0.0, and
     started at http://192.168.93.1:51034
5.  17/05/31 05:46:06 INFO Worker: Connecting to master 192.168.93.1:51011...
6.  17/05/31 05:46:06 INFO StandaloneAppClient$ClientEndpoint: Connecting to
    master spark://192.168.93.1:51011...
7.  17/05/31 05:46:06 INFO TransportClientFactory: Successfully created
    connection to /192.168.93.1:51011 after 38 ms (0 ms spent in bootstraps)
8.  17/05/31 05:46:06 INFO TransportClientFactory: Successfully created
    connection to /192.168.93.1:51011 after 100 ms (0 ms spent in bootstraps)
9.  17/05/31 05:46:07 INFO Master: Registering worker 192.168.93.1:51033 with
    1 cores, 1024.0 MB RAM
10. 17/05/31 05:46:07 INFO Worker: Successfully registered with master
    spark://192.168.93.1:51011
11. 17/05/31 05:46:07 INFO Master: Registering app Wow,My First Spark App!
12. 17/05/31 05:46:07 INFO Master: Registered app Wow,My First Spark App! with
    ID app-20170531054607-0000
13. 17/05/31 05:46:07 INFO StandaloneSchedulerBackend: Connected to Spark
    cluster with app ID app-20170531054607-0000
14. 17/05/31 05:46:07 INFO Master: Launching executor app-20170531054607-
    0000/0 on worker worker-20170531054606-192.168.93.1-51033
15. 17/05/31     05:46:07    INFO    Worker:    Asked    to   launch    executor   app-
    20170531054607-0000/0 for Wow,My First Spark App!
16. 17/05/31 05:46:07 INFO StandaloneAppClient$ClientEndpoint: Executor
    added: app-20170531054607-0000/0 on worker-20170531054606-192.168.93.1-
    51033 (192.168.93.1:51033) with 1 cores
17. ......
18. 17/05/31 05:46:07 INFO StandaloneSchedulerBackend: SchedulerBackend is
    ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
19. 17/05/31 05:46:07 INFO StandaloneAppClient$ClientEndpoint: Executor
    updated: app-20170531054607-0000/0 is now RUNNING

日志中显示:StandaloneAppClient$ClientEndpoint: Connecting to master spark://192.168.93. 1:50686表明StandaloneAppClient的ClientEndpoint注册给master。日志中显示StandaloneApp-Client$ClientEndpoint: Executor added获取了Executor。具体是通过StandaloneAppClient的ClientEndpoint来管理Executor。日志中显示StandaloneSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0说明Standalone-SchedulerBackend已经准备好。

这里是在IDEA本地伪分布式运行的(通过count的action算子启动了Job)。如果是通过Spark-shell运行程序来观察日志,当启动Spark-shell本身的时候,命令终端反馈回来的主要是ClientEndpoint和StandaloneSchedulerBackend,因为此时还没有任何Job的触发,这是启动Application本身而已,所以主要就是实例化SparkContext,并注册当前的应用程序给Master,且从集群中获得ExecutorBackend计算资源。

IDEA本地伪分布式运行,Job启动的日志如下。

1.  17/05/31 05:46:08 INFO DAGScheduler: Got job 0 (count at SparkTest.scala:
     17) with 4 output partitions
2.  17/05/31 05:46:08 INFO DAGScheduler: Final stage: ResultStage 0 (count
    at SparkTest.scala:17)
3.  17/05/31 05:46:08 INFO DAGScheduler: Parents of final stage: List()
4.  17/05/31 05:46:08 INFO DAGScheduler: Missing parents: List()
5.  17/05/31     05:46:08     INFO    DAGScheduler:     Submitting      ResultStage  0
    (ParallelCollectionRDD[0] at parallelize at SparkTest.scala:17), which
    has no missing parents
6.  ......
7.  17/05/31 05:46:08 INFO DAGScheduler: Submitting 4 missing tasks from
    ResultStage 0 (ParallelCollectionRDD[0] at parallelize at SparkTest.scala:17)

count是action算子触发了Job;然后DAGScheduler获取Final stage: ResultStage,提交Submitting ResultStage。最后提交任务给TaskSetManager,启动任务。任务完成后,DAGScheduler完成Job。

DAGScheduler划分好Stage后,会通过TaskSchedulerImpl中的TaskSetManager来管理当前要运行的Stage中的所有任务TaskSet。TaskSetManager会根据locality aware来为Task分配计算资源、监控Task的执行状态(如重试、慢任务进行推测式执行等)。

TaskSet是一个数据结构,TaskSet包含了一系列高层调度器交给底层调度器的任务的集合。第一个成员是Tasks,第二个成员task属于哪个Stage,stageAttemptId是尝试的ID,priority优先级,调度的时候有一个调度池,调度归并调度的优先级。

1.   private[spark] class TaskSet(
2.      val tasks: Array[Task[_]],
3.      val stageId: Int,
4.      val stageAttemptId: Int,
5.      val priority: Int,
6.      val properties: Properties) {
7.    val id: String = stageId + "." + stageAttemptId
8.
9.    override def toString: String = "TaskSet " + id
10. }

TaskSetManager实例化的时候完成TaskSchedulerImpl的工作,接收TaskSet任务的集合,maxTaskFailures是任务失败重试的次数。

Spark 2.1.1版本的TaskSetManager.scala的源码如下。

1.  private[spark] class TaskSetManager(
2.     sched: TaskSchedulerImpl,
3.     val taskSet: TaskSet,
4.     val maxTaskFailures: Int,
5.     clock: Clock = new SystemClock()) extends Schedulable with Logging {

Spark 2.2.0版本的TaskSetManager.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第4行之后增加了blacklistTracker的成员变量,用于黑名单列表executors及nodes的跟踪。

1.     .....
2.      blacklistTracker: Option[BlacklistTracker] = None,
3.  .....

TaskScheduler与SchedulerBackend总体的底层任务调度的过程如下。

a. TaskSchedulerImpl.submitTasks:主要作用是将TaskSet加入到TaskSetManager中进行管理。

DAGScheduler.scala收到JobSubmitted消息,调用handleJobSubmitted方法。

1.  private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
2.  case JobSubmitted(jobId, rdd, func, partitions, callSite, listener,
    properties) =>
3.    dagScheduler.handleJobSubmitted(jobId,           rdd,    func,    partitions,
      callSite, listener, properties)

在handleJobSubmitted方法中提交submitStage。

1.  private[scheduler] def handleJobSubmitted(jobId: Int,
2.
3.  ......
4.    submitStage(finalStage)
5.  }

submitStage方法调用submitMissingTasks提交task。

1.  private def submitStage(stage: Stage) {
2.  ......
3.          submitMissingTasks(stage, jobId.get)
4.     ......

DAGScheduler.scala的submitMissingTasks里面调用了taskScheduler.submitTasks。

Spark 2.1.1版本的DAGScheduler.scala的源码如下。

1.  private def submitMissingTasks(stage: Stage, jobId: Int) {
2.     ......
3.  val tasks: Seq[Task[_]] = try {
4.        stage match {
5.          case stage: ShuffleMapStage =>
6.             ......
7.              new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
8.             ......
9.          case stage: ResultStage =>
10.             ......
11.             new ResultTask(stage.id, stage.latestInfo.attemptId,
12.            ......
13.     if (tasks.size > 0) {
14.       logInfo("Submitting " + tasks.size + " missing tasks from " + stage
          + " (" + stage.rdd + ")")
15.       stage.pendingPartitions ++= tasks.map(_.partitionId)
16.       logDebug("New pending partitions: " + stage.pendingPartitions)
17.       taskScheduler.submitTasks(new TaskSet(
18.         tasks.toArray,      stage.id,     stage.latestInfo.attemptId,      jobId,
            properties))
19.       stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
20.    ......

Spark 2.2.0版本的DAGScheduler.scala的源码与Spark 2.1.1版本相比具有如下特点。

 上段代码中第14行打印日志内容做了修改。

 上段代码中第15~16行代码删除,删除代码stage.pendingPartitions ++= tasks.map (_.partitionId),以及logDebug语句。

1.  .....
2.   logInfo(s"Submitting ${tasks.size} missing tasks from $stage ($
         {stage.rdd}) (first 15 " +  s"tasks are for partitions ${tasks.
         take(15).map(_.partitionId)})")
3.  ......

taskScheduler是一个接口trait,这里没有具体的实现。

1.  //提交要运行的任务序列
2.  def submitTasks(taskSet: TaskSet): Unit

taskScheduler的子类是TaskSchedulerImpl,TaskSchedulerImpl中submitTasks的具体实现如下。

1.    override def submitTasks(taskSet: TaskSet) {
2.     val tasks = taskSet.tasks
3.     logInfo("Adding task set " + taskSet.id + " with " + tasks.length +
       " tasks")
4.     this.synchronized {
5.       val manager = createTaskSetManager(taskSet, maxTaskFailures)
6.       val stage = taskSet.stageId
7.       val stageTaskSets =
8.         taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap
           [Int, TaskSetManager])
9.       stageTaskSets(taskSet.stageAttemptId) = manager
10.      val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
11.        ts.taskSet != taskSet && !ts.isZombie
12.      }
13.      if (conflictingTaskSet) {
14.        throw new IllegalStateException(s"more than one active taskSet for
           stage $stage:" +
15.          s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
16.      }
17.      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.
         properties)
18.
19.      if (!isLocal && !hasReceivedTask) {
20.        starvationTimer.scheduleAtFixedRate(new TimerTask() {
21.          override def run() {
22.            if (!hasLaunchedTask) {
23.              logWarning("Initial job has not accepted any resources; " +
24.                "check your cluster UI to ensure that workers are registered " +
25.                "and have sufficient resources")
26.            } else {
27.              this.cancel()
28.            }
29.          }
30.        }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
31.      }
32.      hasReceivedTask = true
33.    }
34.    backend.reviveOffers()
35.  }

高层调度器把任务集合传给了TaskSet,任务可能是ShuffleMapTask,也可能是ResultTask。获得taskSet.tasks任务赋值给变量tasks。然后使用了同步块synchronized,在同步块中调用createTaskSetManager,创建createTaskSetManager。createTaskSetManager代码如下。

Spark 2.1.1版本的TaskSchedulerImpl.scala的源码如下。

1.   private[scheduler] def createTaskSetManager(
2.      taskSet: TaskSet,
3.      maxTaskFailures: Int): TaskSetManager = {
4.    new TaskSetManager(this, taskSet, maxTaskFailures)
5.  }

Spark 2.2.0版本的TaskSchedulerImpl.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第4行TaskSetManager的成员变量新增加了blacklistTrackerOpt变量。

1.  ......
2.     new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt)
3.    ......

TaskSchedulerImpl.scala的createTaskSetManager会调用new()函数创建一个TaskSetManager,传进来的this是其本身TaskSchedulerImpl、任务集taskSet、最大失败重试次数maxTaskFailures。maxTaskFailures是在构建TaskSchedulerImpl时传入的。

而TaskSchedulerImpl是在SparkContext中创建的。SparkContext的源码如下。

1.          val (sched, ts) = SparkContext.createTaskScheduler(this, master,
            deployMode)
2.      _schedulerBackend = sched
3.      _taskScheduler = ts
4.      _dagScheduler = new DAGScheduler(this)
5.      _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
6.  ......
7.  private def createTaskScheduler(
8.  ......
9.   case SPARK_REGEX(sparkUrl) =>
10.         val scheduler = new TaskSchedulerImpl(sc)
11.         val masterUrls = sparkUrl.split(",").map("spark://" + _)
12.         val   backend    =  new   StandaloneSchedulerBackend(scheduler,   sc,
            masterUrls)
13.         scheduler.initialize(backend)
14.         (backend, scheduler)

在SparkContext.scala中,通过createTaskScheduler创建taskScheduler,而在createTaskScheduler方法中,模式匹配到Standalone的模式,用new函数创建一个TaskSchedulerImpl。

TaskSchedulerImpl的构造方法如下,Spark 2.2版本默认情况下,将获取配置文件中的config.MAX_TASK_FAILURES,MAX_TASK_FAILURES默认的最大失败重试次数是4次。

回到TaskSchedulerImpl,createTaskSetManager创建了TaskSetManager后,非常关键的一行代码是schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)。

b. SchedulableBuilder.addTaskSetManager:SchedulableBuilder会确定TaskSetManager的调度顺序,然后按照TaskSetManager的locality aware来确定每个Task具体运行在哪个ExecutorBackend中。

schedulableBuilder是应用程序级别的调度器。SchedulableBuilder是一个接口trait,建立调度树。buildPools:建立树节点pools。addTaskSetManager:建立叶子节点TaskSetManagers。

1.  private[spark] trait SchedulableBuilder {
2.  def rootPool: Pool
3.  def buildPools(): Unit
4.    def addTaskSetManager(manager: Schedulable, properties: Properties):
      Unit
5.  }

schedulableBuilder支持两种调度模式:FIFOSchedulableBuilder、FairSchedulableBuilder。FIFOSchedulableBuilder是先进先出调度模式。FairSchedulableBuilder是公平调度模式。调度策略可以通过spark-env.sh中的spark.scheduler.mode进行具体设置,默认是FIFO的方式。

回到TaskSchedulerImpl的submitTasks,看一下schedulableBuilder.addTaskSetManager中的调度模式schedulableBuilder。

1.  var schedulableBuilder: SchedulableBuilder = null

schedulableBuilder是SparkContext中new TaskSchedulerImpl(sc)在创建TaskSchedulerImpl的时候通过scheduler.initialize(backend)的initialize方法对schedulableBuilder进行了实例化。

具体调度模式有FIFO和FAIR两种,对应的SchedulableBuilder也有两种,即FIFOSchedulableBuilder、FairSchedulableBuilder。initialize方法中的schedulingMode模式默认是FIFO。

回到TaskSchedulerImpl的submitTasks,schedulableBuilder.addTaskSetManager之后,关键的一行代码是backend.reviveOffers()。

c. CoarseGrainedSchedulerBackend.reviveOffers:给DriverEndpoint发送ReviveOffers。SchedulerBackend.scala的reviveOffers方法没有具体实现。

1.  private[spark] trait SchedulerBackend {
2.   private val appId = "spark-application-" + System.currentTimeMillis
3.   def start(): Unit
4.   def stop(): Unit
5.   def reviveOffers(): Unit
6.   def defaultParallelism(): Int

CoarseGrainedSchedulerBackend是SchedulerBackend的子类。CoarseGrainedScheduler-Backend的reviveOffers方法如下。

1.  override def reviveOffers() {
2.    driverEndpoint.send(ReviveOffers)
3.  }

CoarseGrainedSchedulerBackend的reviveOffers方法中给DriverEndpoint发送ReviveOffers消息,而ReviveOffers本身是一个空的case object对象,ReviveOffers本身是一个空的case object对象,只是起到触发底层资源调度的作用,在有Task提交或者计算资源变动的时候,会发送ReviveOffers这个消息作为触发器。

1.  case object ReviveOffers extends CoarseGrainedClusterMessage

TaskScheduler中要负责为Task分配计算资源:此时程序已经具备集群中的计算资源了,根据计算本地性原则确定Task具体要运行在哪个ExecutorBackend中。

driverEndpoint.send(ReviveOffers)将ReviveOffers消息发送给driverEndpoint,而不是发送给StandaloneAppClient,因为driverEndpoint是程序的调度器。driverEndpoint的receive方法中模式匹配到ReviveOffers消息,就调用makeOffers方法。

1.    override def receive: PartialFunction[Any, Unit] = {
2.     case StatusUpdate(executorId, taskId, state, data) =>
3.  ......
4.     case ReviveOffers =>
5.       makeOffers()

d.在DriverEndpoint接受ReviveOffers消息并路由到makeOffers具体的方法中:在makeOffers方法中首先准备好所有可以用于计算的workOffers(代表了所有可用ExecutorBackend中可以使用的Cores等信息)。

Spark 2.1.1版本的CoarseGrainedSchedulerBackend.scala的源码如下。

1.   private def makeOffers() {
2.    //过滤掉已被Kill的executors节点
3.    val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
4.    val workOffers = activeExecutors.map { case (id, executorData) =>
5.      new   WorkerOffer(id,      executorData.executorHost,        executorData.
        freeCores)
6.    }.toIndexedSeq
7.    launchTasks(scheduler.resourceOffers(workOffers))
8.  }

Spark 2.2.0版本的CoarseGrainedSchedulerBackend.scala的源码与Spark 2.1.1版本相比具有如下特点。

 上段代码中第3行之前新增一行代码:增加了同步锁CoarseGrainedSchedulerBackend. this.synchronized。

 上段代码中第6行之前,新增代码scheduler.resourceOffers(workOffers)。

 删除toIndexedSeq语句。

 上段代码中第7行代码launchTasks调整:增加taskDescs不为空的逻辑判断,然后launchTasks。

1.   private def makeOffers() {
2.        //在执行某项任务时,确保没有executor节点被Kill
3.        val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
4.       .......
5.         scheduler.resourceOffers(workOffers)
6.        }
7.        if (!taskDescs.isEmpty) {
8.          launchTasks(taskDescs)
9.        }
10.     }

其中的executorData类如下,包括freeCores、totalCores等信息。

1.    private[cluster] class ExecutorData(
2.     val executorEndpoint: RpcEndpointRef,
3.     val executorAddress: RpcAddress,
4.     override val executorHost: String,
5.     var freeCores: Int,
6.     override val totalCores: Int,
7.     override val logUrlMap: Map[String, String]
8.  ) extends ExecutorInfo(executorHost, totalCores, logUrlMap)

在makeOffers中首先找到可以利用的activeExecutors,然后创建workOffers。workOffers是一个数据结构case class,表示具体的Executor可能的资源。这里只考虑CPU cores,不考虑内存,因为之前内存已经分配完成。

1.    private[spark]
2.  case class WorkerOffer(executorId: String, host: String, cores: Int)

makeOffers方法中,TaskSchedulerImpl.resourceOffers为每个Task具体分配计算资源,输入offers: IndexedSeq[WorkerOffer]一维数组是可用的计算资源,ExecutorBackend及其上可用的Cores,输出TaskDescription的二维数组Seq[Seq[TaskDescription]]定义每个任务的数据本地性及放在哪个Executor上执行。

TaskDescription包括executorId,TaskDescription中已经确定好了Task具体要运行在哪个ExecutorBackend上。而确定Task具体运行在哪个ExecutorBackend上的算法由TaskSetManager的resourceOffer方法决定。

Spark 2.1.1版本的TaskDescription.scala的源码如下。

1.  private[spark] class TaskDescription(
2.     val taskId: Long,
3.     val attemptNumber: Int,
4.     val executorId: String,
5.     val name: String,
6.     val index: Int,    //在该任务中的TaskSet的索引
7.     _serializedTask: ByteBuffer)
8.   extends Serializable {

Spark 2.2.0版本的TaskDescription.scala的源码与Spark 2.1.1版本相比具有如下特点。

 上段代码中第6行之后,TaskDescription类的成员变量中增加了addedFiles、addedJars、properties等成员。

 上段代码中第7行_serializedTask名称调整为val serializedTask。

 上段代码中第8行删除。删掉继承类Serializable。

1.  ......
2.    val addedFiles: Map[String, Long],
3.    val addedJars: Map[String, Long],
4.    val properties: Properties,
5.    val serializedTask: ByteBuffer) {

resourceOffers由群集管理器调用提供slaves的资源,根据优先级顺序排列任务,以循环的方式填充每个节点的任务,使得集群的任务运行均衡。

Spark 2.1.1版本的TaskSchedulerImpl.scala的源码如下。

1.  def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]]
    = synchronized {
2.      //标记每一个slave 节点活跃状态,记录主机名
3.      //如是新的executor节点增加,则进行跟踪
4.      var newExecAvail = false
5.      for (o <- offers) {
6.        if (!hostToExecutors.contains(o.host)) {
7.          hostToExecutors(o.host) = new HashSet[String]()
8.        }
9.        if (!executorIdToRunningTaskIds.contains(o.executorId)) {
10.         hostToExecutors(o.host) += o.executorId
11.         executorAdded(o.executorId, o.host)
12.         executorIdToHost(o.executorId) = o.host
13.         executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
14.        newExecAvail = true
15.      }
16.      for (rack <- getRackForHost(o.host)) {
17.        hostsByRack.getOrElseUpdate(rack,          new    HashSet[String]())       +=
            o.host
18.      }
19.    }
20.
21.    //随机洗牌,避免总是把任务放在同一组worker节点
22.    val shuffledOffers = Random.shuffle(offers)
23.    //建立要分配给每个worker节点的任务列表
24.    val tasks = shuffledOffers.map(o => new ArrayBuffer
       [TaskDescription](o.cores))
25.    val availableCpus = shuffledOffers.map(o => o.cores).toArray
26.    val sortedTaskSets = rootPool.getSortedTaskSetQueue
27.    for (taskSet <- sortedTaskSets) {
28.      logDebug("parentName: %s, name: %s, runningTasks: %s".format(
29.        taskSet.parent.name, taskSet.name, taskSet.runningTasks))
30.      if (newExecAvail) {
31.        taskSet.executorAdded()
32.      }
33.    }
34.
35.    //把每个TaskSet放在调度顺序中,然后提供它的每个节点本地性级别的递增顺序,以便
       //它有机会启动所有任务的本地任务
36.    //注意:数据本地性优先级别顺序:PROCESS_LOCAL, NODE_LOCAL, NO_PREF,
       //RACK_LOCAL, ANY
37.    for (taskSet <- sortedTaskSets) {
38.      var launchedAnyTask = false
39.      var launchedTaskAtCurrentMaxLocality = false
40.      for (currentMaxLocality <- taskSet.myLocalityLevels) {
41.        do {
42.          launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
43.            taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
44.          launchedAnyTask |= launchedTaskAtCurrentMaxLocality
45.        } while (launchedTaskAtCurrentMaxLocality)
46.      }
47.      if (!launchedAnyTask) {
48.        taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
49.      }
50.    }
51.
52.    if (tasks.size > 0) {
53.      hasLaunchedTask = true
54.    }
55.    return tasks
56.  }

Spark 2.2.0版本的TaskSchedulerImpl.scala的源码与Spark 2.1.1版本相比具有如下特点。

 上段代码中第22行之前新增加一段代码,删除黑名单节点。

 上段代码中第22行将Random.shuffle(offers)调整为shuffleOffers(filteredOffers)。

1.  ......
2.  //在分配资源之前,删除黑名单列表中已过期的任何节点。这么做是为了避免单独的线程和增加
    //同步开销,也因为黑名单的更新只有在分配任务Task资源时才是相关的
3.      blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())
4.
5.      val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>
6.        offers.filter { offer =>
7.          !blacklistTracker.isNodeBlacklisted(offer.host) &&
8.            !blacklistTracker.isExecutorBlacklisted(offer.executorId)
9.        }
10.     }.getOrElse(offers)
11.
12.     val shuffledOffers = shuffleOffers(filteredOffers)
13. .......

resourceOffers中:

 标记每一个活着的slave,记录它的主机名,并跟踪是否增加了新的Executor。感知集群动态资源的状况。

 offers是集群有哪些可用的资源,循环遍历offers,hostToExecutors是否包含当前的host,如果不包含,就将Executor加进去。因为这里是最新请求,获取机器有哪些可用的计算资源。

 getRackForHost是数据本地性,默认情况下,在一个机架Rack里面,生产环境中可能分若干个机架Rack。

 重要的一行代码val shuffledOffers = Random.shuffle(offers):将可用的计算资源打散。

 tasks将获得洗牌后的shuffledOffers通过map转换,对每个worker用了ArrayBuffer[TaskDescription],每个Executor可以放几个[TaskDescription],就可以运行多少个任务。即多少个Cores,就可以分配多少任务。ArrayBuffer是一个一维数组,数组的长度根据当前机器的CPU个数决定。

ArrayBuffer[TaskDescription](o.cores)说明当前ExecutorBackend上可以分配多少个Task,并行运行多少Task,和RDD的分区个数是两个概念:这里不是决定Task的个数,RDD的分区数在创建RDD时就已经决定了。这里,具体任务调度是指Task分配在哪些机器上,每台机器上分配多少Task,一次能分配多少Task。

 在TaskSchedulerImpl中的initialize中创建rootPool,将schedulingMode调度模式传进去。rootPool的叶子节点是TaskSetManagers,按照一定的算法计算Stage的TaskSet调度的优先顺序。

 for循环遍历sortedTaskSets,如果有新的可用的Executor,通过taskSet.executorAdded()加入taskSet。

TastSetManager的executorAdded方法如下。

1.    def recomputeLocality() {
2.     val previousLocalityLevel = myLocalityLevels(currentLocalityIndex)
3.     myLocalityLevels = computeValidLocalityLevels()
4.     localityWaits = myLocalityLevels.map(getLocalityWait)
5.     currentLocalityIndex = getLocalityIndex(previousLocalityLevel)
6.   }
7.
8.   def executorAdded() {
9.     recomputeLocality()
10.  }

数据本地优先级从高到低依次为:PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、ANY。其中,NO_PREF是指机器本地性,一台机器上有很多Node,Node的优先级高于机器本地性。

 resourceOffers中追求最高级别的优先级本地性源码如下。

1.   for (taskSet <- sortedTaskSets) {
2.        var launchedAnyTask = false
3.        var launchedTaskAtCurrentMaxLocality = false
4.        for (currentMaxLocality <- taskSet.myLocalityLevels) {
5.          do {
6.            launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
7.              taskSet, currentMaxLocality, shuffledOffers, availableCpus,
                tasks)
8.            launchedAnyTask |= launchedTaskAtCurrentMaxLocality
9.          } while (launchedTaskAtCurrentMaxLocality)
10.       }
11.       if (!launchedAnyTask) {
12.         taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
13.       }
14.     }

循环遍历sortedTaskSets,对其中的每个taskSet,首先考虑myLocalityLevels的优先性,myLocalityLevels计算数据本地性的Level,将PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、ANY循环一遍。myLocalityLevels是通过computeValidLocalityLevels方法获取到的。

Spark 2.1.1版本的TaskSetManager.scala的computeValidLocalityLevels的源码如下。

1.        var myLocalityLevels: Array[TaskLocality] = computeValidLocalityLevels()
2.  ......
3.  private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
4.      import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL,
        ANY}
5.      val levels = new ArrayBuffer[TaskLocality.TaskLocality]
6.    if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) !=
      0 && pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
7.        levels += PROCESS_LOCAL
8.      }
9.      if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0 &&
10.         pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
11.       levels += NODE_LOCAL
12.     }
13.     if (!pendingTasksWithNoPrefs.isEmpty) {
14.       levels += NO_PREF
15.     }
16.     if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0 &&
17.         pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_)))
{
18.       levels += RACK_LOCAL
19.     }
20.     levels += ANY
21.     logDebug("Valid locality levels for " + taskSet + ": " + levels.
        mkString(", "))
22.     levels.toArray
23.   }

Spark 2.2.0版本的TaskSetManager.scala的computeValidLocalityLevels的源码与Spark 2.1.1版本相比具有如下特点。

 上段代码中第6行在if逻辑判断时去掉了getLocalityWait获取数据本地性等待时间的判断。

 上段代码中第9行删掉了getLocalityWait判断。

 上段代码中第16行删掉了getLocalityWait判断。

1.  ........
2.      if (!pendingTasksForExecutor.isEmpty &&
3.         pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_)))
           {
4.   ........
5.      if (!pendingTasksForHost.isEmpty &&
6.          pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
7.     ........
8.      if (!pendingTasksForRack.isEmpty &&
9.          pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {
10.   ........

resourceOfferSingleTaskSet的源码如下。

1.       private def resourceOfferSingleTaskSet(
2.       taskSet: TaskSetManager,
3.       maxLocality: TaskLocality,
4.       shuffledOffers: Seq[WorkerOffer],
5.       availableCpus: Array[Int],
6.       tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
7.     var launchedTask = false
8.     for (i <- 0 until shuffledOffers.size) {
9.       val execId = shuffledOffers(i).executorId
10.      val host = shuffledOffers(i).host
11.      if (availableCpus(i) >= CPUS_PER_TASK) {
12.        try {
13.          for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
14.            tasks(i) += task
15.            val tid = task.taskId
16.            taskIdToTaskSetManager(tid) = taskSet
17.            taskIdToExecutorId(tid) = execId
18.            executorIdToRunningTaskIds(execId).add(tid)
19.            availableCpus(i) -= CPUS_PER_TASK
20.            assert(availableCpus(i) >= 0)
21.            launchedTask = true
22.          }
23.        } catch {
24.          case e: TaskNotSerializableException =>
25.            logError(s"Resource offer failed, task set ${taskSet.name} was
               not serializable")
26.            //不为这个任务提供资源,但不抛出错误,以允许其他任务集提交任务
27.            return launchedTask
28.        }
29.      }
30.    }
31.    return launchedTask
32.  }

resourceOfferSingleTaskSet方法中的CPUS_PER_TASK是每个Task默认采用一个线程进行计算的。TaskSchedulerImpl.scala中CPUS_PER_TASK的源码如下。

1.      //CPUs to request per task
2.  val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)

resourceOfferSingleTaskSet方法中的taskSet.resourceOffer,通过调用TaskSetManager的 resourceOffer最终确定每个Task具体运行在哪个ExecutorBackend的具体的Locality Level。

Spark 2.1.1版本的TaskSetManager.scala的源码如下。

1.     def resourceOffer(
2.        execId: String,
3.        host: String,
4.        maxLocality: TaskLocality.TaskLocality)
5.      : Option[TaskDescription] =
6.    {
7.  ......
8.  sched.dagScheduler.taskStarted(task, info)
9.      new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,
10.           taskName, index, serializedTask)
11.

Spark 2.2.0版本的TaskSetManager.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第9行构建TaskDescription实例时,新增传入addedFiles、addedJars、localProperties等成员变量。

1.  ......
2.          new TaskDescription(
3.            ......
4.            sched.sc.addedFiles,
5.            sched.sc.addedJars,
6.            task.localProperties,
7.         ......

以上内容都在做一件事情:获取Locality Level本地性的层次。DagScheduler告诉我们任务运行在哪台机器上,DAGScheduler是从数据层面考虑preferedLocation的,DAGScheduler从RDD的层面确定就可以;而TaskScheduler是从具体计算Task的角度考虑计算的本地性,TaskScheduler是更具体的底层调度。本地性的两个层面:①数据的本地性;②计算的本地性。

总结:scheduler.resourceOffers确定了每个Task具体运行在哪个ExecutorBackend上;resourceOffers到底是如何确定Task具体运行在哪个ExecutorBackend上的呢?

i.通过Random.shuffle方法重新洗牌所有的计算资源,以寻求计算的负载均衡。

ii.根据每个ExecutorBackend的cores的个数声明类型为TaskDescription的ArrayBuffer数组。

iii.如果有新的ExecutorBackend分配给我们的Job,此时会调用executorAdded来获得最新的、完整的可用计算资源。

iv.通过下述代码追求最高级别的优先级本地性。

1.   for (taskSet <- sortedTaskSets) {
2.    var launchedAnyTask = false
3.    var launchedTaskAtCurrentMaxLocality = false
4.    for (currentMaxLocality <- taskSet.myLocalityLevels) {
5.      do {
6.        launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
7.          taskSet, currentMaxLocality, shuffledOffers, availableCpus,
            tasks)
8.        launchedAnyTask |= launchedTaskAtCurrentMaxLocality
9.      } while (launchedTaskAtCurrentMaxLocality)
10.   }
11.   if (!launchedAnyTask) {
12.     taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
13.    }
14.  }

v.通过调用TaskSetManager的resourceOffer最终确定每个Task具体运行在哪个ExecutorBackend的具体的Locality Level。

回到CoarseGrainedSchedulerBackend.scala的launchTasks方法。

Spark 2.1.1版本的CoarseGrainedSchedulerBackend.scala的源码如下。

1.   private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
2.        for (task <- tasks.flatten) {
3.          val serializedTask = ser.serialize(task)
4.          if (serializedTask.limit >= maxRpcMessageSize) {
5.            scheduler.taskIdToTaskSetManager.get(task.taskId).foreach
              { taskSetMgr =>
6.              try {
7.                var msg = "Serialized task %s:%d was %d bytes, which exceeds
                  max allowed: " +
8.                  "spark.rpc.message.maxSize (%d bytes). Consider increasing " +
9.                  "spark.rpc.message.maxSize or using broadcast variables for
                    large values."
10.               msg = msg.format(task.taskId, task.index, serializedTask
                  .limit, maxRpcMessageSize)
11.               taskSetMgr.abort(msg)
12.             } catch {
13.               case e: Exception => logError("Exception in error callback",
                  e)
14.             }
15.           }
16.         }
17.         else {
18.           val executorData = executorDataMap(task.executorId)
19.           executorData.freeCores -= scheduler.CPUS_PER_TASK
20.
21.           logDebug(s"Launching task ${task.taskId} on executor id:
              ${task.executorId} hostname: " +
22.             s"${executorData.executorHost}.")
23.
24.           executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer
              (serializedTask)))
25.         }
26.       }
27.     }

Spark 2.2.0版本的CoarseGrainedSchedulerBackend.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第3行调整使用TaskDescription.encode方法编码,对任务Task进行序列化。

1.   ......
2.    val serializedTask = TaskDescription.encode(task)
3.  ......

e.通过launchTasks把任务发送给ExecutorBackend去执行。

launchTasks首先进行序列化,但序列化Task的大小不能太大,如果超过maxRpcMessageSize,则提示出错信息。

RpcUtils.scala中maxRpcMessageSize的定义,spark.rpc.message.maxSize默认设置是128MB:

1.       private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
2.  ......
3.    def maxMessageSizeBytes(conf: SparkConf): Int = {
4.      val maxSizeInMB = conf.getInt("spark.rpc.message.maxSize", 128)
5.      if (maxSizeInMB > MAX_MESSAGE_SIZE_IN_MB) {
6.        throw new IllegalArgumentException(
7.          s"spark.rpc.message.maxSize should not be greater than $MAX_
            MESSAGE_SIZE_IN_MB MB")
8.      }
9.      maxSizeInMB * 1024 * 1024
10.   }
11. }

Task进行广播时的maxSizeInMB大小是128MB,如果任务大于等于128MB,则Task直接被丢弃掉;如果小于128MB,会通过CoarseGrainedSchedulerBackend去launchTask到具体的ExecutorBackend上。

CoarseGrainedSchedulerBackend.scala的launchTasks方法:通过executorData. executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))交给Task要运行的ExecutorBackend,给它发送一个消息LaunchTask,发送序列化的Task。

CoarseGrainedExecutorBackend就收到了launchTasks消息,启动executor.launchTask。