整理記錄一下Spark Shuffle的實作,大部分摘自Spark Shuffle原理及相關調優,可直接移步前往學習。
文章目錄
-
- 概述
- Spark Shuffle曆史節點
- Spark Shuffle的實作
-
- Hash Shuffle v1
- Hash Shuffle v2
- Sort Shuffle v1
- Unsafe Shuffle
- Sort Shuffle v2
- Spark Shuffle相關調優
- 參考
概述
Spark是基于MapReduce思想實作的計算架構。
在MapReduce中,通過shuffle來連接配接map和reduce,将map輸出作為reduce的輸入。
Spark作為MR的實作,其中也存在shuffle流程。
Spark根據RDD的寬依賴劃分stage,stage中又包含了task。每個stage中的task依賴上遊stage中task的輸出,上遊task落盤稱為shuffle寫,下遊task讀稱為shuffle讀,上遊task相當于MR的map階段,下遊task相當于MR的reduce階段。不同stage間task的讀寫構成了spark的shuffle流程。
下遊task(reduce端)會去上遊task(map端)所在節點讀取自己需要的分區資料。整個過程涉及到序列化、磁盤IO等操作。
Spark Shuffle曆史節點
- Spark 0.8及以前 Hash Based Shuffle
- Spark 0.8.1 為Hash Based Shuffle引入File Consolidation機制
- Spark 0.9 引入ExternalAppendOnlyMap
- Spark 1.1 引入Sort Based Shuffle,但預設仍為Hash Based Shuffle
- Spark 1.2 預設的Shuffle方式改為Sort Based Shuffle
- Spark 1.4 引入Tungsten-Sort Based Shuffle
- Spark 1.6 Tungsten-sort并入Sort Based Shuffle
- Spark 2.0 Hash Based Shuffle退出曆史舞台
Spark Shuffle的實作
Spark現在(2.x +)預設shuffle寫的實作方式是Sort Shuffle。源碼中Hash Shuffle的相關代碼已經沒有了。
這裡簡述一下spark有史以來shuffle寫的實作。
Hash Shuffle v1

在map階段(shuffle寫),每個map都會為下遊stage的每個partition寫一個臨時檔案。上遊有M個task,下遊有N個partition,就會在executor上生成
M*N
個檔案。另一方面,如果一個executor上有K個core,那麼executor同時可運作K個task,這樣一來,就會同時申請
K*N
個檔案描述符,一旦partition數較多,勢必會耗盡executor上的檔案描述符,同時生成
K*N
個write handler也會帶來大量記憶體的消耗。
在reduce階段(shuffle read),每個reduce task都會拉取所有map對應的那部分partition資料,那麼executor會打開所有臨時檔案準備網絡傳輸,這裡又涉及到大量檔案描述符,另外,如果reduce階段有combiner操作,那麼它會把網絡中拉到的資料儲存在一個
HashMap
中進行合并操作,如果資料量較大,很容易引發OOM操作。
缺點:建立太多的檔案;打開太多的FD;容易引發OOM。
Hash Shuffle v2
為了解決v1生成過多檔案的問題,v2将一個executor中的所有map task的相同分區檔案合并,每個executor上最多隻生成N個分區檔案。
缺點:隻解決了v1的檔案個數問題,其他問題都還在
Sort Shuffle v1
針對Hash Shuffle的弊端,在spark 1.1.0版本中引入Sort Shuffle,它參考了Hadoop MapReduce中的shuffle實作,對記錄進行排序來做shuffle,如下圖所示:
在map階段(shuffle write),會按照partition id以及key對記錄進行排序,将所有partition的資料寫在同一個檔案中,該檔案中的記錄首先是按照partition id排序一個一個分區的順序排列,每個partition内部是按照key進行排序存放,map task運作期間會順序寫每個partition的資料,并通過一個索引檔案記錄每個partition的大小和偏移量。這樣一來,每個map task一次隻開兩個檔案描述符,一個寫資料,一個寫索引,大大減輕了Hash Shuffle大量檔案描述符的問題,即使一個executor有K個core,那麼最多一次性開K*2個檔案描述符。
在reduce階段(shuffle read),reduce task拉取資料做combine時不再是采用
HashMap
,而是采用
ExternalAppendOnlyMap
,該資料結構在做combine時,如果記憶體不足,會刷寫磁盤,很大程度的保證了魯棒性,避免大資料情況下的OOM。
特點:總體上看來Sort Shuffle解決了Hash Shuffle的所有弊端,但是因為需要其shuffle過程需要對記錄進行排序,是以在性能上有所損失。
Unsafe Shuffle
從spark 1.5.0開始,spark開始了鎢絲計劃(Tungsten),目的是優化記憶體和CPU的使用,進一步提升spark的性能。為此,引入Unsafe Shuffle,它的做法是将資料記錄用二進制的方式存儲,直接在序列化的二進制資料上sort而不是在java 對象上,這樣一方面可以減少memory的使用和GC的開銷,另一方面避免shuffle過程中頻繁的序列化以及反序列化。在排序過程中,它提供cache-efficient sorter,使用一個8 bytes的指針,把排序轉化成了一個指針數組的排序,極大的優化了排序性能。
特點:使用Unsafe Shuffle有幾個限制,shuffle階段不能有aggregate操作,分區數不能超過一定大小(224−1224−1,這是可編碼的最大partition id),是以像reduceByKey這類有aggregate操作的算子是不能使用Unsafe Shuffle,它會退化采用Sort Shuffle。
Sort Shuffle v2
從spark-1.6.0開始,把Sort Shuffle和Unsafe Shuffle全部統一到Sort Shuffle中,如果檢測到滿足Unsafe Shuffle條件會自動采用Unsafe Shuffle,否則采用Sort Shuffle。從spark-2.0.0開始,spark把Hash Shuffle移除,可以說目前spark-2.0中隻有一種Shuffle,即為Sort Shuffle。
Spark Shuffle相關調優
- 減少或避免shuffle
- 調整shuffle的并行讀
- repartition、coalesce
- spark.sql.shuffle.partitions
- 更多的shuffle參數配置:https://spark.apache.org/docs/latest/configuration.html#shuffle-behavior
參考
http://sharkdtu.com/posts/spark-shuffle.html
https://spark.apache.org/docs/latest/configuration.html#shuffle-behavior