天天看點

Spark之shuffle機制及原理

一 概述

Shuffle就是對資料進行重組,由于分布式計算的特性和要求,在實作細節上更加繁瑣和複雜

在MapReduce架構,Shuffle是連接配接Map和Reduce之間的橋梁,Map階段通過shuffle讀取資料并輸出到對應的Reduce;而Reduce階段負責從Map端拉取資料并進行計算。在整個shuffle過程中,往往伴随着大量的磁盤和網絡I/O。是以shuffle性能的高低也直接決定了整個程式的性能高低。Spark也會有自己的shuffle實作過程

Spark之shuffle機制及原理
Spark之shuffle機制及原理

在DAG排程的過程中,Stage階段的劃分是根據是否有shuffle過程,也就是存在ShuffleDependency寬依賴的時候,需要進行shuffle,這時候會将作業job劃分成多個Stage;并且在劃分Stage的時候,建構ShuffleDependency的時候進行shuffle注冊,擷取後續資料讀取所需要的ShuffleHandle,最終每一個job送出後都會生成一個ResultStage和若幹個ShuffleMapStage,其中ResultStage表示生成作業的最終結果所在的Stage. ResultStage與ShuffleMapStage中的task分别對應着ResultTask與ShuffleMapTask。一個作業,除了最終的ResultStage外,其他若幹ShuffleMapStage中各個ShuffleMapTask都需要将最終的資料根據相應的Partitioner對資料進行分組,然後持久化分區的資料。

一 HashShuffle機制

1.1 HashShuffle概述

在spark-1.6版本之前,一直使用HashShuffle,在spark-1.6版本之後使用Sort-Base Shuffle,因為HashShuffle存在的不足是以就替換了HashShuffle.

我們知道,Spark的運作主要分為2部分:一部分是驅動程式,其核心是SparkContext;另一部分是Worker節點上Task,它是運作實際任務的。程式運作的時候,Driver和Executor程序互相互動:運作什麼任務,即Driver會配置設定Task到Executor,Driver 跟 Executor 進行網絡傳輸; 任務資料從哪兒擷取,即Task要從 Driver 抓取其他上遊的 Task 的資料結果,是以有這個過程中就不斷的産生網絡結果。其中,下一個 Stage 向上一個 Stage 要資料這個過程,我們就稱之為 Shuffle。

1.2 沒有優化之前的HashShuffle機制

Spark之shuffle機制及原理

在HashShuffle沒有優化之前,每一個ShufflleMapTask會為每一個ReduceTask建立一個bucket緩存,并且會為每一個bucket建立一個檔案。這個bucket存放的資料就是經過Partitioner操作(預設是HashPartitioner)之後找到對應的bucket然後放進去,最後将資料

重新整理bucket緩存的資料到磁盤上,即對應的block file.

然後ShuffleMapTask将輸出作為MapStatus發送到DAGScheduler的MapOutputTrackerMaster,每一個MapStatus包含了每一個ResultTask要拉取的資料的位置和大小

ResultTask然後去利用BlockStoreShuffleFetcher向MapOutputTrackerMaster擷取MapStatus,看哪一份資料是屬于自己的,然後底層通過BlockManager将資料拉取過來

拉取過來的資料會組成一個内部的ShuffleRDD,優先放入記憶體,記憶體不夠用則放入磁盤,然後ResulTask開始進行聚合,最後生成我們希望擷取的那個MapPartitionRDD

缺點:

如上圖所示:在這裡有1個worker,2個executor,每一個executor運作2個ShuffleMapTask,有三個ReduceTask,是以總共就有4 * 3=12個bucket和12個block file。

# 如果資料量較大,将會生成M*R個小檔案,比如ShuffleMapTask有100個,ResultTask有100個,這就會産生100*100=10000個小檔案

# bucket緩存很重要,需要将ShuffleMapTask所有資料都寫入bucket,才會刷到磁盤,那麼如果Map端資料過多,這就很容易造成記憶體溢出,盡管後面有優化,bucket寫入的資料達到重新整理到磁盤的閥值之後,就會将資料一點一點的重新整理到磁盤,但是這樣磁盤I/O就多了

1.3 優化後的HashShuffle

Spark之shuffle機制及原理

每一個Executor程序根據核數,決定Task的并發數量,比如executor核數是2,就是可以并發運作兩個task,如果是一個則隻能運作一個task

