
前言
在"阿裡體量"的大資料生态中,伏羲系統管理着彈内外多個實體叢集,超十萬台實體機, 以及數百萬的 CPU/GPU cores。每天運作在伏羲分布式平台上的作業數已經超過千萬, 是業界少有的,單天處理 EB 級别資料分布式平台。其中單個作業規模已經高達數十萬計算節點,管理着數百億的邊連接配接。在過去的十年中,阿裡集團以及阿裡雲上這樣的作業數目和規模,錘煉了伏羲分布式平台;與此同時,今天平台上作業的日益多樣化,以及向前再發展的需求,對于伏羲系統架構的進一步演化,也都帶來了巨大挑戰與機遇。
一 背景
1 伏羲 DAG/AM 元件
從較高的層面來看整個分布式系統的體系架構,實體叢集之上運作的分布式系統,大概可以分成資源管理,作業分布式排程執行,與多個計算節點的運作這三個層次,如同下圖所示。通常所說的 DAG 元件,指的是每個分布式作業的中心管理點,也就是 application master (AM)。AM 之是以經常被稱為 DAG (Directional Acyclic Graph, 有向無環圖) 元件,是因為 AM 最重要的責任,就是負責協調分布式作業的執行。而現代的分布式系統中的作業執行流程,通常可以通過 DAG 上面的排程以及資料流來描述[1]。相對于傳統的 Map-Reduce[2] 執行模式, DAG 的模型能對分布式作業做更精準的描述,也是當今各種主流大資料系統 (Hadoop 2.0+, SPARK, FLINK, TENSORFLOW 等) 的設計架構基礎,差別隻在于 DAG 的語義是透露給終端使用者,還是計算引擎開發者。
與此同時,從整個分布式系統 stack 來看, AM 肩負着除了運作 DAG 以外更多的責任。作為作業的中心管控節點,向下其負責與 Resource Manager 之間的互動,為分布式作業申請計算資源;向上其負責與計算引擎進行互動,并将收集的資訊回報到 DAG 的執行過程中。作為唯一有能力對每一個分布式作業的執行大局有最精準的了解的元件,在全局上對 DAG 的運作做準确的管控和調整,也是 AM 的重要職責。從上圖描述的分布式系統 stack 圖中,我們也可以很直覺的看出,AM 是系統中唯一需要和幾乎所有分布式元件互動的元件,在作業的運作中起了重要的承上啟下的作用。這一元件之前在伏羲系統中被稱為 JobMaster (JM), 在本文中我們統一用 DAG 或者 AM 來指代。
2 邏輯圖與實體圖
分布式作業的 DAG,有兩種層面上的表述:邏輯圖與實體圖。簡單地來說 (over-simplified),終端使用者平時了解的 DAG 拓撲,大多數情況下描述的是邏輯圖範疇:比如大家平時看到的 logview 圖,雖然裡面包含了一些實體資訊(每個邏輯節點的并發度),但整體上可以認為描述的就是作業執行流程的邏輯圖。
準确一點說:
- 邏輯圖描述了使用者想要實作的資料處理流程,從資料庫/SQL 的角度(其他類型引擎也都有類似之處,比如 TENSORFLOW) 來看,可以大體認為 DAG 的邏輯圖,是對優化器執行計劃的一個延續。
- 實體圖更多描述了執行計劃映射到實體分布式叢集的具體描述,展現的是執行計劃被物化到分布式系統上,具備的一些特性:比如并發度,資料傳輸方式等等。
而每個邏輯圖的"實體化",可以有很多等效方式。選擇合适的方式來将邏輯圖變成實體化執行,并進行靈活的調整,是 DAG 元件的重要職責之一。從上圖的邏輯圖到實體圖的映射可以看到,一個圖的實體化過程,實際上就是在回答一系列圖節點以及各個連接配接邊實體特性的問題,一旦這些問題得到确認,就能得到在分布式系統上實際執行實體圖。
3 為什麼需要 DAG 2.0 架構更新?
作為從阿裡雲飛天系統建立伊始就開始研發的伏羲分布式作業執行架構,DAG 1.0 在過去十年中支撐了阿裡集團的大資料業務,在系統規模以及可靠性等方面都走在了業界領先。另外一方面,作為一個開發了十年的系統,雖然在這個期間不斷的演進,DAG 1.0 在基本架構上秉承了比較明顯的 Map-Reduce 執行架構的一些特點,邏輯圖和實體圖之間沒有清晰的分層,這導緻在這個基本架構上要繼續向前走,支援更多 DAG 執行過程中的動态性,以及同時支援多種計算模式等方面,都比較困難。事實上今天在 MaxCompute SQL 線上,離線作業模式以及準實時作業模式 (smode) 兩種執行模式,使用了兩套完全分開的分布式執行架構,這也導緻對于優化性能和優化系統資源使用之間的取舍,很多情況下隻能走兩個極端,而無法比較好的 tradeoff。
除此之外,随着 MaxCompute 以及 PAI 引擎的更新換代以及新功能演進,上層的分布式計算自身能力在不斷的增強。對于 AM 元件在作業管理,DAG 執行等方面的動态性,靈活性等方面的需求也日益強烈。在這樣的一個大的背景下,為了支撐計算平台下個 10 年的發展,伏羲團隊啟動了 DAG 2.0 的項目,将從代碼和功能方面,完整替代 1.0 的 JobMaster 元件,實作完全的更新換代。在更好的支撐上層計算需求的同時,也同時對接伏羲團隊在 shuffle 服務 (shuffle service) 上的更新,以及 fuxi master (Resource Manager) 的功能更新。與此同時,站在提供企業化服務的角度來看,一個好的分布式執行架構,除了支援阿裡内部極緻的大規模大吞吐作業之外,我們需要支援計算平台的向外走,支援雲上各種規模和計算模式的需求。除了繼續錘煉超大規模的系統擴充能力以外,我們需要降低大資料系統使用的門檻,通過系統本身的智能動态化能力,來提供自适應(各種資料規模以及處理模式)的大資料企業界服務,是 DAG 2.0 在設計架構中考慮的另一重要次元。
二 DAG 2.0 架構以及整體設計
DAG 2.0 項目,在調研了業界各個分布式系統(包括SPARK/FLINK/Dryad/Tez/Tensorlow)DAG 元件之後,參考了 Dryad/Tez 的架構。新一代的架構上,通過邏輯圖和實體圖的清晰分層,可擴充的狀态機管理,插件式的系統管理,以及基于事件驅動的排程政策等基座設計,實作了對計算平台上多種計算模式的統一管理,并更好的提供了作業執行過程中在不同層面上的動态調整能力。
1 作業執行的動态性
傳統的分布式作業執行流程,作業的執行計劃是在送出之前确定的。以 SQL 執行為例,一個 SQL 語句,在經過編譯器和優化器後産生執行圖,并被轉換成分布式系統(伏羲)的執行計劃。
這個作業流程在大資料系統中是比較标準的操作。然而在具體實作中,如果在 DAG 的執行缺乏自适應動态調整能力的話,整個執行計劃都需要事先确定,會使得作業的運作沒有太多動态調整的空間。放在 DAG 的邏輯圖與實體圖的背景中來說,這要求架構在運作作業前,必須事先了解作業邏輯和處理資料各種特性,并能夠準确回答作業運作過程,各個節點和連接配接邊的實體特性問題,來實作邏輯圖往實體圖的轉換。
然而在現實情況中,許多實體特性相關的問題,在作業運作前是無法被感覺的。以資料特性為例,一個分布式作業在運作前,能夠獲得的隻有原始輸入的一些特性(資料量等), 對于一個較深的 DAG 執行而言,這也就意味着隻有根節點的實體計劃(并發度選擇等) 是相對合理的,而下遊的節點和邊的實體特性隻能通過一些特定的規則來猜測。雖然在輸入資料有豐富的 statistics 的前提下,優化器有可能可以将這些 statistics,與執行 plan 中的各個 operator 特性結合起來,進行一些适度的演算:進而推斷在整個執行流程中,每一步産生的中間資料可能符合什麼樣的特性。但這種推斷在實作上,尤其在面對阿裡大體量的實際生産環境中,面臨着巨大的挑戰,例如:
實際輸入資料的 statistics 的缺失
即便是 SQL 作業處理的結構化資料,也無法保證其源表資料特性擁有很好的統計。事實上今天因為資料落盤方式多樣化,以及精細化統計方式的缺失,大部分的源表資料都是沒有完整的 statistics 的。此外對于叢集内部和外部需要處理的非結構化資料,資料的特性的統計更加困難。
分布式作業中存在的大量使用者邏輯黑盒
作為一個通用的大資料處理系統,不可避免的需要支援使用者邏輯在系統中的運作。比如 SQL 中常用的 UDF/UDTF/UDJ/Extractor/Outputer 等等,這些使用 Java/Python 實作的使用者邏輯,計算引擎和分布式系統并無法了解,在整個作業流程中是類似黑盒的存在。以 MaxCompute 為例,線上有超過 20% 的 SQL 作業,尤其是重點基線作業,都包含使用者代碼。這些大量使用者代碼的存在,也造成了優化器在很多情況下無法對中間産出資料的特性進行預判。
優化器預判錯誤代價昂貴
在優化器選擇執行計劃時,會有一些優化方法,在資料符合一定特殊特性的時候,被合理選中能帶來性能優化。但是一旦選擇的前提假設錯誤(比如資料特性不符合預期),會适得其反,甚至帶來嚴重的性能回退或作業失敗。在這種前提下,依據靜态的資訊實作進行過多的預測經常得不到理想的結果。
這種種原因造成的作業運作過程中的非确定性,要求一個好的分布式作業執行系統,需要能夠根據中間運作結果的特點,來進行執行過程中的動态調整。因為隻有在中間資料已經在執行過程中産生後,其資料特性才能被最準确的獲得,動态性的缺失,可能帶來一系列的線上問題,比如:
- 實體資源的浪費:比如計算節點事先選擇的資源類型的不合理,或者大量的計算被消耗用于處理後繼會被丢棄的無效資料。
- 作業的嚴重長尾:比如中間資料分布傾斜或不合理編排,導緻一個 stage 上計算節點需要處理的資料量極端化。
- 作業的不穩定:比如由于優化器靜态計劃的錯判,導緻不合理的執行計劃無法完成。
而 DAG/AM 作為分布式作業唯一的中心節點和排程管控節點,是唯一有能力收集并聚合相關資料資訊,并基于這些資料特性來做作業執行的動态調整,的分布式元件。這包括簡單的實體執行圖調整(比如動态的并發度調整),也包括複雜一點的調整比如對 shuffle 方式和資料編排方式重組。除此以外,資料的不同特點也會帶來邏輯執行圖調整的需求:對于邏輯圖的動态調整,在分布式作業進行中是一個全新的方向,也是我們在 DAG 2.0 裡面探索的新式解決方案。
點,邊,圖的清晰實體邏輯分層,和基于事件的資料收集和排程管理,以及插件式的功能實作,友善了 DAG 2.0 在運作期間的資料收集,以及使用這些資料來系統性地回答,邏輯圖向實體圖轉化過程中需要确定的問題。進而在必要的時候實作實體圖和邏輯圖的雙重動态性,對執行計劃進行合理的調整。在下文中提到幾個落地場景中,我們會進一步舉例說明基于 2.0 的這種強動态性能力,實作更加自适應,更加高效的分布式作業的執行。
2 統一的 AM/DAG 執行架構
DAG 2.0 抽象分層的點,邊,圖架構上,也使其能通過對點和邊上不同實體特性的描述,對接不同的計算模式。業界各種分布式資料處理引擎,包括 SPARK, FLINK, HIVE, SCOPE, TENSORFLOW 等等,其分布式執行架構的本源都可以歸結于 Dryad[1] 提出的 DAG 模型。我們認為對于圖的抽象分層描述,将允許在同一個 DAG 系統中,對于離線/實時/流/漸進計算等多種模型都可以有一個好的描述。在 DAG 2.0 初步落地的過程中,首要目标是在同一套代碼和架構系統上,統一目前伏羲平台上運作的幾種計算模式,包括 MaxCompute 的離線作業,準實時作業,以及 PAI 平台上的 Tensorflow 作業和其他的非 SQL 類作業。對更多新穎計算模式的探索,也會有計劃的分步驟進行。
1)統一的離線作業與準實時作業執行架構
首先我們來看平台上作業數占到絕大多數的 SQL 線離線作業 (batch job) 與準實時作業 (smode)。前面提到過,由于種種曆史原因,之前 MaxCompompute SQL 線的這兩種模式的資源管理和作業執行,是搭建在兩套完全分開的代碼實作上的。這除了導緻兩套代碼和功能無法複用以外,兩種計算模式的非黑即白,使得彼此在資源使用率和執行性能之間無法 tradeoff。而在 2.0 的 DAG 模型上,我們實作了這兩種計算模式比較自然的融合和統一,如下圖所示:
在通過對邏輯節點和邏輯邊上映射不同的實體特性,離線作業和準實時作業都能得到準确的描述:
- 離線作業:每個節點按需去申請資源,一個邏輯節點代表一個排程機關;節點間連接配接邊上傳輸的資料,通過落盤的方式來保證可靠性;
- 準實時作業:整個作業的所有節點都統一在一個排程機關内進行 gang scheduling;節點間連接配接邊上通過網絡/記憶體直連傳輸資料,并利用資料 pipeline來追求最優的性能。
今天線上上,離線模式因為其 on-demand 的資源申請以及中間資料落盤等特點,作業在資源使用率,規模性和穩定性方面都有明顯的優勢。而準實時模式則通過常駐的計算資源池以及 gang scheduling 這種 greedy 資源申請,降低了作業運作過程中的 overhead,并使得資料的 pipelined 傳輸處理成為可能,達到加速作業運作的效果,但其資源使用的特點,也使其無法在廣泛範圍内來支援大規模作業。DAG 2.0 的更新,不僅在同一套架構上統一了這兩種計算模式,更重要的是這種統一的描述方式,使得探索離線作業高資源使用率,以及準實時作業的高性能之間的 tradeoff 成為可能:當排程機關可以自由調整,就可以實作一種全新的混合的計算模式,我們稱之為 Bubble 執行模式。
這種混合 Bubble 模式,使得 DAG 的使用者,也就是上層計算引擎的開發者(比如 MaxCompute 的優化器),能夠結合執行計劃的特點,以及引擎終端使用者對資源使用和性能的敏感度,來靈活選擇在執行計劃中切出 Bubble 子圖。在 Bubble 内部充分利用網絡直連和計算節點預熱等方式提升性能,沒有切入 Bubble 的節點則依然通過傳統離線作業模式運作。回過頭來看,現有的離線作業模式和準實時作業模式,分别可以被描述成 Bubble 執行模式的兩個極端特例,而在統一的新模型之上,計算引擎和執行架構可以在兩個極端之間,根據具體需要,選擇不同的平衡點,典型的幾個應用場景包括:
Greedy Bubble
在可用的資源(叢集規模,quota 等)受限,一個大規模作業無法實作 gang scheduling 時,如果使用者對資源使用率不敏感,唯一的目标是盡快跑完一個大規模作業。這種情況下,可以實作基于可用計算節點數目,實施 greedy 的 bubble 切割的政策, 盡量切出大的 bubble。
Efficient Bubble
在作業的運作過程中,節點間的運算可能存在天然的 barrier (比如 sort 運算, 建 hash 表等等)。如果把兩個通過 barrier 邊連接配接的節點切到一個 bubble 中,雖然作業 e2e 性能上還是會有排程 overhead 降低等帶來的提升,但是因為資料無法完全 pipeline 起來,資源的使用率達不到最高。那麼在對資源的使用率較為敏感時,可以避免 bubble 内部出現 barrier 邊。這同樣是計算引擎可以根據執行計劃做出決定的。
這裡隻列舉了兩個簡單的政策,其中還有更多可以細化以及針對性優化的地方。在不同的場景上,通過 DAG 層面提供的這種靈活按照 bubble 執行計算的能力,允許上層計算可以在不同場景上挑選合适的政策,更好的支援各種不同計算的需求。
2)支援新型計算模式的描述
1.0 的執行架構的底層設計受 Map-Reduce 模式的影響較深,節點之間的邊連接配接,同時混合了排程順序,運作順序,以及資料流動的多種語義。通過一條邊連接配接的兩個節點,下遊節點必須在上遊節點運作結束,退出,并産生資料後才能被排程。這種描述對于新型的一些計算模式并不适用。比如對于 Parameter Server 計算模式,Parameter Server(PS) 與 Worker 在運作過程中有如下特點:
- PS 作為 parameter 的 serving entity, 可以獨立運作。
- Worker 作為 parameter 的 consumer 和 updater, 需要 PS 在運作後才能有效的運作,并且在運作過程中需要和 PS 持續的進行資料互動。
這種運作模式下,PS 和 worker 之間天然存在着排程上的前後依賴關系。但是因為 PS 與 worker 必須同時運作,不存在 PS 先退出 worker 才排程的邏輯。是以在 1.0 架構上, PS 與 worker 隻能作為兩個孤立無聯系的 stage 來分開排程和運作。此外所有 PS 與 worker 之間,也隻能完全通過計算節點間直連通訊,以及在外部 entity (比如 zookeeper 或 nuwa) 協助來進行溝通與協調。這導緻 AM/DAG 作為中心管理節點作用的缺失,作業的管理基本被下放計算引擎上,由計算節點之間自行試圖協調來完成。這種無中心化的管理,對稍微複雜的情況下 (failover 等) 無法很好的處理。
在 DAG 2.0 的架構上,為了更準确的描述節點之間的排程和運作關系,引入并且實作了 concurrent edge 的概念:通過 concurrent edge 連接配接的上下遊節點,在排程上存在先後,但是可以同時運作。而排程的時機也可以靈活配置:可以上下遊同步排程,也可以在上遊運作到一定程度後,通過事件來觸發下遊的排程。在這種靈活的描述能力上, PS 作業可以通過如下這種 DAG 來描述,這不僅使得作業節點間的關系描述更加準确,而且使得 AM 能夠了解作業的拓撲,進行更加有效的作業管理,包括在不同計算節點發生 failover 時不同的處理政策等。
此外,DAG 2.0 新的描述模型,也允許 PAI 平台上的 Tensorflow/PS 作業實作更多的動态優化,并進行新的創新性工作。在上圖的 dynamic PS DAG 中,就引進了一個額外的 control 節點,這一節點可以在作業運作過程中(包括 PS workload 運作之前和之後),對作業的資源申請,并發度等進行動态的調整,確定作業的優化執行。
事實上 concurrent edge 這個概念,描述的是上下遊節點運作/排程時機的實體特性,也是我們在清晰的邏輯實體分層的架構上實作的一個重要擴充。不僅對于 PS 作業模式,在之前描述過的對于通過 bubble 來統一離線與準實時作業計算模式,這個概念也有重要的作用。
三 DAG 2.0 與上層計算引擎的內建
DAG 2.0 作為計算平台的分布式運作基座,它的更新換代,為上層的各種計算引擎提供了更多靈活高效的執行能力,而這些能力的落地,需要通過與具體計算場景的緊密結合來實作。接下來通過 2.0 與上層各個計算引擎(包括 MaxCompute 以及 PAI 平台等)的一些對接場景,具體舉例說明 2.0 新的排程執行架構,如何賦能平台上層的計算與應用。
1 運作過程中的 DAG 動态調整
作為計算平台上的作業大戶,MaxCompute 平台上多種多樣的計算場景,尤其是離線作業中的各種複雜邏輯,為動态圖能力的落地提供了豐富多樣的場景,這裡從動态實體圖和邏輯圖幾個方面讨論幾個例子。
1)動态并發度調整
基于作業運作期間中間資料大小進行動态并發度調整,是 DAG 動态調整中最基本的能力。以傳統 MR 作業為例,對于一個靜态 MR 作業而言,能根據讀取資料量來比較準确判斷 Mapper 的并發,但是對于 Reducer 的并發隻能簡單推測,比如下圖中對于處理 1TB 的 MR 作業而言,送出作業時,隻能根據 Mapper 1000 并發,來猜測給出 500 的 Reducer 并發度,而如果資料在 Mapper 經過大量過濾導緻最終隻産出 10MB 中間資料時,500 并發度 Redcuer 顯然是非常浪費的,動态的 DAG 必須能夠根據實際的 Mapper 産出來進行 Reducer 并發調整(500 -> 1)。
而實際實作中,最簡單的動态調整,會直接按照并發度調整比例來聚合上遊輸出的 partition 資料,如下圖這個并發度從 10 調整到 5 的例子所示,在調整的過程中,可能産生不必要的資料傾斜。
DAG 2.0 基于中間資料的動态并發調整實作,充分考慮了資料 partition 可能存在傾斜的情況,對動态調整的政策進行了優化,使得動态調整的政策後資料的分布更加均勻,可以有效避免由于動态調整可能引入的資料傾斜。
這種最常見下遊并發調整方式是 DAG 2.0 動态實體圖能力的一個直覺展示。在 2.0 中項目中,結合計算引擎的資料處理的特點,還探索了基于源資料的動态并發調整。例如對于最常見的兩個原表資料的 join (M1 join M2 at J), 如果用節點大小來表示其處理資料的的多少,那對于下圖這麼一個例子,M1 處理的是中等的一個資料表(假設 M1 需要并發度為 10),M2 處理的是較大的資料表(并發度為1000),naïve 的執行方式會将按照 10 + 1000 的并發度排程,同時因為 M2 輸出需要全量 shuffle 到 J, J 需要的并發度也會較大 (~1000)。
而實際上,對于這種計算 pattern 而言,M2 需要讀取(并進行處理)的,應該隻有能和 M1 的輸出 join 得上的資料,也就是說在考慮了整體執行 cost 後,在這種 M1 期望的輸出資料要比 M2 小的多的情況下,可以先行排程 M1 完成計算,将 M1 輸出資料的 statistics 在 AM/DAG 端進行聚合,然後隻挑選出 M2 的有效資料進行處理。這裡 "M2 的有效資料"的選擇本質上是一個 predicate push down 的過程,可以由計算引擎的優化器和運作時聯合進行判斷。也就是說,這種情況下 M2 的并發度調整,是和上層計算緊密結合的。
一個最直覺的例子是,如果 M2 是一個 1000 個分區的分區表,并且分區的 key 和 join 的 key 相同,那麼可以隻讀取 M2 能和 M1 輸出 join 上的有效資料的 partition 進行讀取處理。假如 M1 的輸出隻包含了 M2 原表資料的 3 個 partition keys, 那麼在 M2 就隻需要排程 3 個計算節點來處理這 3 個分區的資料。也就是說 M2 的并發度從預設的 1000,可以降低到 3,這在保證同樣的邏輯計算等效性與正确性的前提下,能大大降低計算資源的消耗,并數倍加速作業的運作。這裡的優化來自幾個方面:
- M2 的并發度 (1000 -> 3) 以及處理的資料量大大降低
- M2 需要 shuffle 到 J 的資料量以及 shuffle 需要的計算量大大降低
- J 需要處理的資料量以及其并發度能大大降低
從上圖這個例子中我們也可以看到,為了保證 M1 -> M2 的排程順序上,DAG 中在 M1 和 M2 間引入了一條依賴邊,而這條邊上是沒有資料流動的,是一條隻表示執行先後的依賴邊。這與傳統 MR/DAG 執行架構裡,邊的連接配接與資料流動緊綁定的假設也有不同,是在 DAG 2.0 中對于邊概念的一個拓展之一。
DAG 執行引擎作為底層分布式排程執行架構,其直接的對接"使用者"是上層計算引擎的開發團隊,其更新對于終端使用者除了性能上的提升,直接的體感可能會少一點。這裡我們舉一個終端使用者體感較強的具體例子,來展示 DAG 更加動态的執行能力,能夠給終端使用者帶來的直接好處。就是在 DAG 動态能力的基礎上,實作的 LIMIT 的優化。
對于 SQL 使用者來說,對資料進行一些基本的 at hoc 操作,了解資料表的特性,一個非常常見的操作是 LIMIT,比如:
SELECT * FROM tpch_lineitem WHERE l_orderkey > 0 LIMIT 5;
在分布式執行架構上,這個操作對應的執行計劃,是通過将源表做切分後,然後排程起所需數目的 mapper 去讀取全部資料,再将 mapper 的輸出彙總到 reducer 後去做最後的 LIMIT 截斷操作。假設源表 (這裡的 tpch_lineitem) 是一個很大的表,需要 1000 個 mapper 才能讀取,那麼在整個分布式執行過程中,涉及的排程代價就是要排程 1000 mapper + 1 reducer。這個過程中會有一些上層計算引擎可以優化的地方,比如每個 mapper 可以最多輸出 LIMIT 需要的 record 數目(這裡的 LIMIT 5)提前退出,而不必處理完所有配置設定給它的資料分片等等。但是在一個靜态的執行架構上,為了擷取這樣簡單的資訊,整體 1001 個計算節點的排程無法避免。這給這種 ad hoc query 執行,帶來了巨大的 overhead, 在叢集資源緊張的時候尤其明顯。
DAG 2.0 上, 針對這種 LIMIT 的場景,依托新執行架構的動态能力,實作了一些優化,這主要包括幾方面:
- 上遊 Exponential start:對于這種大機率下上遊 mapper 計算節點不需要全部運作的情況,DAG 架構将對 mapper 進行指數型的分批排程,也就是排程按照 1, 10 ... FULL 的分批執行
- 下遊的 Early scheduling:上遊産生的 record 數目作為執行過程中的統計資料上報給 AM, AM 在判斷上遊已經産生足夠的 record 條數後,則提前排程下遊 reducer 來消費上遊的資料。
- 上遊的 Early termination:下遊 reducer 在判斷最終輸出的 LIMIT 條數已經滿足條件後,直接退出。這時候 AM 可以觸發上遊 mapper 整個邏輯節點的提前退出(在這種情況下,大部分 mapper 可能都還沒有排程起來),整個作業也能提前完成。
這種計算引擎和 DAG 在執行過程中的靈活動态互動,能夠帶來大量的資源節省,以及加速作業的執行。線上下測試和實際上線效果上,基本上絕大多數作業在 mapper 執行完 1 個計算節點後就能提前退出,而無需全量調起 (1000 vs 1)。
下圖是線上下測試中,當 mapper 并發為 4000 時,上述 query 優化前後的差別:
可以看到,執行時間優化後增速了 5X+, 計算資源的消耗更是減小了數百倍。
這個線下測試結果作為比較典型的例子,稍微有些理想化。為了評估真實的效果,在 DAG 2.0 上線後,選取了 LIMIT 優化生效的線上作業,統計了一星期結果如下:這個優化平均為每個作業節省了 (254.5 cores x min CPU + 207.3 GB x min) 的計算資源,同時每個作業上,平均能節省 4349 個(無效)計算節點的排程。
LIMIT 執行上的改進,作為一個針對特殊場景上實作的優化,涉及了整個 DAG 執行不同政策的調整,這種細化的改進能力,能更直覺的展現 DAG 2.0 架構更新諸多好處:靈活的架構使得 DAG 的執行中擁有了更多的動态調整能力,也能和計算引擎在一起進行更多有針對性的優化。
不同情況下的動态并發度調整,以及具體排程執行政策的動态調整,隻是圖的實體特性動态調整的幾個例子。事實上對于實體特性運作時的調整,在 2.0 的架構之上有各種各樣的應用,比如通過動态資料編排/shuffle 來解決各種運作期間的skew問題等,這裡不再做進一步的展開。接下來我們再來看看 DAG 2.0 上對于邏輯圖的動态調整做的一些探索。
2)動态邏輯圖的調整
分布式 SQL 中,map join 是一個比較常見的優化,其實作原理是在 join 的兩個表中,如果有一個超小的表(可以 fit 到單個計算節點的記憶體中),那對于這個超小表可以不做 shuffle,而是直接将其全量資料 broadcast 到每個處理大表的分布式計算節點上。通過在記憶體中直接建立 hash 表,完成 join 操作。map join 優化能大量減少 (大表) shuffle 和排序,非常明顯的提升作業運作性能。但是其局限性也同樣顯著:如果"超小表"實際不小,無法 fit 進單機記憶體,那麼在試圖建立記憶體中的 hash 表時就會因為 OOM 而導緻整個分布式作業的失敗,而需要重跑。是以雖然 map join 在正确使用時,可以帶來較大的性能提升,但實際上優化器在産生 map join 的 plan 時需要偏保守,很多情況下需要使用者顯式的提供 map join hint 來産生這種優化。此外不管是使用者還是優化器的選擇,對于非源表的輸入都無法做很好的判斷,因為中間資料的大小往往需要在作業運作過程中才能準确得知。
而 map join 與預設 join 方式 (sorted merge join) 對應的其實是兩種不同優化器執行計劃,在 DAG 層面,其對應的是兩種不同的邏輯圖。要支援這種運作過程中根據中間資料特性的動态優化,就需要 DAG 架構具備動态邏輯圖的執行能力,這也是在 DAG 2.0 上開發的 conditional join 功能。
如同下圖展示,在對于 join 使用的算法無法被事先确定的時候,允許優化器提供一個 conditional DAG,這樣的 DAG 同時包括使用兩種不同 join 的方式對應的不同執行計劃支路。在實際執行時,AM 根據上遊産出資料量,動态選擇一條支路執行 (plan A or plan B)。這樣子的動态邏輯圖執行流程,能夠保證每次作業運作時都能根據實際作業資料特性,選擇最優的執行計劃。
conditional join 是動态邏輯圖的第一個落地場景,線上上選擇一批适用性作業,動态的 conditional join 相比靜态的執行計劃,整體獲得了将近 3X 的性能提升。
2 混合 Bubble 模式
Bubble 模式是我們在 DAG 2.0 架構上探索的一種全新的作業運作方式,通過對于 bubble 大小以及位置的調整,可以擷取性能和資源使用率的不同 tradeoff 點。這裡通過一些更加直覺的例子,來幫助大家了解 Bubble 執行在分布式作業中的實際應用。
在上圖的 TPCH Q21 上。比如在 Q21 上,我們看到了通過将作業被切分為三個 "bubble",資料能夠有效的在節點之間 pipeline 起來,并且通過熱點節點實作排程的加速。最終消耗的資源數 (cpu * time) 是準實時作業的 35%, 而性能則與一體化排程的準實時作業非常相近 (96%), 比離線作業性能提升 70% 左右。
在标準 TPCH 1TB 全量測試中,混合 bubble 模式展現出了相比離線和準實時的一體化模式 (gang scheduling) 更好的資源/性能 tradeoff。選用 Greedy Bubble(size = 500) 的政策,bubble 相比離線作業性能提升了 2X (資源消耗僅增加 17%,具體數值略)。同時與一體化排程的準實時作業比較,bubble 執行在隻消耗了 40% 不到的資源 (cpu * time) 的前提下,其性能達到了準實時作業的 85% (具體數值略)。可以看到,這種新型的 bubble 執行模式,允許我們在實際應用中擷取很好的性能與資源的平衡,達到系統資源有效的利用。Bubble 執行模式目前正在阿裡集團内部全量上線中,我們在實際線上的作業也看到了與 TPCH 測試非常相似的效果。
如同之前所述,混合 bubble 模式支援了不同切分政策,這裡提供的隻是一種切分政策上的效果。在與上層計算引擎 (e.g., MaxCompute 優化器) 緊密結合時,這種 DAG 分布式排程 bubble 執行的能力,能夠允許我們根據可用資源和作業計算特點,來尋找性能與資源使用率的最佳平衡點。
四 資源的動态配置和動态管理
傳統分布式作業對于每個計算節點需要的資源類型 (CPU/GPU/Memory) 和大小都是預先确定下來的。然而在作業運作之前,對計算節點資源類型和大小的合理選擇,是比較困難的。即便對于計算引擎的開發者,也需要通過一些比較複雜的規則,才能預估出大概合理的配置。而對于需要将這些配置透明給終端使用者的計算模式,終端使用者要做出選擇就更加困難。
在這裡以 PAI 的 Tensorflow (TF) 作業為例,描述 DAG 2.0 的資源動态配置能力,怎樣幫助平台的 TF 作業選擇合理的 GPU 類型資源以及提高 GPU 資源的使用率。相比 CPU 而言,GPU 作為一種較新的計算資源,硬體的更新換代較快,同時普通終端使用者對于其計算特點也相對不了解。是以終端使用者在指定 GPU 資源類型時,經常存在着不合理的情況。與此同時,GPU 線上上又是相對稀缺資源。今天線上上,GPU 申請量經常超過叢集 GPU 總數,導緻使用者需要花很長時間排隊等待資源。而另外一方面,叢集中 GPU 的實際使用率卻偏低,平均隻有 20% 左右。這種申請和實際使用之間存在的 Gap,往往是由于使用者作業配置中,事先指定的 GPU 資源配置不合理造成。
在 DAG 2.0 的架構上,PAI TF GPU 作業 (見 session 2.2.2 的 dynamic PS DAG) 引入了一個額外的"計算控制節點",可以通過運作 PAI 平台的資源預測算法,來判斷目前作業實際需要的 GPU 資源類型,并在必要的時候,通過向 AM 發送動态事件,來請求修改下遊 worker 實際申請的 GPU 類型。這其中資源預測算法,可以是根據算法的類型,資料的特點,以及曆史作業資訊來做 HBO (history based optimization),也可以通過 dry-run 的方法來進行試運作,以此确定合理的資源類型。
具體實作上,這個場景中 control stage 與 Worker 之間通過 concurrent edge 連接配接,這條邊上的排程觸發條件是在 control stage 已經做出資源選擇決定之後,通過其發出的事件來觸發。這樣的作業運作期間的動态資源配置,線上上功能測試中,帶來了 40% 以上的叢集 GPU 使用率提升。
作為實體特性一個重要的次元,對計算節點的資源特性在運作時的動态調整能力,在 PAI 以及 MaxCompute 上都能找到廣泛的應用。以 MaxCompute SQL 為例,對于下遊節點的 CPU/Memory 的大小,可以根據上遊資料的特點進行有效的預判;同時對于系統中發生的 OOM,可以嘗試自動調高 OOM 後重試的計算節點的記憶體申請,避免作業的失敗,等等。這些都是在 DAG 2.0 上新的架構上實作的一些新功能,在這裡不做具體的展開。
五 工程化與上線
作為分布式系統的底座,DAG 本身的動态能力以及靈活度,在與上層計算引擎結合時,能夠支援上層計算實作更加高效準确的執行計劃,在特定場景上實作數倍的性能提升以及對資源使用率的提高。在上文中,也集中介紹了整個 DAG 2.0 項目工作中,開發實作的一些新功能與新的計算模式。除了對接計算引擎來實作更高效的執行計劃,排程本身的靈活性,是 AM/DAG 執行性能的基本素質。DAG 2.0 的排程決策均基于事件驅動架構以及靈活的狀态機設計來實作,在這裡也交出 DAG 2.0 在基本工程素養和性能方面的成績單:
這裡選用了最簡單的 Map-Reduce (MR) 作業為例,對于這種作業,排程執行上并無太多可以取巧的地方,考驗的是排程系統本身的靈活度和整個處理流程中的全面去阻塞能力。這個例子也凸顯了 DAG 2.0 的排程性能優勢,尤其作業規模越大,優勢越發明顯。此外,對于更接近線上的 work-load 的場景,在 TPCDS 标準 benchmark 中,當執行計劃和運作邏輯完全相同時,2.0 (未打開動态執行等功能)的高性能排程也給作業帶來了顯著的提升。
最後,對于一個從頭到尾完整替代原有系統的新一代的全新架構,怎樣無縫對接線上場景,實作大規模的上線,是一個同樣重要(甚至更重要)的話題,也是對一個實際生産系統進行更新,與小範圍的新系統 POC 之間最大的差別。今天的伏羲排程系統,每天支撐着阿裡集團内外大資料計算平台千萬的分布式作業。DAG/AM 這一核心分布式排程執行元件的更新換代,要完整替換線上已經支撐了大資料業務 10 年的分布式生産系統,而不造成現有場景的失敗,這需要的不僅僅是架構和設計上的先進性。如何在"飛行中換引擎", 保質保量的實作系統更新,其挑戰完全不亞于新的系統架構本身的設計。要實作這樣的更新,擁有一個穩固的工程基座,以及測試/釋出架構,都是不可或缺的。沒有這樣子的底座,上層的動态功能與新計算模式,都無從談起。
目前 DAG 2.0 目前已全面覆寫了阿裡集團 MaxCompute 所有線上的 SQL 離線作業和所有準實時作業,以及 PAI 平台的所有 Tensorflow 作業(CPU 和 GPU)+ PyTorch 作業。每天支撐數千萬分布式作業的運作,并經受了 19 年雙11 /雙12 的考驗。在面對兩次大促創曆史記錄的資料洪峰(相比 18 年增長 50%+)壓力下,保障了集團重點基線在大促當天準時産出。與此同時,更多種類型的作業(例如跨叢集複制作業等等)正在遷移到 DAG 2.0 的新架構,并且依托新的架構更新計算作業本身的能力。DAG 2.0 的架構基座的上線,為各條計算線上依托其實作新功能打下了堅實基礎。
六 展望
伏羲 DAG 2.0 核心架構的更新,旨在夯實阿裡計算平台長期發展的基礎,并支援上層計算引擎與分布式排程方面結合,實作各種創新和建立新計算生态。架構的更新本身是向前邁出的重要一步,但也隻是第一步。要支撐企業級的,各種規模,各種模式的全頻譜計算平台,需要将新架構的能力和上層計算引擎,以及伏羲系統其他元件進行深度整合。依托阿裡的應用場景,DAG 2.0 除了在作業規模等方面繼續在業界保持領先之外,架構和功能上也有許多創新, 比如前面我們已經介紹過的:
- 在業界首次在分布式執行架構上,實作了執行過程中邏輯圖和實體圖的雙重動态可調;
- 通過 Bubble 機制實作了混合的計算模式,探索資源使用率和作業性能間的最佳平衡。
除此之外,2.0 更加清晰的系統封層架構帶來的一個重要改變就是能有利于新功能更快速開發,提速平台和引擎向前創新。由于篇幅有限,本文隻能由點及面地介紹一部分新功能與新計算模式,還有許許多多已經實作,或正在開發中的功能,在業界都是全新的探索,暫時不做進一步展開,比如:
- 準實時作業體系架構的整體更新: 資源管理與多作業管理的解耦,支援準實時作業場景上的動态圖功能。
- 常駐的單 container 多 slot 執行的 cache-aware 查詢加速服務 (MaxCompute 短查詢)
- 基于狀态機的作業節點管理以及失敗下的智能重跑機制。
- 動态可定義的 shuffle 方式:通過 recursive shuffle 等方式動态解決線上大規模作業中的 in-cast 問題。
- 基于 adaptive 的中間資料動态切分與聚合,解決實際分布式作業中各種資料傾斜問題
- 支援 PAI TF GPU 作業的多執行計劃選項。
- 通過 DAG 執行過程中與優化器的互動,實作漸進式的互動式動态優化。
- 支援 Imperative 語言特性,通過 DAG 的動态自增長等能力,對接 IF/ELSE/LOOP 等語義。
核心排程底座能力的提升,能夠為上層的各種分布式計算引擎提供真正企業級的服務能力,提供必須的彈藥。而這些計算排程能力提升帶來的紅利,最終會通過 MaxCompute 和 PAI 等引擎,透傳到終端的阿裡雲計算服務的各個企業。在過去的十年,阿裡業務由内向外的驅動,鍛造了業界規模最大的雲上分布式平台。而通過更好服務集團内部以及雲上的企業使用者,我們希望能夠提升平台的企業級服務能力,可以完成由内向外,到由外至内的整個正向循環過程,推動計算系統螺旋式上升的不斷創新,并通過性能/規模,以及智能化自适應能力兩個次元方面的推進,降低分布式計算服務的使用門檻,真正實作大資料的普惠。
免費下載下傳大資料實戰電子書《領軍行業大資料及 AI 實戰》
雲上不同行業企業大資料及 AI 典型場景最佳實踐全揭秘。深度剖析大資料在直播、多媒體、新零售、物聯網、金融科技、社交、家居服務、網際網路、泛娛樂 9 個行業實戰場景,通過企業真實案例,助你速懂企業大資料實踐。
識别下方二維碼立即下載下傳: