天天看點

降本增效利器!趣頭條Spark Remote Shuffle Service最佳實踐

  • 王振華,趣頭條大資料總監,趣頭條大資料負責人
  • 曹佳清,趣頭條大資料離線團隊進階研發工程師,曾就職于餓了麼大資料INF團隊負責存儲層和計算層元件研發,目前負責趣頭條大資料計算層元件Spark的建設
  • 範振,花名辰繁,阿裡雲計算平台EMR進階技術專家,目前主要關注開源大資料技術以及雲原生技術。

1. 業務場景與現狀

趣頭條是一家依賴大資料的科技公司,在2018-2019年經曆了業務的高速發展,主App和其他創新App的日活增加了10倍以上,相應的大資料系統也從最初的100台機器增加到了1000台以上規模。多個業務線依賴于大資料平台展開業務,大資料系統的高效和穩定成了公司業務發展的基石,在大資料的架構上我們使用了業界成熟的方案,存儲建構在HDFS上、計算資源排程依賴Yarn、表中繼資料使用Hive管理、用Spark進行計算,具體如圖1所示:

降本增效利器!趣頭條Spark Remote Shuffle Service最佳實踐

圖1 趣頭條離線大資料平台架構圖

其中Yarn叢集使用了單一大叢集的方案,HDFS使用了聯邦的方案,同時基于成本因素,HDFS和Yarn服務在ECS上進行了DataNode和NodeManager的混部。

在趣頭條每天有6W+的Spark任務跑在Yarn叢集上,每天新增的Spark任務穩定在100左右,公司的迅速發展要求需求快速實作,積累了很多治理欠債,種種問題表現出來叢集穩定性需要提升,其中Shuffle的穩定性越來越成為叢集的桎梏,亟需解決。

2. 目前大資料平台的挑戰與思考

近半年大資料平台主要的業務名額是降本增效,一方面業務方希望離線平台每天能夠承載更多的作業,另一方面我們自身有降本的需求,如何在降本的前提下支撐更多地業務量對于每個技術人都是非常大地挑戰。熟悉Spark的同學應該非常清楚,在大規模叢集場景下,Spark Shuffle在實作上有比較大的缺陷,展現在以下的幾個方面:

  • Spark Shuffle Fetch過程存在大量的網絡小包,現有的External Shuffle Service設計并沒有非常細緻的處理這些RPC請求,大規模場景下會有很多connection reset發生,導緻FetchFailed,進而導緻stage重算。
  • Spark Shuffle Fetch過程存在大量的随機讀,大規模高負載叢集條件下,磁盤IO負載高、CPU滿載時常發生,極容易發生FetchFailed,進而導緻stage重算。
  • 重算過程會放大叢集的繁忙程度,搶占機器資源,導緻惡性循環嚴重,SLA完不成,需要運維人員手動将作業跑在空閑的Label叢集。
  • 計算和Shuffle過程架構不能拆開,不能把Shuffle限定在指定的叢集内,不能利用部分SSD機器。
  • M*N次的shuffle過程:對于10K mapper,5K reducer級别的作業,基本跑不完。
  • NodeManager和Spark Shuffle Service是同一程序,Shuffle過程太重,經常導緻NodeManager重新開機,進而影響Yarn排程穩定性。

以上的這些問題對于Spark研發同學是非常痛苦的,好多作業每天運作時長方差會非常大,而且總有一些無法完成的作業,要麼業務進行拆分,要麼跑到獨有的Yarn叢集中。除了現有面臨的挑戰之外,我們也在積極建構下一代基礎架構設施,随着雲原生Kubernetes概念越來越火,Spark社群也提供了Spark on Kubernetes版本,相比較于Yarn來說,Kubernetes能夠更好的利用雲原生的彈性,提供更加豐富的運維、部署、隔離等特性。但是Spark on Kubernetes目前還存在很多問題沒有解決,包括容器内的Shuffle方式、動态資源排程、排程性能有限等等。我們針對Kubernetes在趣頭條的落地,主要有以下幾個方面的需求:

  • 實時叢集、OLAP叢集和Spark叢集之前都是互相獨立的,怎樣能夠将這些資源形成統一大資料資源池。通過Kubernetes的天生隔離特性,更好的實作離線業務與實時業務混部,達到降本增效目的。
  • 公司的線上業務都運作在Kubernetes叢集中,如何利用線上業務和大資料業務的不同特點進行錯峰排程,達成ECS的總資源量最少。
  • 希望能夠基于Kubernetes來包容線上服務、大資料、AI等基礎架構,做到運維體系統一化。

