天天看点

Spark Checkpoint 的使用、具体内容及读写过程

阅读目录

1、checkpoint的使用

2、checkpoint的内容

3、checkpoint的写过程

4、Streaming checkpoint

5、checkpoint的读过程

一、checkpoint的使用

checkpoint的使用很简单

1、对于普通应用,首先要设置checkpoint目录,使用SparkContext的setCheckpointDir设置,然后对具体的RDD数据集调用checkpoint方法即可。

2、对流处理应用(结构化流),写出流时必须设置checkpoint,否则会报错,未设置checkpoint目录异常。用.option(“checkpointLocation”, “your checkpoint-pathname”)设置即可。

二、checkpoint的内容

下面这个类包含了和checkpoint相关的所有信息,类的每一个实例对应着一个RDD。管理着实施检查点的过程,也通过提供更新的分区、迭代器以及首选位置来管理检查点后的RDD的状态。

private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])
  extends Serializable {
  // The checkpoint state of the associated RDD.
  protected var cpState = Initialized

  // The RDD that contains our checkpointed data
  private var cpRDD: Option[CheckpointRDD[T]] = None
           

通过这个类我们可以看到,这个类记录了RDD是否被checkpoint,以及持久化的RDD内容。

具体写入的内容:

1、RDD内容。

2、RDD的分区器,Partitioner。

三、checkpoint的写过程(对应reliable)

RDD.doCheckpoint()-->
RDDCheckpointData.checkpoint()-->
RDDCheckpointData.doCheckpoint()-->
ReliableRDDCheckpointData.doCheckpoint()-->
ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)-->
sc.runJob(originalRDD, writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)-->
ReliableCheckpointRDD.writePartitionToCheckpointFile
           

1、Checkpoint的过程也就是保存RDD的过程,RDD.doCheckpoint()在使用该RDD的任务完成之后调用(所以,RDD已经被物化并存储在内存中),并且这一函数在RDD的所有的祖先RDD上递归调用。

abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
  ) extends Serializable with Logging {
  /**
   * checkpoint过程也就是保存RDD的过程,这一函数在使用该RDD的任务完成之后调用(所以,RDD已经被物化并存储
   * 在内存中),并且这一函数在RDD的所有的祖先RDD上递归调用。
   */
  private[spark] def doCheckpoint(): Unit = {
    RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) {
      if (!doCheckpointCalled) {
        doCheckpointCalled = true
        if (checkpointData.isDefined) {
          if (checkpointAllMarkedAncestors) {
            // TODO We can collect all the RDDs that needs to be checkpointed, and then checkpoint
            // them in parallel.
            // 先Checkpoint父RDD,因为在checkpoint了自身RDD之后,自身RDD的沿袭图将被截断
            dependencies.foreach(_.rdd.doCheckpoint())
          }
          checkpointData.get.checkpoint()
        } else {
          dependencies.foreach(_.rdd.doCheckpoint())
        }
      }
    }
  }
           

2、在对RDD设置检查点之后,在RDD上首次调用action操作并完成后立即调用该方法,持久化该RDD的内容,checkpoint内部调用了doCheckpoint执行具体行为。