假設executor核數是1,ShuffleMapTask數量是M,那麼它依然會根據ResultTask的數量R,建立R個bucket緩存,然後對key進行hash,資料進入不同的bucket中,每一個bucket對應着一個block file,用于重新整理bucket緩存裡的資料

然後下一個task運作的時候,那麼不會再建立新的bucket和block file,而是複用之前的task已經建立好的bucket和block file。即所謂同一個Executor程序裡所有Task都會把相同的key放入相同的bucket緩沖區中

這樣的話,生成檔案的數量就是(本地worker的executor數量*executor的cores*ResultTask數量)如上圖所示,即2 * 1* 3 = 6個檔案,每一個Executor的shuffleMapTask數量100,ReduceTask數量為100,那麼

未優化的HashShuffle的檔案數是2 *1* 100*100 =20000,優化之後的數量是2*1*100 = 200檔案,相當于少了100倍

缺點:如果 Reducer 端的并行任務或者是資料分片過多的話則 Core * Reducer Task 依舊過大,也會産生很多小檔案。

二 Sort-Based Shuffle

2.1 Sort-Based Shuffle概述

HashShuffle回顧

HashShuffle寫資料的時候,記憶體有一個bucket緩沖區,同時在本地磁盤有對應的本地檔案,如果本地有檔案,那麼在記憶體應該也有檔案句柄也是需要耗費記憶體的。也就是說,從記憶體的角度考慮,即有一部分存儲資料,一部分管理檔案句柄。如果Mapper分片數量為1000,Reduce分片數量為1000,那麼總共就需要1000000個小檔案。是以就會有很多記憶體消耗,頻繁IO以及GC頻繁或者出現記憶體溢出。

而且Reducer端讀取Map端資料時,Mapper有這麼多小檔案,就需要打開很多網絡通道讀取,很容易造成Reducer(下一個stage)通過driver去拉取上一個stage資料的時候,說檔案找不到,其實不是檔案找不到而是程式不響應,因為正在GC.

2.2 Sorted-Based Shuffle介紹

為了緩解Shuffle過程産生檔案數過多和Writer緩存開銷過大的問題,spark引入了類似于hadoop Map-Reduce的shuffle機制。該機制每一個ShuffleMapTask不會為後續的任務建立單獨的檔案,而是會将所有的Task結果寫入同一個檔案,并且對應生成一個索引檔案。以前的資料是放在記憶體緩存中,等到資料完了再刷到磁盤,現在為了減少記憶體的使用,在記憶體不夠用的時候,可以将輸出溢寫到磁盤,結束的時候,再将這些不同的檔案聯合記憶體的資料一起進行歸并,進而減少記憶體的使用量。一方面檔案數量顯著減少,另一方面減少Writer緩存所占用的記憶體大小,而且同時避免GC的風險和頻率。

Spark之shuffle機制及原理

Sort-Based Shuffle有幾種不同的政策:BypassMergeSortShuffleWriter、SortShuffleWriter和UnasfeSortShuffleWriter。

對于BypassMergeSortShuffleWriter,使用這個模式特點:

# 主要用于處理不需要排序和聚合的Shuffle操作,是以資料是直接寫入檔案,資料量較大的時候,網絡I/O和記憶體負擔較重

# 主要适合處理Reducer任務數量比較少的情況下

# 将每一個分區寫入一個單獨的檔案,最後将這些檔案合并,減少檔案數量;但是這種方式需要并發打開多個檔案,對記憶體消耗比較大

因為BypassMergeSortShuffleWriter這種方式比SortShuffleWriter更快,是以如果在Reducer數量不大,又不需要在map端聚合和排序,而且

Reducer的數目 <  spark.shuffle.sort.bypassMergeThrshold指定的閥值,就是用的是這種方式。

對于SortShuffleWriter,使用這個模式特點:

# 比較适合資料量很大的場景或者叢集規模很大

# 引入了外部外部排序器,可以支援在Map端進行本地聚合或者不聚合

# 如果外部排序器enable了spill功能,如果記憶體不夠,可以先将輸出溢寫到本地磁盤,最後将記憶體結果和本地磁盤的溢寫檔案進行合并

對于UnsafeShuffleWriter由于需要謹慎使用,我們暫不做分析。

另外這個Sort-Based Shuffle跟Executor核數沒有關系,即跟并發度沒有關系,它是每一個ShuffleMapTask都會産生一個data檔案和index檔案,所謂合并也隻是将該ShuffleMapTask的各個partition對應的分區檔案合并到data檔案而已。是以這個就需要個Hash-BasedShuffle的consolidation機制差別開來。

繼續閱讀