天天看点

sprak 容错机制-checkpoint

   我们知道spark具有很强的数据容错机制,为了保证RDD的完整性,RDD 通过血统(Lineage)的关系,它采用粗粒度的方式记录了RDD的演变过程,这种方式相比于细粒度的方式确实限制了spark的运用场景,但是它却提高了spark的性能。

sprak 容错机制-checkpoint

当RDD在运行的过程中,出现错误导致数据不完整,这时spark会根据血统的关系,重新从头计算RDD的方式来恢复数据,这样在RDD的迭代次数比较少时,性能不会有太大差别,但是通常在使用spark执行机器学习算法时,往往需要迭代上百次,假如一个机器学习算法需要迭代RDD100次,但是在执行第100次时,spark出现故障,为保证数据的完整性,spark需要从头开始重新计算RDD,这样会导致spark的性能下降,为了应对这种情况,spark提供了另外一个机制-Checkpoint。

checkPoint可以将中间执行的RDD缓存到磁盘,当后面的RDD在执行时出现问题,spark运行机制就不必从头重新计算RDD,只需在checkPoint点获取数据重新计算后面的RDD即可,这样对于迭代次数比较多的spark任务,可以很好的提高其运行性能。下面看一下checkPoint的spark源码。

/**
   * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
   * directory set with `SparkContext#setCheckpointDir` and all references to its parent
   * RDDs will be removed. This function must be called before any job has been
   * executed on this RDD. It is strongly recommended that this RDD is persisted in
   * memory, otherwise saving it on a file will require recomputation.
   */
  def checkpoint(): Unit = RDDCheckpointData.synchronized {
    // NOTE: we use a global lock here due to complexities downstream with ensuring
    // children RDD partitions point to the correct parent partitions. In the future
    // we should revisit this consideration.
    if (context.checkpointDir.isEmpty) {
      throw new SparkException("Checkpoint directory has not been set in the SparkContext")
    } else if (checkpointData.isEmpty) {
      checkpointData = Some(new ReliableRDDCheckpointData(this))
    }
  }
           

在源码的注释中可以看出,checkpoint的RDD将会保存到通过SparkContext设置的CheckPoint的目录下面,并且 会移除checkpoint的RDD之前所有的RDD, 还有就是checkpoint方法要在RDD执行action方法之前调用。 注释的后半句也是相当重要, 强烈建议RDD持久化到内存中在进行checkpoint操作,不然在checkpoint操作时,将会重新计算RDD ,这样会很影响性能。

温馨提示:在进行checkpoint操作时 ,请先设置checkpoint保存的目录

具体设置方式如:sc.setCheckpointDir("hdfs://data/checkpoint20180122")

否则将会如源码所写抛出checkpoint目录在SparkContext中没有设置异常。

throw new SparkException("Checkpoint directory has not been set in the SparkContext")
           

继续阅读