Spark大数据商业实战三部曲:内核解密|商业案例|性能调优
上QQ阅读APP看书,第一时间看更新

6.2 Spark Application是如何向集群申请资源的

本节讲解Application申请资源的两种类型:第一种是尽可能在集群的所有Worker上分配Executor;第二种是运行在尽可能少的Worker上。本节讲解Application申请资源的源码内容,将彻底解密Spark Application是如何向集群申请资源的。

6.2.1 Application申请资源的两种类型详解

Master负责资源管理和调度。资源调度的方法schedule位于Master.scala类中,当注册程序或者资源发生改变时,都会导致schedule的调用。Schedule调用的时机:每次有新的应用程序提交或者集群资源状况发生改变时(包括Executor增加或者减少、Worker增加或者减少等)。

Spark默认为应用程序启动Executor的方式是FIFO的方式,也就是所有提交的应用程序都放在调度的等待队列中,先进先出,只有在满足了前面应用程序的资源分配的基础上,才能够满足下一个应用程序资源的分配;在FIFO的情况下,默认是spreadOutApps来让应用程序尽可能多地运行在所有的Node上。为应用程序分配Executors有两种方式:第一种方式是尽可能在集群的所有Worker上分配Executor,这种方式往往会带来潜在的、更好的数据本地性;第二种方式是尝试运行在尽可能少的Worker上。

为了更形象地描述Master的调度机制,下面通过图6-1介绍抽象的资源调度框架。

图6-1 Master中抽象的资源调度框架

其中,Worker1到WorkerN是集群中全部的Workers节点,调度时,会根据应用程序请求的资源信息,从全部Workers节点中过滤出资源足够的节点,假设可以得到Worker1到WorkerM的节点。当前过滤的需求是内核数和内存大小足够启动一个Executor,因为Executor是集群执行应用程序的单位组件(注意:和任务(Task)不是同一个概念,对应的任务是在Executor中执行的)。

选出可用Workers之后,会根据内核大小进行排序,这可以理解成是一种基于可用内核排序的、简单的负载均衡策略。然后根据设置的spreadOutApps参数,对应指定两种资源分配策略。

(1)当spreadOutApps=true:使用轮流均摊的策略,也就是采用圆桌(round-robin)算法,图中的虚线表示第一次轮流摊派的资源不足以满足申请的需求,因此开始第二轮摊派,依次轮流均摊,直到符合资源需求。

(2)当spreadOutApps=false:使用依次全占策略,依次从可用Workers上获取该Worker上可用的全部资源,直到符合资源需求。

对应图中Worker内部的小方块,在此表示分配的资源的抽象单位。对应资源的条件,理解的关键点在于资源是分配给Executor的,因此最终启动Executor时,占用的资源必须满足启动所需的条件。

前面描述了Workers上的资源是如何分配给应用程序的,之后正式开始为Executor分配资源,并向Worker发送启动Executor的命令了。根据申请时是否明确指定需要为每个Executor分配确定的内核个数,有:

(1)明确指定每个Executor需要分配的内核个数时:每次分配的是一个Executor所需的内核数和内存数,对应在某个Worker分配到的总的内核数可能是Executor的内核数的倍数,此时,该Worker节点上会启动多个Executor,每个Executor需要指定的内核数和内存数(注意该Worker节点上分配到的总的内存大小)。

(2)未明确指定每个Executor需要分配的内核个数时:每次分配一个内核,最后所有在某Worker节点上分配到的内核都会放到一个Executor内(未明确指定内核个数,因此可以一起放入一个Executor)。因此,最终该应用程序在一个Worker上只有一个Executor(这里指的是针对一个应用程序,当该Worker节点上存在多个应用程序时,仍然会为每个应用程序分别启动相应的Executor)。

在此强调、补充一下调度机制中使用的三个重要的配置属性。

