6.2 Spark Application是如何向集群申请资源的
本节讲解Application申请资源的两种类型:第一种是尽可能在集群的所有Worker上分配Executor;第二种是运行在尽可能少的Worker上。本节讲解Application申请资源的源码内容,将彻底解密Spark Application是如何向集群申请资源的。
6.2.1 Application申请资源的两种类型详解
Master负责资源管理和调度。资源调度的方法schedule位于Master.scala类中,当注册程序或者资源发生改变时,都会导致schedule的调用。Schedule调用的时机:每次有新的应用程序提交或者集群资源状况发生改变时(包括Executor增加或者减少、Worker增加或者减少等)。
Spark默认为应用程序启动Executor的方式是FIFO的方式,也就是所有提交的应用程序都放在调度的等待队列中,先进先出,只有在满足了前面应用程序的资源分配的基础上,才能够满足下一个应用程序资源的分配;在FIFO的情况下,默认是spreadOutApps来让应用程序尽可能多地运行在所有的Node上。为应用程序分配Executors有两种方式:第一种方式是尽可能在集群的所有Worker上分配Executor,这种方式往往会带来潜在的、更好的数据本地性;第二种方式是尝试运行在尽可能少的Worker上。
为了更形象地描述Master的调度机制,下面通过图6-1介绍抽象的资源调度框架。
图6-1 Master中抽象的资源调度框架
其中,Worker1到WorkerN是集群中全部的Workers节点,调度时,会根据应用程序请求的资源信息,从全部Workers节点中过滤出资源足够的节点,假设可以得到Worker1到WorkerM的节点。当前过滤的需求是内核数和内存大小足够启动一个Executor,因为Executor是集群执行应用程序的单位组件(注意:和任务(Task)不是同一个概念,对应的任务是在Executor中执行的)。
选出可用Workers之后,会根据内核大小进行排序,这可以理解成是一种基于可用内核排序的、简单的负载均衡策略。然后根据设置的spreadOutApps参数,对应指定两种资源分配策略。
(1)当spreadOutApps=true:使用轮流均摊的策略,也就是采用圆桌(round-robin)算法,图中的虚线表示第一次轮流摊派的资源不足以满足申请的需求,因此开始第二轮摊派,依次轮流均摊,直到符合资源需求。
(2)当spreadOutApps=false:使用依次全占策略,依次从可用Workers上获取该Worker上可用的全部资源,直到符合资源需求。
对应图中Worker内部的小方块,在此表示分配的资源的抽象单位。对应资源的条件,理解的关键点在于资源是分配给Executor的,因此最终启动Executor时,占用的资源必须满足启动所需的条件。
前面描述了Workers上的资源是如何分配给应用程序的,之后正式开始为Executor分配资源,并向Worker发送启动Executor的命令了。根据申请时是否明确指定需要为每个Executor分配确定的内核个数,有:
(1)明确指定每个Executor需要分配的内核个数时:每次分配的是一个Executor所需的内核数和内存数,对应在某个Worker分配到的总的内核数可能是Executor的内核数的倍数,此时,该Worker节点上会启动多个Executor,每个Executor需要指定的内核数和内存数(注意该Worker节点上分配到的总的内存大小)。
(2)未明确指定每个Executor需要分配的内核个数时:每次分配一个内核,最后所有在某Worker节点上分配到的内核都会放到一个Executor内(未明确指定内核个数,因此可以一起放入一个Executor)。因此,最终该应用程序在一个Worker上只有一个Executor(这里指的是针对一个应用程序,当该Worker节点上存在多个应用程序时,仍然会为每个应用程序分别启动相应的Executor)。
在此强调、补充一下调度机制中使用的三个重要的配置属性。
a.指定为所有Executors分配的总内核个数:在spark-submit脚本提交参数时进行配置。所有Executors分配的总内核个数的控制属性在类SparkSubmitArguments的方法printUsageAndExit中。
1. //指定为所有Executors分配的总内核个数 2. | Spark standalone and Mesos only: 3. | --total-executor-cores NUM Total cores for all executors.
b.指定需要为每个Executor分配的内核个数:在spark-submit脚本提交参数时进行配置。每个Executor分配的内核个数的控制属性在类SparkSubmitArguments的方法printUsageAndExit中。
SparkSubmitArguments.scala的源码如下。
1. // 指定需要为每个Executor分配的内核个数 2. || Spark standalone and YARN only: 3. | --executor-cores NUM Number of cores per executor. (Default: 1 in YARN mode, 4. or all available cores on the worker in standalone mode)
c.资源分配策略:数据本地性(数据密集)与计算密集的控制属性,对应的配置属性在Master类中,代码如下。
1. private val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)
6.2.2 Application申请资源的源码详解
1.任务调度与资源调度的区别
任务调度是通过DAGScheduler、TaskScheduler、SchedulerBackend等进行的作业调度。
资源调度是指应用程序如何获得资源。
任务调度是在资源调度的基础上进行的,如果没有资源调度,任务调度就成为无源之水,无本之木。
2.资源调度内幕
(1)因为Master负责资源管理和调度,所以资源调度的方法shedule位于Master.scala类中,注册程序或者资源发生改变时都会导致schedule的调用,如注册程序时:
1. case RegisterApplication(description, driver) => 2. //待办事项:防止重复注册Driver 3. if (state == RecoveryState.STANDBY) { 4. //忽略,不要发送响应 5. } else { 6. logInfo("Registering app " + description.name) 7. val app = createApplication(description, driver) 8. registerApplication(app) 9. logInfo("Registered app " + description.name + " with ID " + app.id) 10. persistenceEngine.addApplication(app) 11. driver.send(RegisteredApplication(app.id, self)) 12. schedule() 13. }
(2)Schedule调用的时机:每次有新的应用程序提交或者集群资源状况发生改变的时候(包括Executor增加或者减少、Worker增加或者减少等)。
进入schedule(),schedule为当前等待的应用程序分配可用的资源。每当一个新的应用程序进来时,schedule都会被调用。或者资源发生变化时(如Executor挂掉,Worker挂掉,或者新增加机器),schedule都会被调用。
(3)当前Master必须以ALIVE的方式进行资源调度,如果不是ALIVE的状态,就会直接返回,也就是Standby Master不会进行Application的资源调用。
1. if (state != RecoveryState.ALIVE) { 2. return 3. }
(4)接下来通过workers.toSeq.filter(_.state == WorkerState.ALIVE)过滤判断所有Worker中哪些是ALIVE级别的Worker,ALIVE才能够参与资源的分配工作。
1. val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
(5)使用Random.shuffle把Master中保留的集群中所有ALIVE级别的Worker的信息随机打乱;Master的schedule()方法中:workers是一个数据结构,打乱workers有利于负载均衡。例如,不是以固定的顺序启动launchDriver。WorkerInfo是Worker注册时将信息注册过来。
1. val workers = new HashSet[WorkerInfo] 2. ....... 3. val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
WorkerInfo.scala的源码如下。
1. private[spark] class WorkerInfo( 2. val id: String, 3. val host: String, 4. val port: Int, 5. val cores: Int, 6. val memory: Int, 7. val endpoint: RpcEndpointRef, 8. val webUiAddress: String) 9. extends Serializable {
随机打乱的算法:将Worker的信息传进来,先调用new()函数创建一个ArrayBuffer,将所有的信息放进去。然后将两个索引位置的内容进行交换。例如,如果有4个Worker,依次分别为第一个Worker至第四个Worker,第一个位置是第1个Worker,第2个位置是第2个Worker,第3个位置是第3个Worker,第4个位置是第4个Worker;通过Shuffle以后,现在第一个位置可能是第3个Worker,第2个位置可能是第1个Worker,第3个位置可能是第4个Worker,第4个位置可能是第2个Worker,位置信息打乱。
Random.scala中的shuffle方法,其算法内部是循环随机交换所有Worker在Master缓存数据结构中的位置。
1. def shuffle[T, CC[X] <: TraversableOnce[X]](xs: CC[T])(implicit bf: CanBuildFrom[CC[T], T, CC[T]]): CC[T] = { 2. val buf = new ArrayBuffer[T] ++= xs 3. 4. def swap(i1: Int, i2: Int) { 5. val tmp = buf(i1) 6. buf(i1) = buf(i2) 7. buf(i2) = tmp 8. } 9. 10. for (n <- buf.length to 2 by -1) { 11. val k = nextInt(n) 12. swap(n - 1, k) 13. } 14. 15. (bf(xs) ++= buf).result 16. }
(6)Master的schedule()方法中:循环遍历等待启动的Driver,如果是Client模式,就不需要waitingDrivers等待;如果是Cluster模式,此时Driver会加入waitingDrivers等待列表。
当SparkSubmit指定Driver在Cluster模式的情况下,此时Driver会加入waitingDrivers等待列表中,在每个DriverInfo的DriverDescription中有要启动Driver时对Worker的内存及Cores的要求等内容。
1. private val waitingDrivers = new ArrayBuffer[DriverInfo] 2. ......
DriverInfo包括启动时间、ID、描述信息、提交时间等内容。
DriverInfo.scala的源码如下。
1. private[deploy] class DriverInfo( 2. val startTime: Long, 3. val id: String, 4. val desc: DriverDescription, 5. val submitDate: Date) 6. extends Serializable {
其中,DriverInfo的DriverDescription描述信息中包括jarUrl、内存、Cores、supervise、command等内容。如果在Cluster模式中指定supervise为True,那么Driver挂掉时就会自动重启。
DriverDescription.scala的源码如下。
1. private[deploy] case class DriverDescription( 2. jarUrl: String, 3. mem: Int, 4. cores: Int, 5. supervise: Boolean, 6. command: Command) {
在符合资源要求的情况下,采用随机打乱后的一个Worker来启动Driver,worker是Master中对Worker的一个描述。
Master.scala的launchDriver方法如下。
1. private def launchDriver(worker: WorkerInfo, driver: DriverInfo) { 2. logInfo("Launching driver " + driver.id + " on worker " + worker.id) 3. worker.addDriver(driver) 4. driver.worker = Some(worker) 5. worker.endpoint.send(LaunchDriver(driver.id, driver.desc)) 6. driver.state = DriverState.RUNNING 7. }
Master通过worker.endpoint.send(LaunchDriver)发指令给Worker,让远程的Worker启动Driver,Driver启动以后,Driver的状态就变成DriverState.RUNNING。
(7)先启动Driver,才会发生后续的一切资源调度的模式。
(8)Spark默认为应用程序启动Executor的方式是FIFO方式,也就是所有提交的应用程序都是放在调度的等待队列中的,先进先出,只有满足了前面应用程序的资源分配的基础,才能够满足下一个应用程序资源的分配。
Master的schedule()方法中,调用startExecutorsOnWorkers()为当前的程序调度和启动Worker的Executor,默认情况下排队的方式是FIFO。
startExecutorsOnWorkers的源码如下。
1. private def startExecutorsOnWorkers(): Unit = { 2. //这是一个非常简单的FIFO调度。我们尝试在队列中推入第一个应用程序,然后推入第二 //个应用程序等 3. for (app <- waitingApps if app.coresLeft > 0) { 4. val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor 5. //筛选出workers,其没有足够资源来启动Executor 6. val usableWorkers = workers.toArray.filter(_.state == WorkerState .ALIVE) 7. .filter(worker => worker.memoryFree >= app.desc .memoryPerExecutorMB && 8. worker.coresFree >= coresPerExecutor.getOrElse(1)) 9. .sortBy(_.coresFree).reverse 10. val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps) 11. 12. //现在我们决定每个worker分配多少cores,进行分配 13. for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) { 14. allocateWorkerResourceToExecutors( 15. app, assignedCores(pos), coresPerExecutor, usableWorkers(pos)) 16. } 17. } 18. }
(9)为应用程序具体分配Executor前要判断应用程序是否还需要分配Core,如果不需要,则不会为应用程序分配Executor。
startExecutorsOnWorkers中的coresLeft是请求的requestedCores和可用的coresGranted的相减值。例如,如果整个程序要求1000个Cores,但是目前集群可用的只有100个Cores,如果coresLeft不为0,就放入等待队列中;如果coresLeft是0,那么就不需要调度。
1. private[master] def coresLeft: Int = requestedCores - coresGranted
(10)Master.scala的startExecutorsOnWorkers中,具体分配Executor之前,要求Worker必须是ALIVE的状态且必须满足Application对每个Executor的内存和Cores的要求,并且在此基础上进行排序,产生计算资源由大到小的usableWorkers数据结构。
1. val usableWorkers = workers.toArray.filter(_.state == WorkerState .ALIVE) 2. .filter(worker => worker.memoryFree >= app.desc .memoryPerExecutorMB && 3. worker.coresFree >= coresPerExecutor.getOrElse(1)) 4. .sortBy(_.coresFree).reverse 5. val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
然后调用scheduleExecutorsOnWorkers,在FIFO的情况下,默认spreadOutApps让应用程序尽可能多地运行在所有的Node上。
1. private val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)
scheduleExecutorsOnWorker中,minCoresPerExecutor表示每个Executor最小分配的core个数。scheduleExecutorsOnWorker的源码如下。
1. private def scheduleExecutorsOnWorkers( 2. app: ApplicationInfo, 3. usableWorkers: Array[WorkerInfo], 4. spreadOutApps: Boolean): Array[Int] = { 5. val coresPerExecutor = app.desc.coresPerExecutor 6. val minCoresPerExecutor = coresPerExecutor.getOrElse(1) 7. val oneExecutorPerWorker = coresPerExecutor.isEmpty 8. val memoryPerExecutor = app.desc.memoryPerExecutorMB 9. val numUsable = usableWorkers.length 10. val assignedCores = new Array[Int](numUsable) 11. val assignedExecutors = new Array[Int](numUsable) 12. var coresToAssign = math.min(app.coresLeft, usableWorkers.map (_.coresFree).sum) 13. ......
(11)为应用程序分配Executors有两种方式:第一种方式是尽可能在集群的所有Worker上分配Executor,这种方式往往会带来潜在的、更好的数据本地性;第二种方式是尝试运行在尽可能少的Worker上。
(12)具体在集群上分配Cores时会尽可能地满足我们的要求。math.min用于计算最小值。coresToAssig用于计算app.coresLeft与可用的Worker中可用的Cores的和的最小值。例如,应用程序要求1000个Cores,但整个集群中只有100个Cores,所以只能先分配100个Cores。
scheduleExecutorsOnWorkers方法如下。
1. var coresToAssign = math.min(app.coresLeft, usableWorkers.map (_.coresFree).sum) 2. ......
(13)如果每个Worker下面只能为当前的应用程序分配一个Executor,那么每次只分配一个Core。scheduleExecutorsOnWorkers方法如下。
1. if (oneExecutorPerWorker) { 2. assignedExecutors(pos) = 1 3. } else { 4. assignedExecutors(pos) += 1 5. }
总结为两种情况:一种情况是尽可能在一台机器上运行程序的所有功能;另一种情况是尽可能在所有节点上运行程序的所有功能。无论是哪种情况,每次给Executor增加Cores,是增加一个,如果是spreadOutApps的方式,循环一轮再下一轮。例如,有4个Worker,第一次为每个Executor启动一个线程,第二次循环分配一个线程,第三次循环再分配一个线程……
scheduleExecutorsOnWorkers方法如下。
1. while (freeWorkers.nonEmpty) { 2. freeWorkers.foreach { pos => 3. var keepScheduling = true 4. while (keepScheduling && canLaunchExecutor(pos)) { 5. coresToAssign -= minCoresPerExecutor 6. assignedCores(pos) += minCoresPerExecutor 7. 8. //如果每个worker上启动一个Executor,那么每次迭代在Executor 上分配一 //个核,否则,每次迭代都将把内核分配给一个新的Executor 9. if (oneExecutorPerWorker) { 10. assignedExecutors(pos) = 1 11. } else { 12. assignedExecutors(pos) += 1 13. } 14. 15. //展开应用程序意味着将Executors展开到尽可能多的workers节点。如果不展 //开,将对这个workers的Executors进行调度,直到使用它的全部资源。否则, //只是移动到下一个worker节点 16. if (spreadOutApps) { 17. keepScheduling = false 18. } 19. } 20. }
回到Master.scala的startExecutorsOnWorkers,现在已经决定为每个worker分配多少个cores,然后进行资源分配。
1. for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) { 2. allocateWorkerResourceToExecutors( 3. app, assignedCores(pos), coresPerExecutor, usableWorkers(pos)) 4. }
allocateWorkerResourceToExecutors的源码如下。
1. private def allocateWorkerResourceToExecutors( 2. app: ApplicationInfo, 3. assignedCores: Int, 4. coresPerExecutor: Option[Int], 5. worker: WorkerInfo): Unit = { 6. //如果指定了每个Executor的内核数,我们就将分配的内核无剩余地均分给worker节点的 //Executors。否则,我们启动一个单一的 Executor,抓住这个 worker 节点所有的 //assignedCores 7. val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1) 8. val coresToAssign = coresPerExecutor.getOrElse(assignedCores) 9. for (i <- 1 to numExecutors) { 10. val exec = app.addExecutor(worker, coresToAssign) 11. launchExecutor(worker, exec) 12. app.state = ApplicationState.RUNNING 13. } 14. }
allocateWorkerResourceToExecutors中的app.addExecutor增加一个Executor,记录Executor的相关信息。
1. private[master] def addExecutor( 2. worker: WorkerInfo, 3. cores: Int, 4. useID: Option[Int] = None): ExecutorDesc = { 5. val exec = new ExecutorDesc(newExecutorId(useID), this, worker, cores, desc.memoryPerExecutorMB) 6. executors(exec.id) = exec 7. coresGranted += cores 8. exec 9. }
回到allocateWorkerResourceToExecutors方法中,launchExecutor(worker, exec)启动Executor。
1. launchExecutor(worker, exec)
(14)准备具体要为当前应用程序分配的Executor信息后,Master要通过远程通信发指令给Worker来具体启动ExecutorBackend进程。
launchExecutor方法如下。
1. private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = { 2. logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) 3. worker.addExecutor(exec) 4. worker.endpoint.send(LaunchExecutor(masterUrl, 5. exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)) 6. ......
(15)紧接着给应用程序的Driver发送一个ExecutorAdded的信息。
launchExecutor方法如下。
1. exec.application.driver.send( 2. ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)) 3. }