天天看點

Spark之Shuffle

Spark有三種shuffle,分别是hash shuffle、sort shuffle、Tungsten Shuffle。

1、HashShuffle

适合小資料的場景,對小規模資料的處理效率比排序的shuffle高。

Spark之Shuffle

1)     每一個Mapper建立出和Reducer數目相同的bucket,bucket實際上是一個buffer,其大小為spark.shuffle.file.buffer.kb(預設32KB)。

2)     Mapper産生的結果會根據設定的partition算法填充到每個bucket中去,然後再寫入到磁盤檔案。

3)     Reducer從遠端或是本地的block manager中找到相應的檔案讀取資料。

針對上述Shuffle過程産生的檔案過多問題,Spark有另外一種改進的Shuffle過程:consolidation Shuffle,以期顯著減少Shuffle檔案的數量。在consolidation Shuffle中每個bucket并非對應一個檔案,而是對應檔案中的一個segment部分。Job的map在某個節點上第一次執行,為每個reduce建立bucket對應的輸出檔案,把這些檔案組織成ShuffleFileGroup,當這次map執行完之後,這個ShuffleFileGroup可以釋放為下次循環利用;當又有map在這個節點上執行時,不需要建立新的bucket檔案,而是在上次的ShuffleFileGroup中取得已經建立的檔案繼續追加寫一個segment;目前次map還沒執行完,ShuffleFileGroup還沒有釋放,這時如果有新的map在這個節點上執行,無法循環利用這個ShuffleFileGroup,而是隻能建立新的bucket檔案組成新的ShuffleFileGroup來寫輸出。

Spark之Shuffle

優點

1)     快-不需要排序,也不需要維持hash表

2)     不需要額外空間用作排序

3)     不需要額外IO---資料寫入磁盤隻需一次,讀取也隻需一次

缺點

1)     當partitions大時,輸出大量的檔案(cores * R),性能開始降低 //cores是CPU個數

2)     大量的檔案寫入,使檔案系統開始變為随機寫,性能比順序寫要降低100倍

3)     緩存空間占用比較大

Reduce去拖Map的輸出資料,Spark提供了兩套不同的拉取資料架構:通過socket連接配接去取資料;使用netty架構去取資料。

Reduce拖過來資料之後以什麼方式存儲呢?Reduce拖過來的資料會放在一個HashMap中,HashMap中存儲的也是<key, value>對,key是Map輸出的key,Map輸出對應這個key的所有value組成HashMap的value。Spark将Shuffle取過來的每一個<key, value>對插入或者更新到HashMap中,來一個處理一個。HashMap全部放在記憶體中。

Shuffle取過來的資料全部存放在記憶體中,對于資料量比較小或者已經在Map端做過合并處理的Shuffle資料,占用記憶體空間不會太大,但是對于比如group by key這樣的操作,Reduce需要得到key對應的所有value,并将這些value組成一個數組放在記憶體中,這樣當資料量較大時,就需要較多記憶體。

當記憶體不夠時,要不就失敗,要不就用老辦法把記憶體中的資料移到磁盤上放着。Spark意識到在處理資料規模遠遠大于記憶體空間時所帶來的不足,引入了一個具有外部排序的方案。Shuffle過來的資料先放在記憶體中,當記憶體中存儲的<key, value>對超過1000并且記憶體使用超過70%時,判斷節點上可用記憶體如果還足夠,則把記憶體緩沖區大小翻倍,如果可用記憶體不再夠了,則把記憶體中的<key, value>對排序然後寫到磁盤檔案中。最後把記憶體緩沖區中的資料排序之後和那些磁盤檔案組成一個最小堆,每次從最小堆中讀取最小的資料,這個和MapReduce中的merge過程類似。

2、SortShuffle:

SortShuffleManager 的運作機制主要分成兩種,一種是普通運作機制,另一種是bypass 運作機制。當 shuffle reduce task 的數量小于等于 spark.shuffle.sort.bypassMergeThreshold 參數的值時(預設為 200),就會啟用 bypass 機制。