a.指定为所有Executors分配的总内核个数:在spark-submit脚本提交参数时进行配置。所有Executors分配的总内核个数的控制属性在类SparkSubmitArguments的方法printUsageAndExit中。

1.  //指定为所有Executors分配的总内核个数
2.  | Spark standalone and Mesos only:
3.  |  --total-executor-cores NUM  Total cores for all executors.

b.指定需要为每个Executor分配的内核个数:在spark-submit脚本提交参数时进行配置。每个Executor分配的内核个数的控制属性在类SparkSubmitArguments的方法printUsageAndExit中。

SparkSubmitArguments.scala的源码如下。

1.  // 指定需要为每个Executor分配的内核个数
2.  || Spark standalone and YARN only:
3.  |  --executor-cores NUM        Number of cores per executor. (Default: 1
    in YARN mode,
4.    or all available cores on the worker in standalone mode)

c.资源分配策略:数据本地性(数据密集)与计算密集的控制属性,对应的配置属性在Master类中,代码如下。

1.  private val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut",
    true)

6.2.2 Application申请资源的源码详解

1.任务调度与资源调度的区别

 任务调度是通过DAGScheduler、TaskScheduler、SchedulerBackend等进行的作业调度。

 资源调度是指应用程序如何获得资源。

 任务调度是在资源调度的基础上进行的,如果没有资源调度,任务调度就成为无源之水,无本之木。

2.资源调度内幕

(1)因为Master负责资源管理和调度,所以资源调度的方法shedule位于Master.scala类中,注册程序或者资源发生改变时都会导致schedule的调用,如注册程序时:

1.  case RegisterApplication(description, driver) =>
2.  //待办事项:防止重复注册Driver
3.  if (state == RecoveryState.STANDBY) {
4.    //忽略,不要发送响应
5.  } else {
6.     logInfo("Registering app " + description.name)
7.     val app = createApplication(description, driver)
8.     registerApplication(app)
9.     logInfo("Registered app " + description.name + " with ID " + app.id)
10.    persistenceEngine.addApplication(app)
11.    driver.send(RegisteredApplication(app.id, self))
12.    schedule()
13.  }

(2)Schedule调用的时机:每次有新的应用程序提交或者集群资源状况发生改变的时候(包括Executor增加或者减少、Worker增加或者减少等)。

进入schedule(),schedule为当前等待的应用程序分配可用的资源。每当一个新的应用程序进来时,schedule都会被调用。或者资源发生变化时(如Executor挂掉,Worker挂掉,或者新增加机器),schedule都会被调用。

(3)当前Master必须以ALIVE的方式进行资源调度,如果不是ALIVE的状态,就会直接返回,也就是Standby Master不会进行Application的资源调用。

1.     if (state != RecoveryState.ALIVE) {
2.    return
3.  }

(4)接下来通过workers.toSeq.filter(_.state == WorkerState.ALIVE)过滤判断所有Worker中哪些是ALIVE级别的Worker,ALIVE才能够参与资源的分配工作。

1.  val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state
    == WorkerState.ALIVE))

(5)使用Random.shuffle把Master中保留的集群中所有ALIVE级别的Worker的信息随机打乱;Master的schedule()方法中:workers是一个数据结构,打乱workers有利于负载均衡。例如,不是以固定的顺序启动launchDriver。WorkerInfo是Worker注册时将信息注册过来。

1.  val workers = new HashSet[WorkerInfo]
2.  .......
3.   val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state
     == WorkerState.ALIVE))

WorkerInfo.scala的源码如下。

