9.2 Spark中checkpoint原理和源码详解
本节对Spark中checkpoint原理及Spark中checkpoint源码进行详解。
9.2.1 Spark中checkpoint原理详解
checkpoint到底是什么?
(1)Spark在生产环境下经常会面临Tranformations的RDD非常多(例如,一个Job中包含10 000个RDD)或者具体Tranformation产生的RDD本身计算特别复杂和耗时(例如,计算时常超过1h),此时我们必须考虑对计算结果数据的持久化。
(2)Spark擅长多步骤迭代,同时擅长基于Job的复用,这时如果能够对曾经计算的过程产生的数据进行复用,就可以极大地提升效率。
(3)如果采用persist把数据放在内存中,虽然是最快速的,但是也是最不可靠的。如果放在磁盘上,也不是完全可靠的。例如,磁盘会损坏,管理员可能清空磁盘等。
(4)checkpoint的产生就是为了相对更加可靠地持久化数据,checkpoint可以指定把数据放在本地并且是多副本的方式,但是在正常的生产情况下是放在HDFS,这就自然地借助HDFS高容错、高可靠的特征完成了最大化的、可靠的持久化数据的方式。
(5)为确保RDD复用计算的可靠性,checkpoint把数据持久化到HDFS中,保证数据最大程度的安全性。
(6)checkpoint就是针对整个RDD计算链条中特别需要数据持久化的环节(后面会反复使用当前环节的RDD)开始基于HDFS等的数据持久化复用策略,通过对RDD启动checkpoint机制来实现容错和高可用。
9.2.2 Spark中checkpoint源码详解
1.checkpoint的运行原理和源码实现彻底详解
RDD进行计算前须先看一下是否有checkpoint,如果有checkpoint,就不需要再进行计算了。
RDD.scala的iterator方法的源码如下。
1. final def iterator(split: Partition, context: TaskContext): Iterator[T] = { 2. if (storageLevel != StorageLevel.NONE) { 3. getOrCompute(split, context) 4. } else { 5. computeOrReadCheckpoint(split, context) 6. } 7. }
进入RDD.scala的getOrCompute方法,源码如下。
1. private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = { 2. val blockId = RDDBlockId(id, partition.index) 3. var readCachedBlock = true 4. //这种方法被Executors调用,所以我们需要调用SparkEnv.get代替sc.env 5. SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => { 6. readCachedBlock = false 7. computeOrReadCheckpoint(partition, context) 8. }) match {
getOrCompute方法的getOrElseUpdate方法传入的第四个参数是匿名函数,调用computeOrReadCheckpoint(partition, context)检查checkpoint中是否有数据。
RDD.scala的computeOrReadCheckpoint的源码如下。
1. private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = 2. { 3. if (isCheckpointedAndMaterialized) { 4. firstParent[T].iterator(split, context) 5. } else { 6. compute(split, context) 7. } 8. }
computeOrReadCheckpoint方法中的isCheckpointedAndMaterialized是一个布尔值,判断这个RDD是否checkpointed和被物化,Spark 2.0 checkpoint中有两种方式:reliably或者locally。computeOrReadCheckpoint作为isCheckpointed语义的别名返回值。
isCheckpointedAndMaterialized方法的源码如下。
1. private[spark] def isCheckpointedAndMaterialized: Boolean = 2. checkpointData.exists(_.isCheckpointed)
回到RDD.scala的computeOrReadCheckpoint,如果已经持久化及物化isCheckpointed-AndMaterialized,就调用firstParent[T]的iterator。如果没有持久化,则进行compute。
2.checkpoint原理机制
(1)通过调用SparkContext.setCheckpointDir方法指定进行checkpoint操作的RDD把数据放在哪里,在生产集群中是放在HDFS上的,同时为了提高效率,在进行checkpoint的使用时,可以指定很多目录。
SparkContext为即将计算的RDD设置checkpoint保存的目录。如果在集群中运行,必须是HDFS的目录路径。
SparkContext.scala的setCheckpointDir的源码如下。
1. def setCheckpointDir(directory: String) { 2. 3. /** *如果在集群上运行,如目录是本地的,则记录一个警告。否则,driver可能会试图从它自己 *的本地文件系统重建RDD的checkpoint检测点,因为checkpoint检查点文件不正确。 *实际上是在Executor机器上 */ 4. if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) { 5. logWarning("Spark is not running in local mode, therefore the checkpoint directory " + 6. s"must not be on the local filesystem. Directory '$directory' " + 7. "appears to be on the local filesystem.") 8. } 9. 10. checkpointDir = Option(directory).map { dir => 11. val path = new Path(dir, UUID.randomUUID().toString) 12. val fs = path.getFileSystem(hadoopConfiguration) 13. fs.mkdirs(path) 14. fs.getFileStatus(path).getPath.toString 15. } 16. }
RDD.scala的checkpoint方法标记RDD的检查点checkpoint。它将保存到SparkContext# setCheckpointDir的目录检查点内的文件中,所有引用它的父RDDs将被移除。须在任何作业之前调用此函数。建议RDD在内存中缓存,否则保存在文件中时需要重新计算。
RDD.scala的checkpoint的源码如下。
1. def checkpoint(): Unit = RDDCheckpointData.synchronized { 2. //注意:我们在这里使用全局锁,原因是下游的复杂性:子RDD分区指向正确的父分区。未 //来我们应该重新考虑这个问题 3. if (context.checkpointDir.isEmpty) { 4. throw new SparkException("Checkpoint directory has not been set in the SparkContext") 5. } else if (checkpointData.isEmpty) { 6. checkpointData = Some(new ReliableRDDCheckpointData(this)) 7. } 8. }
其中的checkpointData是RDDCheckpointData。
1. private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None
RDDCheckpointData标识某个RDD要进行checkpoint。如果某个RDD要进行checkpoint,那在Spark框架内部就会生成RDDCheckpointData。
1. private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T]) 2. extends Serializable { 3. 4. import CheckpointState._ 5. 6. //相关的RDD检查状态 7. protected var cpState = Initialized 8. 9. //RDD包含检查点数据 10. private var cpRDD: Option[CheckpointRDD[T]] = None 11. 12. //待办事宜:确定需要在下面的方法中使用全局锁吗 13. 14. /** 15. *返回RDD的checkpoint数据是否已经持久化 16. */ 17. def isCheckpointed: Boolean = RDDCheckpointData.synchronized { 18. cpState == Checkpointed 19. } 20. 21. /** 22. *物化RDD和持久化其内容 23. *RDD的第一个行动完成以后立即触发调用 24. */ 25. final def checkpoint(): Unit = { 26. //防止多个线程同时对相同RDDCheckpointing,这RDDCheckpointData状态自动翻转 27. RDDCheckpointData.synchronized { 28. if (cpState == Initialized) { 29. cpState = CheckpointingInProgress 30. } else { 31. return 32. } 33. } 34. 35. val newRDD = doCheckpoint() 36. 37. //更新我们的状态和截断RDD的血统 38. RDDCheckpointData.synchronized { 39. cpRDD = Some(newRDD) 40. cpState = Checkpointed 41. rdd.markCheckpointed() 42. } 43. } 44. 45. /** 46. *物化RDD和持久化其内容 47. * 48. *子类应重写此方法,以定义自定义检查点行为 49. * @return the Checkpoint RDD 在进程中创建 50. */ 51. protected def doCheckpoint(): CheckpointRDD[T] 52. /** *返回包含我们的检查点数据。如果checkpoint的状态是Checkpointed,才定义 */ 53. 54. def checkpointRDD: Option[CheckpointRDD[T]] = RDDCheckpointData. synchronized { cpRDD } 55. /** *返回checkpoint RDD的分区,仅用于测试 */ 56. 57. def getPartitions: Array[Partition] = RDDCheckpointData.synchronized { 58. cpRDD.map(_.partitions).getOrElse { Array.empty } 59. } 60. 61. } 62. /** *同步检查点操作的全局锁 */ 63. 64. private[spark] object RDDCheckpointData
(2)在进行RDD的checkpoint的时候,其所依赖的所有的RDD都会从计算链条中清空掉。
(3)作为最佳实践,一般在进行checkpoint方法调用前都要进行persist把当前RDD的数据持久化到内存或者磁盘上,这是因为checkpoint是Lazy级别,必须有Job的执行,且在Job执行完成后,才会从后往前回溯哪个RDD进行了checkpoint标记,然后对标记过的RDD新启动一个Job执行具体的checkpoint过程。
(4)checkpoint改变了RDD的Lineage。
(5)当调用checkpoint方法要对RDD进行checkpoint操作,此时框架会自动生成RDDCheckpointData,当RDD上运行过一个Job后,就会立即触发RDDCheckpointData中的checkpoint方法,在其内部会调用doCheckpoint,实际上在生产时会调用ReliableRDDCheckpointData的doCheckpoint,在生产过程中会导致ReliableCheckpointRDD的writeRDDToCheckpointDirectory的调用,而在writeRDDToCheckpointDirectory方法内部,会触发runJob来执行把当前的RDD中的数据写到checkpoint的目录中,同时会产生ReliableCheckpointRDD实例。
RDDCheckpointData.scala的checkpoint方法进行真正的checkpoint:在RDDCheckpointData. synchronized同步块中先判断cpState的状态,然后调用doCheckpoint()。
RDDCheckpointData.scala的checkpoint方法的源码如下。
1. final def checkpoint(): Unit = { 2. //防止多个线程同时对相同RDDcheckpointing,这RDDCheckpointData状态自动翻转 3. RDDCheckpointData.synchronized { 4. if (cpState == Initialized) { 5. cpState = CheckpointingInProgress 6. } else { 7. return 8. } 9. } 10. 11. val newRDD = doCheckpoint() 12. 13. //更新我们的状态和截断RDD的血统 14. RDDCheckpointData.synchronized { 15. cpRDD = Some(newRDD) 16. cpState = Checkpointed 17. rdd.markCheckpointed() 18. } 19. }
其中的doCheckpoint方法是RDDCheckpointData.scala中的方法,这里没有具体的实现。
1. protected def doCheckpoint(): CheckpointRDD[T]
RDDCheckpointData的子类包括LocalRDDCheckpointData、ReliableRDDCheckpointData。ReliableRDDCheckpointData子类中doCheckpoint方法具体的实现,在方法中进行writeRDDToCheckpointDirectory的调用。
ReliableRDDCheckpointData.scala的doCheckpoint的源码如下。
1. protected override def doCheckpoint(): CheckpointRDD[T] = { 2. val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir) 3. 4. //如果引用超出范围,则可选地清理检查点文件 5. if (rdd.conf.getBoolean("spark.cleaner.referenceTracking. cleanCheckpoints", false)) { 6. rdd.context.cleaner.foreach { cleaner => 7. cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id) 8. } 9. } 10. 11. logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}") 12. newRDD 13. } 14. 15. }
writeRDDToCheckpointDirectory将RDD的数据写入到checkpoint的文件中,返回一个ReliableCheckpointRDD。
首先找到sparkContext,赋值给sc变量。
基于checkpointDir创建checkpointDirPath。
fs获取文件系统的内容。
然后是广播sc.broadcast,将路径信息广播给所有的Executor。
接下来是sc.runJob,触发runJob执行,把当前的RDD中的数据写到checkpoint的目录中。
最后返回ReliableCheckpointRDD。无论是对哪个RDD进行checkpoint,最终都会产生ReliableCheckpointRDD,以checkpointDirPath.toString中的数据为数据来源;以originalRDD.partitioner的分区器partitioner作为partitioner;这里的originalRDD就是要进行checkpoint的RDD。
writeRDDToCheckpointDirectory的源码如下。
1. def writeRDDToCheckpointDirectory[T: ClassTag]( 2. originalRDD: RDD[T], 3. checkpointDir: String, 4. blockSize: Int = -1): ReliableCheckpointRDD[T] = { 5. 6. val sc = originalRDD.sparkContext 7. 8. //为检查点创建输出路径 9. val checkpointDirPath = new Path(checkpointDir) 10. val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration) 11. if (!fs.mkdirs(checkpointDirPath)) { 12. throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath") 13. } 14. 15. //保存文件,并重新加载它作为一个RDD 16. val broadcastedConf = sc.broadcast( 17. new SerializableConfiguration(sc.hadoopConfiguration)) 18. //待办事项:这是代价昂贵的,因为它又一次计算RDD是不必要的(SPARK-8582) sc.runJob(originalRDD, 19. writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _) 20. 21. if (originalRDD.partitioner.nonEmpty) { 22. writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath) 23. } 24. 25. val newRDD = new ReliableCheckpointRDD[T]( 26. sc, checkpointDirPath.toString, originalRDD.partitioner) 27. if (newRDD.partitions.length != originalRDD.partitions.length) { 28. throw new SparkException( 29. s"Checkpoint RDD $newRDD(${newRDD.partitions.length}) has different " + 30. s"number of partitions from original RDD $originalRDD (${originalRDD.partitions.length})") 31. } 32. newRDD 33. }
ReliableCheckpointRDD是读取以前写入可靠存储系统检查点文件数据的RDD。其中的partitioner是构建ReliableCheckpointRDD的时候传进来的。其中的getPartitions是构建一个一个的分片。其中,getPreferredLocations获取数据本地性,fs.getFileBlockLocations获取文件的位置信息。compute方法通过ReliableCheckpointRDD.readCheckpointFile读取数据。
ReliableCheckpointRDD.scala的源码如下。
1. private[spark] class ReliableCheckpointRDD[T: ClassTag]( 2. sc: SparkContext, 3. val checkpointPath: String, 4. _partitioner: Option[Partitioner] = None 5. ) extends CheckpointRDD[T](sc) { 6. 7. @transient private val hadoopConf = sc.hadoopConfiguration 8. @transient private val cpath = new Path(checkpointPath) 9. @transient private val fs = cpath.getFileSystem(hadoopConf) 10. private val broadcastedConf = sc.broadcast(new SerializableConfiguration (hadoopConf)) 11. //如果检查点目录不存在,则快速失败 12. require(fs.exists(cpath), s"Checkpoint directory does not exist: $checkpointPath") 13. /** *返回checkpoint的路径,RDD从中读取数据 */ 14. 15. override val getCheckpointFile: Option[String] = Some(checkpointPath) 16. override val partitioner: Option[Partitioner] = { 17. _partitioner.orElse { 18. ReliableCheckpointRDD.readCheckpointedPartitionerFile(context, checkpointPath) 19. } 20. } 21. /** *返回检查点目录中的文件所描述的分区 *由于原来的RDD可能属于一个之前的应用,没办法知道之前的分区数。此方法假定在应用 *生命周期,原始集检查点文件完全保存在可靠的存储里面 */ 22. 23. protected override def getPartitions: Array[Partition] = { 24. //如果路径不存在,listStatus就抛出异常 25. val inputFiles = fs.listStatus(cpath) 26. .map(_.getPath) 27. .filter(_.getName.startsWith("part-")) 28. .sortBy(_.getName.stripPrefix("part-").toInt) 29. //如果输入文件无效,则快速失败 30. inputFiles.zipWithIndex.foreach { case (path, i) => 31. if (path.getName != ReliableCheckpointRDD.checkpointFileName(i)) { 32. throw new SparkException(s"Invalid checkpoint file: $path") 33. } 34. } 35. Array.tabulate(inputFiles.length)(i => new CheckpointRDDPartition(i)) 36. } 37. /** *返回与给定分区关联的检查点文件的位置 38. */ 39. protected override def getPreferredLocations(split: Partition): Seq[String] = { 40. val status = fs.getFileStatus( 41. new Path(checkpointPath, ReliableCheckpointRDD.checkpointFileName (split.index))) 42. val locations = fs.getFileBlockLocations(status, 0, status.getLen) 43. locations.headOption.toList.flatMap(_.getHosts).filter(_ != "localhost") 44. } 45. 46. /** *读取与给定分区关联的检查点文件的内容 47. */ 48. override def compute(split: Partition, context: TaskContext): I terator[T] = { 49. val file = new Path(checkpointPath, ReliableCheckpointRDD. checkpointFileName(split.index)) 50. ReliableCheckpointRDD.readCheckpointFile(file, broadcastedConf, context) 51. } 52. 53. } 54. .......
下面看一下ReliableCheckpointRDD.scala中compute方法中的ReliableCheckpointRDD. readCheckpointFile。readCheckpointFile读取指定检查点文件checkpoint的内容。readCheckpointFile方法通过deserializeStream反序列化fileInputStream文件输入流,然后将deserializeStream变成一个Iterator。
Spark 2.1.1版本的ReliableCheckpointRDD.scala的readCheckpointFile的源码如下。
1. def readCheckpointFile[T]( 2. path: Path, 3. broadcastedConf: Broadcast[SerializableConfiguration], 4. context: TaskContext): Iterator[T] = { 5. val env = SparkEnv.get 6. val fs = path.getFileSystem(broadcastedConf.value.value) 7. val bufferSize = env.conf.getInt("spark.buffer.size", 65536) 8. val fileInputStream = fs.open(path, bufferSize) 9. val serializer = env.serializer.newInstance() 10. val deserializeStream = serializer.deserializeStream(fileInputStream) 11. 12. //注册一个任务完成回调以,关闭输入流 13. context.addTaskCompletionListener(context => deserializeStream.close()) 14. 15. deserializeStream.asIterator.asInstanceOf[Iterator[T]] 16. } 17. 18. }
Spark 2.2.0版本的ReliableCheckpointRDD.scala的readCheckpointFile的源码与Spark 2.1.1版本相比具有如下特点:上段代码中第8行整体替换,新增fileInputStream变量中对CHECKPOINT_COMPRESS压缩配置的判断。如果CHECKPOINT压缩配置为true,则对fileStream文件流进行压缩。
1. ...... 2. val fileInputStream = { 3. val fileStream = fs.open(path, bufferSize) 4. if (env.conf.get(CHECKPOINT_COMPRESS)) { 5. CompressionCodec.createCodec(env.conf).compressedInputStream (fileStream) 6. } else { 7. fileStream 8. } 9. } 10. ......
ReliableRDDCheckpointData.scala的cleanCheckpoint方法,清理RDD数据相关的checkpoint文件。
1. def cleanCheckpoint(sc: SparkContext, rddId: Int): Unit = { 2. checkpointPath(sc, rddId).foreach { path => 3. path.getFileSystem(sc.hadoopConfiguration).delete(path, true) 4. } 5. }
在生产环境中不使用LocalCheckpointRDD。LocalCheckpointRDD的getPartitions直接从toArray级别中调用new()函数创建CheckpointRDDPartition。LocalCheckpointRDD的compute方法直接报异常。
LocalCheckpointRDD的源码如下。
1. private[spark] class LocalCheckpointRDD[T: ClassTag]( 2. sc: SparkContext, 3. rddId: Int, 4. numPartitions: Int) 5. extends CheckpointRDD[T](sc) { 6. ...... 7. protected override def getPartitions: Array[Partition] = { 8. (0 until numPartitions).toArray.map { i => new CheckpointRDDPartition(i) } 9. } 10. ....... 11. override def compute(partition: Partition, context: TaskContext): Iterator[T] = { 12. throw new SparkException( 13. s"Checkpoint block ${RDDBlockId(rddId, partition.index)} not found! Either the executor " + 14. s"that originally checkpointed this partition is no longer alive, or the original RDD is " + 15. s"unpersisted. If this problem persists, you may consider using 'rdd.checkpoint()' " + 16. s"instead, which is slower than local checkpointing but more fault- tolerant.") 17. } 18. 19. }
checkpoint运行流程图如图9-2所示。
图9-2 Checkpoint运行流程图
通过SparkContext设置Checkpoint数据保存的目录,RDD调用checkpoint方法,生产RDDCheckpointData,当RDD上运行一个Job后,就会立即触发RDDCheckpointData中的checkpoint方法,在其内部会调用doCheckpoint;然后调用ReliableRDDCheckpointData的doCheckpoint;ReliableCheckpointRDD的writeRDDToCheckpointDirectory的调用;在writeRDDToCheckpointDirectory方法内部会触发runJob,来执行把当前的RDD中的数据写到Checkpoint的目录中,同时会产生ReliableCheckpointRDD实例。
checkpoint保存在HDFS中,具有多个副本;persist保存在内存中或者磁盘中。在Job作业调度的时候,checkpoint沿着finalRDD的“血统”关系lineage从后往前回溯向上查找,查找哪些RDD曾标记为要进行checkpoint,标记为checkpointInProgress;一旦进行checkpoint,RDD所有父RDD就被清空。