天天看點

Spark-Caching /Checkpointing

cacheing和checkpointing這2種操作是都是用來防止rdd(彈性分布式資料集)每次被引用時被重複計算帶來的時間和空間上不必要的損失。

Caching

cache 機制保證了需要通路重複資料的應用(如疊代型算法和互動式應用)可以運作的更快。有多種級别的持久化政策讓開發者選擇,使開發者能夠對空間和計算成本進行權衡,同時能指定out of memory時對rdd的操作(緩存在記憶體或者磁盤,并且可以指定在記憶體不夠的情況下按照FIFO的政策選取一部分block交換到磁盤來産生空餘空間)。是以Spark不但可以對rdd重複計算還能在節點發生故障時重新計算丢失的分區。最後,被緩存的rdd存在于一個running的應用的生命周期内,如果這個應用終止了,那麼緩存的rdd也會同時被删除。

Checkpointing

checkpointing把rdd存儲到一個可靠的存儲系統(例如HDFS,S3)。checkpoint一個rdd有點類似于Hadoop中把中間計算結果存儲到磁盤,損失部分執行性能來獲得更好的從運作過程中出現failures時recover的能力。因為rdd是checkpoint在外部的存儲系統(磁盤,HDFS,S3等),是以checkpoint過的rdd能夠被其他的應用重用。

由rdd的計算路徑來了解caching和checkpointing的互相作用。 Spark engine的核心是DAGScheduler。它把一個spark job分解成由若幹個stages組成的DAG。每一個shuffle或者result stage再分解成一個個在RDD的分區中獨立運作的task。一個RDD的iterator方法是一個task通路基礎資料分區的入口:

如果設定了存儲級别,表明rdd可能被緩存,它首先嘗試調用getOrCompute方法從block manager中得到分區。

=computeOrReadCheckpoint這個方法會從checkpoint中尋找對應的資料,如果rdd沒有被checkpoint,那麼就從目前計算的分區開始計算。

cache 機制是每計算出一個要 cache 的 partition 就直接将其 cache 到記憶體了。但是checkpoint 沒有使用這種第一次計算得到就存儲的方法,而是等到 job 結束後另外啟動專門的 job 去完成 checkpoint 。也就是說需要 checkpoint 的 RDD 會被計算兩次。

是以,在使用 rdd.checkpoint() 的時候,建議加上 rdd.cache(),這樣第二次運作的 job 就不用再去計算該 rdd 了,直接讀取 cache 寫磁盤。其實 Spark 提供了 rdd.persist(StorageLevel.DISK_ONLY) 這樣的方法,相當于 cache 到磁盤上,這樣可以做到 rdd 第一次被計算得到時就存儲到磁盤上,但這個 persist 和 checkpoint 有很多不同。前者雖然可以将 RDD 的 partition 持久化到磁盤,但該 partition 由 blockManager 管理。

一旦 driver program 執行結束,也就是 executor 所在程序 CoarseGrainedExecutorBackend stop,blockManager 也會 stop,被 cache 到磁盤上的 RDD 也會被清空(整個 blockManager 使用的 local 檔案夾被删除)。

而 checkpoint 将 RDD 持久化到 HDFS 或本地檔案夾,如果不被手動 remove 掉,是一直存在的,也就是說可以被下一個 driver program 使用,而 cached RDD 不能被其他 dirver program 使用。

使用checkpoint*會消耗更多的時間在rdd的讀寫*上(因為要使用外部存儲系統HDFS,S3,或者磁盤),但是Spark worker的一些failures不一定導緻重新計算。

另一方面,caching的rdd 不會永久占用存儲空間,但是重新計算在Spark worker出現一些failures的時候是必要的。

綜上,這2個都是取決于開發者自己的角度結合業務場景來使用,一般情況下,綜合計算任務的性能來進行2者的選擇(大部分情況用cache就夠了,如果感覺 job 可能會出錯可以手動去 checkpoint 一些 critical 的 RDD)。

繼續閱讀