1.  private[spark] class WorkerInfo(
2.     val id: String,
3.     val host: String,
4.     val port: Int,
5.     val cores: Int,
6.     val memory: Int,
7.     val endpoint: RpcEndpointRef,
8.     val webUiAddress: String)
9.   extends Serializable {

随机打乱的算法:将Worker的信息传进来,先调用new()函数创建一个ArrayBuffer,将所有的信息放进去。然后将两个索引位置的内容进行交换。例如,如果有4个Worker,依次分别为第一个Worker至第四个Worker,第一个位置是第1个Worker,第2个位置是第2个Worker,第3个位置是第3个Worker,第4个位置是第4个Worker;通过Shuffle以后,现在第一个位置可能是第3个Worker,第2个位置可能是第1个Worker,第3个位置可能是第4个Worker,第4个位置可能是第2个Worker,位置信息打乱。

Random.scala中的shuffle方法,其算法内部是循环随机交换所有Worker在Master缓存数据结构中的位置。

1.   def shuffle[T, CC[X] <: TraversableOnce[X]](xs: CC[T])(implicit bf:
     CanBuildFrom[CC[T], T, CC[T]]): CC[T] = {
2.      val buf = new ArrayBuffer[T] ++= xs
3.
4.      def swap(i1: Int, i2: Int) {
5.        val tmp = buf(i1)
6.        buf(i1) = buf(i2)
7.        buf(i2) = tmp
8.      }
9.
10.     for (n <- buf.length to 2 by -1) {
11.       val k = nextInt(n)
12.       swap(n - 1, k)
13.     }
14.
15.     (bf(xs) ++= buf).result
16.   }

(6)Master的schedule()方法中:循环遍历等待启动的Driver,如果是Client模式,就不需要waitingDrivers等待;如果是Cluster模式,此时Driver会加入waitingDrivers等待列表。

当SparkSubmit指定Driver在Cluster模式的情况下,此时Driver会加入waitingDrivers等待列表中,在每个DriverInfo的DriverDescription中有要启动Driver时对Worker的内存及Cores的要求等内容。

1.   private val waitingDrivers = new ArrayBuffer[DriverInfo]
2.  ......

DriverInfo包括启动时间、ID、描述信息、提交时间等内容。

DriverInfo.scala的源码如下。

1.  private[deploy] class DriverInfo(
2.     val startTime: Long,
3.     val id: String,
4.     val desc: DriverDescription,
5.     val submitDate: Date)
6.   extends Serializable {

其中,DriverInfo的DriverDescription描述信息中包括jarUrl、内存、Cores、supervise、command等内容。如果在Cluster模式中指定supervise为True,那么Driver挂掉时就会自动重启。

DriverDescription.scala的源码如下。

1.  private[deploy] case class DriverDescription(
2.   jarUrl: String,
3.   mem: Int,
4.   cores: Int,
5.   supervise: Boolean,
6.   command: Command) {

在符合资源要求的情况下,采用随机打乱后的一个Worker来启动Driver,worker是Master中对Worker的一个描述。

Master.scala的launchDriver方法如下。

1.      private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
2.    logInfo("Launching driver " + driver.id + " on worker " + worker.id)
3.    worker.addDriver(driver)
4.    driver.worker = Some(worker)
5.    worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
6.    driver.state = DriverState.RUNNING
7.  }

Master通过worker.endpoint.send(LaunchDriver)发指令给Worker,让远程的Worker启动Driver,Driver启动以后,Driver的状态就变成DriverState.RUNNING。

(7)先启动Driver,才会发生后续的一切资源调度的模式。

(8)Spark默认为应用程序启动Executor的方式是FIFO方式,也就是所有提交的应用程序都是放在调度的等待队列中的,先进先出,只有满足了前面应用程序的资源分配的基础,才能够满足下一个应用程序资源的分配。

Master的schedule()方法中,调用startExecutorsOnWorkers()为当前的程序调度和启动Worker的Executor,默认情况下排队的方式是FIFO。

startExecutorsOnWorkers的源码如下。

1.   private def startExecutorsOnWorkers(): Unit = {
2.     //这是一个非常简单的FIFO调度。我们尝试在队列中推入第一个应用程序,然后推入第二
       //个应用程序等
3.     for (app <- waitingApps if app.coresLeft > 0) {
4.       val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
5.       //筛选出workers,其没有足够资源来启动Executor
6.       val usableWorkers = workers.toArray.filter(_.state == WorkerState
         .ALIVE)
7.         .filter(worker => worker.memoryFree >= app.desc
           .memoryPerExecutorMB &&
8.           worker.coresFree >= coresPerExecutor.getOrElse(1))
9.         .sortBy(_.coresFree).reverse
10.      val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers,
         spreadOutApps)
11.
12.      //现在我们决定每个worker分配多少cores,进行分配
13.      for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
14.        allocateWorkerResourceToExecutors(
15.          app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
16.      }
17.    }
18.  }

(9)为应用程序具体分配Executor前要判断应用程序是否还需要分配Core,如果不需要,则不会为应用程序分配Executor。

startExecutorsOnWorkers中的coresLeft是请求的requestedCores和可用的coresGranted的相减值。例如,如果整个程序要求1000个Cores,但是目前集群可用的只有100个Cores,如果coresLeft不为0,就放入等待队列中;如果coresLeft是0,那么就不需要调度。

1.  private[master] def coresLeft: Int = requestedCores - coresGranted

(10)Master.scala的startExecutorsOnWorkers中,具体分配Executor之前,要求Worker必须是ALIVE的状态且必须满足Application对每个Executor的内存和Cores的要求,并且在此基础上进行排序,产生计算资源由大到小的usableWorkers数据结构。

1.        val usableWorkers = workers.toArray.filter(_.state == WorkerState
          .ALIVE)
2.          .filter(worker => worker.memoryFree >= app.desc
            .memoryPerExecutorMB &&
3.            worker.coresFree >= coresPerExecutor.getOrElse(1))
4.          .sortBy(_.coresFree).reverse
5.  val   assignedCores     =   scheduleExecutorsOnWorkers(app,         usableWorkers,
    spreadOutApps)

然后调用scheduleExecutorsOnWorkers,在FIFO的情况下,默认spreadOutApps让应用程序尽可能多地运行在所有的Node上。

1.  private val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut",
    true)