1)普通運作機制

在該模式下,資料會先寫入一個記憶體資料結構中,此時根據不同的 shuffle 算子, 可能選用不同的資料結構。如果是 reduceByKey 這種聚合類的 shuffle 算子,那麼會選用 Map 資料結構, 一邊通過 Map 進行聚合,一邊寫入記憶體; 如果是 join 這種普通的 shuffle 算子,那麼會選用 Array 資料結構,直接寫入記憶體。接着, 每寫一條資料進入記憶體資料結構之後,就會判斷一下, 是否達到了某個臨界門檻值。如果達到臨界門檻值的話,那麼就會嘗試将記憶體資料結構中的資料溢寫到磁盤, 然後清空記憶體資料結構。

在溢寫到磁盤檔案之前,會先根據 key 對記憶體資料結構中已有的資料進行排序。排序過後,會分批将資料寫入磁盤檔案。預設的 batch 數量是 10000 條,也就是說, 排序好的資料,會以每批 1 萬條資料的形式分批寫入磁盤檔案。寫入磁盤檔案是通過 Java 的 BufferedOutputStream 實作的。BufferedOutputStream 是 Java 的緩沖輸出流,首先會将資料緩沖在記憶體中,當記憶體緩沖滿溢之後再一次寫入磁盤檔案中,這 樣可以減少磁盤 IO 次數, 提升性能。

一個 task 将所有資料寫入記憶體資料結構的過程中, 會發生多次磁盤溢寫操作, 也就會産生多個臨時檔案。最後會将之前所有的臨時磁盤檔案都進行合并, 這就是merge 過程, 此時會将之前所有臨時磁盤檔案中的資料讀取出來,然後依次寫入最終的磁盤檔案之中。此外,由于一個 task 就隻對應一個磁盤檔案,也就意味着該 task為下遊 stage 的 task 準備的資料都在這一個檔案中,是以還會單獨寫一份索引檔案,其中辨別了下遊各個 task 的資料在檔案中的 start offset 與 end offset。

SortShuffleManager 由于有一個磁盤檔案 merge 的過程,是以大大減少了檔案數量。比如第一個 stage 有 50 個 task,總共有 10 個 Executor,每個 Executor 執行 5 個 task,而第二個 stage 有 100 個 task。由于每個 task 最終隻有一個磁盤檔案,是以此時每個 Executor 上隻有 5 個磁盤檔案, 所有 Executor 隻有 50 個磁盤檔案。

普通運作機制的 SortShuffleManager 工作原理如圖 1-9 所示:

Spark之Shuffle

2)  bypass 運作機制

bypass運作機制的觸發條件如下:

①shuffle map task 數量小于 spark.shuffle.sort.bypassMergeThreshold 參數的值。

②不是聚合類的 shuffle 算子。

此時,每個 Map Task 會為每個下遊 Reduce Task 都建立一個臨時磁盤檔案,并将資料按 key 進行 hash然後根據 key 的 hash 值,将資料寫入對應的磁盤檔案之中。當然,寫入磁盤檔案時也是先寫入記憶體緩沖,緩沖寫滿之後再溢寫到磁盤檔案的。最後,同樣會将所有臨時磁盤檔案都合并成一個磁盤檔案,并建立一個單獨的索引檔案。

該過程的磁盤寫機制其實跟未經優化的 HashShuffleManager 是一模一樣的, 因為都要建立數量驚人的磁盤檔案,隻是在最後會做一個磁盤檔案的合并而已。是以少量的最終磁盤檔案,也讓該機制相對未經優化的 HashShuffleManager 來說,shuffle read 的性能會更好。

而該機制與普通 SortShuffleManager 運作機制的不同在于: 第一, 磁盤寫機制不同;第二,不會進行排序。也就是說,啟用該機制的最大好處在于, shuffle write 過程中,不需要進行資料的排序操作,也就節省掉了這部分的性能開銷。

