Spark大数据商业实战三部曲:内核解密|商业案例|性能调优
上QQ阅读APP看书,第一时间看更新

3.3 RDD依赖关系

RDD依赖关系为成两种:窄依赖(Narrow Dependency)、宽依赖(Shuffle Dependency)。窄依赖表示每个父RDD中的Partition最多被子RDD的一个Partition所使用;宽依赖表示一个父RDD的Partition都会被多个子RDD的Partition所使用。

3.3.1 窄依赖解析

RDD的窄依赖(Narrow Dependency)是RDD中最常见的依赖关系,用来表示每一个父RDD中的Partition最多被子RDD的一个Partition所使用,如图3-1窄依赖关系图所示,父RDD有2~3个Partition,每一个分区都只对应子RDD的一个Partition(join with inputs co-partitioned:对数据进行基于相同Key的数值相加)。

图3-1 窄依赖关系图

窄依赖分为两类:第一类是一对一的依赖关系,在Spark中用OneToOneDependency来表示父RDD与子RDD的依赖关系是一对一的依赖关系,如map、filter、join with inputs co-partitioned;第二类是范围依赖关系,在Spark中用RangeDependency表示,表示父RDD与子RDD的一对一的范围内依赖关系,如union。

OneToOneDependency依赖关系的Dependency.scala的源码如下。

1.   class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T]
     (rdd) {
2.    override def getParents(partitionId: Int): List[Int] = List(partitionId)
3.  }

OneToOneDependency的getParents重写方法引入了参数partitionId,而在具体的方法中也使用了这个参数,这表明子RDD在使用getParents方法的时候,查询的是相同partitionId的内容。也就是说,子RDD仅仅依赖父RDD中相同partitionID的Partition。

Spark窄依赖中第二种依赖关系是RangeDependency。Dependency.scala的RangeDependency的源码如下。

1.   class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
2.    extends NarrowDependency[T](rdd) {
3.
4.    override def getParents(partitionId: Int): List[Int] = {
5.      if (partitionId >= outStart && partitionId < outStart + length) {
6.        List(partitionId - outStart + inStart)
7.      } else {
8.        Nil
9.      }
10.   }
11. }

RangeDependency和OneToOneDependency最大的区别是实现方法中出现了outStart、length、instart,子RDD在通过getParents方法查询对应的Partition时,会根据这个partitionId减去插入时的开始ID,再加上它在父RDD中的位置ID,换而言之,就是将父RDD中的Partition,根据partitionId的顺序依次插入到子RDD中。

分析完Spark中的源码,下边通过两个例子来讲解从实例角度去看RDD窄依赖输出的结果。

对于OneToOneDependency,采用map操作进行实验,实验代码和结果如下所示。

1.  def main (args: Array[String]) {
2.  val num1 = Array(100,80,70)
3.  val rddnum1 = sc.parallelize(num1)
4.  val mapRdd = rddnum1.map(_*2)
5.  mapRdd.collect().foreach(println)
6.  }

结果为200 160 140。

对于RangeDependency,采用union操作进行实验,实验代码和结果如下所示。

1.  def main (args: Array[String]) {
2.   //创建数组1
3.  val data1= Array("spark","scala","hadoop")
4.    //创建数组2
5.  val data2=Array("SPARK","SCALA","HADOOP")
6.   //将数组1的数据形成RDD1
7.  val rdd1 = sc.parallelize(data1)
8.    //将数组2的数据形成RDD2
9.  val rdd2=sc.parallelize(data2)
10.   //把RDD1与RDD2联合
11. val unionRdd = rdd1.union(rdd2)
12.   //将结果收集并输出
13. unionRdd.collect().foreach(println)
14. }

结果为spark scala hadoop SPARK SCALA HADOOP。

3.3.2 宽依赖解析

RDD的宽依赖(Shuffle Dependency)是一种会导致计算时产生Shuffle操作的RDD操作,用来表示一个父RDD的Partition都会被多个子RDD的Partition使用,如图3-2宽依赖关系图中groupByKey算子操作所示,父RDD有3个Partition,每个Partition中的数据会被子RDD中的两个Partition使用。

图3-2 宽依赖关系图

宽依赖的源码位于Dependency.scala文件的ShuffleDependency方法中,newShuffleId()产生了新的shuffleId,表明宽依赖过程需要涉及shuffle操作,后续的代码表示宽依赖进行时的shuffle操作需要向shuffleManager注册信息。

Dependency.scala的ShuffleDependency的源码如下。

1.   @DeveloperApi
2.  class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
3.      @transient private val _rdd: RDD[_ <: Product2[K, V]],
4.      val partitioner: Partitioner,
5.      val serializer: Serializer = SparkEnv.get.serializer,
6.      val keyOrdering: Option[Ordering[K]] = None,
7.      val aggregator: Option[Aggregator[K, V, C]] = None,
8.      val mapSideCombine: Boolean = false)
9.    extends Dependency[Product2[K, V]] {
10.
11.   override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2
      [K, V]]]
12.
13.   private[spark] val keyClassName: String = reflect.classTag[K].
      runtimeClass.getName
14.   private[spark] val valueClassName: String = reflect.classTag[V].
      runtimeClass.getName
15.   //如果在PairRDDFunctions方法中使用combineBykeyWithClassTag, combiner
16.   //类标签可能是空的
17.   private[spark] val combinerClassName: Option[String] =
18.     Option(reflect.classTag[C]).map(_.runtimeClass.getName)
19.
20.   val shuffleId: Int = _rdd.context.newShuffleId()
21.
22.   val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.
      registerShuffle(
23.     shuffleId, _rdd.partitions.length, this)
24.
25.   _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
26. }

Spark中宽依赖关系非常常见,其中较经典的操作为GroupByKey(将输入的key-value类型的数据进行分组,对相同key的value值进行合并,生成一个tuple2,如图3-3所示),具体代码和操作结果如下所示。输入5个tuple2类型的数据,通过运行产生3个tuple2数据。

1.  def main (args: Array[String]) {
2.    //设置输入的Tuple2数组
3.  val data = Array(Tuple2("spark",100),Tuple2("spark",95),
4.  Tuple2("hadoop",99),Tuple2("hadoop",80),Tuple2("scala",75))
5.    //将数组内容转化为RDD
6.  val rdd = sc.parallelize(data)
7.    //对RDD进行groupByKey操作
8.  val rddGrouped=rdd.groupByKey()
9.    //输出结果
10. rddGrouped.collect.foreach(println)
11. }

操作结果如图3-3所示。

图3-3 GroupByKey结果