天天看點

Serverless Spark的彈性利器 - EMR Shuffle Service

背景與動機

計算存儲分離下的剛需

計算存儲分離是雲原生的重要特征。通常來講,計算是CPU密集型,存儲是IO密集型,他們對于硬體配置的需求是不同的。在傳統計算存儲混合的架構中,為了兼顧計算和存儲,CPU和儲存設備都不能太差,是以犧牲了靈活性,提高了成本。在計算存儲分離架構中,可以獨立配置計算機型和存儲機型,具有極大的靈活性,進而降低成本。

存儲計算分離是新型的硬體架構,但以往的系統是基于混合架構設計的,必須進行改造才能充分利用分離架構的優勢,甚至不改造的話會報錯,例如很多系統假設本地盤足夠大,而計算節點本地盤很小;再例如有些系統在Locality上的優化在分離架構下不再适用。Spark Shuffle就是一個典型例子。衆所周知,Shuffle的過程如下圖所示。

Serverless Spark的彈性利器 - EMR Shuffle Service

每個mapper把全量shuffle資料按照partitionId排序後寫本地檔案,同時儲存索引檔案記錄每個partition的offset和length。reduce task去所有的map節點拉取屬于自己的shuffle資料。大資料場景T級别的shuffle資料量很常見,這就要求本地磁盤足夠大,導緻了跟計算存儲分離架構的沖突。是以,需要重構傳統的shuffle過程,把shuffle資料解除安裝到存儲節點。

穩定性和性能

除了計算存儲分離架構下的剛需,在傳統的混合架構下,目前的shuffle實作也存在重要缺陷: 大量的随機讀寫和小資料量的網絡傳輸。考慮1000 mapper * 2000 reducer的stage,每個mapper寫128M shuffle資料,則屬于每個reduce的資料量約為64k。從mapper的磁盤角度,每次磁盤IO請求随機讀64K的資料; 從網絡的角度,每次網絡請求傳輸64k的資料:都是非常糟糕的pattern,導緻大量不穩定和性能問題。是以,即使在混合架構下,重構shuffle也是很必要的工作。

EMR Shuffle Service設計

基于以上的動機,阿裡雲EMR團隊設計開發了EMR Shuffle Service服務(以下稱ESS),同時解決了計算存儲分離和混合架構下的shuffle穩定性和性能問題。

整體設計

ESS包含三個主要角色: Master, Worker, Client。其中Master和Worker構成服務端,Client以不侵入方式內建到Spark裡。Master的主要職責是資源配置設定和狀态管理;Worker的主要職責是處理和存儲shuffle資料;Client的主要職責是緩存和推送shuffle資料。整體流程如下所示(其中ResourceManager和MetaService是Master的元件):

Serverless Spark的彈性利器 - EMR Shuffle Service

ESS采用Push Style的shuffle模式,每個Mapper持有一個按Partition分界的緩存區,Shuffle資料首先寫入緩存區,每當某個Partition的緩存滿了即觸發PushData。

在PushData觸發之前Client會檢查本地是否有PartitionLocation資訊,該Location規定了每個Partition的資料應該推送的Worker位址。若不存在,則向Master發起getOrAllocateBuffers請求。Master收到後檢查是否已配置設定,若未配置設定則根據目前資源情況選擇互為主從的兩個Worker并向他們發起AllocateBuffer指令。Worker收到後記錄Meta并配置設定記憶體緩存。Master收到Worker的ack之後把主副本的Location資訊傳回給Client。

Client開始往主副本推送資料。主副本Worker收到請求後,把資料緩存到本地記憶體,同時把該請求以Pipeline的方式轉發給從副本。從副本收到完整資料後立即向主副本發ack,主副本收到ack後立即向Client回複ack。

