天天看點

Spark Tungsten-sort Based Shuffle 分析

tungsten 中文是鎢絲的意思。 tungsten project 是 databricks 公司提出的對spark優化記憶體和cpu使用的計劃,該計劃初期似乎對spark sql優化的最多。不過部分rdd api 還有shuffle也是以受益。

簡述

tungsten-sort優化點主要在三個方面:

直接在serialized binary data上sort而不是java objects,減少了memory的開銷和gc的overhead。

提供cache-efficient sorter,使用一個8bytes的指針,把排序轉化成了一個指針數組的排序。

spill的merge過程也無需反序列化即可完成

這些優化的實作導緻引入了一個新的記憶體管理模型,類似os的page,對應的實際資料結構為memoryblock,支援off-heap 以及 in-heap 兩種模式。為了能夠對record 在這些memoryblock進行定位,引入了pointer(指針)的概念。

如果你還記得sort based shuffle裡存儲資料的對象partitionedappendonlymap,這是一個放在jvm heap裡普通對象,在tungsten-sort中,他被替換成了類似作業系統記憶體頁的對象。如果你無法申請到新的page,這個時候就要執行spill操作,也就是寫入到磁盤的操作。具體觸發條件,和sort based shuffle 也是類似的。

spark 預設開啟的是sort based shuffle,想要打開tungsten-sort ,請設定

對應的實作類是:

名字的來源是因為使用了大量jdk sun unsafe api。

當且僅當下面條件都滿足時,才會使用新的shuffle方式:

shuffle dependency 不能帶有aggregation 或者輸出需要排序

shuffle 的序列化器需要是 kryoserializer 或者 spark sql's 自定義的一些序列化方式.

shuffle 檔案的數量不能大于 16777216

序列化時,單條記錄不能大于 128 mb

可以看到,能使用的條件還是挺苛刻的。

參看如下代碼,page的大小:

這就保證了頁大小不超過packedrecordpointer.maximum_page_size_bytes 的值,該值就被定義成了128m。

而産生這個限制的具體設計原因,我們還要仔細分析下tungsten的記憶體模型:

Spark Tungsten-sort Based Shuffle 分析

這張圖其實畫的是 on-heap 的記憶體邏輯圖,其中 #page 部分為13bit, offset 為51bit,你會發現 2^51 >>128m的。但是在shuffle的過程中,對51bit 做了壓縮,使用了27bit,具體如下:

這裡預留出的24bit給了partition number,為了後面的排序用。上面的好幾個限制其實都是因為這個指針引起的:

一個是partition 的限制,前面的數字 16777216 就是來源于partition number 使用24bit 表示的。

第二個是page number

第三個是偏移量,最大能表示到2^27=128m。那一個task 能管理到的記憶體是受限于這個指針的,最多是 2^13 * 128m 也就是1tb左右。

有了這個指針,我們就可以定位和管理到off-heap 或者 on-heap裡的記憶體了。這個模型還是很漂亮的,記憶體管理也非常高效,記得之前的預估partitionedappendonlymap的記憶體是非常困難的,但是通過現在的記憶體管理機制,是非常快速并且精确的。

對于第一個限制,那是因為後續shuffle write的sort 部分,隻對前面24bit的partiton number 進行排序,key的值沒有被編碼到這個指針,是以沒辦法進行ordering

同時,因為整個過程是追求不反序列化的,是以不能做aggregation。

核心類:

資料會通過 unsafeshuffleexternalsorter.insertrecordintosorter 一條一條寫入到 seroutputstream 序列化輸出流。

這裡消耗記憶體的地方是

預設是1m,類似于sort based shuffle 中的externalsorter,在tungsten sort 對應的為unsafeshuffleexternalsorter,記錄序列化後就通過sorter.insertrecord方法放到sorter裡去了。

這裡sorter 負責申請page,釋放page,判斷是否要進行spill都這個類裡完成。代碼的架子其實和sort based 是一樣的。

Spark Tungsten-sort Based Shuffle 分析

(另外,值得注意的是,這張圖裡進行spill操作的同時檢查記憶體可用而導緻的exeception 的bug 已經在1.5.1版本被修複了,忽略那條路徑)

記憶體是否充足的條件依然shufflememorymanager 來決定,也就是所有task shuffle 申請的page記憶體總和不能大于下面的值:

上面的數字可通過下面兩個配置來更改:

unsafeshuffleexternalsorter 負責申請記憶體,并且會生成該條記錄最後的邏輯位址,也就前面提到的 pointer。

接着record 會繼續流轉到unsafeshuffleinmemorysorter中,這個對象維護了一個指針數組:

數組的初始大小為 4096,後續如果不夠了,則按每次兩倍大小進行擴充。

假設100萬條記錄,那麼該數組大約是8m 左右,是以其實還是很小的。一旦spill後該unsafeshuffleinmemorysorter就會被賦為null,被回收掉。

我們回過頭來看spill,其實邏輯上也異常簡單了,unsafeshuffleinmemorysorter 會傳回一個疊代器,該疊代器粒度每個元素就是一個指針,然後到根據該指針可以拿到真實的record,然後寫入到磁盤,因為這些record 在一開始進入unsafeshuffleexternalsorter 就已經被序列化了,是以在這裡就純粹變成寫位元組數組了。形成的結構依然和sort based shuffle 一緻,一個檔案裡不同的partiton的資料用filesegment來表示,對應的資訊存在一個index檔案裡。

另外寫檔案的時候也需要一個 buffer :

另外從記憶體裡拿到資料放到diskwriter,這中間還要有個中轉,是通過

來完成的,都是記憶體,是以很快。

task結束前,我們要做一次mergespills操作,然後形成一個shuffle 檔案。這裡面其實也挺複雜的,如果開啟了

并且沒有開啟 

或者壓縮方式為:

則可以非常高效的進行合并,叫做transferto。不過無論是什麼合并,都不需要進行反序列化。

shuffle read 完全複用hashshufflereader,具體參看 sort-based shuffle。

我個人感覺,tungsten-sort 實作了記憶體的自主管理,管理方式模拟了作業系統的方式,通過page可以使得大量的record被順序存儲在記憶體,整個shuffle write 排序的過程隻需要對指針進行運算(二進制排序),并且無需反序列化,整個過程非常高效,對于減少gc,提高記憶體通路效率,提高cpu使用效率确實帶來了明顯的提升。