天天看點

spark streaming源碼分析5 checkpoint

部落格位址: http://blog.csdn.net/yueqian_zhu/

這一節介紹checkpoint相關的内容

spark streaming是一個7*24小時工作的實時處理系統,是以必須保證從故障中恢複的能力。由于streaming 實際上是以小batch資料周期性的執行類似spark core中RDD的計算,是以,其worker節點的容錯也就天然的繼承了。但是,spark core的資料來源一般存在于hdfs中,是以并沒有做driver這一層的容錯保證,出錯時隻要重跑就可以了。而spark streaming的資料來源一般為網絡、kafka等,在driver異常時,需要有機制保證從哪裡繼續讀取計算,未完成的jobs如何重新計算等,進而保證資料的正确性。那麼,這正是通過spark streaming的checkpoint機制來完成的。在spark core部分,每當送出一個job計算完成之後,就會自動調用rdd.doCheckpoint方法将rdd儲存起來。streaming底層依賴spark core的計算邏輯,是以在rdd這一層,自然的會将每個最終runjob的RDD做checkpoint。

checkpoint寫:

在一個jobset運作完成後,就自動調用doCheckpoint方法,将在這個jobset内所有job都完成rdd這一層級的checkpoint檔案都記錄在記憶體中(currentCheckpointFiles),同時将這個batch time的所有資訊封裝成checkpoint對象(主要包含DStreamGraph,pendingtimes等資訊),并序列化儲存到檔案中,也就是DStream層的checkpoint。

我們在對DStream作checkpoint的目的就是希望恢複當時的RDD狀況,是以根據currentCheckpointFiles就可以知道所有運作完成的job,而根據pendingtimes就可以知道已經生成jobset,但尚未執行或者未全部執行的jobs。這樣,當從checkpoint檔案中恢複出來後,就可以恢複當時的狀況。注意的是,由于pendingtimes記錄的是jobset級别的,對于已經執行了jobset中一部分job的情況,就隻能保證at-least once邏輯。另外,前面的章節講到過,在一個jobset結束後,會處理ClearMetaData消息。其中有一項功能就是清理checkpoint資料,清理一些過期的checkpoint檔案以及記憶體資訊。

WAL 預寫日志

分為driver端和executor端。為了防止receiver接受資料後宕掉,進而有一部分資料因未來得及處理而丢失,可以通過預寫日志(WAL)将接收到的資料儲存到容錯的檔案系統中,保證資料的零丢失。同理,我們在接收到資料後會将block的元資訊上報driver,用于記錄實際接收到的資料的具體位置。在driver的WAL中,也會将這部分資訊寫入日志中,進而在driver異常重新開機後,可以從日志中恢複這些資訊,保證接收到的資料的可用性。

checkpoint讀:

1、ssc的getOrCreate方法,如果是從checkpoint檔案中讀取的,則反序列化成checkpoint對象,再初始化ssc

2、從DStream這一層的checkpoint中,根據checkpoint files擷取generateRdds

3、在初始化ReceivedBlockTracker時,如果開啟了WAL,則從日志中讀取恢複。再根據pendingtimes中的時間去重新送出jobs

以上如有錯誤,請回報給我,謝謝。。。

繼續閱讀