Spark快速大数据分析(第2版)
上QQ阅读APP看书,第一时间看更新

1.3.2 Spark的分布式执行

读到这里,你一定已经知道了 Spark 是一个分布式数据处理引擎,其各种组件在一个集群上协同工作。接下来的几章会探讨如何使用 Spark 进行编程,在此之前,你需要先了解 Spark 分布式架构中的各组件是如何一起工作并相互通信的,以及 Spark 都支持哪些部署模式。

我们先一一介绍图 1-4 中出现的组件,以及这些组件在架构中发挥的作用。从整体架构上看,Spark 应用有一个驱动器程序,该程序负责控制 Spark 集群内的并行计算。驱动器会通过 SparkSession 对象访问集群内的分布式组件(一系列 Spark 执行器)和集群管理器。

图 1-4:Spark 的组件和架构

  1. Spark 驱动器

    作为 Spark 应用中负责初始化 SparkSession 的部分,Spark 驱动器扮演着多个角色:它与集群管理器打交道;它向集群管理器申请 Spark 执行器(JVM)所需要的资源(CPU、内存等);它还会将所有的 Spark 操作转换为 DAG 运算,并负责调度,还要将这些计算分成任务分发到 Spark 执行器上。一旦资源分配完成,创建好执行器后,驱动器就会直接与执行器通信。

  2. SparkSession`

    在 Spark 2.0 中,SparkSession 是所有 Spark 操作和数据的统一入口。它不仅封装了 Spark 程序之前的各种入口(如 SparkContextSQLContextHiveContextSparkConf,以及 StreamingContext 等),还让 Spark 变得更加简单、好用。

    在 Spark 2.x 中,虽然 SparkSession 对象已经包含了其他所有的上下文对象,但你仍然可以访问那些上下文对象及其方法。通过这种方式,社区保持着后向的兼容性。也就是说,使用 SparkContextSQLContext 的基于 1.x 版本的旧代码也可以在 2.x 上运行。

    通过这个入口,可以创建 JVM 运行时参数、定义 DataFrame 或 Dataset、从数据源读取数据、访问数据库元数据,并发起 Spark SQL 查询。SparkSession 为所有的 Spark 功能提供了统一的入口。

    在独立的 Spark 应用中,你可以用自己所选择的编程语言的高级 API 创建 SparkSession 对象。在 Spark shell 中(第 2 章将进一步介绍),SparkSession 对象会被自动创建,你只需要使用全局变量 sparksc3 即可访问。

    在 Spark 1.x 中,需要创建多种上下文对象(分别用于流处理、SQL 等),这会让代码显得很烦琐。在 Spark 2.x 中,应用只需要为每个 JVM 创建一个 SparkSession 对象,然后就可以用其执行各种 Spark 操作。

    我们直接看代码示例吧。

    // Scala代码
    import org.apache.spark.sql.SparkSession
    
    // 构建SparkSession
    val spark = SparkSession
      .builder
      .appName("LearnSpark")
      .config("spark.sql.shuffle.partitions", 6)
      .getOrCreate()
    ...
    // 用session对象读取JSON
    val people = spark.read.json("...")
    ...
    // 用session对象发起SQL查询
    val resultsDF = spark.sql("SELECT city, pop, state, zip FROM table_name")
  3. 集群管理器

    集群管理器负责管理和分配集群内各节点的资源,以用于 Spark 应用的执行。目前,Spark 支持 4 种集群管理器:Spark 自带的独立集群管理器、Apache Hadoop YARN、Apache Mesos,以及 Kubernetes。

  4. Spark 执行器

    Spark 执行器在集群内各工作节点上运行。执行器与驱动器程序通信,并负责在工作节点上执行任务。在大多数部署模式中,每个工作节点上只有一个执行器。

  5. 部署模式

    支持多种部署模式是 Spark 的一大优势,这让 Spark 可以在不同的配置和环境中运行。因为集群管理器不需要知道它实际在哪里运行(只要能管理 Spark 的执行器,并满足资源请求就行),所以 Spark 可以部署在 Apache Hadoop YARN 和 Kubernetes 等一些常见环境中,并且以不同的模式运行。表 1-1 总结了可供选择的部署模式。

    表 1-1:Spark部署模式一览表

  6. 分布式数据与分区

    实际的物理数据是以分区的形式分布在 HDFS 或者云存储上的,如图 1-5 所示。数据分区遍布整个物理集群,而 Spark 将每个分区在逻辑上抽象为内存中的一个 DataFrame4。出于数据本地性要求,在分配任务时,根据要读取的数据分区与各 Spark 执行器在网络上的远近,最好将任务分配到最近的 Spark 执行器上。

    图 1-5:物理机器间的数据分布

    分区可以实现高效的并行执行。将数据切割为数据块或分区的分布式结构可以让 Spark 执行器只处理靠近自己的数据,从而最小化网络带宽使用率。也就是说,执行器的每个核心都能分到自己要处理的数据分区,如图 1-6 所示。

    图 1-6:执行器的每个核心都获得了数据的一个分区

    举个例子,以下代码片段将存储在集群中的物理数据分入 8 个分区。这样一来,每个执行器都可以分到一个或多个分区,然后加载到执行器的内存中。

    # Python代码
    log_df = spark.read.text("path_to_large_text_file").repartition(8)
    print(log_df.rdd.getNumPartitions())

    以下代码会在内存中创建一个包含 8 个分区、共 10 000 个整型数的 DataFrame。

    # Python代码
    df = spark.range(0, 10000, 1, 8)
    print(df.rdd.getNumPartitions())

    这两段代码都会打印出 8

    第 3 章和第 7 章会讨论如何根据所有执行器的核心数来修改和调整分区方式,以实现最大的并发度。

3全局变量 scSparkSession 内的 SparkContext 对象。——译者注

4此 DataFrame 与后续会介绍的 Spark DataFrame 不是同一个概念。——译者注