因為趣頭條的大資料業務目前全都部署在阿裡雲上,阿裡雲EMR團隊和趣頭條的大資料團隊進行了深入技術共創,共同研發了Remote Shuffle Service(以下簡稱RSS),旨在解決Spark on Yarn層面提到的所有問題,并為Spark跑在Kubernetes上提供Shuffle基礎元件。

3. Remote Shuffle Service設計與實作

3.1 Remote Shuffle Service的背景

早在2019年初我們就關注到了社群已經有相應的讨論,如

SPARK-25299

。該Issue主要希望解決的問題是在雲原生環境下,Spark需要将Shuffle資料寫出到遠端的服務中。但是我們經過調研後發現Spark 3.0(之前的master分支)隻支援了部分的接口,而沒有對應的實作。該接口主要希望在現有的Shuffle代碼架構下,将資料寫到遠端服務中。如果基于這種方式實作,比如直接将Shuffle以流的方式寫入到HDFS或者Alluxio等高速記憶體系統,會有相當大的性能開銷,趣頭條也做了一些相應的工作,并進行了部分的Poc,性能與原版Spark Shuffle實作相差特别多,最差性能可下降3倍以上。同時我們也調研了一部分其他公司的實作方案,例如Facebook的Riffle方案以及LinkedIn開源的Magnet,這些實作方案是首先将Shuffle檔案寫到本地,然後在進行Merge或者Upload到遠端的服務上,這和後續我們的Kubernetes架構是不相容的,因為Kubernetes場景下,本地磁盤Hostpath或者LocalPV并不是一個必選項,而且也會存在隔離和權限的問題。

基于上述背景,我們與阿裡雲EMR團隊共同開發了Remote Shuffle Service。RSS可以提供以下的能力,完美的解決了Spark Shuffle面臨的技術挑戰,為我們叢集的穩定性和容器化的落地提供了強有力的保證,主要展現在以下幾個方面:

  • 高性能伺服器的設計思路,不同于Spark原有Shuffle Service,RPC更輕量、通用和穩定。
  • 兩副本機制,能夠保證的Shuffle fetch極小機率(低于0.01%)失敗。
  • 合并shuffle檔案,從M*N次shuffle變成N次shuffle,順序讀HDD磁盤會顯著提升shuffle heavy作業性能。
  • 減少Executor計算時記憶體壓力,避免map過程中Shuffle Spill。
  • 計算與存儲分離架構,可以将Shuffle Service部署到特殊硬體環境中,例如SSD機器,可以保證SLA極高的作業。
  • 完美解決Spark on Kubernetes方案中對于本地磁盤的依賴。

3.2 Remote Shuffle Service的實作

3.2.1 整體設計

Spark RSS架構包含三個角色: Master, Worker, Client。Master和Worker構成服務端,Client以不侵入的方式內建到Spark ShuffleManager裡(RssShuffleManager實作了ShuffleManager接口)。

  • Master的主要職責是資源配置設定與狀态管理。
  • Worker的主要職責是處理和存儲Shuffle資料。
  • Client的主要職責是緩存和推送Shuffle資料。

整體流程如下所示(其中ResourceManager和MetaService是Master的元件),如圖2。

降本增效利器!趣頭條Spark Remote Shuffle Service最佳實踐

圖2 RSS整體架構圖

3.2.2 實作流程