private[spark] abstract class RDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])
  extends Serializable {
  
   final def checkpoint(): Unit = {
   // Guard against multiple threads checkpointing the same RDD by
   // atomically flipping the state of this RDDCheckpointData
   RDDCheckpointData.synchronized {
     if (cpState == Initialized) {
       cpState = CheckpointingInProgress
     } else {
       return
     }
   }
//这一方法的具体实现在继承RDDCheckpointData的子类中    
val newRDD = doCheckpoint()

// 更新Checkpoint状态并截断沿袭图
RDDCheckpointData.synchronized {
  cpRDD = Some(newRDD)
  cpState = Checkpointed
  //在完成checkpoint之后,标记该RDD已经checkpointed,清空该RDD的沿袭图
  rdd.markCheckpointed()
    }
  }
           

3、通过Ctrl+Alt+B可以看到doCheckpoint()方法在子类中的实现,可以看到该实现有两种一种是reliable(对应ReliableRDDCheckpointData)、一种是local(对应LocalRDDCheckpointData)。reliable将RDD保存到可靠的带有容错能力的存储体系当中,例如HDFS。reliable方式可以让driver在失败时从上次计算状态中重新启动。local则是将RDD在executor中进行暂时的块存储。local主要是用来避免将RDD进行可靠存储要花的大代价。local通常在RDD构建的沿袭很长而又要经常截断时候很有用,例如用GraphX进行机器学习时。

private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient private val rdd: RDD[T])
  extends RDDCheckpointData[T](rdd) with Logging {
  /**
   * reliable方式执行具体物化RDD动作的就是该方法,将RDD内容写入到一个可靠的DFS中
   */
  protected override def doCheckpoint(): CheckpointRDD[T] = {
  /**
  * 可以看到,调用了ReliableCheckpointRDD的writeRDDToCheckpointDirectory方法
  */
    val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir)

    // Optionally clean our checkpoint files if the reference is out of scope
    if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) {
      rdd.context.cleaner.foreach { cleaner =>
        cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)
      }
    }

    logInfo(s"Done checkpointing RDD ${rdd.id} to $cpDir, new parent is RDD ${newRDD.id}")
    newRDD
  }

}
           

4、ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir) 返回ReliableCheckpointRDD。

def writeRDDToCheckpointDirectory[T: ClassTag](
      originalRDD: RDD[T],
      checkpointDir: String,
      blockSize: Int = -1): ReliableCheckpointRDD[T] = {
    val checkpointStartTimeNs = System.nanoTime()

    val sc = originalRDD.sparkContext

    // 创建checkpoint输出目录
    val checkpointDirPath = new Path(checkpointDir)
    val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
    if (!fs.mkdirs(checkpointDirPath)) {
      throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath")
    }

    // 保存到文件中,作为一个RDD来加载
    val broadcastedConf = sc.broadcast(
      new SerializableConfiguration(sc.hadoopConfiguration))
    // TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582)
    // 这里明显强调了checkpoint是一个昂贵的操作,不是昂贵在异步,而是昂贵在沿着沿袭图链条重计算该RDD
    sc.runJob(originalRDD,
      writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)

    if (originalRDD.partitioner.nonEmpty) {
      writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)
    }

    val checkpointDurationMs =
      TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - checkpointStartTimeNs)
    logInfo(s"Checkpointing took $checkpointDurationMs ms.")

    val newRDD = new ReliableCheckpointRDD[T](
      sc, checkpointDirPath.toString, originalRDD.partitioner)
    if (newRDD.partitions.length != originalRDD.partitions.length) {
      throw new SparkException(
        "Checkpoint RDD has a different number of partitions from original RDD. Original " +
          s"RDD [ID: ${originalRDD.id}, num of partitions: ${originalRDD.partitions.length}]; " +
          s"Checkpoint RDD [ID: ${newRDD.id}, num of partitions: " +
          s"${newRDD.partitions.length}].")
    }
    newRDD
  }
           

四、Spark Streaming checkpoint

1、什么时候用到checkpoint

  • 流状态有转换操作的时候,例如updateStateByKey or reduceByKeyAndWindow
  • 从driver失败恢复时,需要元数据信息

2、checkpoint设置

streamingContext.checkpoint(checkpointDirectory)
           

3、触发checkpoint

  • 生成作业并perform checkpointing时

mark

private def generateJobs(time: Time) {
    // Checkpoint all RDDs marked for checkpointing to ensure their lineages are
    // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
    ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
    Try {
      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
      graph.generateJobs(time) // generate jobs using allocated block
    } match {
      case Success(jobs) =>
        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
      case Failure(e) =>
        jobScheduler.reportError("Error generating jobs for time " + time, e)
        PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
    }
    // 通过checkpoint事件来进行,加入checkpoint事件
    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
  }
           
  • 当前批次作业完成,清理Dstream元数据信息时

