天天看點

Spark源碼--Dependency

注:本文基于spark-2.4.4版本源碼進行分析

首先從整體上看一下Dependency相關類的繼承關系,有一個直覺的印象:

Spark源碼--Dependency

然後從Dependency源碼開始看。如下圖:

Spark源碼--Dependency

Dependency是一個抽象類,類中的rdd成員是父RDD。

接着看NarrowDependency類:

Spark源碼--Dependency

NarrowDependency是一個抽象類。NarrowDependency覆寫了Dependency的rdd變量。其值等于NarrowDependency的主構造函數傳進來的RDD。也就是說,我們需要傳一個RDD作為主構造函數的參數,這個傳進來的RDD其實就是父RDD。

同時NarrowDependency還定義了一個

getParents(partitionId: Int): Seq[Int]

方法。此方法是用來獲得子RDD分區的所有父RDD分區。 此方法參數是子RDD的一個分區ID,傳回值是子RDD的這個分區依賴的所有父RDD分區的ID。是以傳回值是一個Seq[Int]。

接着看NarrowDependency的第一個子類:OneToOneDependency

Spark源碼--Dependency

OneToOneDependency顧名思義就是一對一依賴。也是比較簡單,像map,filter,flatMap這種類型的算子中間産生的就是OneToOneDependency。

OneToOneDependency重寫了

getParents

方法。因為子RDD分區和父RDD分區是一一對應的,是以分區id是相同的,直接傳回List(partitionId)即可。

繼續看NarrowDependency的第二個子類:RangeDependency

Spark源碼--Dependency

RangeDependency是範圍依賴,用于UnionRDD中。為了了解RangeDependency重寫的getParents方法的邏輯,我們先進入到UnionRDD源碼中看一下,看UnionRDD的

getDependencies

方法。

Spark源碼--Dependency

UnionRDD的

getDependencies

方法實作邏輯也很簡單。我們直接看96-99行對應的源碼。通過周遊傳入UnionRDD的rdds,對每一個rdd建立一個RangeDependency。參數inStart是父rdd的分區id起始值,都是從0開始。參數outStart是子rdd的分區id起始值,通過pos變量不斷累加記錄。

這樣的話,再回到RangeDependency的getParents方法的邏輯中。if語句裡面的兩個判斷語句是為了驗證傳入的分區id是否合理。如果合理,就傳回對應的父rdd分區id。舉個栗子:

Spark源碼--Dependency

假如我們想要得到子RDD3中分區id為5的分區在父RDD中的分區id。經過getParents方法可知最後傳回List(partitionId - outStart + inStart)。

partitionId等于5,outStart等于3,inStart等于0。故傳回List(2)。正是對應父RDD2中的2号分區。

最後我們來看ShuffleDependency

Spark源碼--Dependency

ShuffleDependency的構造函數傳入:父RDD,分區器,序列化器,key的排序規則,aggregator以及是否進行map-side combine。而且我們看到ShuffleDependency是直接繼承的Dependency,是以并沒有getParent方法,因為寬依賴中,子RDD的一個partition可能依賴于父RDD的多個partition,是以沒有提供getParent方法。

為了更好地了解ShuffleDependency,我們從reduceByKey算子開始看。這個算子在PairRDDFunctions類中。

Spark源碼--Dependency

這裡面其實還涉及到scala隐式轉換的知識點。當我們這樣使用時:rdd.reduceByKey。因為RDD類中沒有定義reduceByKey這種[K,V]類型的算子,是以scala會自動進行隐式轉換,從RDD的伴生對象中找到rddToPairRDDFunctions這個方法,把rdd轉換為PairRDDFunctions類型。然後調用PairRDDFunctions中的reduceByKey。

Spark源碼--Dependency

reduceByKey這個算子内部會調用combineByKeyWithClassTag這個算子。

Spark源碼--Dependency

self就是調用reduceByKey算子的rdd。withScope用來做DAG可視化的。在withScope代碼塊裡的建立的RDD,同屬于一個scope。接着跟進combineByKeyWithClassTag這個算子。

Spark源碼--Dependency

看最後的else裡面的邏輯,new了一個ShuffledRDD。跟進ShuffledRDD源碼的getDependencies源碼:

Spark源碼--Dependency

最後是傳回了

List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))

直到這個時候才出現ShuffleDependency。參數prev就是調用shuffle算子的rdd。

至此,我們大概把Dependency的源碼以及建立Dependency的邏輯了解了一下。

繼續閱讀