天天看點

Spark源碼分析 – Shuffle

hadoop

hadoop的思路是, 在mapper端每次當memory buffer中的資料快滿的時候, 先将memory中的資料, 按partition進行劃分, 然後各自存成小檔案, 這樣當buffer不斷的spill的時候, 就會産生大量的小檔案 

是以hadoop後面直到reduce之前做的所有的事情其實就是不斷的merge, 基于檔案的多路并歸排序, 在map端的将相同partition的merge到一起, 在reduce端, 把從mapper端copy來的資料檔案進行merge, 以用于最終的reduce 

多路并歸排序, 達到兩個目的 

merge, 把相同key的value都放到一個arraylist裡面 

sort, 最終的結果是按key排序的 

這個方案擴充性很好, 面對大資料也沒有問題, 當然問題在效率, 畢竟需要多次進行基于檔案的多路并歸排序, 多輪的和磁盤進行資料讀寫……

Spark源碼分析 – Shuffle

spark

spark的優勢在于效率, 是以沒有做merge sort, 這樣省去多次磁盤讀寫

當然這樣會有擴充性問題, 很難兩全,

因為不能後面再merge, 是以在寫的時候, 需要同時打開corenum * bucketnum個檔案, 寫完才能關閉

并且在reduce的時候, 由于之前沒有做merge, 是以必須在記憶體裡面維護所有key的hashmap, 實時的merge和reduce, 詳細參考下面

如何将shuffle資料寫入block, 關鍵看shufflemaptask中的邏輯

可用看到使用shuffleblockmanager, spark從0.8開始将shuffleblockmanager從普通的blockmanager中分離出來, 便于優化

shufflemaptask

shuffleblockmanager

shuffleblockmanager的核心函數就是forshuffle, 這個函數傳回shuffleblocks對象

shuffleblocks對象的函數acquirewriters, 傳回shufflewritergroup, 其中封裝所有partition所對應的blockobjectwriter

這裡的問題是, 

由于spark的排程是基于task的, task其實對應于partition

如果有m個partitions, 而需要shuffle到n個partition上, 其實就是m個mapper task和n個reducer task

當然在spark中不可能所有的mapper task一起運作, task的并行度取決于core number

1. 如果每個mapper task都要産生n個files, 那麼最終産生的檔案數就是n*m, 檔案數過多...

在spark 0.8.1中已經優化成使用shuffle consolidation, 即多個mapper task公用一個bucket檔案, 怎麼公用?

取決于并行度, 因為并行的task是無法公用一個bucket檔案的, 是以至少會産生corenum * bucketnum個檔案, 而後面被執行的task就可以重用前面建立的bucketfile, 而不用重新建立

2. 在打開檔案寫的時候, 每個檔案的write handler預設需要100kb記憶體緩存, 是以同時需要corenum * bucketnum * 100kb大小的記憶體消耗, 這個問題還沒有得到解決

其實就是說spark在shuffle的時候碰到了擴充性問題, 這個問題為什麼hadoop沒有碰到?

因為hadoop可用容忍多次的磁盤讀寫, 多次的檔案merge, 是以它可以在每次從buffer spill的時候, 把内容寫到一個新的檔案中, 然後後面再去做檔案merge  

pairrddfunctions.combinebykey

關鍵的一點是, 在reduce端的進行中 (可以看沒有mapsidecombine的部分, 更清晰一些)

mappartitions其實是使用的mappartitionsrdd, 即對于每個item調用aggregator.combinevaluesbykey

可以看到這裡和hadoop最大的不同是, hadoop在reduce時得到的是一個key已經merge好的集合, 是以一次性reduce處理完後, 就可以直接存掉了

 shuffledrdd

shufflefetcher

從mapoutputtracker查詢到(根據shuffleid, reduceid)需要讀取的shuffle partition的位址

然後從blockmanager擷取所有這寫block的fetcher的iterator

前面有個問題沒有說清楚, 當shuffle完成後, reducer端的task怎麼知道應該從哪裡擷取目前partition所需要的所有shuffled blocks

在hadoop中是通過jobtracker, mapper會通過hb告訴jobtracker執行的狀況, reducer不斷的去詢問jobtracker, 并知道需要copy哪些hdfs檔案 

而在spark中就通過将shuffle資訊注冊到mapoutputtracker

mapoutputtracker

首先每個節點都可能需要查詢shuffle資訊, 是以需要mapoutputtrackeractor用于通信

參考sparkcontext中的邏輯, 隻有在master上才建立actor對象, 其他slaver上隻是建立actor ref

注意, 隻有master上的mapoutputtracker會有所有的最新shuffle資訊

但是對于slave, 出于效率考慮, 也會buffer從master得到的shuffle資訊, 是以getserverstatuses中會先在local的mapstatuses取資料, 如果沒有, 再取remote的master上擷取

注冊

注冊工作都是在master上的dagscheduler完成的

spark中是以shuffleid來辨別每個shuffle, 不同于hadoop, 一個job中可能有多個shuffle過程, 是以無法通過jobid

分兩步來注冊, 

1. 在new stage的時候, 需要注冊shuffleid, 由于new stage一定是由于遇到shuffledep

2. 在handle taskcompletion事件的時候, 當一個shufflemaptask完成的時候, 即mapoutput産生的時候, 就可以注冊mapstatus(blockmanagerid, compressedsizes)

通過blockmanagerid+partitionid+reduceid就可以知道blockid, 進而讀到資料

繼續閱讀