mark

private def clearMetadata(time: Time) {
    ssc.graph.clearMetadata(time)

    // If checkpointing is enabled, then checkpoint,
    // else mark batch to be fully processed
    if (shouldCheckpoint) {
    // 这里添加了checkpoint时间
      eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true))
      eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true))
    } else {
      // If checkpointing is not enabled, then delete metadata information about
      // received blocks (block data not saved in any case). Otherwise, wait for
      // checkpointing of this batch to complete.
      val maxRememberDuration = graph.getMaxInputStreamRememberDuration()
      jobScheduler.receiverTracker.cleanupOldBlocksAndBatches(time - maxRememberDuration)
      jobScheduler.inputInfoTracker.cleanup(time - maxRememberDuration)
      markBatchFullyProcessed(time)
    }
  }
           

再看看处理事件的函数

private def processEvent(event: JobGeneratorEvent) {
    logDebug("Got event " + event)
    event match {
      case GenerateJobs(time) => generateJobs(time)
      case ClearMetadata(time) => clearMetadata(time)
      // DoCheckpoint 事件在这里处理,调用 doCheckpoint(time, clearCheckpointDataLater) 
      case DoCheckpoint(time, clearCheckpointDataLater) =>
        doCheckpoint(time, clearCheckpointDataLater)
      case ClearCheckpointData(time) => clearCheckpointData(time)
    }
  }
           

执行checkpoint,JobGenerator.doCheckpoint(time: Time, clearCheckpointDataLater: Boolean)

private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {
    if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
      logInfo("Checkpointing graph for time " + time)
      ssc.graph.updateCheckpointData(time)
      // CheckpointWriter.write执行具体写操作,可以看到是写Checkpoint对象
      checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)
    } else if (clearCheckpointDataLater) {
      markBatchFullyProcessed(time)
    }
  }
           

4、checkpoint哪些内容

  • 元数据信息

    元数据信息是定义流应用的信息,用于运行driver程序的节点挂掉的时候恢复应用,包含以下内容

    1、流应用的配置 —> master, appName, jars, checkpointDir, sparkConfPairs

    2、定义流应用的DStream操作集合 —> ssc.graph中

    3、未完成的批,job已经入队,但是还未完成 —> ssc.scheduler.getPendingTimes().toArray

  • RDD数据内容,作业转换操作生成的RDD。

    通常这种状态转换发生在多个批之间,生成的RDD依赖于之前的批生成的RDD,这种情况必须checkpoint,否则沿袭图链条无限制增长。

通过Checkpoint.scala类我们会发现checkpoint的内容实际上是Checkpoint对象

private[streaming]
class Checkpoint(ssc: StreamingContext, val checkpointTime: Time)
  extends Logging with Serializable {
  
  // checkpont 的Checkpoint对象的主要内容
  val master = ssc.sc.master
  val framework = ssc.sc.appName
  val jars = ssc.sc.jars
  val graph = ssc.graph
  val checkpointDir = ssc.checkpointDir
  val checkpointDuration = ssc.checkpointDuration
  val pendingTimes = ssc.scheduler.getPendingTimes().toArray
  val sparkConfPairs = ssc.conf.getAll

// 执行写操作
def write(checkpoint: Checkpoint, clearCheckpointDataLater: Boolean) {
    try {
    // 序列化 Checkpoint 对象
      val bytes = Checkpoint.serialize(checkpoint, conf)
      executor.execute(new CheckpointWriteHandler(
        checkpoint.checkpointTime, bytes, clearCheckpointDataLater))
      logInfo(s"Submitted checkpoint of time ${checkpoint.checkpointTime} to writer queue")
    } catch {
      case rej: RejectedExecutionException =>
        logError("Could not submit checkpoint task to the thread pool executor", rej)
    }
  }
           

5、Streaming checkpoint过程

JobGenerator.doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) --> 
CheckpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater) -->
CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte], clearCheckpointDataLater: Boolean)
           

