tungsten 中文是鎢絲的意思。 tungsten project 是 databricks 公司提出的對spark優化記憶體和cpu使用的計劃,該計劃初期似乎對spark sql優化的最多。不過部分rdd api 還有shuffle也是以受益。
簡述
tungsten-sort優化點主要在三個方面:
直接在serialized binary data上sort而不是java objects,減少了memory的開銷和gc的overhead。
提供cache-efficient sorter,使用一個8bytes的指針,把排序轉化成了一個指針數組的排序。
spill的merge過程也無需反序列化即可完成
這些優化的實作導緻引入了一個新的記憶體管理模型,類似os的page,對應的實際資料結構為memoryblock,支援off-heap 以及 in-heap 兩種模式。為了能夠對record 在這些memoryblock進行定位,引入了pointer(指針)的概念。
如果你還記得sort based shuffle裡存儲資料的對象partitionedappendonlymap,這是一個放在jvm heap裡普通對象,在tungsten-sort中,他被替換成了類似作業系統記憶體頁的對象。如果你無法申請到新的page,這個時候就要執行spill操作,也就是寫入到磁盤的操作。具體觸發條件,和sort based shuffle 也是類似的。
spark 預設開啟的是sort based shuffle,想要打開tungsten-sort ,請設定
對應的實作類是:
名字的來源是因為使用了大量jdk sun unsafe api。
當且僅當下面條件都滿足時,才會使用新的shuffle方式:
shuffle dependency 不能帶有aggregation 或者輸出需要排序
shuffle 的序列化器需要是 kryoserializer 或者 spark sql's 自定義的一些序列化方式.
shuffle 檔案的數量不能大于 16777216
序列化時,單條記錄不能大于 128 mb
可以看到,能使用的條件還是挺苛刻的。
參看如下代碼,page的大小:
這就保證了頁大小不超過packedrecordpointer.maximum_page_size_bytes 的值,該值就被定義成了128m。
而産生這個限制的具體設計原因,我們還要仔細分析下tungsten的記憶體模型:

這張圖其實畫的是 on-heap 的記憶體邏輯圖,其中 #page 部分為13bit, offset 為51bit,你會發現 2^51 >>128m的。但是在shuffle的過程中,對51bit 做了壓縮,使用了27bit,具體如下:
這裡預留出的24bit給了partition number,為了後面的排序用。上面的好幾個限制其實都是因為這個指針引起的:
一個是partition 的限制,前面的數字 16777216 就是來源于partition number 使用24bit 表示的。
第二個是page number
第三個是偏移量,最大能表示到2^27=128m。那一個task 能管理到的記憶體是受限于這個指針的,最多是 2^13 * 128m 也就是1tb左右。
有了這個指針,我們就可以定位和管理到off-heap 或者 on-heap裡的記憶體了。這個模型還是很漂亮的,記憶體管理也非常高效,記得之前的預估partitionedappendonlymap的記憶體是非常困難的,但是通過現在的記憶體管理機制,是非常快速并且精确的。
對于第一個限制,那是因為後續shuffle write的sort 部分,隻對前面24bit的partiton number 進行排序,key的值沒有被編碼到這個指針,是以沒辦法進行ordering
同時,因為整個過程是追求不反序列化的,是以不能做aggregation。
核心類:
資料會通過 unsafeshuffleexternalsorter.insertrecordintosorter 一條一條寫入到 seroutputstream 序列化輸出流。
這裡消耗記憶體的地方是
預設是1m,類似于sort based shuffle 中的externalsorter,在tungsten sort 對應的為unsafeshuffleexternalsorter,記錄序列化後就通過sorter.insertrecord方法放到sorter裡去了。
這裡sorter 負責申請page,釋放page,判斷是否要進行spill都這個類裡完成。代碼的架子其實和sort based 是一樣的。
(另外,值得注意的是,這張圖裡進行spill操作的同時檢查記憶體可用而導緻的exeception 的bug 已經在1.5.1版本被修複了,忽略那條路徑)
記憶體是否充足的條件依然shufflememorymanager 來決定,也就是所有task shuffle 申請的page記憶體總和不能大于下面的值:
上面的數字可通過下面兩個配置來更改:
unsafeshuffleexternalsorter 負責申請記憶體,并且會生成該條記錄最後的邏輯位址,也就前面提到的 pointer。
接着record 會繼續流轉到unsafeshuffleinmemorysorter中,這個對象維護了一個指針數組:
數組的初始大小為 4096,後續如果不夠了,則按每次兩倍大小進行擴充。
假設100萬條記錄,那麼該數組大約是8m 左右,是以其實還是很小的。一旦spill後該unsafeshuffleinmemorysorter就會被賦為null,被回收掉。
我們回過頭來看spill,其實邏輯上也異常簡單了,unsafeshuffleinmemorysorter 會傳回一個疊代器,該疊代器粒度每個元素就是一個指針,然後到根據該指針可以拿到真實的record,然後寫入到磁盤,因為這些record 在一開始進入unsafeshuffleexternalsorter 就已經被序列化了,是以在這裡就純粹變成寫位元組數組了。形成的結構依然和sort based shuffle 一緻,一個檔案裡不同的partiton的資料用filesegment來表示,對應的資訊存在一個index檔案裡。
另外寫檔案的時候也需要一個 buffer :
另外從記憶體裡拿到資料放到diskwriter,這中間還要有個中轉,是通過
來完成的,都是記憶體,是以很快。
task結束前,我們要做一次mergespills操作,然後形成一個shuffle 檔案。這裡面其實也挺複雜的,如果開啟了
并且沒有開啟
或者壓縮方式為:
則可以非常高效的進行合并,叫做transferto。不過無論是什麼合并,都不需要進行反序列化。
shuffle read 完全複用hashshufflereader,具體參看 sort-based shuffle。
我個人感覺,tungsten-sort 實作了記憶體的自主管理,管理方式模拟了作業系統的方式,通過page可以使得大量的record被順序存儲在記憶體,整個shuffle write 排序的過程隻需要對指針進行運算(二進制排序),并且無需反序列化,整個過程非常高效,對于減少gc,提高記憶體通路效率,提高cpu使用效率确實帶來了明顯的提升。