RDD的核心方法:

首先看一下getPartitions方法的源碼:
getPartitions傳回的是一系列partitions的集合,即一個Partition類型的數組
我們就想進入HadoopRDD實作:
1、getJobConf():用來擷取job Configuration,擷取配置方式有clone和非clone方式,但是clone方式 是not thread-safe,預設是禁止的,非clone方式可以從cache中擷取,如cache中沒有那就建立一個新的,然後再放到cache中
2、進入 getInputFormcat(jobConf)方法:
3、進入inputFormat.getSplits(jobConf, minPartitions)方法:
進入FileInputFormcat類的getSplits方法:
5、進入HadoopPartition:
而getDependencies表達式RDD之間的依賴關系,如下所示:
getDependencies傳回的是依賴關系的一個Seq集合,裡面的Dependency數組中的下劃線是類型的PlaceHolder
我們進入ShuffledRDD類中的getDependencies方法:
我們進入ShuffleDependency類:
每個RDD都會具有計算的函數,如下所示:
我們進入HadoopMapPartitionsWithSplitRDD的 compute方法:
Compute方法是針對RDD的每個Partition進行計算的,其TaskContext參數的源碼如下:
getPreferredLocations是尋找Partition的首選位置:
我們進入NewHadoopRDD的getPreferredLocations:
其實RDD還有一個可選的分區政策:
Partitioner的源碼如下:
可以看出預設使用的是HashPartitioner,要注意key為Array的情況;
spark.default.parallelism必須要設定,否則會根據partitions資料來傳輸RDD,這樣也會很容易出現OOM
歡迎關注微信公衆号:大資料從業者