五、checkpoint的读过程

1、checkpoint读

–> RDD访问

RDD.iterator
           

–> 计算RDD或者从checkpoint文件目录中读取RDD

private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
  {
    if (isCheckpointedAndMaterialized) {
      firstParent[T].iterator(split, context)
    } else {
      compute(split, context)
    }
  }
           

–>ReliableCheckpointRDD.compute  (以从可靠存储中读取而不是local为例)

override def compute(split: Partition, context: TaskContext): Iterator[T] = {
    val file = new Path(checkpointPath, ReliableCheckpointRDD.checkpointFileName(split.index))
    ReliableCheckpointRDD.readCheckpointFile(file, broadcastedConf, context)
  }
           

–>ReliableCheckpointRDD.readCheckpointFile(file, broadcastedConf, context) 返回RDD迭代器

def readCheckpointFile[T](
      path: Path,
      broadcastedConf: Broadcast[SerializableConfiguration],
      context: TaskContext): Iterator[T] = {
    val env = SparkEnv.get
    val fs = path.getFileSystem(broadcastedConf.value.value)
    val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
    val fileInputStream = {
      val fileStream = fs.open(path, bufferSize)
      if (env.conf.get(CHECKPOINT_COMPRESS)) {
        CompressionCodec.createCodec(env.conf).compressedInputStream(fileStream)
      } else {
        fileStream
      }
    }
    val serializer = env.serializer.newInstance()
    val deserializeStream = serializer.deserializeStream(fileInputStream)

    // Register an on-task-completion callback to close the input stream.
    context.addTaskCompletionListener[Unit](context => deserializeStream.close())

    deserializeStream.asIterator.asInstanceOf[Iterator[T]]
  }
           

2、Streaming checkpoint读

// 从checkpoint数据中创建JavaStreamingContext或者创建一个新的JavaStreamingContext
JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);
           

调用CheckpointReader.read方法

def this(path: String, hadoopConf: Configuration) =
    this(null, CheckpointReader.read(path, new SparkConf(), hadoopConf).orNull, null)

  def this(path: String, sparkContext: SparkContext) = {
    this(
      sparkContext,
      CheckpointReader.read(path, sparkContext.conf, sparkContext.hadoopConfiguration).orNull,
      null)
  }
           

具体读方法,如果checkpoint存在,返回反序列化的Checkpoint对象,Option[Checkpoint]

def read(
      checkpointDir: String,
      conf: SparkConf,
      hadoopConf: Configuration,
      ignoreReadError: Boolean = false): Option[Checkpoint] = {
    val checkpointPath = new Path(checkpointDir)

    val fs = checkpointPath.getFileSystem(hadoopConf)

    // Try to find the checkpoint files
    val checkpointFiles = Checkpoint.getCheckpointFiles(checkpointDir, Some(fs)).reverse
    if (checkpointFiles.isEmpty) {
      return None
    }

    // Try to read the checkpoint files in the order
    logInfo(s"Checkpoint files found: ${checkpointFiles.mkString(",")}")
    var readError: Exception = null
    checkpointFiles.foreach { file =>
      logInfo(s"Attempting to load checkpoint from file $file")
      try {
        val fis = fs.open(file)
        val cp = Checkpoint.deserialize(fis, conf)
        logInfo(s"Checkpoint successfully loaded from file $file")
        logInfo(s"Checkpoint was generated at time ${cp.checkpointTime}")
        return Some(cp)
      } catch {
        case e: Exception =>
          readError = e
          logWarning(s"Error reading checkpoint from file $file", e)
      }
    }

    // If none of checkpoint files could be read, then throw exception
    if (!ignoreReadError) {
      throw new SparkException(
        s"Failed to read checkpoint from directory $checkpointPath", readError)
    }
    None
  }
}
           

继续阅读