天天看點

Spark Sort Based Shuffle記憶體分析

借用和董神的一段對話說下背景:

<b>shuffle共有三種,别人讨論的是hash shuffle,這是最原始的實作,曾經有兩個版本,第一版是每個map産生r個檔案,一共産生mr個檔案,由于産生的中間檔案太大影響擴充性,社群提出了第二個優化版本,讓一個core上map共用檔案,減少檔案數目,這樣共産生corer個檔案,好多了,但中間檔案數目仍随任務數線性增加,仍難以應對大作業,但hash shuffle已經優化到頭了。為了解決hash shuffle性能差的問題,又引入sort shuffle,完全借鑒mapreduce實作,每個map産生一個檔案,徹底解決了擴充性問題</b>

目前sort based shuffle 是作為預設shuffle類型的。shuffle 是一個很複雜的過程,任何一個環節都足夠寫一篇文章。是以這裡,我嘗試換個方式,從實用的角度出發,讓讀者有兩方面的收獲:

剖析哪些環節,哪些代碼可能會讓記憶體産生問題

控制相關記憶體的參數

有時候,我們甯可程式慢點,也不要oom,至少要先跑步起來,希望這篇文章能夠讓你達成這個目标。

同時我們會提及一些類名,這些類友善你自己想更深入了解時,可以友善的找到他們,自己去探個究竟。

spark 的shuffle 分為 write,read 兩階段。我們預先建立三個概念:

write 對應的是shufflemaptask,具體的寫操作externalsorter來負責

read 階段由shufflerdd裡的hashshufflereader來完成。如果拉來的資料如果過大,需要落地,則也由externalsorter來完成的

所有write 寫完後,才會執行read。 他們被分成了兩個不同的stage階段。

也就是說,shuffle write ,shuffle read 兩階段都可能需要落磁盤,并且通過disk merge 來完成最後的sort歸并排序。

shuffle write 的入口鍊路為:

會産生記憶體瓶頸的其實就是 org.apache.spark.util.collection.externalsorter。我們看看這個複雜的externalsorter都有哪些地方在占用記憶體:

第一個地:

我們知道,資料都是先寫記憶體,記憶體不夠了,才寫磁盤。這裡的map就是那個放資料的記憶體了。

這個partitionedappendonlymap内部維持了一個數組,是這樣的:

也就是他消耗的并不是storage的記憶體,所謂storage記憶體,指的是由blockmanager管理起來的記憶體。

partitionedappendonlymap 放不下,要落地,那麼不能硬生生的寫磁盤,是以需要個buffer,然後把buffer再一次性寫入磁盤檔案。這個buffer是由參數

控制的。資料擷取的過程中,序列化反序列化,也是需要空間的,是以spark 對數量做了限制,通過如下參數控制:

 spark.shuffle.spill.batchsize=10000

假設一個executor的可使用的core為 c個,那麼對應需要的記憶體消耗為:

這麼看來,寫檔案的buffer不是問題,而序列化的batchsize也不是問題,幾萬或者十幾萬個record 而已。那c * partitionedappendonlymap  到底會有多大呢?我先給個結論:

怎麼得到上面的結論呢?核心店就是要判定partitionedappendonlymap 需要占用多少記憶體,而它到底能占用記憶體,則由觸發寫磁盤動作決定,因為一旦寫磁盤,partitionedappendonlymap所占有的記憶體就會被釋放。下面是判斷是否寫磁盤的邏輯代碼:

每放一條記錄,就會做一次記憶體的檢查,看partitionedappendonlymap 到底占用了多少記憶體。如果真是這樣,假設檢查一次記憶體1ms, 1kw 就不得了的時間了。是以肯定是不行的,是以 estimatesize其實是使用采樣算法來做的。

第二個,我們也不希望maybespill太耗時,是以 maybespill 方法裡就搞了很多東西,減少耗時。我們看看都設定了哪些防線

首先會判定要不要執行内部邏輯:

每隔32次會進行一次檢查,并且要目前partitionedappendonlymap currentmemory &gt;  mymemorythreshold 才會進一步判定是不是要spill.

