天天看點

sparkRDD:第4節 RDD的依賴關系;第5節 RDD的緩存機制;第6節 DAG的生成

4.      RDD的依賴關系

6.1      RDD的依賴

RDD和它依賴的父RDD的關系有兩種不同的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。

sparkRDD:第4節 RDD的依賴關系;第5節 RDD的緩存機制;第6節 DAG的生成

6.2      窄依賴

窄依賴指的是每一個父RDD的Partition最多被子RDD的一個Partition使用

總結:窄依賴我們形象的比喻為獨生子女。窄依賴不會産生shuffle,比如說:flatMap/map/filter....

6.3      寬依賴

寬依賴指的是多個子RDD的Partition會依賴同一個父RDD的Partition

總結:寬依賴我們形象的比喻為超生。寬依賴會産生shuffle,比如說:reduceByKey/groupByKey...

6.4      Lineage(血統)

RDD隻支援粗粒度轉換,即隻記錄單個塊上執行的單個操作。将建立RDD的一系列Lineage(即血統)記錄下來,以便恢複丢失的分區。RDD的Lineage會記錄RDD的中繼資料資訊和轉換行為,當該RDD的部分分區資料丢失時,它可以根據這些資訊來重新運算和恢複丢失的資料分區。

7.  RDD的緩存

Spark速度非常快的原因之一,就是在不同操作中可以在記憶體中持久化或者緩存資料集。當持久化某個RDD後,每一個節點都将把計算分區結果儲存在記憶體中,對此RDD或衍生出的RDD進行的其他動作中重用。這使得後續的動作變得更加迅速。RDD相關的持久化和緩存,是Spark最重要的特征之一。可以說,緩存是Spark建構疊代式算法和快速互動式查詢的關鍵。

7.1 RDD緩存方式

RDD通過persist方法或cache方法可以将前面的計算結果緩存,但是并不是這兩個方法被調用時立即緩存,而是觸發後面的action時,該RDD将會被緩存在計算節點的記憶體中,并供後面重用。

rdd1.cache

rdd2.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)

cache和persist差別:

cache:預設是把資料緩存在記憶體中,其本質是調用了persist方法

eg. rdd1.cache

persist:它可以把資料緩存在磁盤中,它可以有很多豐富的緩存級别,這些緩存級别都被封裝在一個object StorageLevel

eg.  rdd3.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)

sparkRDD:第4節 RDD的依賴關系;第5節 RDD的緩存機制;第6節 DAG的生成

通過檢視源碼發現cache最終也是調用了persist方法,預設的存儲級别都是僅在記憶體存儲一份,Spark的存儲級别還有好多種,存儲級别在object StorageLevel中定義的。

sparkRDD:第4節 RDD的依賴關系;第5節 RDD的緩存機制;第6節 DAG的生成

緩存有可能丢失,或者存儲于記憶體的資料由于記憶體不足而被删除,RDD的緩存容錯機制保證了即使緩存丢失也能保證計算的正确執行。通過基于RDD的一系列轉換,丢失的資料會被重算,由于RDD的各個Partition是相對獨立的,是以隻需要計算丢失的部分即可,并不需要重算全部Partition。 

清除緩存資料:

(1)自動清除

整個應用程式結束之後,緩存中的所有資料自動清除

(2)手動清除

手動調用rdd的unpersist(true)  //參數true,表示阻塞整個程式,知道所有緩存都清除後,才執行後面的邏輯;false,表示邊清除緩存邊執行後面的邏輯。

什麼時候設定緩存:

(1)某個rdd後期被使用了多次

val rdd2=rdd1.flatMap(_.split(" "))

val rdd3=rdd1.map((_,1))

上面rdd1被使用了多次,後期可以對rdd1的結果資料進行緩存,緩存之後後面用到了它,可以直接從緩存中擷取得到。避免重新計算,浪費時間。

(2)一個rdd的結果資料計算邏輯比較複雜或者是計算時間比較長-------> 總之 它的資料來之不易

val rdd1=sc.textFile("/words.txt").flatMap(_.split(" ")).xxxx .xxxxx..............

8.  DAG的生成

8.1 什麼是DAG

DAG(Directed Acyclic Graph)叫做有向無環圖,原始的RDD通過一系列的轉換就形成了DAG,根據RDD之間依賴關系的不同将DAG劃分成不同的Stage(排程階段)。對于窄依賴,partition的轉換處理在一個Stage中完成計算。對于寬依賴,由于有Shuffle的存在,隻能在parent RDD處理完成後,才能開始接下來的計算,是以寬依賴是劃分Stage的依據。

sparkRDD:第4節 RDD的依賴關系;第5節 RDD的緩存機制;第6節 DAG的生成

9.  Spark任務排程

9.1 任務排程流程圖

sparkRDD:第4節 RDD的依賴關系;第5節 RDD的緩存機制;第6節 DAG的生成

各個RDD之間存在着依賴關系,這些依賴關系就形成有向無環圖DAG,DAGScheduler對這些依賴關系形成的DAG進行Stage劃分,劃分的規則很簡單,從後往前回溯,遇到窄依賴加入本stage,遇見寬依賴進行Stage切分。完成了Stage的劃分。DAGScheduler基于每個Stage生成TaskSet,并将TaskSet送出給TaskScheduler。TaskScheduler 負責具體的task排程,最後在Worker節點上啟動task。

任務排程的步驟詳細說明:

(1)Driver會運作用戶端main方法中的代碼,代碼就會建構SparkContext對象,在建構SparkContext對象中,會建立DAGScheduler和TaskScheduler,然後按照rdd一系列的操作生成DAG有向無環圖。最後把DAG有向無環圖送出給DAGScheduler。

(2)DAGScheduler拿到DAG有向無環圖後,按照寬依賴進行stage的劃分,這個時候會産生很多個stage,每一個stage中都有很多可以并行運作的task,把每一個stage中這些task封裝在一個taskSet集合中,最後送出給TaskScheduler。

(3)TaskScheduler拿到taskSet集合後,依次周遊每一個task,最後送出給worker節點的exectuor程序中。task就以線程的方式運作在worker節點的executor程序中。

9.2 DAGScheduler

(1)DAGScheduler對DAG有向無環圖進行Stage劃分。

(2)記錄哪個RDD或者 Stage 輸出被物化(緩存),通常在一個複雜的shuffle之後,通常物化一下(cache、persist),友善之後的計算。

(3)重新送出shuffle輸出丢失的stage(stage内部計算出錯)給TaskScheduler

(4)将 Taskset 傳給底層排程器

a)– spark-cluster TaskScheduler

b)– yarn-cluster YarnClusterScheduler

c)– yarn-client YarnClientClusterScheduler

9.3 TaskScheduler

(1)為每一個TaskSet建構一個TaskSetManager 執行個體管理這個TaskSet 的生命周期

(2)資料本地性決定每個Task最佳位置

(3)送出 taskset( 一組task) 到叢集運作并監控

(4)推測執行,碰到計算緩慢任務需要放到别的節點上重試

(5)重新送出Shuffle輸出丢失的Stage給DAGScheduler

轉載于:https://www.cnblogs.com/mediocreWorld/p/11432298.html

繼續閱讀