天天看點

Spark容錯機制,Lineage,CheckpointLineage機制Checkpoint機制

Spark容錯機制,Lineage,Checkpoint

  • Lineage機制
  • Checkpoint機制

一般來說,分布式資料集的容錯性有兩種方式:資料檢查點和記錄資料的更新。

面向大樹據處理檢查點機制的代價更高,需要通過資料中心的網絡連接配接在不同的機器之間複制資料,而網絡的帶寬往往比記憶體的帶寬低的多,并且需要消耗大量的存儲資源。是以spark選擇了記錄資料的更新,但是記錄的太細也會消耗大量的資源。是以,RDD隻支援粗粒度的轉換,即隻記錄單個塊上的單個操作,然後将建立RDD的一系列變換序列(每個RDD都包含了他是如何由其他RDD變換過來的以及如何重建某一塊資料的資訊。是以RDD的容錯機制又稱“血統(Lineage)”)記錄下來,以便恢複丢失的分區。

Lineage本質上很類似于資料庫中的重做日志(Redo Log),隻不過這個重做日志粒度很大,是對全局資料做同樣的重做進而恢複資料。

Lineage機制

相比較其它系統的細粒度的記憶體資料更新級别的備份或是LOG機制,RDD的Lineage記錄的是粗顆粒度的特定資料Transformation操作(filter,map,join等)當這個RDD的部分分區資料丢失,可以通過Lineage擷取資訊來重新計算和恢複丢失的資料。因為這種粗粒度的容錯機制也限制了spark的運用場合,但相比細粒度的容錯機制也大大提高了運作的性能。

RDD在Lineage依賴方面分為兩種:窄依賴(Narrow Dependencies)與寬依賴(Wide Dependencies,源碼中稱為Shuffle Dependencies),用來解決資料容錯的高效性。對于區分兩種依賴可參考文章Spark stage劃分和寬窄依賴

窄依賴和寬依賴的概念主要用在兩個地方:一個是容錯中相當于Redo日志的功能;另一個是在排程中建構DAG作為不同Stage的劃分點。

對于寬依賴,Stage計算的輸入和輸出在不同的節點上,lineage方法對與輸入節點完好,而輸出節點當機時,通過重新計算,這種情況下,這種方法容錯是有效的,否則無效,因為無法重試,需要向上其祖先追溯看是否可以重試(這就是lineage,血統的意思),Narrow Dependencies對于資料的重算開銷要遠小于Wide Dependencies的資料重算開銷。

容錯原理

在容錯機制中,如果一個節點當機了,而且運算窄依賴,則隻要把丢失的父RDD分區重算即可,不依賴于其他節點。而寬依賴需要父RDD的所有分區都存在,重算就很昂貴了。可以這樣了解開銷的經濟與否:在窄依賴中,在子RDD的分區丢失、重算父RDD分區時,父RDD相應分區的所有資料都是子RDD分區的資料,并不存在備援計算。在寬依賴情況下,丢失一個子RDD分區重算的每個父RDD的每個分區的所有資料并不是都給丢失的子RDD分區用的,會有一部分資料相當于對應的是未丢失的子RDD分區中需要的資料,這樣就會産生備援計算開銷,這也是寬依賴開銷更大的原因。是以如果使用Checkpoint算子來做檢查點,不僅要考慮Lineage是否足夠長,也要考慮是否有寬依賴,對寬依賴加Checkpoint是最物有所值的。

Checkpoint機制

通過上述分析可以看出在以下兩種情況下,RDD需要加檢查點。

  1. DAG中的Lineage過長,如果重算,則開銷太大(如在PageRank中)。
  2. 在寬依賴上做Checkpoint獲得的收益更大。

由于RDD是隻讀的,是以Spark中計算中資料一緻性不是主要關心的,記憶體相對容易管理,這也是作者牛B的地方,減少了架構的複雜性,提升了性能和可擴充性,為上層架構的提供了強有力的基礎。在RDD計算中通過Checkpoint機制進行容錯,對于傳統Checkpoint一般有兩種方式,備援資料和日志記錄更新操作。在RDD的doCheckpoint相當于備援資料來緩存資料,而上面介紹的Lineage相當于粗粒度的記錄更新操作來實作容錯。

