天天看點

【Spark】Spark的Shuffle機制

在mapreduce架構中,shuffle是連接配接map和reduce之間的橋梁,map的輸出要用到reduce中必須經過shuffle這個環節,shuffle的性能高低直接影響了整個程式的性能和吞吐量。

shuffle是mapreduce架構中的一個特定的phase,介于map phase和reduce phase之間,當map的輸出結果要被reduce使用時。輸出結果須要按key哈希。而且分發到每個reducer上去。這個過程就是shuffle。因為shuffle涉及到了磁盤的讀寫和網絡的傳輸,是以shuffle性能的高低直接影響到了整個程式的執行效率。

下圖描寫叙述了mapreduce算法的整個流程,當中shuffle phase是介于map phase和reduce phase之間:

【Spark】Spark的Shuffle機制

在hadoop, 在mapper端每次當memory buffer中的資料快滿的時候, 先将memory中的資料, 按partition進行劃分, 然後各自存成小檔案, 這樣當buffer不斷的spill的時候, 就會産生大量的小檔案。

是以hadoop後面直到reduce之前做的全部的事情事實上就是不斷的merge, 基于檔案的多路并歸排序,在map端的将同樣partition的merge到一起, 在reduce端, 把從mapper端copy來的資料檔案進行merge, 以用于終于的reduce

多路歸并排序, 達到兩個目的。

merge, 把同樣key的value都放到一個arraylist裡面;sort, 終于的結果是按key排序的。

這個方法擴充性非常好, 面對大資料也沒有問題, 當然問題在效率, 畢竟須要多次進行基于檔案的多路歸并排序,多輪的和磁盤進行資料讀寫。

【Spark】Spark的Shuffle機制

spark中的shuffle是把一組無規則的資料盡量轉換成一組具有一定規則的資料。

spark計算模型是在分布式的環境下計算的。這就不可能在單程序空間中容納全部的計算資料來進行計算。這樣資料就依照key進行分區。配置設定成一塊一塊的小分區,打散分布在叢集的各個程序的記憶體空間中,并非全部計算算子都滿足于依照一種方式分區進行計算。

當須要對資料進行排序存儲時。就有了又一次依照一定的規則對資料又一次分區的必要。shuffle就是包裹在各種須要重分區的算子之下的一個對資料進行又一次組合的過程。

在邏輯上還能夠這樣了解:因為又一次分區須要知道分區規則。而分區規則依照資料的key通過映射函數(hash或者range等)進行劃分,由資料确定出key的過程就是map過程,同一時候map過程也能夠做資料處理。比如,在join算法中有一個非常經典的算法叫map side join,就是确定資料該放到哪個分區的邏輯定義階段。shuffle将資料進行收集配置設定到指定reduce分區,reduce階段依據函數對對應的分區做reduce所需的函數處理。

【Spark】Spark的Shuffle機制

* 首先每個mapper會依據reducer的數量建立出對應的bucket,bucket的數量是m×r,當中m是map的個數,r是reduce的個數。

* 其次mapper産生的結果會依據設定的partition算法填充到每個bucket中去。

這裡的partition算法是能夠自己定義的,當然預設的算法是依據key哈希到不同的bucket中去。

* 當reducer啟動時,它會依據自己task的id和所依賴的mapper的id從遠端或是本地的block manager中取得對應的bucket作為reducer的輸入進行處理。

這裡的bucket是一個抽象概念,在實作中每個bucket能夠對應一個檔案。能夠對應檔案的一部分或是其它等。

繼續閱讀