Spark大数据商业实战三部曲:内核解密|商业案例|性能调优
上QQ阅读APP看书,第一时间看更新

5.2 Worker启动原理和源码详解

本节讲解Worker启动原理和源码。对于Worker的部署启动,我们以Worker的脚本为入口点进行分析。

5.2.1 Worker启动的原理流程

Spark中各个组件是通过脚本启动部署的。Worker的部署以脚本为入口点开始分析。每个组件对应提供了启动的脚本,同时也会提供停止的脚本,停止脚本比较简单,在此仅分析启动的脚本。

部署Worker组件时,最简单的方式是通过配置Spark部署目录下的conf/slaves文件,然后以批量的方式启动集群中在该文件中列出的全部节点上的Worker实例。启动组件的命令如下所示:

1.  ./sbin/start-slaves.sh

或者动态地在某个新增节点上(注意是新增节点,如果之前已经部署过,可以参考后面对启动多个实例的进一步分析)启动一个Worker实例,此时可以在该新增的节点上执行如下启动命令。

1.  ./sbin/start-slave.sh  MasterURL

其中,参数MasterURL表示当前集群中Master的监听地址,启动后Worker会通过该地址动态注册到Master组件,实现为集群动态添加Worker节点的目的。

下面是Worker部署脚本的解析。

部署脚本根据单个节点以及多个节点的Worker部署,对应有两个脚本:start-slave.sh和start-slaves.sh。其中,start-slave.sh负责在脚本执行节点启动一个Worker组件。start-slaves.sh脚本则会读取配置的conf/slaves文件,逐个启动集群中各个Slave节点上的Worker组件。

1.首先分析脚本start-slaves.sh

脚本start-slaves.sh提供了批量启动集群中各个Slave节点上的Worker组件的方法,即可以在配置好Slave节点(即配置好conf/slaves文件)后,通过该脚本一次性全部启动集群中的Worker组件。

脚本的代码如下所示。

1.  #在根据conf/slaves文件指定的每个节点上启动一个Slave实例,即Worker组件
2.  #Starts a slave instance on each machine specified in the conf/slaves file.
3.
4.  if [ -z "${SPARK_HOME}" ]; then
5.    export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
6.  fi
7.
8.  #根据配置信息设置是否需要同时启动Tachyon的Slave实例
9.  START_TACHYON=false
10.
11. while (( "$#" )); do
12. case $1 in
13.     --with-tachyon)
14.       if [ ! -e "${SPARK_HOME}/sbin"/../tachyon/bin/tachyon ]; then
15.         echo "Error: --with-tachyon specified, but tachyon not found."
16.         exit -1
17.       fi
18.       START_TACHYON=true
19.       ;;
20. esac
21. shift
22. done
23.
24. . "${SPARK_HOME}/sbin/spark-config.sh"
25. . "${SPARK_HOME}/bin/load-spark-env.sh"
26.
27. # Find the port number for the master
28. if [ "$SPARK_MASTER_PORT" = "" ]; then
29.   SPARK_MASTER_PORT=7077
30. fi
31.
32. #这里获取Master的IP信息。需要注意的是,如果当前SPARK_MASTER_IP环境变量
33. #没有配置,会通过hostname命令来获取,这时如果不在Master组件所在节点启动本
34. #脚本,Master的IP设置就不一致了,为避免此类错误,建议在Master组件所在的节
35. #点上启动该脚本
36. if [ "$SPARK_MASTER_IP" = "" ]; then
37.   SPARK_MASTER_IP="`hostname`"
38. fi
39. # 通过slaves.sh脚本启动Tachyon的Slave实例
40. if [ "$START_TACHYON" == "true" ]; then
41.   "${SPARK_HOME}/sbin/slaves.sh" cd "${SPARK_HOME}" \; "${SPARK_HOME}/
      sbin"/../tachyon/bin/tachyon bootstrap-conf "$SPARK_MASTER_IP"
42.   # set -t so we can call sudo
43.   SPARK_SSH_OPTS="-o StrictHostKeyChecking=no -t" "${SPARK_HOME}/sbin/
      slaves.sh" cd "${SPARK_HOME}" \; "${SPARK_HOME}/tachyon/bin/tachyon-
      start.sh" worker SudoMount \; sleep 1
44. fi
45. #通过slaves.sh脚本启动Worker实例,这里会调用Worker启动的另一个脚本start-
    #slave.sh
46. # Launch the slaves
47. "${SPARK_HOME}/sbin/slaves.sh" cd "${SPARK_HOME}" \; "${SPARK_HOME}/sbin
    /start-slave.sh" "spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT"