為了不block PushData的請求,Worker收到PushData請求後會先塞到一個queue裡,由專有的線程池異步處理。根據該Data所屬的Partition拷貝到事先配置設定的buffer裡,若buffer滿了則觸發flush。ESS支援多種存儲後端,包括DFS和local。若後端是DFS,則主從副本隻有一方會flush,依靠DFS的雙副本保證容錯;若後端是Local,則主從雙方都會flush。

在所有的Mapper都結束後,Master會觸發StageEnd事件,向所有Worker發送CommitFiles請求,Worker收到後把屬于該Stage的buffer裡的資料flush到存儲層,close檔案,并釋放buffer。Master收到所有ack後記錄每個partition對應的檔案清單。若CommitFiles請求失敗,則Master标記此Stage為DataLost。

在Reduce階段,reduce task首先向Master請求該Partition對應的檔案清單,若傳回碼是DataLost,則觸發Stage重算或直接abort作業。若傳回正常,則直接讀取檔案資料。

ESS的設計要點,一是采用PushStyle的方式做shuffle,避免了本地存儲,進而适應了計算存儲分離架構;二是按照reduce做了聚合,避免了小檔案随機讀寫和小資料量網絡請求;三是做了兩副本,提高了系統穩定性。

容錯

除了雙副本和DataLost檢測,ESS在容錯上做了很多事情保證正确性。

PushData失敗

當PushData失敗次數(Worker挂了,網絡繁忙,CPU繁忙等)超過MaxRetry後,Client會給Master發消息請求新的Partition Location,此後本Client都會使用新的Location位址。

若Revive是因為Client端而非Worker的問題導緻,則會産生同一個Partition資料分布在不同Worker上的情況,Master的Meta元件會正确處理這種情形。

若發生WorkerLost,則會導緻大量PushData同時失敗,此時會有大量同一Partition的Revive請求打到Master。為了避免給同一個Partition配置設定過多的Location,Master保證僅有一個Revive請求真正得到處理,其餘的請求塞到pending queue裡,待Revive處理結束後傳回同一個Location。

WorkerLost

當發生WorkerLost時,對于該Worker上的副本資料,Master向其peer發送CommitFile的請求,然後清理peer上的buffer。若Commit Files失敗,則記錄該Stage為DataLost;若成功,則後續的PushData通過Revive機制重新申請Location。

資料備援

Speculation task和task重算會導緻資料重複。解決辦法是每個PushData的資料片裡encode了所屬的mapId,attemptId和batchId,并且Master為每個map task記錄成功commit的attemtpId。read端通過attemptId過濾不同的attempt資料,并通過batchId過濾同一個attempt的重複資料。

ReadPartition失敗

在DFS模式下,ReadPartition失敗會直接導緻Stage重算或abort job。在Local模式,ReadPartition失敗會觸發從peer location讀,若主從都失敗則觸發Stage重算或abort job。

多backend支援

ESS目前支援DFS和Local兩種存儲後端。

跟Spark內建

ESS以不侵入Spark代碼的方式跟Spark內建,使用者隻需把我們提供的Shuffle Client jar包配置到driver和client的classpath裡,并加入以下配置即可切換到ESS方式:

spark.shuffle.manager=org.apache.spark.shuffle.ess.EssShuffleManager           

監控報警

我們對ESS服務端進行了較為詳盡的監控報警并對接了Prometheus和Grafana,如下所示:

Serverless Spark的彈性利器 - EMR Shuffle Service

性能數字

TeraSort的性能數字如下(2T, 4T, 10T規模):

Serverless Spark的彈性利器 - EMR Shuffle Service

10T規模TPC-DS的性能數字如下:

Serverless Spark的彈性利器 - EMR Shuffle Service

後續

我們後續會持續投入,集中在産品化、極緻性能、池化等方向,歡迎大家使用!

更多資料湖技術相關的文章請點選:

阿裡雲重磅釋出雲原生資料湖體系

更多資料湖相關資訊交流請加入阿裡巴巴資料湖技術釘釘群

Serverless Spark的彈性利器 - EMR Shuffle Service