天天看點

Apache Spark源碼走讀(十二)Sort-based Shuffle的設計與實作

spark 1.1中對spark core的一個重大改進就是引入了sort-based shuffle處理機制,本文就該處理機制的實作進行初步的分析。

通過一個小的實驗來直覺的感受一下sort-based shuffle算法會産生哪些中間檔案,具體實驗步驟如下所述。

步驟1: 修改conf/spark-default.conf, 加入如下内容

步驟2: 運作spark-shell

 步驟3: 執行wordcount

 步驟4: 檢視生成的中間檔案

檔案查找結果如下所示

可以看到生成了兩人種字尾的檔案,分别為data和index類型,這兩者的用途在後續分析中會詳細講述。

如果我們做一下對比實驗,将shuffle模式改為hash,再來觀看生成的檔案,就會找到差別。将原先配置檔案中的sort改為hash,重新啟動spark-shell,執行相同的wordcount之後,在tmp目錄下找到的檔案清單如下。

兩者生成的檔案數量差異非常大,具體數值計算如下

在hash模式下,每一次shuffle會生成m*r的數量的檔案,如上述wordcount例子中,整個job有一次shuffle過程,由于輸入檔案預設分片為2,故m個數為2,而spark.default.parallelism配置的值為4,故r為4,是以總共生成1*2*4=8個檔案。shuffle_0_1_2解讀為shuffle+shuffle_id+map_id+reduce_id,故0_1_2表示由第0次shuffle中的第1個maptask生成的檔案,該檔案内容會被第2個reduce task消費

在sort模式下,一個map task隻生成一個檔案,而不管生成的檔案要被多少的reduce消費,故檔案個數是m的數量,由于wordcount中的預設分片為2,故隻生成兩個data檔案

剛才的示例中隻有一次shuffle過程,我們可以通過小小的改動來達到兩次shuffle,代碼如下

上述代碼将reducebykey的結果通過map進行反轉,即将原來的(w, count)轉換為(count,w),然後根據出現次數進行歸類。 groupbykey會再次導緻資料shuffle過程。

在hash模式下産生的檔案如下所示

引入一次新的shuffle,産生了大量的中間檔案

如果是使用sort,效果如何呢?隻會增加m個檔案,由于在新的shuffle過程中,map task數目為4,是以總共的檔案是2+4=6。

值得指出的是shuffle_0和shuffle_1的執行次序問題,數字越大越先執行,由于spark job送出的時候是從後往前倒推的,故0是最後将執行,而前面的先執行。

sort-based shuffle的總體指導思想是一個map task最終隻生成一個shuffle檔案,那麼後續的reduce task是如何從這一個shuffle檔案中得到自己的partition呢,這個時候就需要引入一個新的檔案類型即index檔案。

其具體實作步驟如下:

map task在讀取自己輸入的partition之後,将計算結果寫入到externalsorter

externalsorter會使用一個map來存儲新的計算結果,新的計算結果根據partiton分類,如果是有combine操作,則需要将新的值與原有的值進行合并

如果externalsorter中的map占用的記憶體已經超越了使用的閥值,則将map中的内容spill到磁盤中,每一次spill産生一個不同的檔案

當輸入partition中的所有資料都已經處理完畢之後,這時有可能一部分計算結果在記憶體中,另一部分計算結果在spill的一到多個檔案之中,這時通過merge操作将記憶體和spill檔案中的内容合并整到一個檔案裡

最後将每一個partition的在data檔案中的起始位置和結束位置寫入到index檔案

相應的源檔案

sortshufflemanager.scala

sortshufflewriter.scala

externalsorter.scala

indexshuffleblockmanager.scala

幾個重要的函數

sortshufflewriter.write

externalsorter.insertall

writepartitionedfile将記憶體中的資料和spill檔案中内容一起合并到一個檔案當中

而資料讀取過程中則需要使用indexshuffleblockmanager來擷取partiton的具體位置

<a href="http://jerryshao.me/architecture/2014/01/04/spark-shuffle-detail-investigation/" target="_blank">詳細探究spark的shuffle 實作</a>

<a href="https://issues.apache.org/jira/browse/spark-2045" target="_blank">spark-2045 sort-based shuffle implementation</a>

繼續閱讀