Cosco是Facebook開發的一種服務,主要用于優化Spark Shuffle的性能,下文主要介紹用Flash閃存(以下簡稱:閃存)進一步優化Cosco。
一、Cosco
Cosco作為一種服務主要優化Spark Shuffle的性能,其優勢有:
- 相較于原生的Spark Shuffle,能夠提升大約3倍的I/O性能,能夠有效降低磁盤的讀寫時間;
- 引入閃存以後Cosco能夠以更少的資源支撐更多的場景;
- 引入閃存之後有更大的可能降低Query的延遲;
- 利用閃存優化Cosco的過程中用到的技術也可以用于Cosco之外的領域。
(一)Cosco産生背景
在Spark Shuffle中有Map Task和Reduce Task兩種Task,每個Map Task都會生成Map Output Files,然後根據Partition進行分組,決定将檔案寫入本地磁盤還是分布式檔案系統中;所有Map Task執行完畢之後,Reduce Task就會去讀取Map Output Files中某個分區的資料,将其合并成某個大的Partition,在必要的時候還會進行排序。在上面的過程中,主要會存在兩個與I/O性能相關的問題:
(1)Write amplification problem
如下圖,這個問題也就說在最壞的情況下,同一個shuffle産生的位元組會被寫入磁盤三次:
- 第一次是在Map Task産生shuffle data的過程中如果記憶體不足,會先把Shuffle Data Spill到磁盤;
- 第二次是寫入Map Output Files的過程;
- 第三次是在Reduce Task中,如果進行sort的時候記憶體不夠,也會先Spill到磁盤。

