Spark大数据商业实战三部曲:内核解密|商业案例|性能调优
上QQ阅读APP看书,第一时间看更新

2.2 Spark 2.2 Core

本节讲解第二代Tungsten引擎、SparkSession和累加器API的使用。

2.2.1 第二代Tungsten引擎

Spark备受瞩目的原因之一在于它的高性能,Spark开发者为了保持这个优势,一直在不断地进行各种层次的优化,其中最令人兴奋的莫过于钨丝计划(Project Tungsten),因为钨丝计划的提出给Spark带来了极大的性能提升,并且在一定程度上引导了Spark的发展方向。

Spark是使用Scala和Java语言开发的,不可避免地运行于JVM之上。当然,内存管理也是依赖于JVM的内存管理机制,而对于大数据量的基于内存的处理,JVM对象模型对内存的额外开销,以及频繁的GC和Full GC都是非常致命的问题。另外,随着网络带宽和磁盘I/O的不断提升,内存和CPU又重新作为性能瓶颈受到关注,JVM对象的序列化、反序列化带来的性能损耗亟待解决。Spark 1.5版本加入的钨丝计划从3大方面着手解决这些问题:

(1)统一内存管理模型和二进制处理(Binary Processing)。统一内存管理模型代替之前基于JVM的静态内存管理,引入Page来管理堆内存和堆外内存(on-heap和off-heap),并且直接操作内存中的二进制数据,而不是Java对象,很大程度上摆脱了JVM内存管理的限制。

(2)基于缓存感知的计算(Cache-aware Computation)。Spark内存读取操作也会带来一部分性能损耗,钨丝计划便设计了缓存友好的算法和数据结构来提高缓存命中率,充分利用L1/L2/L3三级缓存,大幅提高了内存读取速度,进而缩短了内存中整个计算过程的时间。

(3)代码生成(Code Generation)。在JVM中,所有代码的执行由解释器一步步地解释执行,CodeGeneration这一功能则在Spark运行时动态生成用于部分算子求值的bytecode,减少了对基础数据类型的封装,并且缓解了调用虚函数的额外开销。

Spark 2.0升级了第二代Tungsten引擎。其中最重要的一点是把CodeGeneration作用于全阶段的SparkSQL和DataFrame之上(即全阶段代码生成Whole Stage Code Generation),为常见的算子带来10倍左右的性能提升!

2.2.2 SparkSession

加入SparkSession,取代原来的SQLContext和HiveContext,为了兼容两者,仍然保留。SparkSession使用方法如下:

1.  SparkSession.builder()
2.      .master("local")
3.      .appName("Word Count")
4.      .config("spark.some.config.option", "some-value")
5.      .getOrCreate()

首先获得SparkSession的Builder,然后使用Builder为SparkSession设置参数,最后使用getOrCreate方法检测当前线程是否有一个已经存在的Thread-local级别的SparkSession,如果有,则返回它;如果没有,则检测是否有全局级别的SparkSession,有,则返回,没有,则创建新的SparkSession。

在程序中如果要使用SparkContext,调用sparkSession.sparkContext即可。在程序的最后我们需要调用sparkContext.stop方法,这个方法会调用sparkContext.stop来关闭sparkContext。

从Spark 2.0开始,DataFrame和DataSet既可以容纳静态、有限的数据,也可以容纳无限的流数据,所以用户也可以使用SparkSession像创建静态数据集一样来创建流式数据集,并且可以使用相同的操作算子。这样,整合了实时流处理和离线处理的框架,结合其他容错、扩展等特性就形成了完整的Lambda架构。

2.2.3 累加器API

Spark 2.0引入了一个更加简单和更高性能的累加器API,如在1.X版本中可以这样使用累加器:

1.  //定义累加器,这里直接使用  SparkContext    内置的累加器,设置初始值为  0,名字为"My
    //Accumulator"
