Spark大数据商业实战三部曲:内核解密|商业案例|性能调优
上QQ阅读APP看书,第一时间看更新

6.4 从Application提交的角度重新审视Executor

本节从Application提交的角度重新审视Executor,彻底解密Executor到底是什么时候启动的,以及Executor如何把结果交给Application。

6.4.1 Executor到底是什么时候启动的

SparkContext启动后,StandaloneSchedulerBackend中会调用new()函数创建一个StandaloneAppClient,StandaloneAppClient中有一个名叫ClientEndPoint的内部类,在创建ClientEndpoint时会传入Command来指定具体为当前应用程序启动的Executor进行的入口类的名称为CoarseGrainedExecutorBackend。ClientEndPoint继承自ThreadSafeRpcEndpoint,其通过RPC机制完成和Master的通信。在ClientEndPoint的start方法中,会通过registerWithMaster方法向Master发送RegisterApplication请求,Master收到该请求消息后,首先通过registerApplication方法完成信息登记,之后将会调用schedule方法,在Worker上启动Executor。Master对RegisterApplication请求处理源码如下所示。

Master.scala的源码如下。

1.        case RegisterApplication(description, driver) =>
2.       //待办事项:防止driver程序重复注册
3.      //Master处于STANDBY(备用)状态,不作处理
4.       if (state == RecoveryState.STANDBY) {
5.         //忽略,不发送响应
6.       } else {
7.         logInfo("Registering app " + description.name)
8.     //由description描述,构建ApplicationInfo
9.         val app = createApplication(description, driver)
10.        registerApplication(app)
11.        logInfo("Registered app " + description.name + " with ID " + app.id)
12.     //在持久化引擎中加入application
13.        persistenceEngine.addApplication(app)
14.        driver.send(RegisteredApplication(app.id, self))
15.  //调用schedule方法,在worker节点上启动Executor
16.        schedule()
17.      }

在上面的代码中,Master匹配到RegisterApplication请求,先判断Master的状态是否为STANDBY(备用)状态,如果不是,说明Master为ALIVE状态,在这种状态下调用createApplication(description,sender)方法创建ApplicationInfo,完成之后调用persistenceEngine. addApplication(app)方法,将新创建的ApplicationInfo持久化,以便错误恢复。完成这两步操作后,通过driver.send(RegisteredApplication(app.id, self))向StandaloneAppClient返回注册成功后ApplicationInfo的Id和master的url地址。

ApplicationInfo对象是对application的描述,下面先来看createApplication方法的源码。

Master.scala的源码如下。

1.  private def createApplication(desc: ApplicationDescription, driver:
        RpcEndpointRef):
2.        ApplicationInfo = {
3.   //ApplicationInfo创建时间
4.      val now = System.currentTimeMillis()
5.      val date = new Date(now)
6.    //由date生成application id
7.      val appId = newApplicationId(date)
8.   //创建ApplicationInfo
9.      new ApplicationInfo(now, appId, desc, date, driver, defaultCores)
10.   }

上面的代码中,createApplication方法接收ApplicationDescription和ActorRef两种类型的参数,并调用newApplicationId方法生成appId,关键代码如下所示。

1. val appId = "app-%s-%04d".format(createDateFormat.format(submitDate),
   nextAppNumber)

由代码所决定,appid的格式形如:app-20160429101010-0001。desc对象中包含一些基本的配置,包括从系统中传入的一些配置信息,如appname、maxCores、memoryPerExecutorMB等。最后使用desc、date、driver、defaultCores等作为参数构造一个ApplicatiOnInfo对象并返回。函数返回之后,调用registerApplication方法,完成application的注册,该方法是如何完成注册的?方法代码如下所示。

Master.scala的源码如下。

1.    private def registerApplication(app: ApplicationInfo): Unit = {
2.    //Driver的地址,用于Master和Driver通信
3.      val appAddress = app.driver.address
4.    //如果addressToApp中已经有了该Driver地址,说明该Driver已经注册过了,直接
      //return
5.
6.      if (addressToApp.contains(appAddress)) {
7.        logInfo("Attempted to re-register application at same address: " +
          appAddress)
8.        return
9.      }
10.  //向度量系统注册
11.     applicationMetricsSystem.registerSource(app.appSource)
12.  //apps是一个HashSet,保存数据不能重复,向HashSet中加入app
13.     apps += app
14. //idToApp是一个HashMap,该HashMap用于保存id和app的对应关系
15.     idToApp(app.id) = app
16. //endpointToApp是一个HashMap,该HashMap用于保存driver和app的对应关系
17.     endpointToApp(app.driver) = app
18. //addressToApp是一个HashMap,记录app Driver的地址和app的对应关系
19.     addressToApp(appAddress) = app
20. /waitingApps是一个数组,记录等待调度的app记录
21.     waitingApps += app
22.     if (reverseProxy) {
23.       webUi.addProxyTargets(app.id, app.desc.appUiUrl)
24.     }
25.   }

