天天看点

Spark容错机制,Lineage,CheckpointLineage机制Checkpoint机制

Spark容错机制,Lineage,Checkpoint

  • Lineage机制
  • Checkpoint机制

一般来说,分布式数据集的容错性有两种方式:数据检查点和记录数据的更新。

面向大树据处理检查点机制的代价更高,需要通过数据中心的网络连接在不同的机器之间复制数据,而网络的带宽往往比内存的带宽低的多,并且需要消耗大量的存储资源。因此spark选择了记录数据的更新,但是记录的太细也会消耗大量的资源。因此,RDD只支持粗粒度的转换,即只记录单个块上的单个操作,然后将创建RDD的一系列变换序列(每个RDD都包含了他是如何由其他RDD变换过来的以及如何重建某一块数据的信息。因此RDD的容错机制又称“血统(Lineage)”)记录下来,以便恢复丢失的分区。

Lineage本质上很类似于数据库中的重做日志(Redo Log),只不过这个重做日志粒度很大,是对全局数据做同样的重做进而恢复数据。

Lineage机制

相比较其它系统的细粒度的内存数据更新级别的备份或是LOG机制,RDD的Lineage记录的是粗颗粒度的特定数据Transformation操作(filter,map,join等)当这个RDD的部分分区数据丢失,可以通过Lineage获取信息来重新计算和恢复丢失的数据。因为这种粗粒度的容错机制也限制了spark的运用场合,但相比细粒度的容错机制也大大提高了运行的性能。

RDD在Lineage依赖方面分为两种:窄依赖(Narrow Dependencies)与宽依赖(Wide Dependencies,源码中称为Shuffle Dependencies),用来解决数据容错的高效性。对于区分两种依赖可参考文章Spark stage划分和宽窄依赖

窄依赖和宽依赖的概念主要用在两个地方:一个是容错中相当于Redo日志的功能;另一个是在调度中构建DAG作为不同Stage的划分点。

对于宽依赖,Stage计算的输入和输出在不同的节点上,lineage方法对与输入节点完好,而输出节点宕机时,通过重新计算,这种情况下,这种方法容错是有效的,否则无效,因为无法重试,需要向上其祖先追溯看是否可以重试(这就是lineage,血统的意思),Narrow Dependencies对于数据的重算开销要远小于Wide Dependencies的数据重算开销。

容错原理

在容错机制中,如果一个节点死机了,而且运算窄依赖,则只要把丢失的父RDD分区重算即可,不依赖于其他节点。而宽依赖需要父RDD的所有分区都存在,重算就很昂贵了。可以这样理解开销的经济与否:在窄依赖中,在子RDD的分区丢失、重算父RDD分区时,父RDD相应分区的所有数据都是子RDD分区的数据,并不存在冗余计算。在宽依赖情况下,丢失一个子RDD分区重算的每个父RDD的每个分区的所有数据并不是都给丢失的子RDD分区用的,会有一部分数据相当于对应的是未丢失的子RDD分区中需要的数据,这样就会产生冗余计算开销,这也是宽依赖开销更大的原因。因此如果使用Checkpoint算子来做检查点,不仅要考虑Lineage是否足够长,也要考虑是否有宽依赖,对宽依赖加Checkpoint是最物有所值的。

Checkpoint机制

通过上述分析可以看出在以下两种情况下,RDD需要加检查点。

  1. DAG中的Lineage过长,如果重算,则开销太大(如在PageRank中)。
  2. 在宽依赖上做Checkpoint获得的收益更大。

由于RDD是只读的,所以Spark中计算中数据一致性不是主要关心的,内存相对容易管理,这也是作者牛B的地方,减少了框架的复杂性,提升了性能和可扩展性,为上层框架的提供了强有力的基础。在RDD计算中通过Checkpoint机制进行容错,对于传统Checkpoint一般有两种方式,冗余数据和日志记录更新操作。在RDD的doCheckpoint相当于冗余数据来缓存数据,而上面介绍的Lineage相当于粗粒度的记录更新操作来实现容错。

