天天看點

Spark RDD詳解 —— RDD特性、lineage、緩存、checkpoint、依賴關系

RDD(Resilient Distributed Datasets)彈性的分布式資料集,又稱Spark core,它代表一個隻讀的、不可變、可分區,裡面的元素可分布式并行計算的資料集。

RDD是一個很抽象的概念,不易于了解,但是要想學好Spark,必須要掌握RDD,熟悉它的程式設計模型,這是學習Spark其他元件的基礎。筆者在這裡從名字和幾個重要的概念給大家一一解讀:

Resilient(彈性的)

提到大資料必提分布式,而在大規模的分布式叢集中,任何一台伺服器随時都有可能出現故障,如果一個task任務所在的伺服器出現故障,必然導緻這個task執行失敗。此時,RDD的"彈性的"特點可以使這個task在叢集内進行遷移,進而保證整體任務對故障伺服器的平穩過渡。對于整個任務而言,隻需重跑某些失敗的task即可,而無需完全重跑,大大提高性能

Distributed(分布式)

首先了解一下分區,即資料根據一定的切分規則切分成一個個的子集。spark中分區劃分規則預設是根據key進行哈希取模,切分後的資料子集可以獨立運作在各個task中并且在各個叢集伺服器中并行執行。當然使用者也可以自定義分區規則,這個還是很有應用場景的,比如自定義分區打散某個key特别多的資料集以避免資料傾斜(資料傾斜是大資料領域常見問題也是調優重點,後續會單獨講解)

Datasets(資料集)

初學者很容易誤解,認為RDD是存儲資料的,畢竟從名字看來它是一個"彈性的分布式資料集"。但是,筆者強調,RDD并不存儲資料,它隻記錄資料存儲的位置。内部處理邏輯是通過使用者調用不同的Spark算子,一個RDD會轉換為另一個RDD(這也展現了RDD隻讀不可變的特點,即一個RDD隻能由另一個RDD轉換而來),以transformation算子為例,RDD彼此之間會形成pipeline管道,無需等到上一個RDD所有資料處理邏輯執行完就可以立即交給下一個RDD進行處理,性能也得到了很大提升。但是RDD在進行transform時,不是每處理一條資料就交給下一個RDD,而是使用小批量的方式進行傳遞(這也是一個優化點)

lineage

既然Spark将RDD之間以pipeline的管道連接配接起來,如何避免在伺服器出現故障後,重算這些資料呢?這些失敗的RDD由哪來呢?這就牽涉到,Spark中的一個很重要的概念:Lineage即血統關系。它會記錄RDD的中繼資料資訊和依賴關系,當該RDD的部分分區資料丢失時,可以根據這些資訊來重新運算和恢複丢失的分區資料。簡單而言就是它會記錄哪些RDD是怎麼産生的、怎麼“丢失”的等,然後Spark會根據lineage記錄的資訊,恢複丢失的資料子集,這也是保證Spark RDD彈性的關鍵點之一

Spark緩存和checkpoint

緩存(cache/persist)

cache和persist其實是RDD的兩個API,并且cache底層調用的就是persist,差別之一就在于cache不能顯示指定緩存方式,隻能緩存在記憶體中,但是persist可以通過指定緩存方式,比如顯示指定緩存在記憶體中、記憶體和磁盤并且序列化等。通過RDD的緩存,後續可以對此RDD或者是基于此RDD衍生出的其他的RDD進行中重用這些緩存的資料集

容錯(checkpoint)

本質上是将RDD寫入磁盤做檢查點(通常是checkpoint到HDFS上,同時利用了hdfs的高可用、高可靠等特征)。上面提到了Spark lineage,但在實際的生産環境中,一個業務需求可能非常非常複雜,那麼就可能會調用很多算子,産生了很多RDD,那麼RDD之間的linage鍊條就會很長,一旦某個環節出現問題,容錯的成本會非常高。此時,checkpoint的作用就展現出來了。使用者可以将重要的RDD checkpoint下來,出錯後,隻需從最近的checkpoint開始重新運算即可使用方式也很簡單,指定checkpoint的位址[SparkContext.setCheckpointDir("checkpoint的位址")],然後調用RDD的checkpoint的方法即可。

checkpoint與cache/persist對比

1、都是lazy操作,隻有action算子觸發後才會真正進行緩存或checkpoint操作(懶加載操作是Spark任務很重要的一個特性,不僅适用于Spark RDD還适用于Spark sql等元件)

2、cache隻是緩存資料,但不改變lineage。通常存于記憶體,丢失資料可能性更大

3、改變原有lineage,生成新的CheckpointRDD。通常存于hdfs,高可用且更可靠

RDD的依賴關系

Spark中使用DAG(有向無環圖)來描述RDD之間的依賴關系,根據依賴關系的不同,劃分為寬依賴和窄依賴

Spark RDD詳解 —— RDD特性、lineage、緩存、checkpoint、依賴關系

通過上圖,可以很容易得出所謂寬依賴:多個子RDD的partition會依賴同一個parentRDD的partition;窄依賴:每個parentRDD的partition最多被子RDD的一個partition使用。這兩個概念很重要,像寬依賴是劃分stage的關鍵,并且一般都會伴有shuffle,而窄依賴之間其實就形成前文所述的pipeline管道進行處理資料。(圖中的map、filter等是Spark提供的算子,具體含義大家可以自行到Spark官網了解,順便感受一下scala函數式程式設計語言的強大)。

Spark任務以及stage等的具體劃分,牽涉到源碼,後續會單獨講解

最後筆者以RDD源碼中的注釋,闡述一下RDD的屬性:

1.分區清單(資料塊清單,隻儲存資料位置,不儲存具體位址)

2.計算每個分片的函數(根據父RDD計算出子RDD)

3.RDD的依賴清單

4.RDD預設是存儲于記憶體,但當記憶體不足時,會spill到

disk(可通過設定StorageLevel來控制)

5.預設hash分區,可自定義分區器

6.每一個分片的優先計算位置(preferred locations)清單,比如HDFS的block的所在位置應該是優先計算的位置