1.3.2 Spark的分布式执行
读到这里,你一定已经知道了 Spark 是一个分布式数据处理引擎,其各种组件在一个集群上协同工作。接下来的几章会探讨如何使用 Spark 进行编程,在此之前,你需要先了解 Spark 分布式架构中的各组件是如何一起工作并相互通信的,以及 Spark 都支持哪些部署模式。
我们先一一介绍图 1-4 中出现的组件,以及这些组件在架构中发挥的作用。从整体架构上看,Spark 应用有一个驱动器程序,该程序负责控制 Spark 集群内的并行计算。驱动器会通过 SparkSession
对象访问集群内的分布式组件(一系列 Spark 执行器)和集群管理器。
图 1-4:Spark 的组件和架构
Spark 驱动器
作为 Spark 应用中负责初始化
SparkSession
的部分,Spark 驱动器扮演着多个角色:它与集群管理器打交道;它向集群管理器申请 Spark 执行器(JVM)所需要的资源(CPU、内存等);它还会将所有的 Spark 操作转换为 DAG 运算,并负责调度,还要将这些计算分成任务分发到 Spark 执行器上。一旦资源分配完成,创建好执行器后,驱动器就会直接与执行器通信。SparkSession`
在 Spark 2.0 中,
SparkSession
是所有 Spark 操作和数据的统一入口。它不仅封装了 Spark 程序之前的各种入口(如SparkContext
、SQLContext
、HiveContext
、SparkConf
,以及StreamingContext
等),还让 Spark 变得更加简单、好用。在 Spark 2.x 中,虽然
SparkSession
对象已经包含了其他所有的上下文对象,但你仍然可以访问那些上下文对象及其方法。通过这种方式,社区保持着后向的兼容性。也就是说,使用SparkContext
或SQLContext
的基于 1.x 版本的旧代码也可以在 2.x 上运行。通过这个入口,可以创建 JVM 运行时参数、定义 DataFrame 或 Dataset、从数据源读取数据、访问数据库元数据,并发起 Spark SQL 查询。
SparkSession
为所有的 Spark 功能提供了统一的入口。在独立的 Spark 应用中,你可以用自己所选择的编程语言的高级 API 创建
SparkSession
对象。在 Spark shell 中(第 2 章将进一步介绍),SparkSession
对象会被自动创建,你只需要使用全局变量spark
或sc
3 即可访问。在 Spark 1.x 中,需要创建多种上下文对象(分别用于流处理、SQL 等),这会让代码显得很烦琐。在 Spark 2.x 中,应用只需要为每个 JVM 创建一个
SparkSession
对象,然后就可以用其执行各种 Spark 操作。我们直接看代码示例吧。
// Scala代码 import org.apache.spark.sql.SparkSession // 构建SparkSession val spark = SparkSession .builder .appName("LearnSpark") .config("spark.sql.shuffle.partitions", 6) .getOrCreate() ... // 用session对象读取JSON val people = spark.read.json("...") ... // 用session对象发起SQL查询 val resultsDF = spark.sql("SELECT city, pop, state, zip FROM table_name")
集群管理器
集群管理器负责管理和分配集群内各节点的资源,以用于 Spark 应用的执行。目前,Spark 支持 4 种集群管理器:Spark 自带的独立集群管理器、Apache Hadoop YARN、Apache Mesos,以及 Kubernetes。
Spark 执行器
Spark 执行器在集群内各工作节点上运行。执行器与驱动器程序通信,并负责在工作节点上执行任务。在大多数部署模式中,每个工作节点上只有一个执行器。
部署模式
支持多种部署模式是 Spark 的一大优势,这让 Spark 可以在不同的配置和环境中运行。因为集群管理器不需要知道它实际在哪里运行(只要能管理 Spark 的执行器,并满足资源请求就行),所以 Spark 可以部署在 Apache Hadoop YARN 和 Kubernetes 等一些常见环境中,并且以不同的模式运行。表 1-1 总结了可供选择的部署模式。
表 1-1:Spark部署模式一览表
分布式数据与分区
实际的物理数据是以分区的形式分布在 HDFS 或者云存储上的,如图 1-5 所示。数据分区遍布整个物理集群,而 Spark 将每个分区在逻辑上抽象为内存中的一个 DataFrame4。出于数据本地性要求,在分配任务时,根据要读取的数据分区与各 Spark 执行器在网络上的远近,最好将任务分配到最近的 Spark 执行器上。
图 1-5:物理机器间的数据分布
分区可以实现高效的并行执行。将数据切割为数据块或分区的分布式结构可以让 Spark 执行器只处理靠近自己的数据,从而最小化网络带宽使用率。也就是说,执行器的每个核心都能分到自己要处理的数据分区,如图 1-6 所示。
图 1-6:执行器的每个核心都获得了数据的一个分区
举个例子,以下代码片段将存储在集群中的物理数据分入 8 个分区。这样一来,每个执行器都可以分到一个或多个分区,然后加载到执行器的内存中。
# Python代码 log_df = spark.read.text("path_to_large_text_file").repartition(8) print(log_df.rdd.getNumPartitions())
以下代码会在内存中创建一个包含 8 个分区、共 10 000 个整型数的 DataFrame。
# Python代码 df = spark.range(0, 10000, 1, 8) print(df.rdd.getNumPartitions())
这两段代码都会打印出
8
。第 3 章和第 7 章会讨论如何根据所有执行器的核心数来修改和调整分区方式,以实现最大的并发度。
3全局变量 sc
为 SparkSession
内的 SparkContext
对象。——译者注
4此 DataFrame 与后续会介绍的 Spark DataFrame 不是同一个概念。——译者注