天天看點

spark-RDD源碼分析

RDD的核心方法:

spark-RDD源碼分析
spark-RDD源碼分析
spark-RDD源碼分析
spark-RDD源碼分析
spark-RDD源碼分析

首先看一下getPartitions方法的源碼:

spark-RDD源碼分析

getPartitions傳回的是一系列partitions的集合,即一個Partition類型的數組

我們就想進入HadoopRDD實作:

spark-RDD源碼分析

1、getJobConf():用來擷取job Configuration,擷取配置方式有clone和非clone方式,但是clone方式 是not thread-safe,預設是禁止的,非clone方式可以從cache中擷取,如cache中沒有那就建立一個新的,然後再放到cache中

2、進入 getInputFormcat(jobConf)方法:

spark-RDD源碼分析

3、進入inputFormat.getSplits(jobConf, minPartitions)方法:

進入FileInputFormcat類的getSplits方法:

spark-RDD源碼分析
spark-RDD源碼分析
spark-RDD源碼分析

5、進入HadoopPartition:

spark-RDD源碼分析

而getDependencies表達式RDD之間的依賴關系,如下所示:

spark-RDD源碼分析

getDependencies傳回的是依賴關系的一個Seq集合,裡面的Dependency數組中的下劃線是類型的PlaceHolder

我們進入ShuffledRDD類中的getDependencies方法:

spark-RDD源碼分析

我們進入ShuffleDependency類:

spark-RDD源碼分析

每個RDD都會具有計算的函數,如下所示:

spark-RDD源碼分析

我們進入HadoopMapPartitionsWithSplitRDD的 compute方法:

spark-RDD源碼分析

Compute方法是針對RDD的每個Partition進行計算的,其TaskContext參數的源碼如下:

spark-RDD源碼分析

getPreferredLocations是尋找Partition的首選位置:

spark-RDD源碼分析

我們進入NewHadoopRDD的getPreferredLocations:

spark-RDD源碼分析

其實RDD還有一個可選的分區政策:

Partitioner的源碼如下:

spark-RDD源碼分析
spark-RDD源碼分析
spark-RDD源碼分析
spark-RDD源碼分析

可以看出預設使用的是HashPartitioner,要注意key為Array的情況;

spark.default.parallelism必須要設定,否則會根據partitions資料來傳輸RDD,這樣也會很容易出現OOM

歡迎關注微信公衆号:大資料從業者

繼續閱讀