scheduleExecutorsOnWorker中,minCoresPerExecutor表示每个Executor最小分配的core个数。scheduleExecutorsOnWorker的源码如下。

1.     private def scheduleExecutorsOnWorkers(
2.        app: ApplicationInfo,
3.        usableWorkers: Array[WorkerInfo],
4.        spreadOutApps: Boolean): Array[Int] = {
5.      val coresPerExecutor = app.desc.coresPerExecutor
6.      val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
7.      val oneExecutorPerWorker = coresPerExecutor.isEmpty
8.      val memoryPerExecutor = app.desc.memoryPerExecutorMB
9.      val numUsable = usableWorkers.length
10.     val assignedCores = new Array[Int](numUsable)
11.     val assignedExecutors = new Array[Int](numUsable)
12.     var coresToAssign = math.min(app.coresLeft, usableWorkers.map
        (_.coresFree).sum)
13. ......

(11)为应用程序分配Executors有两种方式:第一种方式是尽可能在集群的所有Worker上分配Executor,这种方式往往会带来潜在的、更好的数据本地性;第二种方式是尝试运行在尽可能少的Worker上。

(12)具体在集群上分配Cores时会尽可能地满足我们的要求。math.min用于计算最小值。coresToAssig用于计算app.coresLeft与可用的Worker中可用的Cores的和的最小值。例如,应用程序要求1000个Cores,但整个集群中只有100个Cores,所以只能先分配100个Cores。

scheduleExecutorsOnWorkers方法如下。

1.       var coresToAssign = math.min(app.coresLeft, usableWorkers.map
         (_.coresFree).sum)
2.  ......

(13)如果每个Worker下面只能为当前的应用程序分配一个Executor,那么每次只分配一个Core。scheduleExecutorsOnWorkers方法如下。

1.  if (oneExecutorPerWorker) {
2.       assignedExecutors(pos) = 1
3.  } else {
4.    assignedExecutors(pos) += 1
5.  }

总结为两种情况:一种情况是尽可能在一台机器上运行程序的所有功能;另一种情况是尽可能在所有节点上运行程序的所有功能。无论是哪种情况,每次给Executor增加Cores,是增加一个,如果是spreadOutApps的方式,循环一轮再下一轮。例如,有4个Worker,第一次为每个Executor启动一个线程,第二次循环分配一个线程,第三次循环再分配一个线程……

scheduleExecutorsOnWorkers方法如下。

1.          while (freeWorkers.nonEmpty) {
2.   freeWorkers.foreach { pos =>
3.     var keepScheduling = true
4.     while (keepScheduling && canLaunchExecutor(pos)) {
5.       coresToAssign -= minCoresPerExecutor
6.       assignedCores(pos) += minCoresPerExecutor
7.
8.       //如果每个worker上启动一个Executor,那么每次迭代在Executor 上分配一
         //个核,否则,每次迭代都将把内核分配给一个新的Executor
9.       if (oneExecutorPerWorker) {
10.        assignedExecutors(pos) = 1
11.      } else {
12.        assignedExecutors(pos) += 1
13.      }
14.
15.      //展开应用程序意味着将Executors展开到尽可能多的workers节点。如果不展
         //开,将对这个workers的Executors进行调度,直到使用它的全部资源。否则,
         //只是移动到下一个worker节点
16.      if (spreadOutApps) {
17.        keepScheduling = false
18.      }
19.    }
20.  }

回到Master.scala的startExecutorsOnWorkers,现在已经决定为每个worker分配多少个cores,然后进行资源分配。

1.        for (pos <- 0 until usableWorkers.length if assignedCores(pos)
          > 0) {
2.    allocateWorkerResourceToExecutors(
3.      app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
4.  }

allocateWorkerResourceToExecutors的源码如下。

1.     private def allocateWorkerResourceToExecutors(
2.      app: ApplicationInfo,
3.      assignedCores: Int,
4.      coresPerExecutor: Option[Int],
5.      worker: WorkerInfo): Unit = {
6.  //如果指定了每个Executor的内核数,我们就将分配的内核无剩余地均分给worker节点的
    //Executors。否则,我们启动一个单一的       Executor,抓住这个    worker  节点所有的
    //assignedCores
7.    val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
8.    val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
9.    for (i <- 1 to numExecutors) {
10.      val exec = app.addExecutor(worker, coresToAssign)
11.      launchExecutor(worker, exec)
12.      app.state = ApplicationState.RUNNING
13.    }
14.  }

allocateWorkerResourceToExecutors中的app.addExecutor增加一个Executor,记录Executor的相关信息。

1.     private[master] def addExecutor(
2.      worker: WorkerInfo,
3.      cores: Int,
4.      useID: Option[Int] = None): ExecutorDesc = {
5.    val exec = new ExecutorDesc(newExecutorId(useID), this, worker, cores,
      desc.memoryPerExecutorMB)
6.    executors(exec.id) = exec
7.    coresGranted += cores
8.    exec
9.  }

回到allocateWorkerResourceToExecutors方法中,launchExecutor(worker, exec)启动Executor。

1.  launchExecutor(worker, exec)

(14)准备具体要为当前应用程序分配的Executor信息后,Master要通过远程通信发指令给Worker来具体启动ExecutorBackend进程。

launchExecutor方法如下。

1.  private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc):
    Unit = {
2.    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
3.    worker.addExecutor(exec)
4.    worker.endpoint.send(LaunchExecutor(masterUrl,
5.      exec.application.id, exec.id, exec.application.desc, exec.cores,
        exec.memory))
6.  ......

(15)紧接着给应用程序的Driver发送一个ExecutorAdded的信息。

launchExecutor方法如下。

1.    exec.application.driver.send(
2.      ExecutorAdded(exec.id,       worker.id,  worker.hostPort,  exec.cores,
        exec.memory))
3.  }