2.  val accum = sc.accumulator(0, "My Accumulator")
3.  //计算值
4.  sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
5.  //获取累加器的值,(Executor上面只能对累加器进行累加操作,只有Driver才能读取累加
    //器的值,Driver读取值的时候会把各个Executor上存储的本地累加器的值加起来),这里
    //的结果是10
6.  accum.value

在Spark 2.X版本里使用SparkContext里内置的累加器:

1.   //与Spark 1.X不同的是,需要指定累加器的类型,目前SparkContext有Long类型和
     //Double类型的累加器可以直接使用(不需要指定初始值)
2.  val accum = sc.longAccumulator("My Accumulator")
3.  sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
4.  print(accum.value)

只使用SparkContext里内置的累加器功能肯定不能满足略微复杂的业务类型,此时我们就可以自定义累加器。在1.X版本中的做法是(下面是官网的例子):

1.   //继承AccumulatorParam[Vector],返回类型为Vector
2.  object VectorAccumulatorParam extends AccumulatorParam[Vector] {
3.  //定义“零”值,这里把传入的初始值的size作为“零”值
4.  def zero(initialValue: Vector): Vector = {
5.      Vector.zeros(initialValue.size)
6.    }
7.  //定义累加操作的计算方式
8.    def addInPlace(v1: Vector, v2: Vector): Vector = {
9.      v1 += v2
10.   }
11. }

上面的累加器元素和返回类型是相同的,在Scala中还有另外一种方式来自定义累加器,用户只需要继承Accumulable,就可以把元素和返回值定义为不同的类型,这样我们就可以完成添加操作(如往Int类型的List里添加整数,此时元素为Int类型,而返回类型为List)。

在Spark 2.X中加入一个新的抽象类——AccumulatorV2,继承这个类要实现以下几种方法:

add方法:指定元素相加操作。

copy方法:指定对自定义累加器的复制操作。

isZero方法:返回该累加器的值是否为“零”。

merge方法:合并两个相同类型的累加器。

reset方法:重置累加器。

value方法:返回累加器当前的值。

重写这几种方法之后,只需实例化自定义累加器,并连同累加器名字一起传给sparkContext.register方法。

下面简单实现一个把字符串合并为数组的累加器:

1.   //首先要继承AccumulatorV2,并指定输入为String类型,输出为ArrayBuffer[String]
2.  class MyAccumulator extends AccumulatorV2[String, ArrayBuffer[String]] {
3.  //设置累加器的结果,类型为ArrayBuffer[String]
4.    private var result = ArrayBuffer[String]()
5.
6.  //判断累加器当前值是否为“零值”,这里我们指定如果result的size为0,则累加器的当
    //前值是“零值”
7.    override def isZero: Boolean = this.result.size == 0
8.
9.  //copy方法设置为新建本累加器,并把result赋给新的累加器
10.   override def copy(): AccumulatorV2[String, ArrayBuffer[String]] = {
11.     val newAccum = new MyAccumulator
12.     newAccum.result = this.result
13.     newAccum
14.   }
15. //reset方法设置为把result设置为新的ArrayBuffer
16. override def reset(): Unit = this.result == new ArrayBuffer[String]()
17.
18. //add方法把传进来的字符串添加到result内
19. override def add(v: String): Unit = this.result += v
20.
21. //merge方法把两个累加器的result合并起来
22. override def merge(other: AccumulatorV2[String, ArrayBuffer[String]]):
    Unit = {
23.       result.++=:(other.value)
24.     }
25. //value方法返回result
26.   override def value: ArrayBuffer[String] = this.result
27. }
28. 接着在main方法里使用累加器:
29. val Myaccum = new MyAccumulator()
30.
31. //向SparkContext注册累加器
32.     sc.register(Myaccum)
33.
34. //把“a”“b”“c”“d”添加进累加器的result数组并打印出来
35.     sc.parallelize(Array("a","b","c","d")).foreach(x => Myaccum.add(x))
36.     println(Myaccum.value)

运行结果显示的ArrayBuffer里的值顺序是不固定的,取决于各个Executor的值到达Driver的顺序。