天天看點

Spark技術内幕:Sort Based Shuffle實作解析

在spark 1.2.0中,spark core的一個重要的更新就是将預設的hash based shuffle換成了sort based shuffle,即spark.shuffle.manager 從hash換成了sort,對應的實作類分别是org.apache.spark.shuffle.hash.hashshufflemanager和org.apache.spark.shuffle.sort.sortshufflemanager。

這個方式的選擇是在org.apache.spark.sparkenv完成的:

那麼sort basedshuffle“取代”hash basedshuffle作為預設選項的原因是什麼?

正如前面提到的,hashbased shuffle的每個mapper都需要為每個reducer寫一個檔案,供reducer讀取,即需要産生m*r個數量的檔案,如果mapper和reducer的數量比較大,産生的檔案數會非常多。hash based shuffle設計的目标之一就是避免不需要的排序(hadoop map reduce被人诟病的地方,很多不需要sort的地方的sort導緻了不必要的開銷)。但是它在處理超大規模資料集的時候,産生了大量的diskio和記憶體的消耗,這無疑很影響性能。hash based shuffle也在不斷的優化中,正如前面講到的spark 0.8.1引入的file consolidation在一定程度上解決了這個問題。為了更好的解決這個問題,spark 1.1 引入了sort based shuffle。首先,每個shuffle map task不會為每個reducer生成一個單獨的檔案;相反,它會将所有的結果寫到一個檔案裡,同時會生成一個index檔案,reducer可以通過這個index檔案取得它需要處理的資料。避免産生大量的檔案的直接收益就是節省了記憶體的使用和順序disk io帶來的低延時。節省記憶體的使用可以減少gc的風險和頻率。而減少檔案的數量可以避免同時寫多個檔案對系統帶來的壓力。

并且從作者reynoldxin的幾乎所有的測試來看,sortbased shuffle在速度和記憶體使用方面優于hashbased shuffle:“sort-basedshuffle has lower memory usage and seems to outperformhash-based in almost allof our testing.”

性能資料:from:https://issues.apache.org/jira/browse/spark-3280

Spark技術内幕:Sort Based Shuffle實作解析
Spark技術内幕:Sort Based Shuffle實作解析

shuffle map task會按照key相對應的partition id進行sort,其中屬于同一個partition的key不會sort。因為對于不需要sort的操作來說,這個sort是負收益的;要知道之前spark剛開始使用hash based的shuffle而不是sort based就是為了避免hadoop map reduce對于所有計算都會sort的性能損耗。對于那些需要sort的運算,比如sortbykey,這個sort在spark 1.2.0裡還是由reducer完成的。

如果這個過程記憶體不夠用了,那麼這些已經sort的内容會被spill到外部存儲。然後在結束的時候将這些不同的檔案進行merge sort。

為了便于下遊的taskfetch到其需要的partition,這裡會生成一個index檔案,去記錄不同的partition的位置資訊。當然了org.apache.spark.storage.blockmanager需要也有響應的實作以實作這種新的尋址方式。

Spark技術内幕:Sort Based Shuffle實作解析

核心實作的邏輯都在類org.apache.spark.shuffle.sort.sortshufflewriter。下面簡要分析一下它的實作:

1)          對于每個partition,建立一個scala.array存儲它所包含的key,value對。每個待處理的key,value對都會插入相應的scala.array。

2)          如果scala.array的大小超過門檻值,那麼需要将這個in memory的資料spill到外部存儲。這個檔案的開始部分會記錄這個partition的id,這個檔案儲存了多少個pair等資訊。

3)          最後需要将所有spill到外部存儲的檔案進行mergesort。同時打開的檔案不能過多,過多的話會消耗大量的記憶體,增加oom或者gc的風險;也不能過少,過少的話就會影響性能,增大計算的延時。一般的話推薦每次同時打開10 – 100個檔案。

4)          在生成最後的資料檔案時,需要同時生成index索引檔案。正如前面提到的,這個索引檔案将記錄不同partition的range。

當然了,你可能還有個疑問,就是hash based shuffle說白了就是根據key需要寫入的org.apache.spark.hashpartitioner,為每個reducer寫入單獨的partition。隻不過對于同一個core啟動的shuffle map task,如果選擇spark.shuffle.consolidatefiles的話,第二個shuffle map task會把結果append到上一個檔案中去。那麼sort的邏輯是完全可以整合到hash based shuffle中去,為什麼又要重新實作一種shuffle writer呢?我認為有以下幾點:

shuffle機制是所有類似計算子產品的核心機制之一,要進行大的優化的風險非常高;比如一個看似簡單的consolidation機制,在0.8.1就引入了,但是到1.2.0還是沒有作為預設選項。 hash based shuffle如果修改為sort的邏輯,所謂的改進可能會影響原來已經穩定的spark應用。比如一個應用在使用hash based shuffle性能是完全符合預期的,那麼遷移到spark 1.2.0後,隻需要将配置檔案修改以下就可以完成這個無縫的遷移。 作為一個通用的計算平台,你的測試的case永遠cover不了所有的場景。那麼,還是留給使用者去選擇吧。 sort的機制還處理不斷完善的階段。比如很有的優化或者功能的改進會不斷的完善。是以,期待sort在以後的版本中更加完善吧。

如果您喜歡 本文,那麼請動一下手指支援以下部落格之星的評比吧。非常感謝您的投票。每天可以一票哦。

點我投票