檢查點(本質是将RDD寫入磁盤做檢查點) 是為了Lineage做容做的輔助,Lineage過長會導緻容做成本過高,可以在中間做檢查點,如果之後有節點出問題而丢失資料,從檢查點開始做Lineage,減少開銷。

下面是RDD源碼中Checkpoint的方法,裡面建議在執行Checkpoint()方法之前先對rdd進行persisted操作,并且需要在SparkContext中設定Checkpoint的路徑。

/**
   * Mark this RDD for Checkpointing. It will be saved to a file inside the Checkpoint
   * directory set with `SparkContext#setCheckpointDir` and all references to its parent
   * RDDs will be removed. This function must be called before any job has been
   * executed on this RDD. It is strongly recommended that this RDD is persisted in
   * memory, otherwise saving it on a file will require recomputation.
   */
   def Checkpoint(): Unit = RDDCheckpointData.synchronized {
    // NOTE: we use a global lock here due to complexities downstream with ensuring
    // children RDD partitions point to the correct parent partitions. In the future
    // we should revisit this consideration.
    if (context.CheckpointDir.isEmpty) {
      throw new SparkException("Checkpoint directory has not been set in the SparkContext")
    } else if (CheckpointData.isEmpty) {
      CheckpointData = Some(new ReliableRDDCheckpointData(this))
    }
  }
           

Checkpoint 寫流程

checkpoint這個函數調用中,所依賴的RDD都會被删除,函數必須在job運作之前調用執行,強烈建議RDD緩存在記憶體中,否則儲存到檔案的時候需要從頭計算。初始化RDD的CheckpointData變量為ReliableRDDCheckpointData。這時候标記為Initialized狀态。

在所有job action的時候,runJob方法中都會調用rdd.doCheckpoint,這個會向前遞歸調用所有的依賴的RDD,看看需不需要Checkpoint。需要需要Checkpoint,然後調用CheckpointData.get.Checkpoint(),裡面标記狀态為CheckpointingInProgress,裡面調用具體實作類的ReliableRDDCheckpointData的doCheckpoint方法。

doCheckpoint->writeRDDToCheckpointDirectory,注意這裡會把job再運作一次,如果已經cache了,就可以直接使用緩存中的RDD了,就不需要重頭計算一遍了,這時候直接把RDD,輸出到hdfs,每個分區一個檔案,會先寫到一個臨時檔案,如果全部輸出完,進行rename,如果輸出失敗,就復原delete。

标記狀态為Checkpointed,markCheckpointed方法中清除所有的依賴,怎麼清除依賴的呢,就是把RDD變量的強引用設定為null,垃圾回收了,會觸發ContextCleaner裡面監聽清除實際BlockManager緩存中的資料。

Checkpoint 讀流程

Checkpoint将RDD持久化到HDFS或本地檔案夾,如果不被手動remove掉,是一直存在的,也就是說可以被下一個driver program使用。比如spark streaming挂掉了,重新開機後就可以使用之前Checkpoint的資料進行recover,當然在同一個driver program也可以使用。我們講下在同一個driver program中是怎麼使用 Checkpoint 資料的。

如果一個RDD被Checkpoint了,如果這個RDD上有action操作時候,或者回溯的這個RDD的時候,這個RDD進行計算的時候,裡面判斷如果已經Checkpoint過,對分區和依賴的處理都是使用的RDD内部的CheckpointRDD變量。

具體細節如下,

如果一個RDD被Checkpoint了,那麼這個RDD中對分區和依賴的處理都是使用的RDD内部的CheckpointRDD變量,具體實作是ReliableCheckpointRDD類型。這個是在Checkpoint寫流程中建立的。依賴和擷取分區方法中先判斷是否已經Checkpoint,如果已經Checkpoint了,就使用ReliableCheckpointRDD,來處理依賴和擷取分區。

如果沒有,才向前回溯依賴。

參考文章:

https://blog.csdn.net/u012137473/article/details/85007483

https://www.cnblogs.com/duanxz/p/6329675.html

https://blog.csdn.net/wt346326775/article/details/72870518?utm_source=blogxgwz6

繼續閱讀