6.4 从Application提交的角度重新审视Executor
本节从Application提交的角度重新审视Executor,彻底解密Executor到底是什么时候启动的,以及Executor如何把结果交给Application。
6.4.1 Executor到底是什么时候启动的
SparkContext启动后,StandaloneSchedulerBackend中会调用new()函数创建一个StandaloneAppClient,StandaloneAppClient中有一个名叫ClientEndPoint的内部类,在创建ClientEndpoint时会传入Command来指定具体为当前应用程序启动的Executor进行的入口类的名称为CoarseGrainedExecutorBackend。ClientEndPoint继承自ThreadSafeRpcEndpoint,其通过RPC机制完成和Master的通信。在ClientEndPoint的start方法中,会通过registerWithMaster方法向Master发送RegisterApplication请求,Master收到该请求消息后,首先通过registerApplication方法完成信息登记,之后将会调用schedule方法,在Worker上启动Executor。Master对RegisterApplication请求处理源码如下所示。
Master.scala的源码如下。
1. case RegisterApplication(description, driver) => 2. //待办事项:防止driver程序重复注册 3. //Master处于STANDBY(备用)状态,不作处理 4. if (state == RecoveryState.STANDBY) { 5. //忽略,不发送响应 6. } else { 7. logInfo("Registering app " + description.name) 8. //由description描述,构建ApplicationInfo 9. val app = createApplication(description, driver) 10. registerApplication(app) 11. logInfo("Registered app " + description.name + " with ID " + app.id) 12. //在持久化引擎中加入application 13. persistenceEngine.addApplication(app) 14. driver.send(RegisteredApplication(app.id, self)) 15. //调用schedule方法,在worker节点上启动Executor 16. schedule() 17. }
在上面的代码中,Master匹配到RegisterApplication请求,先判断Master的状态是否为STANDBY(备用)状态,如果不是,说明Master为ALIVE状态,在这种状态下调用createApplication(description,sender)方法创建ApplicationInfo,完成之后调用persistenceEngine. addApplication(app)方法,将新创建的ApplicationInfo持久化,以便错误恢复。完成这两步操作后,通过driver.send(RegisteredApplication(app.id, self))向StandaloneAppClient返回注册成功后ApplicationInfo的Id和master的url地址。
ApplicationInfo对象是对application的描述,下面先来看createApplication方法的源码。
Master.scala的源码如下。
1. private def createApplication(desc: ApplicationDescription, driver: RpcEndpointRef): 2. ApplicationInfo = { 3. //ApplicationInfo创建时间 4. val now = System.currentTimeMillis() 5. val date = new Date(now) 6. //由date生成application id 7. val appId = newApplicationId(date) 8. //创建ApplicationInfo 9. new ApplicationInfo(now, appId, desc, date, driver, defaultCores) 10. }
上面的代码中,createApplication方法接收ApplicationDescription和ActorRef两种类型的参数,并调用newApplicationId方法生成appId,关键代码如下所示。
1. val appId = "app-%s-%04d".format(createDateFormat.format(submitDate), nextAppNumber)
由代码所决定,appid的格式形如:app-20160429101010-0001。desc对象中包含一些基本的配置,包括从系统中传入的一些配置信息,如appname、maxCores、memoryPerExecutorMB等。最后使用desc、date、driver、defaultCores等作为参数构造一个ApplicatiOnInfo对象并返回。函数返回之后,调用registerApplication方法,完成application的注册,该方法是如何完成注册的?方法代码如下所示。
Master.scala的源码如下。
1. private def registerApplication(app: ApplicationInfo): Unit = { 2. //Driver的地址,用于Master和Driver通信 3. val appAddress = app.driver.address 4. //如果addressToApp中已经有了该Driver地址,说明该Driver已经注册过了,直接 //return 5. 6. if (addressToApp.contains(appAddress)) { 7. logInfo("Attempted to re-register application at same address: " + appAddress) 8. return 9. } 10. //向度量系统注册 11. applicationMetricsSystem.registerSource(app.appSource) 12. //apps是一个HashSet,保存数据不能重复,向HashSet中加入app 13. apps += app 14. //idToApp是一个HashMap,该HashMap用于保存id和app的对应关系 15. idToApp(app.id) = app 16. //endpointToApp是一个HashMap,该HashMap用于保存driver和app的对应关系 17. endpointToApp(app.driver) = app 18. //addressToApp是一个HashMap,记录app Driver的地址和app的对应关系 19. addressToApp(appAddress) = app 20. /waitingApps是一个数组,记录等待调度的app记录 21. waitingApps += app 22. if (reverseProxy) { 23. webUi.addProxyTargets(app.id, app.desc.appUiUrl) 24. } 25. }
上面的代码中,首先通过app.driver.path.address得到Driver的地址,然后查看appAddress映射表中是否已经存在这个路径,如果存在,表示该application已经注册,直接返回;如果不存在,则在waitingApps数组中加入该application,同时在idToApp、endpointToApp、addressToApp映射表中加入映射关系。加入waitingApps数组中的application等待schedule方法的调度。
schedule方法有两个作用:第一,完成Driver的调度,将waitingDrivers数组中的Driver发送到满足条件的Worker上运行;第二,在满足条件的Worker节点上为application启动Executor。
Master.scala的schedule方法的源码如下。
1. private def schedule(): Unit = { 2. ....... 3. launchDriver(worker, driver) 4. ....... 5. startExecutorsOnWorkers() 6. }
在Master中,schedule方法是一个很重要的方法,每一次新的Driver的注册、application的注册,或者可用资源发生变动,都将调用schedule方法。schedule方法用于为当前等待调度的application调度可用的资源,在满足条件的Worker节点上启动Executor。这个方法还有另外一个作用,就是当有Driver提交的时候,负责将Driver发送到一个可用资源满足Driver需求的Worker节点上运行。launchDriver(worker,driver)方法负责完成这一任务。
application调度成功之后,Master将会为application在Worker节点上启动Executors,调用startExecutorsOnWorkers方法完成此操作。
在scheduleExecutorsOnWorkers方法中,有两种启动Executor的策略:第一种是轮流均摊策略(round-robin),采用圆桌算法依次轮流均摊,直到满足资源需求,轮流均摊策略通常会有更好的数据本地性,因此它是默认的选择策略;第二种是依次全占,在usableWorkers中,依次获取每个Worker上的全部资源,直到满足资源需求。
scheduleExecutorsOnWorkers方法为application分配好逻辑意义上的资源后,还不能真正在Worker节点为application分配出资源,需要调用动作函数为application真正地分配资源。allocateWorkerResourceToExecutors方法的调用,将会在Worker节点上实际分配资源。下面是allocateWorkerResourceToExecutors的源码。
Master.scala的源码如下。
1. private def allocateWorkerResourceToExecutors( 2. ...... 3. launchExecutor(worker, exec) 4. .......
上面代码调用了launchExecutor(worker,exec)方法,这个方法有两个参数:第一个参数是满足条件的WorkerInfo信息;第二个参数是描述Executor的ExecutorDesc对象。这个方法将会向Worker节点发送LaunchExecutor的请求,Worker节点收到该请求之后,将会负责启动Executor。launchExecutor方法代码清单如下所示。
Master.scala的源码如下。
1. private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = { 2. logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) 3. //向WorkerInfo中加入exec这个描述Executor的ExecutorDesc对象 4. worker.addExecutor(exec) 5. //向worker发送LaunchExecutor消息,加载Executor消息中携带了masterUrl地址、 //application id、Executor id、Executor描述desc、Executor核的个数、Executor //分配的内存大小 6. 7. worker.endpoint.send(LaunchExecutor(masterUrl, 8. exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)) 9. //向Driver发回ExecutorAdded消息,消息携带worker的id号、worker的host和 //port、分配的核的个数和内存大小 10. exec.application.driver.send( 11. ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)) 12. }
launchExecutor有两个参数,第一个参数是worker:WorkerInfo,代表Worker的基本信息;第二个参数是exec:ExecutorDesc,这个参数保存了Executor的基本配置信息,如memory、cores等。此方法中有worker.endpoint.send(LaunchExecutor(...)),向Worker发送LaunchExecutor请求,Worker收到该请求之后,将会调用方法启动Executor。
向Worker发送LaunchExecutor消息的同时,通过exec.application.driver.send (ExecutorAdded(…))向Driver发送ExecutorAdded消息,该消息为Driver反馈Master都在哪些Worker上启动了Executor,Executor的编号是多少,为每个Executor分配了多少个核,多大的内存以及Worker的联系hostport等消息。
Worker收到LaunchExecutor消息会做相应的处理。首先判断传过来的masterUrl是否和activeMasterUrl相同,如果不相同,说明收到的不是处于ALIVE状态的Master发送过来的请求,这种情况直接打印警告信息。如果相同,则说明该请求来自ALIVE Master,于是为Executor创建工作目录,创建好工作目录之后,使用appid、execid、appDes等参数创建ExecutorRunner。顾名思义,ExecutorRunner是Executor运行的地方,在ExecutorRunner中有一个工作线程,这个线程负责下载依赖的文件,并启动CoarseGaindExecutorBackend进程,该进程单独在一个JVM上运行。下面是ExecutorRunner中的线程启动的源码。
ExecutorRunner.scala的源码如下。
1. private[worker] def start() { 2. //创建线程 3. workerThread = new Thread("ExecutorRunner for " + fullId) { 4. //线程run方法中调用fetchAndRunExecutor 5. override def run() { fetchAndRunExecutor() } 6. } 7. //启动线程 8. workerThread.start() 9. 10. //终止回调函数,用于杀死进程 11. shutdownHook = ShutdownHookManager.addShutdownHook { () => 12. //这是可能的,调用fetchAndRunExecutor 之前,state 将是 ExecutorState. //RUNNING。在这种情况下,我们应该设置“状态”为“失败” 13. if (state == ExecutorState.RUNNING) { 14. state = ExecutorState.FAILED 15. } 16. killProcess(Some("Worker shutting down")) } 17. }
上面代码中定义了一个Thread,这个Thread的run方法中调用fetchAndRunExecutor方法,fetchAndRunExecutor负责以进程的方式启动ApplicationDescription中携带的org.apache.spark.executor.CoarseGrainedExecutorBackend进程。
其中,fetchAndRunExecutor方法中的CommandUtils.buildProcessBuilder(appDesc.command,传入的入口类是"org.apache.spark.executor.CoarseGrainedExecutorBackend",当Worker节点中启动ExecutorRunner时,ExecutorRunner中会启动CoarseGrainedExecutorBackend进程,在CoarseGrainedExecutorBackend的onStart方法中,向Driver发出RegisterExecutor注册请求。
CoarseGrainedExecutorBackend的onStart方法的源码如下。
1. override def onStart() { 2. ....... 3. driver = Some(ref) 4. //向driver发送ask请求,等待driver回应 5. ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls)) 6. ......
Driver端收到注册请求,将会注册Executor的请求。
CoarseGrainedSchedulerBackend.scala的receiveAndReply方法的源码如下。
1. override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { 2. 3. case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) => 4. ....... 5. executorRef.send(RegisteredExecutor) 6. ......
如上面代码所示,Driver向CoarseGrainedExecutorBackend发送RegisteredExecutor消息,CoarseGrainedExecutorBackend收到RegisteredExecutor消息后将会新建一个Executor执行器,并为此Executor充当信使,与Driver通信。CoarseGrainedExecutorBackend收到RegisteredExecutor消息的源码如下所示。
CoarseGrainedExecutorBackend.scala的receive的源码如下。
1. override def receive: PartialFunction[Any, Unit] = { 2. case RegisteredExecutor => 3. logInfo("Successfully registered with driver") 4. try { 5. //收到RegisteredExecutor消息,立即创建Executor 6. executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false) 7. } catch { 8. case NonFatal(e) => 9. exitExecutor(1, "Unable to create executor due to " + e.getMessage, e) 10. }
从上面的代码中可以看到,CoarseGrainedExecutorBackend收到RegisteredExecutor消息后,将会新创建一个org.apache.spark.executor.Executor对象,至此Executor创建完毕。
6.4.2 Executor如何把结果交给Application
CoarseGrainedExecutorBackend给DriverEndpoint发送StatusUpdate传输执行结果,DriverEndpoint会把执行结果传递给TaskSchedulerImpl处理,然后交给TaskResultGetter内部通过线程分别处理Task执行成功和失败的不同情况,然后告诉DAGScheduler任务处理结束的状况。
CoarseGrainedSchedulerBackend.scala中DriverEndpoint的receive方法的源码如下。
1. override def receive: PartialFunction[Any, Unit] = { 2. case StatusUpdate(executorId, taskId, state, data) => 3. scheduler.statusUpdate(taskId, state, data.value) 4. if (TaskState.isFinished(state)) { 5. executorDataMap.get(executorId) match { 6. case Some(executorInfo) => 7. executorInfo.freeCores += scheduler.CPUS_PER_TASK 8. makeOffers(executorId) 9. case None => 10. //忽略更新,因为我们不知道Executor 11. logWarning(s"Ignored task status update ($taskId state $state)"+ 12. s"from unknown executor with ID $executorId") 13. } 14. }
DriverEndpoint的receive方法中的StatusUpdate调用scheduler.statusUpdate,然后释放资源,再次进行资源调度makeOffers(executorId)。
TaskSchedulerImpl的statusUpdate中:
如果是TaskState.LOST,则记录原因,将Executor清理掉。
如果是TaskState.isFinished,则从taskSet中运行的任务中清除掉,调用taskResultGetter. enqueueSuccessfulTask处理。
如果是TaskState.FAILED、TaskState.KILLED、TaskState.LOST,调用taskResultGetter. enqueueFailedTask处理。