其中 mymemorythreshold可通過如下配置獲得初始值

接着會向 shufflememorymanager 要 2 * currentmemory - mymemorythreshold 的記憶體,shufflememorymanager 是被executor 所有正在運作的task(core) 共享的,能夠配置設定出去的記憶體是:

上面的數字可通過下面兩個配置來更改:

如果無法擷取到足夠的記憶體,就會觸發真的spill操作了。

看到這裡,上面的結論就顯而易見了。

然而,這裡我們忽略了一個很大的問題,就是

為什麼說它是大問題,前面我們說了,estimatesize 是近似估計,是以有可能估的不準,也就是實際記憶體會遠遠超過預期。

具體的大家可以看看 org.apache.spark.util.collection.sizetracker

我這裡給出一個結論:

如果你記憶體開的比較大,其實反倒風險更高,因為estimatesize 并不是每次都去真實的算緩存。它是通過采樣來完成的,而采樣的周期不是固定的,而是指數增長的,比如第一次采樣完後,partitionedappendonlymap 要經過1.1次的update/insert操作之後才進行第二次采樣,然後經過1.1*.1.1次之後進行第三次采樣,以此遞推,假設你記憶體開的大,那partitionedappendonlymap可能要經過幾十萬次更新之後之後才會進行一次采樣,然後才能計算出新的大小,這個時候幾十萬次更新帶來的新的記憶體壓力,可能已經讓你的gc不堪重負了。

當然,這是一種折中,因為确實不能頻繁采樣。

如果你不想出現這種問題,要麼自己替換實作這個類,要麼将

設定的更小一些。

shuffle read 記憶體消耗分析

shuffle read 的入口鍊路為:

shuffle read 會更複雜些,尤其是從各個節點拉取資料。但這塊不是不是我們的重點。按流程,主要有:

擷取待拉取資料的疊代器

使用appendonlymap/externalappendonlymap 做combine

如果需要對key排序,則使用externalsorter

其中1後續會單獨列出文章。3我們在write階段已經讨論過。是以這裡重點是第二個步驟,combine階段。

如果你開啟了

則使用externalappendonlymap,否則使用appendonlymap。兩者的差別是,前者如果記憶體不夠,則落磁盤,會發生spill操作,後者如果記憶體不夠,直接oom了。

這裡我們會重點分析externalappendonlymap。

externalappendonlymap 作為記憶體緩沖資料的對象如下:

如果currentmap 對象向申請不到記憶體,就會觸發spill動作。判定記憶體是否充足的邏輯和shuffle write 完全一緻。

combine做完之後,externalappendonlymap 會傳回一個iterator,叫做externaliterator,這個iterator背後的資料源是所有spill檔案以及目前currentmap裡的資料。

我們進去 externaliterator 看看,唯一的一個占用記憶體的對象是這個優先隊列:

mergeheap 裡元素數量等于所有spill檔案個數加一。streambuffer 的結構:

其中iterator 隻是一個對象引用,pairs 應該儲存的是iterator裡的第一個元素(如果hash有沖突的話,則為多個)

是以mergeheap 應該不占用什麼記憶體。到這裡我們看看應該占用多少記憶體。依然假設  corenum 為 c,則

是以這一段占用記憶體較大的依然是 sizetrackingappendonlymap  ,一樣的,他的值也符合如下公式

externalappendonlymap 的目的是做combine,然後如果你還設定了order,那麼接着會啟用 externalsorter 來完成排序。

經過上文對shuffle write的使用,相比大家也對externalsorter有一定的了解了,此時應該占用記憶體的地方最大不超過下面的這個值:

不過即使如此,因為他們共享一個shufflememorymanager,則理論上隻有這麼大:

分析到這裡,我們可以做個總結:

shuffle read階段如果記憶體不足,有兩個階段會落磁盤,分别是combine 和 sort 階段。對應的都會spill小檔案,并且産生讀。

shuffle read 階段如果開啟了spill功能,則基本能保證記憶體控制在 executorheapmemeory * 0.2 * 0.8 之内。

繼續閱讀