上面的代码中,首先通过app.driver.path.address得到Driver的地址,然后查看appAddress映射表中是否已经存在这个路径,如果存在,表示该application已经注册,直接返回;如果不存在,则在waitingApps数组中加入该application,同时在idToApp、endpointToApp、addressToApp映射表中加入映射关系。加入waitingApps数组中的application等待schedule方法的调度。

schedule方法有两个作用:第一,完成Driver的调度,将waitingDrivers数组中的Driver发送到满足条件的Worker上运行;第二,在满足条件的Worker节点上为application启动Executor。

Master.scala的schedule方法的源码如下。

1.  private def schedule(): Unit = {
2.  .......
3.       launchDriver(worker, driver)
4.      .......
5.      startExecutorsOnWorkers()
6.    }

在Master中,schedule方法是一个很重要的方法,每一次新的Driver的注册、application的注册,或者可用资源发生变动,都将调用schedule方法。schedule方法用于为当前等待调度的application调度可用的资源,在满足条件的Worker节点上启动Executor。这个方法还有另外一个作用,就是当有Driver提交的时候,负责将Driver发送到一个可用资源满足Driver需求的Worker节点上运行。launchDriver(worker,driver)方法负责完成这一任务。

application调度成功之后,Master将会为application在Worker节点上启动Executors,调用startExecutorsOnWorkers方法完成此操作。

在scheduleExecutorsOnWorkers方法中,有两种启动Executor的策略:第一种是轮流均摊策略(round-robin),采用圆桌算法依次轮流均摊,直到满足资源需求,轮流均摊策略通常会有更好的数据本地性,因此它是默认的选择策略;第二种是依次全占,在usableWorkers中,依次获取每个Worker上的全部资源,直到满足资源需求。

scheduleExecutorsOnWorkers方法为application分配好逻辑意义上的资源后,还不能真正在Worker节点为application分配出资源,需要调用动作函数为application真正地分配资源。allocateWorkerResourceToExecutors方法的调用,将会在Worker节点上实际分配资源。下面是allocateWorkerResourceToExecutors的源码。

Master.scala的源码如下。

1. private def allocateWorkerResourceToExecutors(
2.    ......
3.       launchExecutor(worker, exec)
4.    .......

上面代码调用了launchExecutor(worker,exec)方法,这个方法有两个参数:第一个参数是满足条件的WorkerInfo信息;第二个参数是描述Executor的ExecutorDesc对象。这个方法将会向Worker节点发送LaunchExecutor的请求,Worker节点收到该请求之后,将会负责启动Executor。launchExecutor方法代码清单如下所示。

Master.scala的源码如下。

1.      private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc):
        Unit = {
2.      logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
3.  //向WorkerInfo中加入exec这个描述Executor的ExecutorDesc对象
4.      worker.addExecutor(exec)
5.  //向worker发送LaunchExecutor消息,加载Executor消息中携带了masterUrl地址、
    //application id、Executor id、Executor描述desc、Executor核的个数、Executor
    //分配的内存大小
6.
7.      worker.endpoint.send(LaunchExecutor(masterUrl,
8.        exec.application.id, exec.id, exec.application.desc, exec.cores,
          exec.memory))
9.    //向Driver发回ExecutorAdded消息,消息携带worker的id号、worker的host和
      //port、分配的核的个数和内存大小
10.     exec.application.driver.send(
11.       ExecutorAdded(exec.id,       worker.id,     worker.hostPort,      exec.cores,
          exec.memory))
12.   }

launchExecutor有两个参数,第一个参数是worker:WorkerInfo,代表Worker的基本信息;第二个参数是exec:ExecutorDesc,这个参数保存了Executor的基本配置信息,如memory、cores等。此方法中有worker.endpoint.send(LaunchExecutor(...)),向Worker发送LaunchExecutor请求,Worker收到该请求之后,将会调用方法启动Executor。

向Worker发送LaunchExecutor消息的同时,通过exec.application.driver.send (ExecutorAdded(…))向Driver发送ExecutorAdded消息,该消息为Driver反馈Master都在哪些Worker上启动了Executor,Executor的编号是多少,为每个Executor分配了多少个核,多大的内存以及Worker的联系hostport等消息。

Worker收到LaunchExecutor消息会做相应的处理。首先判断传过来的masterUrl是否和activeMasterUrl相同,如果不相同,说明收到的不是处于ALIVE状态的Master发送过来的请求,这种情况直接打印警告信息。如果相同,则说明该请求来自ALIVE Master,于是为Executor创建工作目录,创建好工作目录之后,使用appid、execid、appDes等参数创建ExecutorRunner。顾名思义,ExecutorRunner是Executor运行的地方,在ExecutorRunner中有一个工作线程,这个线程负责下载依赖的文件,并启动CoarseGaindExecutorBackend进程,该进程单独在一个JVM上运行。下面是ExecutorRunner中的线程启动的源码。

ExecutorRunner.scala的源码如下。

