天天看點

Spark Shuffle内幕

一、到底什麼是shuffle(洗牌)

需要Shuffle的關鍵性原因是某種具有共同特征的資料需要最終彙聚到一個計算節點上進行計算。shuffle會将中間結果存儲在本地,然後下個stage需要資料再去向這個節點要資料(此時如果是不同節點會涉及到網絡傳輸)。

二、Shuffle可能面臨的問題?運作task的時候才會産生shuffle(shuffle已經融化在spark的算子中了)

1.資料量非常大;(幾千台機器進行shuffle,網絡傳輸比較大)

2.資料如何分類,即如何Partition:hash,sort,鎢絲計算(tungsten);

3.負載均衡(資料傾斜);(不同方式進行shuffle)

4.網絡傳輸效率,需要在壓縮和解壓縮(CPU)之間做出權衡,序列化和反序列化也是要考慮的問題;

說明:具體的task進行計算的時候盡一切最大可能使得資料具備process locality的特性(記憶體中計算);退而求其次是增加資料分片,減少每個task處理的資料量(任務批次會多,即使任務更多,排隊也會快)。

讀磁盤比讀記憶體風險大,記憶體中直接算可能比從磁盤讀中間結果要快,不持久化中間結果,資料丢失,重新計算資料依賴的RDD,一個Stage内部,鼓勵這麼做,但如果産生shuffle,網絡通信,需要持久化,容錯角度,shuffle容錯需要持久化,需要父stage所有算完,才能進行下一步,如果出錯,需要重新計算父stage,代價比較大,是以Stage計算,遇見shuffle,一定要持久化,local filesystem,techoyon中,

三、 Hash shuffle

1.key不能是array

2.hash shuffle不需要排序,此時從理論上講就節省了hadoop mapreduce中進行shuffle需要排序時候的時間浪費,因為實際環境有大量的不需要排序的shuffle類型;

思考:不需要排序的hash shuffle是否一定比需要排序的sorted shuffle速度更快?

不一定!如果資料規模比較小的情況下,hash shuffle會比sorted shuffle速度快(很多)!但是如果資料量大(hash shuffle适合椎小規模的資料,因為spark之前版本使用hash shuffle的,後來出現sorted shuffle,适合處理大規模資料,hash處理不了,hash方式:key,句柄,小檔案,磁盤、記憶體會成瓶頸),此時sorted shuffle(節省記憶體,節省磁盤通路,利于更大規模的資料)一般都會比hash shuffle快(很多)。

3.每個ShuffleMapTask會根據key的hash值計算出目前的key需要寫入的partition,然後把決定後的結果寫入單獨的檔案,此時會導緻每個task會産生R(Reducer的個數,指下一個stage的并行度)個檔案,如果目前的stage中有M個shufflemaptask,則會M*R個檔案!!!

上個Stage的資訊會注冊給driver,下個Stage會從driver中(通過網絡)讀取資料,但是注意:shuffle操作絕大多數情況下都要通過網絡,如果mapper和reducer在同一台機器上,此時隻需要讀取本地磁盤即可。

除了最後一個Stage外的Stage,Task的類型是ShuffleMapTask

Task在執行過程中,假設下一個Stage有三個并行,如果有兩個shufflemaptask,就會産生六個檔案,但是數量越大,就越大

讀資料需要檔案句柄讀資料

Spark Shuffle内幕

Hash shuffle的兩大死穴:第一:shuffle前會産生海量的小檔案于硬碟之上,此時會産生大量耗時低效IO操作;第二:記憶體不夠用!!!由于記憶體中需要儲存海量的檔案操作句柄和臨時緩存資訊,如果資料處理規模比較龐大的話,記憶體不可承受,出現oom等問題。

三:Sorted Shuffle:

為了改善上述問題(同時打開過多檔案導緻writer handler記憶體使用過大以及産生過度檔案),spark後來推出了Consalidate機制,來把小檔案合并,此時shuffle檔案産生的數量為cores*R,對于shufflemaptask的數量明顯多于同時可用的并行cores的數量的情況下,shuffle産生的檔案會大幅度減少,會極大降低oom的可能;

為此spark推出shuffle pluggable開發架構,友善系統更新的時候定制shuffle功能子產品,也友善第三方系統改造人員根據實際業務場景來開放具體最佳的shuffle子產品;核心接口shuffleManager,具體預設實作有HashShuffleManager, SortedShuffleManager等,Spark 1.6.0中具體的配置如下:

val shortShuffleMgrNames = Map(

“hash”-> “org.apache.spark.shuffle.hash.HashShuffleManager”,

“sort”-> “org.apache.spark.shuffle.sort.SortShuffleManager”,

“tungsten-sort”-> “org.apache.spark.shuffle.sort.SortShuffleManager”)

預設是sort

首先shuffle map不會,所有結果寫到一個檔案中,還有索引,避免産生大量檔案,磁盤檔案變小,索引,節省記憶體,減少gc分享頻率,減少同時寫多個檔案給系統帶來的壓力,歸并排序

繼續閱讀