天天看點

Spark shuffle詳細過程

有許多場景下,我們需要進行跨伺服器的資料整合,比如兩個表之間,通過Id進行join操作,你必須確定所有具有相同id的資料整合到相同的塊檔案中。那麼我們先說一下mapreduce的shuffle過程。

Mapreduce的shuffle的計算過程是在executor中劃分mapper與reducer。Spark的Shuffling中有兩個重要的壓縮參數。spark.shuffle.compress true---是否将會将shuffle中outputs的過程進行壓縮。将spark.io.compression.codec編碼器設定為壓縮資料,預設是true.同時,通過spark.shuffle.manager 來設定shuffle時的排序算法,有hash,sort,tungsten-sort。(用hash會快一點,我不需要排序啊~)

Hash Shuffle

使用hash散列有很多缺點,主要是因為每個Map task都會為每個reduce生成一份檔案,是以最後就會有M * R個檔案數量。那麼如果在比較多的Map和Reduce的情況下就會出問題,輸出緩沖區的大小,系統中打開檔案的數量,建立和删除所有這些檔案的速度都會受到影響。如下圖:

Spark shuffle詳細過程

這裡有一個優化的參數spark.shuffle.consolidateFiles,預設為false,當設定成true時,會對mapper output時的檔案進行合并。如果你叢集有E個executors(“-num-excutors”)以及C個cores("-executor-cores”),以及每個task又T個CPUs(“spark.task.cpus”),那麼總共的execution的slot在叢集上的個數就是E * C / T(也就是executor個數×CORE的數量/CPU個數)個,那麼shuffle過程中所建立的檔案就為E * C / T * R(也就是executor個數 × core的個數/CPU個數×reduce個數)個。外文文獻寫的太公式化,那麼我用通俗易懂的形式闡述下。就好比總共的并行度是20(5個executor,4個task)  Map階段會将資料寫入磁盤,當它完成時,他将會以reduce的個數來生成檔案數。那麼每個executor就隻會計算core的數量/cpu個數的tasks.如果task數量大于總共叢集并行度,那麼将開啟下一輪,輪詢執行。

Spark shuffle詳細過程

速度較快,因為沒有再對中間結果進行排序,減少了reduce打開檔案時的性能消耗。

當然,當資料是經過序列化以及壓縮的。當重新讀取檔案,資料将進行解壓縮與反序列化,這裡reduce端資料的拉取有個參數spark.reducer.maxSizeInFlight(預設為48MB),它将決定每次資料從遠端的executors中拉取大小。這個拉取過程是由5個并行的request,從不同的executor中拉取過來,進而提升了fetch的效率。 如果你加大了這個參數,那麼reducers将會請求更多的文資料進來,它将提高性能,但是也會增加reduce時的記憶體開銷。

Sort Shuffle

Sort Shuffle如同hash shuffle map寫入磁盤,reduce拉取資料的一個性質,當在進行sort shuffle時,總共的reducers要小于spark.shuffle.sort.bypassMergeThrshold(預設為200),将會執行回退計劃,使用hash将資料寫入單獨的檔案中,然後将這些小檔案聚集到一個檔案中,進而加快了效率。(實作自BypassMergeSortShuffleWriter中)

那麼它的實作邏輯是在reducer端合并mappers的輸出結果。Spark在reduce端的排序是用了TimSort,它就是在reduce前,提前用算法進行了排序。  那麼用算法的思想來說,合并的M N個元素進行排序,那麼其複雜度為O(MNlogM) 具體算法不講了~要慢慢看~

随之,當你沒有足夠的記憶體儲存map的輸出結果時,在溢出前,會将它們disk到磁盤,那麼緩存到記憶體的大小便是 spark.shuffle.memoryFraction * spark.shuffle.safyFraction.預設的情況下是”JVM Heap Size * 0.2 * 0.8 = JVM Heap Size * 0.16”。需要注意的是,當你多個線程同時在一個executor中運作時(spark.executor.cores/spark.task.cpus 大于1的情況下),那麼map output的每個task将會擁有 “JVM Heap Size * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction / spark.executor.cores * spark.task.cpus。運作原理如下圖:

Spark shuffle詳細過程

使用此種模式,會比使用hashing要慢一點,可通過bypassMergeThreshold找到叢集的最快平衡點。

Tungsten Sort

使用此種排序方法的優點在于,操作的二進制資料不需要進行反序列化。它使用 sun.misc.Unsafe模式進行直接資料的複制,因為沒有反序列化,是以直接是個位元組數組。同時,它使用特殊的高效緩存器ShuffleExtemalSorter壓記錄與指針以及排序的分區id.隻用了8 Bytes的空間的排序數組。這将會比使用CPU緩存要效率。

Spark shuffle詳細過程

每個spill的資料、指針進行排序,輸出到一個索引檔案中。随後将這些partitions再次合并到一個輸出檔案中。

本文翻譯自一位國外大神的部落格:https://0x0fff.com/spark-memory-management/