byPass運作機制的 SortShuffleManager 工作原理如圖 1-10 所示:

Spark之Shuffle

與hash shuffle相比,sort shuffle中每個Mapper隻産生一個資料檔案和一個索引檔案,資料檔案中的資料按照Reducer排序,但屬于同一個Reducer的資料不排序。Mapper産生的資料先放到AppendOnlyMap這個資料結構中,如果記憶體不夠,資料則會spill到磁盤,最後合并成一個檔案。

與Hash shuffle相比,shuffle檔案數量減少,記憶體使用更加可控。但排序會影響速度。

優點

1)     map建立檔案量較少

2)     少量的IO随機操作,大部分是順序讀寫

缺點

1)     要比Hash Shuffle要慢,需要自己通過spark.shuffle.sort.bypassMergeThreshold來設定合适的值。

2)     如果使用SSD盤存儲shuffle資料,那麼Hash Shuffle可能更合适。

3、Tungsten Shuffle:

Tungsten-sort 算不得一個全新的shuffle 方案,它在特定場景下基于類似現有的Sort Based Shuffle處理流程,對記憶體/CPU/Cache使用做了非常大的優化。帶來高效的同時,也就限定了自己的使用場景。如果Tungsten-sort 發現自己無法處理,則會自動使用 Sort Based Shuffle進行處理。

Tungsten-sort優化點主要在三個方面:

1)直接在序列化後的二進制資料上排序而不是java 對象,減少了memory的開銷和GC的overhead。

2)提供cache-efficient sorter,使用一個8bytes的指針,把排序轉化成了一個指針數組的排序。

3)spill的merge過程也無需反序列化即可完成。

這些優化的實作導緻引入了一個新的記憶體管理模型,類似OS的Page,對應的實際資料結構為MemoryBlock,支援off-heap 以及 in-heap 兩種模式。為了能夠對Record 在這些MemoryBlock進行定位,引入了Pointer(指針)的概念。

Spark 預設開啟的是Sort Based Shuffle,想要打開Tungsten-sort ,請設定

spark.shuffle.manager=tungsten-sort

對應的實作類是:

org.apache.spark.shuffle.unsafe.UnsafeShuffleManager

名字的來源是因為使用了大量JDK Sun Unsafe API。

當且僅當下面條件都滿足時,才會使用新的Shuffle方式:

1)     Shuffle dependency 不能帶有aggregation 或者輸出需要排序

2)     Shuffle 的序列化器需要是 KryoSerializer 或者 Spark SQL's 自定義的一些序列化方式.

3)     Shuffle 檔案的數量不能大于 16777216(2的24次方)

4)     序列化時,單條記錄不能大于 128 MB

可以看到,能使用的條件還是挺苛刻的。

這些限制來源于哪裡

參看如下代碼,page的大小:

this.pageSizeBytes = (int) Math.min( PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES, shuffleMemoryManager.pageSizeBytes());

 這就保證了頁大小不超過PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES 的值,該值就被定義成了128M。

而産生這個限制的具體設計原因,我們還要仔細分析下Tungsten的記憶體模型:

Spark之Shuffle

這張圖其實畫的是 on-heap 的記憶體邏輯圖,其中 #Page 部分為13bit, Offset 為51bit,你會發現 2^51 >>128M的。但是在Shuffle的過程中,對51bit 做了壓縮,使用了27bit,具體如下:

 [24 bit partition number][13 bit memory page number][27 bit offset in page]

這裡預留出的24bit給了partition number,為了後面的排序用。上面的好幾個限制其實都是因為這個指針引起的:

一個是partition 的限制,前面的數字 16777216 就是來源于partition number 使用24bit 表示的。

第二個是page number

第三個是偏移量,最大能表示到2^27=128M。那一個task 能管理到的記憶體是受限于這個指針的,最多是 2^13 * 128M 也就是1TB左右。