1.   private[worker] def start() {
2.       //创建线程
3.      workerThread = new Thread("ExecutorRunner for " + fullId) {
4.       //线程run方法中调用fetchAndRunExecutor
5.        override def run() { fetchAndRunExecutor() }
6.      }
7.       //启动线程
8.      workerThread.start()
9.
10.      //终止回调函数,用于杀死进程
11.     shutdownHook = ShutdownHookManager.addShutdownHook { () =>
12.       //这是可能的,调用fetchAndRunExecutor       之前,state  将是 ExecutorState.
          //RUNNING。在这种情况下,我们应该设置“状态”为“失败”
13.       if (state == ExecutorState.RUNNING) {
14.         state = ExecutorState.FAILED
15.       }
16.       killProcess(Some("Worker shutting down")) }
17.   }

上面代码中定义了一个Thread,这个Thread的run方法中调用fetchAndRunExecutor方法,fetchAndRunExecutor负责以进程的方式启动ApplicationDescription中携带的org.apache.spark.executor.CoarseGrainedExecutorBackend进程。

其中,fetchAndRunExecutor方法中的CommandUtils.buildProcessBuilder(appDesc.command,传入的入口类是"org.apache.spark.executor.CoarseGrainedExecutorBackend",当Worker节点中启动ExecutorRunner时,ExecutorRunner中会启动CoarseGrainedExecutorBackend进程,在CoarseGrainedExecutorBackend的onStart方法中,向Driver发出RegisterExecutor注册请求。

CoarseGrainedExecutorBackend的onStart方法的源码如下。

1.      override def onStart() {
2.  .......
3.        driver = Some(ref)
4.   //向driver发送ask请求,等待driver回应
5.        ref.ask[Boolean](RegisterExecutor(executorId, self, hostname,
          cores, extractLogUrls))
6.    ......

Driver端收到注册请求,将会注册Executor的请求。

CoarseGrainedSchedulerBackend.scala的receiveAndReply方法的源码如下。

1.     override def receiveAndReply(context: RpcCallContext):
       PartialFunction[Any, Unit] = {
2.
3.       case RegisterExecutor(executorId, executorRef, hostname, cores,
         logUrls) =>
4.  .......
5.           executorRef.send(RegisteredExecutor)
6.   ......

如上面代码所示,Driver向CoarseGrainedExecutorBackend发送RegisteredExecutor消息,CoarseGrainedExecutorBackend收到RegisteredExecutor消息后将会新建一个Executor执行器,并为此Executor充当信使,与Driver通信。CoarseGrainedExecutorBackend收到RegisteredExecutor消息的源码如下所示。

CoarseGrainedExecutorBackend.scala的receive的源码如下。

1.   override def receive: PartialFunction[Any, Unit] = {
2.      case RegisteredExecutor =>
3.        logInfo("Successfully registered with driver")
4.        try {
5.    //收到RegisteredExecutor消息,立即创建Executor
6.          executor = new Executor(executorId, hostname, env, userClassPath,
            isLocal = false)
7.        } catch {
8.          case NonFatal(e) =>
9.            exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
10.       }

从上面的代码中可以看到,CoarseGrainedExecutorBackend收到RegisteredExecutor消息后,将会新创建一个org.apache.spark.executor.Executor对象,至此Executor创建完毕。

6.4.2 Executor如何把结果交给Application

CoarseGrainedExecutorBackend给DriverEndpoint发送StatusUpdate传输执行结果,DriverEndpoint会把执行结果传递给TaskSchedulerImpl处理,然后交给TaskResultGetter内部通过线程分别处理Task执行成功和失败的不同情况,然后告诉DAGScheduler任务处理结束的状况。

CoarseGrainedSchedulerBackend.scala中DriverEndpoint的receive方法的源码如下。

1.   override def receive: PartialFunction[Any, Unit] = {
2.      case StatusUpdate(executorId, taskId, state, data) =>
3.        scheduler.statusUpdate(taskId, state, data.value)
4.        if (TaskState.isFinished(state)) {
5.          executorDataMap.get(executorId) match {
6.            case Some(executorInfo) =>
7.              executorInfo.freeCores += scheduler.CPUS_PER_TASK
8.              makeOffers(executorId)
9.            case None =>
10.             //忽略更新,因为我们不知道Executor
11.             logWarning(s"Ignored task status update ($taskId state $state)"+
12.               s"from unknown executor with ID $executorId")
13.         }
14.       }

DriverEndpoint的receive方法中的StatusUpdate调用scheduler.statusUpdate,然后释放资源,再次进行资源调度makeOffers(executorId)。

TaskSchedulerImpl的statusUpdate中:

 如果是TaskState.LOST,则记录原因,将Executor清理掉。

 如果是TaskState.isFinished,则从taskSet中运行的任务中清除掉,调用taskResultGetter. enqueueSuccessfulTask处理。

 如果是TaskState.FAILED、TaskState.KILLED、TaskState.LOST,调用taskResultGetter. enqueueFailedTask处理。