5.2 Worker启动原理和源码详解
本节讲解Worker启动原理和源码。对于Worker的部署启动,我们以Worker的脚本为入口点进行分析。
5.2.1 Worker启动的原理流程
Spark中各个组件是通过脚本启动部署的。Worker的部署以脚本为入口点开始分析。每个组件对应提供了启动的脚本,同时也会提供停止的脚本,停止脚本比较简单,在此仅分析启动的脚本。
部署Worker组件时,最简单的方式是通过配置Spark部署目录下的conf/slaves文件,然后以批量的方式启动集群中在该文件中列出的全部节点上的Worker实例。启动组件的命令如下所示:
1. ./sbin/start-slaves.sh
或者动态地在某个新增节点上(注意是新增节点,如果之前已经部署过,可以参考后面对启动多个实例的进一步分析)启动一个Worker实例,此时可以在该新增的节点上执行如下启动命令。
1. ./sbin/start-slave.sh MasterURL
其中,参数MasterURL表示当前集群中Master的监听地址,启动后Worker会通过该地址动态注册到Master组件,实现为集群动态添加Worker节点的目的。
下面是Worker部署脚本的解析。
部署脚本根据单个节点以及多个节点的Worker部署,对应有两个脚本:start-slave.sh和start-slaves.sh。其中,start-slave.sh负责在脚本执行节点启动一个Worker组件。start-slaves.sh脚本则会读取配置的conf/slaves文件,逐个启动集群中各个Slave节点上的Worker组件。
1.首先分析脚本start-slaves.sh
脚本start-slaves.sh提供了批量启动集群中各个Slave节点上的Worker组件的方法,即可以在配置好Slave节点(即配置好conf/slaves文件)后,通过该脚本一次性全部启动集群中的Worker组件。
脚本的代码如下所示。
1. #在根据conf/slaves文件指定的每个节点上启动一个Slave实例,即Worker组件 2. #Starts a slave instance on each machine specified in the conf/slaves file. 3. 4. if [ -z "${SPARK_HOME}" ]; then 5. export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)" 6. fi 7. 8. #根据配置信息设置是否需要同时启动Tachyon的Slave实例 9. START_TACHYON=false 10. 11. while (( "$#" )); do 12. case $1 in 13. --with-tachyon) 14. if [ ! -e "${SPARK_HOME}/sbin"/../tachyon/bin/tachyon ]; then 15. echo "Error: --with-tachyon specified, but tachyon not found." 16. exit -1 17. fi 18. START_TACHYON=true 19. ;; 20. esac 21. shift 22. done 23. 24. . "${SPARK_HOME}/sbin/spark-config.sh" 25. . "${SPARK_HOME}/bin/load-spark-env.sh" 26. 27. # Find the port number for the master 28. if [ "$SPARK_MASTER_PORT" = "" ]; then 29. SPARK_MASTER_PORT=7077 30. fi 31. 32. #这里获取Master的IP信息。需要注意的是,如果当前SPARK_MASTER_IP环境变量 33. #没有配置,会通过hostname命令来获取,这时如果不在Master组件所在节点启动本 34. #脚本,Master的IP设置就不一致了,为避免此类错误,建议在Master组件所在的节 35. #点上启动该脚本 36. if [ "$SPARK_MASTER_IP" = "" ]; then 37. SPARK_MASTER_IP="`hostname`" 38. fi 39. # 通过slaves.sh脚本启动Tachyon的Slave实例 40. if [ "$START_TACHYON" == "true" ]; then 41. "${SPARK_HOME}/sbin/slaves.sh" cd "${SPARK_HOME}" \; "${SPARK_HOME}/ sbin"/../tachyon/bin/tachyon bootstrap-conf "$SPARK_MASTER_IP" 42. # set -t so we can call sudo 43. SPARK_SSH_OPTS="-o StrictHostKeyChecking=no -t" "${SPARK_HOME}/sbin/ slaves.sh" cd "${SPARK_HOME}" \; "${SPARK_HOME}/tachyon/bin/tachyon- start.sh" worker SudoMount \; sleep 1 44. fi 45. #通过slaves.sh脚本启动Worker实例,这里会调用Worker启动的另一个脚本start- #slave.sh 46. # Launch the slaves 47. "${SPARK_HOME}/sbin/slaves.sh" cd "${SPARK_HOME}" \; "${SPARK_HOME}/sbin /start-slave.sh" "spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT"
其中,脚本slaves.sh通过ssh协议在指定的各个Slave节点上执行各种命令。
在ssh启动的start-slave.sh命令中,可以看到它的参数是"spark://$SPARK_MASTER_ IP:$SPARK_MASTER_PORT",即启动slave节点上的Worker进程时,使用的Master URL的值是通过两个环境变量(SPARK_MASTER_IP和SPARK_MASTER_PORT)拼接而成的。
2.脚本start-slave.sh分析
从前面start-slaves.sh脚本的分析中可以看到,最终是在各个Slave节点上执行start-slave.sh脚本来部署Worker组件。对应地,就可以通过该脚本,动态地为集群添加新的Worker组件。
脚本的代码如下所示:
1. 2. if [ -z "${SPARK_HOME}" ]; then 3. export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)" 4. fi 5. 6. #注:提取的类名必须和SparkSubmit的类相匹配,任何变化都需在类中反映 7. 8. # Worker组件对应的类 9. CLASS="org.apache.spark.deploy.worker.Worker" 10. 11. #脚本的用法,其中master参数是必选的,Worker需要与集群的Master通信 12. #这里的master 对应 Master URL信息 13. if [[ $# -lt 1 ]] || [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then 14. echo "Usage: ./sbin/start-slave.sh [options] <master>" 15. pattern="Usage:" 16. pattern+="\|Using Spark's default log4j profile:" 17. pattern+="\|Registered signal handlers for" 18. 19. "${SPARK_HOME}"/bin/spark-class $CLASS --help 2>&1 | grep -v "$pattern" 1>&2 20. exit 1 21. fi 22. 23. . "${SPARK_HOME}/sbin/spark-config.sh" 24. 25. . "${SPARK_HOME}/bin/load-spark-env.sh" 26. 27. #第一个参数应该是master,先保存它,因为我们可能需要在它和其他参数之间插入参数 28. 29. MASTER=$1 30. shift 31. 32. # Worker 的Web UI的端口号设置 33. 34. if [ "$SPARK_WORKER_WEBUI_PORT" = "" ]; then 35. SPARK_WORKER_WEBUI_PORT=8081 36. fi 37. 38. #在节点上启动指定序号的Worker实例 39. # 40. #快速启动Worker的本地功能 41. function start_instance { 42. #指定的Worker实例的序号,一个节点上可以部署多个Worker组件,对应有多个实例 43. WORKER_NUM=$1 44. shift 45. 46. if [ "$SPARK_WORKER_PORT" = "" ]; then 47. PORT_FLAG= 48. PORT_NUM= 49. else 50. PORT_FLAG="--port" 51. PORT_NUM=$(( $SPARK_WORKER_PORT + $WORKER_NUM - 1 )) 52. fi 53. WEBUI_PORT=$(( $SPARK_WORKER_WEBUI_PORT + $WORKER_NUM - 1 )) 54. 55. #和Master组件一样,Worker组件也是使用启动守护进程的spark-daemon.sh脚本来启动 #一个Worker实例的 56. 57. "${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS $WORKER_NUM \ 58. --webui-port "$WEBUI_PORT" $PORT_FLAG $PORT_NUM $MASTER "$@" 59. } 60. 61. #一个节点上部署几个Worker组件是由SPARK_WORKER_INSTANCES环境变量控制#的,默 #认情况下只部署一个实例,start_instance方法的第一个参数为实例的序号 62. if [ "$SPARK_WORKER_INSTANCES" = "" ]; then 63. start_instance 1 "$@" 64. else 65. for ((i=0; i<$SPARK_WORKER_INSTANCES; i++)); do 66. start_instance $(( 1 + $i )) "$@" 67. done 68. fi
手动启动Worker实例时,如果需要在一个节点上部署多个Worker组件,则需要配置SPARK_WORKER_INSTANCES环境变量,否则多次启动脚本部署Worker组件时会报错,其原因在于spark-daemon.sh脚本的执行控制,这里给出关键代码的简单分析。
首先,脚本中带了实例是否已经运行的判断,代码如下所示。
1. run_command() { 2. mode="$1" 3. shift 4. 5. mkdir -p "$SPARK_PID_DIR" 6. 7. #检查记录对应实例的PID的文件,如果对应进程已经运行,则会报错 8. if [ -f "$pid" ]; then 9. TARGET_ID="$(cat "$pid")" 10. if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; then 11. echo "$command running as process $TARGET_ID. Stop it first." 12. exit 1 13. fi 14. fi
其中,记录对应实例的PID的文件相关的代码如下所示。
1. #这是PID文件所在的目录,如果没有设置,默认为/tmp 2. #如果使用了默认目录,可能会出现停止组件失败的信息 3. #原因在于该/tmp下的文件可能会被系统自动删除 4. if [ "$SPARK_PID_DIR" = "" ]; then 5. SPARK_PID_DIR=/tmp 6. fi 7. 8. #这是指定实例编号对应的pid文件的路径 9. #其中$instance代表实例编号,因此如果编号相同,对应的就是同一个文件 10. pid="$SPARK_PID_DIR/spark-$SPARK_IDENT_STRING-$command-$instance.pid"
从上面的分析可以看出,如果不是通过设置SPARK_WORKER_INSTANCES,然后一次性启动多个Worker实例,而是手动一个个地启动,对应的在脚本中每次启动时的实例编号都是1,在后台守护进程的spark-daemon.sh脚本中生成的pid就是同一个文件。因此,第二次启动时,pid文件已经存在,此时就会报错(对应停止时也是通过读取pid文件获取进程ID的,因此自动停止多个实例,也需要设置SPARK_WORKER_INSTANCES)。
5.2.2 Worker启动的源码详解
首先查看Worker伴生对象中的main方法,代码如下。
1. private[deploy] object Worker extends Logging { 2. val SYSTEM_NAME = "sparkWorker" 3. val ENDPOINT_NAME = "Worker" 4. 5. def main(argStrings: Array[String]) { 6. Utils.initDaemon(log) 7. val conf = new SparkConf 8. //构建解析参数的实例 9. val args = new WorkerArguments(argStrings, conf) 10. //启动RPC通信环境以及Worker的RPC通信终端 11. val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args. webUiPort, args.cores, 12. args.memory, args.masters, args.workDir, conf = conf) 13. rpcEnv.awaitTermination() 14. }
可以看到,Worker伴生对象中的main方法、格式和Master基本一致。通过参数的类型WorkerArguments来解析命令行参数。具体的代码解析可以参考Master节点部署时的MasterArguments的代码解析。
另外,MasterArguments中的printUsageAndExit方法,对应的就是命令行中的帮助信息。
解析完Worker的参数后,调用startRpcEnvAndEndpoint方法启动RPC通信环境以及Worker的RPC通信终端。该方法的代码解析可以参考Master节点部署时使用的同名方法的代码解析。
最终会实例化一个Worker对象。Worker也继承ThreadSafeRpcEndpoint,对应的也是一个RPC的通信终端,实例化该对象后会调用onStart方法,该方法的代码如下所示。
Worker.scala的源码如下。
1. override def onStart() { 2. //刚启动时Worker肯定是未注册的状态 3. assert(!registered) 4. logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format( 5. host, port, cores, Utils.megabytesToString(memory))) 6. logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}") 7. logInfo("Spark home: " + sparkHome) 8. //构建工作目录 9. createWorkDir() 10. //启动Shuffle服务 11. shuffleService.startIfEnabled() 12. //启动一个Web UI 13. webUi = new WorkerWebUI(this, workDir, webUiPort) 14. webUi.bind() 15. 16. workerWebUiUrl = s"http://$publicAddress:${webUi.boundPort}" 17. //每个Slave节点上启动Worker组件时,都需要向集群中的Master注册 18. 19. registerWithMaster() 20. 21. metricsSystem.registerSource(workerSource) 22. metricsSystem.start() 23. //度量系统启动后,将Worker 度量的Servlet处理程序附加到Web用户界面 24. metricsSystem.getServletHandlers.foreach(webUi.attachHandler) 25. }
其中,createWorkDir()方法对应构建了该Worker节点上的工作目录,后续在该节点上执行的Application相关信息都会存放在该目录下。
Worker.scala的createWorkDir的源码如下。
1. private def createWorkDir() { 2. workDir = Option(workDirPath).map(new File(_)).getOrElse(new File (sparkHome, "work")) 3. try { 4. //这偶尔会失败,不知道原因 ... !workDir.exists() && !workDir.mkdirs() 5. //因此,尝试创建并检查目录是否创建 6. workDir.mkdirs() 7. if ( !workDir.exists() || !workDir.isDirectory) { 8. logError("Failed to create work directory " + workDir) 9. System.exit(1) 10. } 11. assert (workDir.isDirectory) 12. } catch { 13. case e: Exception => 14. logError("Failed to create work directory " + workDir, e) 15. System.exit(1) 16. } 17. }
可以看到,如果workDirPath没有设置,默认使用的是sparkHome目录下的work子目录。对应的workDirPath在Worker实例化时传入,反推代码可以查到该变量在WorkerArguments中设置。相关代码有两处:一处在WorkerArguments的主构造体中,代码如下所示。
WorkerArguments.scala的源码如下。
1. if (System.getenv("SPARK_WORKER_DIR") != null) { 2. workDir = System.getenv("SPARK_WORKER_DIR") 3. }
即workDirPath由环境变量SPARK_WORKER_DIR设置。
另外一处在命令行选项解析时设置,代码如下所示。
WorkerArguments.scala的源码如下。
1. private def parse(args: List[String]): Unit = args match { 2. ...... 3. case ("--work-dir" | "-d") :: value :: tail => 4. workDir = value 5. parse(tail) 6. ......
即workDirPath由启动Worker实例时传入的可选项--work-dir设置。属性配置:通常由命令可选项来动态设置启动时的配置属性,此时配置的优先级高于默认的属性文件以及环境变量中设置的属性。
启动Worker后一个关键的步骤就是注册到Master,对应的方法registerWithMaster()的代码如下所示。
Worker.scala的源码如下。
1. private def registerWithMaster() { 2. ...... 3. registerMasterFutures = tryRegisterAllMasters() 4. ...... 5.
继续查看tryRegisterAllMasters方法,代码如下所示。
Spark 2.1.1版本的Worker.scala的源码如下。
1. private def tryRegisterAllMasters(): Array[JFuture[_]] = { 2. ...... 3. registerWithMaster(masterEndpoint) 4. ...... 5.
Spark 2.2.0版本的Worker.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第3行registerWithMaster方法调整为sendRegisterMessageToMaster方法。
1. private def tryRegisterAllMasters(): Array[JFuture[_]] = { 2. ...... 3. sendRegisterMessageToMaster(masterEndpoint) 4. ......
其中,registerWithMaster(masterEndpoint)向特定Master的RPC通信终端发送消息RegisterWorker。
Worker接收到反馈消息后,进一步调用handleRegisterResponse方法进行处理。对应的处理代码如下所示。
Worker.scala的源码如下。
1. private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized { 2. msg match { 3. //成功注册该Worker节点,设置registered,修改当前的Master 4. case RegisteredWorker(masterRef, masterWebUiUrl) => 5. logInfo("Successfully registered with master " + masterRef .address.toSparkURL) 6. registered = true 7. changeMaster(masterRef, masterWebUiUrl) 8. //启动周期性心跳发送调度器,在Worker生命周期中定期向Worker发送自己的心跳信息 9. forwordMessageScheduler.scheduleAtFixedRate(new Runnable { 10. override def run(): Unit = Utils.tryLogNonFatalError { 11. self.send(SendHeartbeat) 12. } 13. }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS) 14. //启动工作目录的定期清理调度器,默认情况下,该配置的属性为False,需要手动设置,对 //应属性名为spark.worker.cleanup.enabled 15. if (CLEANUP_ENABLED) { 16. logInfo( 17. s"Worker cleanup enabled; old application directories will be deleted in: $workDir") 18. forwordMessageScheduler.scheduleAtFixedRate(new Runnable { 19. override def run(): Unit = Utils.tryLogNonFatalError { 20. self.send(WorkDirCleanup) 21. } 22. }, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS) 23. } 24. 25. val execs = executors.values.map { e => 26. new ExecutorDescription(e.appId, e.execId, e.cores, e.state) 27. } 28. masterRef.send(WorkerLatestState(workerId, execs.toList, drivers.keys.toSeq)) 29. //注册失败,则退出 30. case RegisterWorkerFailed(message) => 31. if (!registered) { 32. logError("Worker registration failed: " + message) 33. System.exit(1) 34. } 35. //注册的Master处于Standby状态 36. case MasterInStandby => 37. //Ignore. Master not yet ready. 38. } 39. }
分析到这一步,已经明确了注册以及对注册的反馈信息的处理细节。下面进一步分析注册重试定时器的相关处理。注册重试定时器会定期向Worker本身发送ReregisterWithMaster消息,因此可以在receive方法中查看该消息的处理,具体代码如下所示。
1. override def receive: PartialFunction[Any, Unit] = synchronized { 2. ...... 3. case ReregisterWithMaster => 4. reregisterWithMaster() 5. ......