(2)small IOs problem
在Spark Shuffle模型中,其I/O請求總數會有M x R個,而在生産中觀察到的I/O請求的size平均為200kb,相對于磁盤來說是非常小的,當整個作業的并行度提升之後會産生大量的小I/O請求,會急劇增加磁盤開銷,比如尋址時間等,進而導緻Shuffle的性能變差。
基于上述問題,Cosco應運而生,其工作原理如下圖所示。相較于原生的Spark每個Task生成一個自己的Map Output Files,Cosco允許不同的Map Task将同一個Partition寫入到同一個記憶體緩存中,緩存到達一個門檻值後,會将這部分資料Flush到分布式檔案系統的檔案中。這種情況下,同一個Partition可能産生多個對應的Flush檔案,等到Reduce Task執行的時候,隻需要讀取HDFS系統中的檔案即可,且檔案的資料量在十幾M的級别,且檔案數量遠小于之前的M x R數量級。是以,也就解決了小I/O的問題。
(二)用Flash替換記憶體緩沖
使用Flash來作為緩沖的話是通過追加寫的方式将Shuffle的資料寫入緩存中,之是以這麼做是因為閃存的可擦寫次數是有限的,追加寫可以延長閃存的壽命。閃存相對于記憶體存在着一定的延遲,但是總體而言這個延遲相對于整個shuffle在Cosco中緩存的時間是可以忽略不計的。
現在存在一個如何選擇的問題,比如在1GB的記憶體和每天能夠寫100GB的閃存之間讓我們用來部署叢集,我們如何抉擇呢?這裡有一條不精确的經驗:1GB的記憶體和每天能夠寫100GB的閃存這兩種選擇的效果是一樣的,也就是說能夠支撐同樣的場景,但是記憶體比閃存需要更多的能耗。大家在部署叢集的時候可以根據這條經驗來實際操作,選擇最優的配置。
(三)基于記憶體和閃存混合的緩存優化
基于記憶體和閃存混合的緩存優化技術主要有兩種:
- 第一種是優先緩存記憶體,當記憶體達到一定門檻值之後再Flush到閃存;
- 第二種是利用partition加載速度不一樣的特性,對于加載速度快的partition用記憶體緩存,對于加載速度慢的partition用閃存緩存。
(1)第一種
第一種優化技術利用了Shuffle資料随時間變化的特性,如下圖所示,我們發現Shuffle資料随時間變化的統計中,峰值情況是占據小部分的,于是我們用閃存來處理峰值情況,最終隻用250GB的記憶體和每天能寫25TB的閃存就能達到和原來一樣的效果,這樣就實作了Cosco的優勢,用更少的硬體資源來支撐了同樣的場景。這種混合存儲的優化技術比之純用記憶體有着更強的伸縮性,不會在某些特殊情況下造成系統崩潰。比如單純用記憶體的時候,如果出現記憶體不足就會崩潰,但是混合使用的時候就可以用閃存來處理異常情況,避免造成嚴重後果。
原來的Cosco叢集有負載均衡的邏輯,更了獲得更好的效果,我們使用插件的方式将閃存的優化內建到負載均衡邏輯中,如下圖所示,引入一個門檻值,當Shuffle Service記憶體不夠的時候,就會利用門檻值來進行判斷是否将資料Flush到閃存中,這樣有兩個好處:
- 實作簡單;
- 便于對叢集的性能做評估。
總的來說,第一種優化技術有如下特點:
- 利用了Shuffle Data的分布随時間變化而變化的特性;
- 采用了優先在記憶體内進行緩存的政策;
- 巧妙的适配了原來的負載均衡邏輯。
(2)第二種
第二種是利用partition加載速度不一樣的特性,對于加載速度快的partition用記憶體緩存,對于加載速度慢的partition用閃存緩存。其政策主要是周期性的檢測Partition的加載速率,當速率小于某個門檻值的時候,就使用閃存來緩存,當速率大于某個門檻值的時候就使用記憶體來緩存。從下圖中可以看出,在實際生産中大多數Partition的加載速度是比較慢的,少部分加載速度比較快,加載速度比較慢的Partition占用了少部分記憶體,造成記憶體的低使用率,是以我們用閃存來承載這些Partition,達到優化的目的。
二、未來工作
(一)低延遲查詢
引入閃存之後,我們可以讓Reduce Task直接從閃存中讀取緩存的資料,而不是從HDFS中的檔案讀取資料,這樣子提高了資料的讀取速率。另外,在引入閃存之後,Shuffle的資料塊會變得更大,在Reduce端合并資料塊的次數會變少,讓整個查詢變得更快。
(二)性能提升
目前,Cosco為了保證容錯性,每一份Shuffle資料在寫入持久化的檔案之前,會在不同節點的Shuffle Service儲存兩份資料。如果我們引入閃存之後,因為閃存具有不易失性,這樣子在Shuffle Service在恢複之後可以從閃存恢複資料,減少了拷貝的副本。另外,在引入閃存之後,資料塊變得更大,在整個DFS上的讀寫也會更加高效。
三、性能評估技術
在上述的優化過程中,主要有如下四種類型的評估技術:
- Discrete event simulation
- Synthetic load generation on a test cluster
- Shadow testing on a test cluster
- Special canary in a production cluster
(1)Discrete event simulation
Discrete event simulation,也就是離散時間模拟的方法,是一種比較通用的評估方法。我們把每個Shuffle Data到達閃存的行為作為一個離散事件,記錄其到達的時間、此時閃存中寫入的資料總量以及最後閃存被Flush到DFS檔案的資料總量。最終我們會得到如下圖所示統計表,包含了最終資料塊的大小和緩存的時間,由此我們就可以推算出資料塊的加載速率,也就是對應Partition的加載速率,并且把這個速率應用于上文中講到的第二種優化技術來進行決策。
(2)Special canary in a production cluster
如果我們要在一個生産環境中來驗證我們所進行的優化是否有效是比較困難的,因為在Cosco中一個Task可以與多個Shuffle Service進行通信,是以很難确定是因為加入了閃存提升了性能還是因為其他原因而提升。是以,我們将整個生産叢集分成兩個互不幹擾的子叢集,然後進行對比試驗,比如對子叢集A增加閃存優化,而子叢集B保持原來的部署模式。之後,我們再對兩個子叢集進行評估,就可以得知增加閃存優化是否起到了優化效果。
關鍵詞:Cosco、Shuffle、FLash閃存、Spark、緩存優化、負載均衡
擷取更多 Spark+AI SUMMIT 精彩演講視訊回放,立刻點選前往:
>>SPARK + AI SUMMIT 2020 中文精華版線上峰會 7月4日第一場<< >>SPARK + AI SUMMIT 2020 中文精華版線上峰會 7月5日第二場<<