5.4 Executor中任务的执行
本节讲解Executor中任务的加载,通过launchTask()方法加载任务,将任务以TaskRunner的形式放入线程池中运行;Executor中的任务线程池可以减少在创建和销毁线程上所花的时间和系统资源开销;TaskRunner任务执行失败处理以及TaskRunner的运行内幕等内容。
5.4.1 Executor中任务的加载
Executor是基于线程池的任务执行器。通过launchTask方法加载任务,将任务以TaskRunner的形式放入线程池中运行。
DAGScheduler划分好Stage通过submitMissingTasks方法分配好任务,并把任务交由TaskSchedulerImpl的submitTasks方法,将任务加入调度池,之后调用CoarseGrainedScheduler-Backend的riviveOffers方法为Task分配资源,指定Executor。任务资源都分配好之后,CoarseGrainedSchedulerBackend将向CoarseGranedExecutorBackend发送LaunchTask消息,将具体的任务发送到Executor上进行计算。
CoarseGranedExecutorBackend匹配到LaunchTask(data)消息之后,将会调用Executor的launchTask方法。launchTask方法中将会构建TaskRunner对象,并放入线程池中执行。
Executor中Task的加载时序图如图5-5所示。
图5-5 Executor中Task的加载时序图
任务加载好后,在Executor中将会把构建好的TaskRunner放入线程池运行,至此任务完成加载,开始运行。
5.4.2 Executor中的任务线程池
Executor是构建在线程池之上的任务执行器。在Executor中使用线程池的好处是显而易见的,使用线程池可以减少在创建和销毁线程上所花的时间和系统资源开销。如果不使用线程池,可能造成系统创建大量的线程而导致消耗完系统内存以及出现“过度切换”。
为什么Executor中需要线程池?使用线程池基于以下原因:首先,在Executor端执行的任务处理时间都比较短,需要频繁地创建和销毁线程,这样就带来了巨大的创建和销毁线程的开销,造成额外的系统资源开销;其次,Executor中处理的任务数量巨大,如果每个任务都创建一个线程,将导致消耗完系统内存,出现“过度切换”。
首先来看Executor中的线程池。Executor中使用的是CachedThreadPool,使用这种类型线程池的好处是:任务比较多时可以自动新增处理线程,而任务比较少时自动回收空闲线程。
CoarseGrainedExecutorBackend调用Executor的launchTask方法,将会新建TaskRunner,然后放入线程池进行处理。
从上面的源码中可以看到,新建的TaskRunner对象首先放入runningTasks这样一个ConcurrentHashMap里面,然后使用线程池的Execute方法运行TaskRunner。Execute方法将会调用TaskRunner的run方法。在TaskRunner的run方法中执行计算任务。
5.4.3 任务执行失败处理
TaskRunner在计算的过程中可能发生各种异常,甚至错误,如抓取shuffle结果失败、任务被杀死、没权限向HDFS写入数据等。当TaskRunner的run方法运行的时候,代码中通过try-catch语句捕获这些异常,并通过调用CoarseGrainedExecutorBackend的statusUpdate方法向CoarseGrainedSchedulerBackend汇报。
下面是CoarseGrainedExecutorBackend的statusUpdate方法的源码如下。
1. override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { 2. val msg = StatusUpdate(executorId, taskId, state, data) 3. driver match { 4. case Some(driverRef) => driverRef.send(msg) 5. case None => logWarning(s"Drop $msg because has not yet connected to driver") 6. } 7. }
在statusUpdate方法中,通过方法的参数taskId、state、data构建一个StatusUpdate对象,并通过driverRef的send方法将该对象发送回CoarseGrainedSheduleBackend。CoarseGrainedScheduleBackend匹配到StatusUpdate时,将根据StatusUpdate对象中的state值对该Task的执行情况做出判断,并执行不同的处理逻辑。
从源码中可以发现有TaskState对象,其实这里的TaskState是一个枚举变量,该枚举变量中包括LAUNCHING、RUNNING、FINISHED、FAILED、KILLED、LOST这些枚举值,分别对应任务执行的不同状态。Executor根据任务执行的不同状态,通过statusUpdate方法返回特定的TaskState值,该值通过ExecutorBackend返回给SchedulerBackend,在SchedulerBackend中根据TaskState中的值进行处理。
TaskState.scala的源码如下。
1. private[spark] object TaskState extends Enumeration { 2. 3. val LAUNCHING, RUNNING, FINISHED, FAILED, KILLED, LOST = Value 4. 5. private val FINISHED_STATES = Set(FINISHED, FAILED, KILLED, LOST) 6. 7. type TaskState = Value 8. 9. def isFailed(state: TaskState): Boolean = (LOST == state) || (FAILED == state) 10. 11. def isFinished(state: TaskState): Boolean = FINISHED_STATES.contains (state) 12. } 13.
以TaskState.FAILED这种情况为例,在Executor的run方法中,如果发生FetchFailed-Exeception、CommitDeniedExeception或其他Throwable的子类的异常,就会返回TaskState. FAILED状态,该状态通过CoaseGainedExecutorBackend返回。在CoaseGaiendScheduler-Backend中,匹配到StatusUpdate消息,将进行相应的处理,匹配代码如下所示。
CoarseGrainedSchedulerBackend.scala的StatusUpdate的源码如下。
1. override def receive: PartialFunction[Any, Unit] = { 2. case StatusUpdate(executorId, taskId, state, data) => 3. //调用TaskSchedulerImpl更新状态 4. scheduler.statusUpdate(taskId, state, data.value) 5. //若状态为FINISHED,则从executorDataMap中取出executorId对应的ExecutorInfo 6. if (TaskState.isFinished(state)) { 7. executorDataMap.get(executorId) match { 8. case Some(executorInfo) => 9. executorInfo.freeCores += scheduler.CPUS_PER_TASK 10. //用makeOffers方法重新分配资源 11. makeOffers(executorId) 12. case None => 13. //因为不知道executor,忽略更新 14. logWarning(s"Ignored task status update ($taskId state $state) " + 15. s"from unknown executor with ID $executorId") 16. 17. } 18. }
上面的代码中,首先调用TaskSchedulerImpl的statusUpdate方法,该方法用于更新taskId对应任务的状态。完成更新之后,判断state状态是否FINISHED,若状态为FINISHED,则从executorDataMap这个哈希表中取出executorId对应的ExecutorData对象,修改该对象中的freeCores。因为状态已经为FINISHED,因此ExecutorData中的freeCores会增加CPUS_PER_TASK个,这里的CPU_PER_TASK为每个任务占用的CPU核的个数,该个数可以通过spark.task.cpus配置项进行配置。
更新完成ExecutorData上的可用CPU后,这些闲置的CPU通过makeOffers方法再次分配给其他任务使用。
Spark 2.1.1版本的CoarseGrainedSchedulerBackend.scala的makeOffers的源码如下。
1. private def makeOffers(executorId: String) { 2. //过滤存活的Executor 3. if (executorIsAlive(executorId)) { 4. //从 executorDataMap 这个哈希表中取出 executorId 对应的 ExecutorData 对象, //ExecutorData表示Executor上的一组资源 5. val executorData = executorDataMap(executorId) 6. //使用executorData创建WorkerOffer对象,该对象代表Executor上可用的资源 7. val workOffers= IndexedSeq( 8. new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores)) 9. 10. //调用TaskSchedulerImpl上的resourceOffers方法,为任务分配运行资源,该方法返 //回获得运行资源的任务集合,之后运行launchTasks方法,将这些任务发送到Executor //上运行 11. launchTasks(scheduler.resourceOffers(workOffers)) 12. } 13. }
Spark 2.2.0版本的CoarseGrainedSchedulerBackend.scala的makeOffers的源码与Spark 2.1.1版本相比具有如下特点。
上段代码中第3行之前新增加了同步锁CoarseGrainedSchedulerBackend.this.s ynchronized,在执行某项任务task时,确保没有executor被杀死。
上段代码中第11行之前增加scheduler.resourceOffers(workOffers)以及Seq.empty为空的情况。
上段代码中第11行launchTasks方法调整增加taskDescs不为空的逻辑判断。
1. ...... 2. //执行某项任务时,确保没有Executor被杀死 3. val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized { 4. ...... 5. scheduler.resourceOffers(workOffers) 6. } else { 7. Seq.empty 8. } 9. } 10. if (!taskDescs.isEmpty) { 11. launchTasks(taskDescs) 12. } 13. }
每个Executor上的资源发生变动时,都将调用makeOffers方法,该方法的作用是为等待执行的任务分配资源,并通过launchTasks方法将这些任务发送到这些Executor上运行。这些任务将被包装成TaskRenner对象,运行于Executor上的线程池中。
5.4.4 揭秘TaskRunner
TaskRunner位于Executor中,继承自Runnable接口,代表一个可执行的任务。Driver端下发的任务最终都要在Executor中封装成TaskRunner。在TaskRunner的run方法中,将会进行任务的解析,并调用Task接口的run方法进行计算。TaskRunner定义的代码如下所示。
Spark 2.1.1版本的Executor.scala的源码如下。
1. class TaskRunner( 2. execBackend: ExecutorBackend,//通过execBackend和SchedulerBakend通信 3. val taskId: Long, 4. val attemptNumber: Int, 5. taskName: String, 6. serializedTask: ByteBuffer) 7. extends Runnable {
Spark 2.2.0版本的Executor.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第3~6行TaskRunner中的成员变量调整为新增成员变量TaskDescription。
1. ...... 2. private val taskDescription: TaskDescription) 3. .....
TaskRunner的构造函数中有execBackend、taskId、attemptNumber、taskName、serializedTask 5个参数。其中,execBackend作为和CoarseGrainedSchedulerBackend通信的使者传入到TaskRunner中,在任务计算状态发生变化的时候,调用execBackend的statusUpdate方法向CoarseGrainedSchedulerBackend报告。传入taskId是为了使用TaskMemoryManager管理该Task。attemptNumber代表任务尝试执行的次数,serializedTask是序列化的任务。序列化的任务通过序列化工具反序列化得到任务对象。
在TaskRunner中是如何运行任务的?我们知道,在线程池中启动Runnable任务会自动调用Runnable的run方法,TaskRunner作为一个Runnable接口的实现类,启动时会自动调用其run方法。run方法主要完成以下任务。
调用ExecutorBackend的statusUpdate方法向SchedulerBackend发送任务状态更新消息。
反序列化出Task和相关依赖Jar包。
调用Task上的run方法运行任务。
返回Task运行结果。
Task是一个接口,ResultTask和ShffleMapTask是其两种实现。Task接口中提供了run方法,用于运行任务。TaskRunner的run方法中,会通过反序列化器反序列化出Task,并调用Task上的run方法运行任务,这里怎么知道是ResultTask,还是ShffleMapTask呢?其实,这里不管是ResultTask,还是ShffleMapTask,都一视同仁,因为ResultTask和ShffleMapTask都实现了Task接口,都有run方法。这正是面向接口编程带来的最大的好处,灵活且最大限度地复用代码。
Task运行结果的处理情况有3种:第一种情况是resultSize大于maxResultSize,这种情况下构建IndirectTaskResult对象,并返回该IndirectTaskResult对象,IndirectTaskResult对象中包含结果所在的BlockId,在SchedulerBackend中可以通过BlockManager获得该BlockId对应的结果数据,这里的maxResultSize默认为1GB;第二种情况是resultSize大于Akka帧的大小,这种情况下也是构建IndirectTaskResult对象,并返回该IndirectTaskResult对象,Akka帧的大小为128MB;第三种情况是直接返回DirectTaskResult,这是在resultSize小于Akka帧大小的情况下采取的默认返回方式。