其中,脚本slaves.sh通过ssh协议在指定的各个Slave节点上执行各种命令。

在ssh启动的start-slave.sh命令中,可以看到它的参数是"spark://$SPARK_MASTER_ IP:$SPARK_MASTER_PORT",即启动slave节点上的Worker进程时,使用的Master URL的值是通过两个环境变量(SPARK_MASTER_IP和SPARK_MASTER_PORT)拼接而成的。

2.脚本start-slave.sh分析

从前面start-slaves.sh脚本的分析中可以看到,最终是在各个Slave节点上执行start-slave.sh脚本来部署Worker组件。对应地,就可以通过该脚本,动态地为集群添加新的Worker组件。

脚本的代码如下所示:

1.
2.  if [ -z "${SPARK_HOME}" ]; then
3.    export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
4.  fi
5.
6.  #注:提取的类名必须和SparkSubmit的类相匹配,任何变化都需在类中反映
7.
8.  # Worker组件对应的类
9.  CLASS="org.apache.spark.deploy.worker.Worker"
10.
11. #脚本的用法,其中master参数是必选的,Worker需要与集群的Master通信
12. #这里的master 对应 Master URL信息
13. if [[ $# -lt 1 ]] || [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
14.   echo "Usage: ./sbin/start-slave.sh [options] <master>"
15.   pattern="Usage:"
16.   pattern+="\|Using Spark's default log4j profile:"
17.   pattern+="\|Registered signal handlers for"
18.
19.   "${SPARK_HOME}"/bin/spark-class $CLASS --help 2>&1 | grep -v "$pattern"
      1>&2
20.   exit 1
21. fi
22.
23. . "${SPARK_HOME}/sbin/spark-config.sh"
24.
25. . "${SPARK_HOME}/bin/load-spark-env.sh"
26.
27. #第一个参数应该是master,先保存它,因为我们可能需要在它和其他参数之间插入参数
28.
29. MASTER=$1
30. shift
31.
32. # Worker 的Web UI的端口号设置
33.
34. if [ "$SPARK_WORKER_WEBUI_PORT" = "" ]; then
35.   SPARK_WORKER_WEBUI_PORT=8081
36. fi
37.
38. #在节点上启动指定序号的Worker实例
39. #
40. #快速启动Worker的本地功能
41. function start_instance {
42. #指定的Worker实例的序号,一个节点上可以部署多个Worker组件,对应有多个实例
43.   WORKER_NUM=$1
44.   shift
45.
46.   if [ "$SPARK_WORKER_PORT" = "" ]; then
47.     PORT_FLAG=
48.     PORT_NUM=
49.   else
50.     PORT_FLAG="--port"
51.     PORT_NUM=$(( $SPARK_WORKER_PORT + $WORKER_NUM - 1 ))
52.   fi
53.   WEBUI_PORT=$(( $SPARK_WORKER_WEBUI_PORT + $WORKER_NUM - 1 ))
54.
55. #和Master组件一样,Worker组件也是使用启动守护进程的spark-daemon.sh脚本来启动
    #一个Worker实例的
56.
57.   "${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS $WORKER_NUM \
58.      --webui-port "$WEBUI_PORT" $PORT_FLAG $PORT_NUM $MASTER "$@"
59. }
60.
61. #一个节点上部署几个Worker组件是由SPARK_WORKER_INSTANCES环境变量控制#的,默
    #认情况下只部署一个实例,start_instance方法的第一个参数为实例的序号
62. if [ "$SPARK_WORKER_INSTANCES" = "" ]; then
63.   start_instance 1 "$@"
64. else
65.   for ((i=0; i<$SPARK_WORKER_INSTANCES; i++)); do
66.     start_instance $(( 1 + $i )) "$@"
67.   done
68. fi

手动启动Worker实例时,如果需要在一个节点上部署多个Worker组件,则需要配置SPARK_WORKER_INSTANCES环境变量,否则多次启动脚本部署Worker组件时会报错,其原因在于spark-daemon.sh脚本的执行控制,这里给出关键代码的简单分析。

首先,脚本中带了实例是否已经运行的判断,代码如下所示。

1.  run_command() {
2.    mode="$1"
3.    shift
4.
5.    mkdir -p "$SPARK_PID_DIR"
6.
7.  #检查记录对应实例的PID的文件,如果对应进程已经运行,则会报错
8.    if [ -f "$pid" ]; then
9.      TARGET_ID="$(cat "$pid")"
10.     if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; then
11.       echo "$command running as process $TARGET_ID.  Stop it first."
12.       exit 1
13.     fi
14.   fi

其中,记录对应实例的PID的文件相关的代码如下所示。

1.  #这是PID文件所在的目录,如果没有设置,默认为/tmp
2.  #如果使用了默认目录,可能会出现停止组件失败的信息
3.  #原因在于该/tmp下的文件可能会被系统自动删除
4.  if [ "$SPARK_PID_DIR" = "" ]; then
5.    SPARK_PID_DIR=/tmp
6.  fi
7.
8.  #这是指定实例编号对应的pid文件的路径
9.  #其中$instance代表实例编号,因此如果编号相同,对应的就是同一个文件
10. pid="$SPARK_PID_DIR/spark-$SPARK_IDENT_STRING-$command-$instance.pid"

从上面的分析可以看出,如果不是通过设置SPARK_WORKER_INSTANCES,然后一次性启动多个Worker实例,而是手动一个个地启动,对应的在脚本中每次启动时的实例编号都是1,在后台守护进程的spark-daemon.sh脚本中生成的pid就是同一个文件。因此,第二次启动时,pid文件已经存在,此时就会报错(对应停止时也是通过读取pid文件获取进程ID的,因此自动停止多个实例,也需要设置SPARK_WORKER_INSTANCES)。

5.2.2 Worker启动的源码详解

首先查看Worker伴生对象中的main方法,代码如下。

1.   private[deploy] object Worker extends Logging {
2.    val SYSTEM_NAME = "sparkWorker"
3.    val ENDPOINT_NAME = "Worker"
4.
5.    def main(argStrings: Array[String]) {
6.      Utils.initDaemon(log)
7.      val conf = new SparkConf
8.    //构建解析参数的实例
9.      val args = new WorkerArguments(argStrings, conf)
10.   //启动RPC通信环境以及Worker的RPC通信终端
11.     val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.
        webUiPort, args.cores,
12.       args.memory, args.masters, args.workDir, conf = conf)
13.    rpcEnv.awaitTermination()
14.  }

可以看到,Worker伴生对象中的main方法、格式和Master基本一致。通过参数的类型WorkerArguments来解析命令行参数。具体的代码解析可以参考Master节点部署时的MasterArguments的代码解析。

另外,MasterArguments中的printUsageAndExit方法,对应的就是命令行中的帮助信息。

解析完Worker的参数后,调用startRpcEnvAndEndpoint方法启动RPC通信环境以及Worker的RPC通信终端。该方法的代码解析可以参考Master节点部署时使用的同名方法的代码解析。

最终会实例化一个Worker对象。Worker也继承ThreadSafeRpcEndpoint,对应的也是一个RPC的通信终端,实例化该对象后会调用onStart方法,该方法的代码如下所示。

Worker.scala的源码如下。

1.   override def onStart() {
2.   //刚启动时Worker肯定是未注册的状态
3.      assert(!registered)
4.      logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
5.        host, port, cores, Utils.megabytesToString(memory)))
6.      logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
7.      logInfo("Spark home: " + sparkHome)
8.   //构建工作目录
9.     createWorkDir()
10.  //启动Shuffle服务
11.     shuffleService.startIfEnabled()
12.  //启动一个Web UI
13.     webUi = new WorkerWebUI(this, workDir, webUiPort)
14.     webUi.bind()
15.
16.     workerWebUiUrl = s"http://$publicAddress:${webUi.boundPort}"
17.  //每个Slave节点上启动Worker组件时,都需要向集群中的Master注册
18.
19.    registerWithMaster()
20.
21.     metricsSystem.registerSource(workerSource)
22.     metricsSystem.start()
23.     //度量系统启动后,将Worker 度量的Servlet处理程序附加到Web用户界面
24.     metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
25.   }

其中,createWorkDir()方法对应构建了该Worker节点上的工作目录,后续在该节点上执行的Application相关信息都会存放在该目录下。

Worker.scala的createWorkDir的源码如下。

1.  private def createWorkDir() {
2.  workDir = Option(workDirPath).map(new File(_)).getOrElse(new File
    (sparkHome, "work"))
3.  try {
4.    //这偶尔会失败,不知道原因 ... !workDir.exists() && !workDir.mkdirs()
5.    //因此,尝试创建并检查目录是否创建
6.    workDir.mkdirs()
7.    if ( !workDir.exists() || !workDir.isDirectory) {
8.      logError("Failed to create work directory " + workDir)
9.         System.exit(1)
10.      }
11.      assert (workDir.isDirectory)
12.    } catch {
13.      case e: Exception =>
14.        logError("Failed to create work directory " + workDir, e)
15.        System.exit(1)
16.    }
17.  }

可以看到,如果workDirPath没有设置,默认使用的是sparkHome目录下的work子目录。对应的workDirPath在Worker实例化时传入,反推代码可以查到该变量在WorkerArguments中设置。相关代码有两处:一处在WorkerArguments的主构造体中,代码如下所示。

WorkerArguments.scala的源码如下。

1.  if (System.getenv("SPARK_WORKER_DIR") != null) {
2.    workDir = System.getenv("SPARK_WORKER_DIR")
3.  }

即workDirPath由环境变量SPARK_WORKER_DIR设置。

另外一处在命令行选项解析时设置,代码如下所示。

WorkerArguments.scala的源码如下。

1.  private def parse(args: List[String]): Unit = args match {
2.  ......
3.      case ("--work-dir" | "-d") :: value :: tail =>
4.        workDir = value
5.        parse(tail)
6.  ......

即workDirPath由启动Worker实例时传入的可选项--work-dir设置。属性配置:通常由命令可选项来动态设置启动时的配置属性,此时配置的优先级高于默认的属性文件以及环境变量中设置的属性。

启动Worker后一个关键的步骤就是注册到Master,对应的方法registerWithMaster()的代码如下所示。

Worker.scala的源码如下。

1.  private def registerWithMaster() {
2.    ......
3.        registerMasterFutures = tryRegisterAllMasters()
4.     ......
5.

继续查看tryRegisterAllMasters方法,代码如下所示。

Spark 2.1.1版本的Worker.scala的源码如下。

1.      private def tryRegisterAllMasters(): Array[JFuture[_]] = {
2.  ......
3.              registerWithMaster(masterEndpoint)
4.  ......
5.

Spark 2.2.0版本的Worker.scala的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第3行registerWithMaster方法调整为sendRegisterMessageToMaster方法。

1.        private def tryRegisterAllMasters(): Array[JFuture[_]] = {
2.  ......
3.    sendRegisterMessageToMaster(masterEndpoint)
4.  ......

其中,registerWithMaster(masterEndpoint)向特定Master的RPC通信终端发送消息RegisterWorker。

Worker接收到反馈消息后,进一步调用handleRegisterResponse方法进行处理。对应的处理代码如下所示。

Worker.scala的源码如下。

1.      private    def   handleRegisterResponse(msg:        RegisterWorkerResponse):
        Unit = synchronized {
2.      msg match {
3.    //成功注册该Worker节点,设置registered,修改当前的Master
4.        case RegisteredWorker(masterRef, masterWebUiUrl) =>
5.          logInfo("Successfully registered with master " + masterRef
            .address.toSparkURL)
6.          registered = true
7.          changeMaster(masterRef, masterWebUiUrl)
8.   //启动周期性心跳发送调度器,在Worker生命周期中定期向Worker发送自己的心跳信息
9.          forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
10.           override def run(): Unit = Utils.tryLogNonFatalError {
11.             self.send(SendHeartbeat)
12.           }
13.         }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
14.  //启动工作目录的定期清理调度器,默认情况下,该配置的属性为False,需要手动设置,对
     //应属性名为spark.worker.cleanup.enabled
15.         if (CLEANUP_ENABLED) {
16.           logInfo(
17.             s"Worker cleanup enabled; old application directories will be
                deleted in: $workDir")
18.           forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
19.             override def run(): Unit = Utils.tryLogNonFatalError {
20.               self.send(WorkDirCleanup)
21.             }
22.           }, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS,
              TimeUnit.MILLISECONDS)
23.         }
24.
25.         val execs = executors.values.map { e =>
26.           new ExecutorDescription(e.appId, e.execId, e.cores, e.state)
27.         }
28.         masterRef.send(WorkerLatestState(workerId, execs.toList,
            drivers.keys.toSeq))
29.  //注册失败,则退出
30.       case RegisterWorkerFailed(message) =>
31.         if (!registered) {
32.           logError("Worker registration failed: " + message)
33.           System.exit(1)
34.         }
35.  //注册的Master处于Standby状态
36.       case MasterInStandby =>
37.       //Ignore. Master not yet ready.
38.    }
39.  }

分析到这一步,已经明确了注册以及对注册的反馈信息的处理细节。下面进一步分析注册重试定时器的相关处理。注册重试定时器会定期向Worker本身发送ReregisterWithMaster消息,因此可以在receive方法中查看该消息的处理,具体代码如下所示。

1.    override def receive: PartialFunction[Any, Unit] = synchronized {
2.  ......
3.        case ReregisterWithMaster =>
4.        reregisterWithMaster()
5.  ......