检查点(本质是将RDD写入磁盘做检查点) 是为了Lineage做容做的辅助,Lineage过长会导致容做成本过高,可以在中间做检查点,如果之后有节点出问题而丢失数据,从检查点开始做Lineage,减少开销。

下面是RDD源码中Checkpoint的方法,里面建议在执行Checkpoint()方法之前先对rdd进行persisted操作,并且需要在SparkContext中设置Checkpoint的路径。

/**
   * Mark this RDD for Checkpointing. It will be saved to a file inside the Checkpoint
   * directory set with `SparkContext#setCheckpointDir` and all references to its parent
   * RDDs will be removed. This function must be called before any job has been
   * executed on this RDD. It is strongly recommended that this RDD is persisted in
   * memory, otherwise saving it on a file will require recomputation.
   */
   def Checkpoint(): Unit = RDDCheckpointData.synchronized {
    // NOTE: we use a global lock here due to complexities downstream with ensuring
    // children RDD partitions point to the correct parent partitions. In the future
    // we should revisit this consideration.
    if (context.CheckpointDir.isEmpty) {
      throw new SparkException("Checkpoint directory has not been set in the SparkContext")
    } else if (CheckpointData.isEmpty) {
      CheckpointData = Some(new ReliableRDDCheckpointData(this))
    }
  }
           

Checkpoint 写流程

checkpoint这个函数调用中,所依赖的RDD都会被删除,函数必须在job运行之前调用执行,强烈建议RDD缓存在内存中,否则保存到文件的时候需要从头计算。初始化RDD的CheckpointData变量为ReliableRDDCheckpointData。这时候标记为Initialized状态。

在所有job action的时候,runJob方法中都会调用rdd.doCheckpoint,这个会向前递归调用所有的依赖的RDD,看看需不需要Checkpoint。需要需要Checkpoint,然后调用CheckpointData.get.Checkpoint(),里面标记状态为CheckpointingInProgress,里面调用具体实现类的ReliableRDDCheckpointData的doCheckpoint方法。

doCheckpoint->writeRDDToCheckpointDirectory,注意这里会把job再运行一次,如果已经cache了,就可以直接使用缓存中的RDD了,就不需要重头计算一遍了,这时候直接把RDD,输出到hdfs,每个分区一个文件,会先写到一个临时文件,如果全部输出完,进行rename,如果输出失败,就回滚delete。

标记状态为Checkpointed,markCheckpointed方法中清除所有的依赖,怎么清除依赖的呢,就是把RDD变量的强引用设置为null,垃圾回收了,会触发ContextCleaner里面监听清除实际BlockManager缓存中的数据。

Checkpoint 读流程

Checkpoint将RDD持久化到HDFS或本地文件夹,如果不被手动remove掉,是一直存在的,也就是说可以被下一个driver program使用。比如spark streaming挂掉了,重启后就可以使用之前Checkpoint的数据进行recover,当然在同一个driver program也可以使用。我们讲下在同一个driver program中是怎么使用 Checkpoint 数据的。

如果一个RDD被Checkpoint了,如果这个RDD上有action操作时候,或者回溯的这个RDD的时候,这个RDD进行计算的时候,里面判断如果已经Checkpoint过,对分区和依赖的处理都是使用的RDD内部的CheckpointRDD变量。

具体细节如下,

如果一个RDD被Checkpoint了,那么这个RDD中对分区和依赖的处理都是使用的RDD内部的CheckpointRDD变量,具体实现是ReliableCheckpointRDD类型。这个是在Checkpoint写流程中创建的。依赖和获取分区方法中先判断是否已经Checkpoint,如果已经Checkpoint了,就使用ReliableCheckpointRDD,来处理依赖和获取分区。

如果没有,才向前回溯依赖。

参考文章:

https://blog.csdn.net/u012137473/article/details/85007483

https://www.cnblogs.com/duanxz/p/6329675.html

https://blog.csdn.net/wt346326775/article/details/72870518?utm_source=blogxgwz6

继续阅读