下面重點來講一下實作的流程:

  • RSS采用Push Style的shuffle模式,每個Mapper持有一個按Partition分界的緩存區,Shuffle資料首先寫入緩存區,每當某個Partition的緩存滿了即觸發PushData。
  • Driver先和Master發生StageStart的請求,Master接受到該RPC後,會配置設定對應的Worker Partition并傳回給Driver,Shuffle Client得到這些元資訊後,進行後續的推送資料。
  • Client開始向主副本推送資料。主副本Worker收到請求後,把資料緩存到本地記憶體,同時把該請求以Pipeline的方式轉發給從副本,進而實作了2副本機制。
  • 為了不阻塞PushData的請求,Worker收到PushData請求後會以純異步的方式交由專有的線程池異步處理。根據該Data所屬的Partition拷貝到事先配置設定的buffer裡,若buffer滿了則觸發flush。RSS支援多種存儲後端,包括DFS和Local。若後端是DFS,則主從副本隻有一方會flush,依靠DFS的雙副本保證容錯;若後端是Local,則主從雙方都會flush。
  • 在所有的Mapper都結束後,Driver會觸發StageEnd請求。Master接收到該RPC後,會向所有Worker發送CommitFiles請求,Worker收到後把屬于該Stage buffer裡的資料flush到存儲層,close檔案,并釋放buffer。Master收到所有響應後,記錄每個partition對應的檔案清單。若CommitFiles請求失敗,則Master标記此Stage為DataLost。
  • 在Reduce階段,reduce task首先向Master請求該Partition對應的檔案清單,若傳回碼是DataLost,則觸發Stage重算或直接abort作業。若傳回正常,則直接讀取檔案資料。

總體來講,RSS的設計要點總結為3個層面:

  • 采用PushStyle的方式做shuffle,避免了本地存儲,進而适應了計算存儲分離架構。
  • 按照reduce做聚合,避免了小檔案随機讀寫和小資料量網絡請求。
  • 做了2副本,提高了系統穩定性。

3.2.3 容錯

對于RSS系統,容錯性是至關重要的,我們分為以下幾個次元來實作:

  • PushData失敗
    • 當PushData失敗次數(Worker挂了,網絡繁忙,CPU繁忙等)超過MaxRetry後,Client會給Master發消息請求新的Partition Location,此後本Client都會使用新的Location位址,該階段稱為Revive。
    • 若Revive是因為Client端而非Worker的問題導緻,則會産生同一個Partition資料分布在不同Worker上的情況,Master的Meta元件會正确處理這種情形。
    • 若發生WorkerLost,則會導緻大量PushData同時失敗,此時會有大量同一Partition的Revive請求打到Master。為了避免給同一個Partition配置設定過多的Location,Master保證僅有一個Revive請求真正得到處理,其餘的請求塞到pending queue裡,待Revive處理結束後傳回同一個Location。
  • Worker當機
    • 當發生WorkerLost時,對于該Worker上的副本資料,Master向其peer發送CommitFile的請求,然後清理peer上的buffer。若Commit Files失敗,則記錄該Stage為DataLost;若成功,則後續的PushData通過Revive機制重新申請Location。
  • 資料去重
    • Speculation task和task重算會導緻資料重複。解決辦法是每個PushData的資料片裡編碼了所屬的mapId,attemptId和batchId,并且Master為每個map task記錄成功commit的attemtpId。read端通過attemptId過濾不同的attempt資料,并通過batchId過濾同一個attempt的重複資料。
  • 多副本
    • RSS目前支援DFS和Local兩種存儲後端。
    • 在DFS模式下,ReadPartition失敗會直接導緻Stage重算或abort job。在Local模式,ReadPartition失敗會觸發從peer location讀,若主從都失敗則觸發Stage重算或abort job。

3.2.4 高可用

大家可以看到RSS的設計中Master是一個單點,雖然Master的負載很小,不會輕易地挂掉,但是這對于線上穩定性來說無疑是一個風險點。在項目的最初上線階段,我們希望可以通過SubCluster的方式進行workaround,即通過部署多套RSS來承載不同的業務,這樣即使RSS Master當機,也隻會影響有限的一部分業務。但是随着系統的深入使用,我們決定直面問題,引進高可用Master。主要的實作如下:

  • 首先,Master目前的中繼資料比較多,我們可以将一部分與ApplD+ShuffleId本身相關的中繼資料下沉到Driver的ShuffleManager中,由于中繼資料并不會很多,Driver增加的記憶體開銷非常有限。
  • 另外,關于全局負載均衡的中繼資料和排程相關的中繼資料,我們利用Raft實作了Master元件的高可用,這樣我們通過部署3或5台Master,真正的實作了大規模可擴充的需求。