有了這個指針,我們就可以定位和管理到off-heap 或者 on-heap裡的記憶體了。這個模型還是很漂亮的,記憶體管理也非常高效,記得之前的預估PartitionedAppendOnlyMap的記憶體是非常困難的,但是通過現在的記憶體管理機制,是非常快速并且精确的。

對于第一個限制,那是因為後續Shuffle Write的sort 部分,隻對前面24bit的partiton number 進行排序,key的值沒有被編碼到這個指針,是以沒辦法進行ordering同時,因為整個過程是追求不反序列化的,是以不能做aggregation(聚合)。

Shuffle Write

核心類:

org.apache.spark.shuffle.unsafe.UnsafeShuffleWriter

資料會通過 UnsafeShuffleExternalSorter.insertRecordIntoSorter 一條一條寫入到 serOutputStream 序列化輸出流。

這裡消耗記憶體的地方是serBuffer = new MyByteArrayOutputStream(1024 * 1024),預設是1M,類似于Sort Based Shuffle 中的ExternalSorter,在Tungsten Sort 對應的為UnsafeShuffleExternalSorter,記錄序列化後就通過sorter.insertRecord方法放到sorter裡去了。

這裡sorter 負責申請Page,釋放Page,判斷是否要進行spill都這個類裡完成。代碼的架子其實和Sort Based 是一樣的。

Spark之Shuffle

 (另外,值得注意的是,這張圖裡進行spill操作的同時檢查記憶體可用而導緻的Exeception 的bug 已經在1.5.1版本被修複了,忽略那條路徑)

記憶體是否充足的條件依然shuffleMemoryManager 來決定,也就是所有task shuffle 申請的Page記憶體總和不能大于下面的值: ExecutorHeapMemeory * 0.2 * 0.8

上面的數字可通過下面兩個配置來更改:

spark.shuffle.memoryFraction=0.2

spark.shuffle.safetyFraction=0.8

UnsafeShuffleExternalSorter 負責申請記憶體,并且會生成該條記錄最後的邏輯位址,也就前面提到的 Pointer。

接着Record 會繼續流轉到UnsafeShuffleInMemorySorter中,這個對象維護了一個指針數組:

private long[] pointerArray;

數組的初始大小為 4096,後續如果不夠了,則按每次兩倍大小進行擴充。

假設100萬條記錄,那麼該數組大約是8M 左右,是以其實還是很小的。一旦spill後該UnsafeShuffleInMemorySorter就會被賦為null,被回收掉。

我們回過頭來看spill,其實邏輯上也異常簡單了,UnsafeShuffleInMemorySorter 會傳回一個疊代器,該疊代器粒度每個元素就是一個指針,然後到根據該指針可以拿到真實的record,然後寫入到磁盤,因為這些record 在一開始進入UnsafeShuffleExternalSorter 就已經被序列化了,是以在這裡就純粹變成寫位元組數組了。形成的結構依然和Sort Based Shuffle 一緻,一個檔案裡不同的partiton的資料用fileSegment來表示,對應的資訊存在一個index檔案裡。

另外寫檔案的時候也需要一個 buffer : spark.shuffle.file.buffer = 32k

另外從記憶體裡拿到資料放到DiskWriter,這中間還要有個中轉,是通過

 final byte[] writeBuffer = new byte[DISK_WRITE_BUFFER_SIZE=1024 * 1024];

來完成的,都是記憶體,是以很快。

Task結束前,我們要做一次mergeSpills操作,然後形成一個shuffle 檔案。這裡面其實也挺複雜的,

如果開啟了`spark.shuffle.unsafe.fastMergeEnabled=true`并且沒有開啟`spark.shuffle.compress=true`

或者壓縮方式為:LZFCompressionCodec,則可以非常高效的進行合并,叫做transferTo。不過無論是什麼合并,都不需要進行反序列化。

繼續閱讀