4. 實際效果與分析

4.1 性能與穩定性

團隊針對TeraSort,TPC-DS以及大量的内部作業進行了測試,在Reduce階段減少了随機讀的開銷,任務的穩定性和性能都有了大幅度提升。

圖3是TeraSort的benchmark,以10T Terasort為例,Shuffle量壓縮後大約5.6T。可以看出該量級的作業在RSS場景下,由于Shuffle read變為順序讀,性能會有大幅提升。

降本增效利器!趣頭條Spark Remote Shuffle Service最佳實踐

圖3 TeraSort性能測試(RSS性能更好)

圖4是一個線上實際脫敏後的Shuffle heavy大作業,之前在混部叢集中很小機率可以跑完,每天任務SLA不能按時達成,分析原因主要是由于大量的FetchFailed導緻stage進行重算。使用RSS之後每天可以穩定的跑完,2.1T的shuffle也不會出現任何FetchFailed的場景。在更大的資料集性能和SLA表現都更為顯著。

降本增效利器!趣頭條Spark Remote Shuffle Service最佳實踐

圖4 實際業務的作業stage圖(使用RSS保障穩定性和性能)

4.2 業務效果

在大資料團隊和阿裡雲EMR團隊的共同努力下,經過近半年的上線、營運RSS,以及和業務部門的長時間測試,業務價值主要展現在以下方面:

  • 降本增效效果明顯,在叢集規模小幅下降的基礎上,支撐了更多的計算任務,TCO成本下降20%。
  • SLA顯著提升,大規模Spark Shuffle任務從跑不完到能跑完,我們能夠将不同SLA級别作業合并到同一叢集,減小叢集節點數量,達到統一管理,縮小成本的目的。原本業務方有一部分SLA比較高的作業在一個獨有的Yarn叢集B中運作,由于主Yarn叢集A的負載非常高,如果跑到叢集A中,會經常的挂掉。利用RSS之後可以放心的将作業跑到主叢集A中,進而釋放掉獨有Yarn叢集B。
  • 作業執行效率顯著提升,跑的慢 -> 跑的快。我們比較了幾個典型的Shuffle heavy作業,一個重要的業務線作業原本需要3小時,RSS版本需要1.6小時。抽取線上5~10個作業,大作業的性能提升相當明顯,不同作業平均下來有30%以上的性能提升,即使是shuffle量不大的作業,由于比較穩定不需要stage重算,長期運作平均時間也會減少10%-20%。
  • 架構靈活性顯著提升,更新為計算與存儲分離架構。Spark在容器中運作的過程中,将RSS作為基礎元件,可以使得Spark容器化能夠大規模的落地,為離線線上統一資源、統一排程打下了基礎。

5. 未來展望

趣頭條大資料平台和阿裡雲EMR團隊後續會繼續保持深入共創,将探索更多的方向。主要有以下的一些思路:

  • RSS存儲能力優化,包括将雲的對象存儲作為存儲後端。
  • RSS多引擎支援,例如MapReduce、Tez等,提升曆史任務執行效率。
  • 加速大資料容器化落地,配合RSS能力,解決K8s排程器性能、排程政策等一系列挑戰。
  • 持續優化成本,配合EMR的彈性伸縮功能,一方面Spark可以使用更多的阿裡雲ECS/ECI搶占式執行個體來進一步壓縮成本,另一方面将已有機器包括阿裡雲ACK,ECI等資源形成統一大池子,将大資料的計算元件和線上業務進行錯峰排程以及混部。

歡迎試用

自建Spark或使用EMR Spark叢集的客戶都可以測試,測試加入釘釘群(如下),并在群内@黯滅 @揚流

降本增效利器!趣頭條Spark Remote Shuffle Service最佳實踐

繼續閱讀