天天看點

SQL 查詢的分布式執行與排程

作者:張茄子,算法、 分布式技術和函數式程式設計愛好者

OLAP 是大資料分析應用非常重要的組成部分。這篇文章是介紹 OLAP 任務在并發/分布式環境下執行和排程的算法和模型的。我們将從最簡單的 Volcano 模型開始講起,逐漸引出分布式環境下執行 OLAP 查詢操作的一些挑戰和經典的解決方案。

這些算法和模型将主要在 SQL 和關系模型的語境之内讨論, Spark 和 Flink 這類基于 DAG 的處理系統内也有很多相似的概念,在本文中将不會贅述。

基礎模型

Volcano 模型

《SQL 查詢優化原理與 Volcano Optimizer 介紹》

中,我們已經對以關系代數為基礎的 SQL 查詢優化算法進行了介紹,本文的很多内容也将建立在前文内容的基礎之上。首先我們來介紹在單線程執行環境下廣為人知的經典模型——Volcano 模型。(值得注意的是,這裡的 Volcano 模型指的是查詢的執行模型,和前文的優化器模型并非同一事物。)

Volcano 模型又叫疊代器模型,其基本思路十分簡單:将關系代數當中的每一個算子抽象成一個疊代器。每個疊代器都帶有一個 next 方法。每次調用這個方法将會傳回這個算子的産生的一行資料(或者說一個 Tuple)。程式通過在 SQL 的計算樹的根節點不斷地調用 next方法來獲得整個查詢的全部結果。比如 SELECT col1 FROM table1 WHERE col2 > 0;這條 SQL 就可以被翻譯成由三個算子組成的計算樹。如下圖所示,我們也可以使用僞代碼将這三個算子的執行邏輯表示出來。

SQL 查詢的分布式執行與排程

可以看到,Volcano 模型是十分簡單的,而且他對每個算子的接口都進行了一緻性的封裝。也就是說,從父節點來看,子節點具體是什麼類型的算子并不重要,隻需要能源源不斷地從子節點的算子中 Fetch 到資料行就可以。這樣的特性也給優化器從外部調整執行樹而不改變計算結果創造了友善。為了友善分析上述計算方案的調用順序和時間花費,我們将一個 next 函數的調用分為三部分:調用部分(Call)、執行部分(Execution)和傳回部分(Return)。下圖描繪了之前的計算樹的執行過程:

SQL 查詢的分布式執行與排程

如上圖所示,綠色方塊代表對 next 方法的調用,黃色方塊代表對應算子的執行内容,紅色方塊代表算子的資料傳回。可以看到,上述計算樹的執行,是通過不斷地重複對 PROJECT 算子的調用開始的。當 PROJECT 算子被調用後,他緊接着要先調用 FILTER 算子的 next 方法,而 FILTER 算子又會進一步調用 SCAN 算子。是以,最先執行計算任務的是 SCAN 算子,在 SCAN 算子傳回一行結果之後,FILTER 算子開始進行處理,直到 PROJECT 算子完成他的任務。

以上隻是 Volcano 模型完成一行操作所進行的任務,一次查詢需要不斷地進行上述操作以處理表中的每一行。

《MonetDB/X100: Hyper-Pipelining Query Execution》

當中的分析表明,這些對 next 方法的調用(綠色部分)本身就占用了大量的執行時間,是以一個很容易想到的優化方案就是在調用 next 方法的時候一次處理一批資料。這種優化方案被稱為向量化(Vectorization)這種向量化的執行方案不但使得函數調用的成本得以被均攤,也對 CPU 的 Cache 更為友好。向量化還為進一步的優化(如 SIMD 指令執行)提供了有利條件。

上述 SQL 和執行方案隻是最簡單的一個案例,接下來讓我們讨論一個稍微複雜一點的例子:

SELECT t1.col1, t1.col2, t2.col2
FROM t1 JOIN t2 ON t1.col1 = t2.col1;           

在這個例子中,有一個 JOIN 算子,是以在生成的執行方案樹當中會有一個分叉。我們假定 t2表的行數遠小于 t1 表,這樣一個合理的執行方案可能是先将 t2 表的所有資料讀出來構造一個 Hash 表,再将 t1 表當中的每一行讀出并通過 Hash 表查詢獲得結果。此時的 Volcano 模型的執行要更複雜些。JOIN 算子為了向上封裝這些細節,需要在内部不斷調用 t2 表的 next 方法直到将其内容全部讀取出來。是以,對于這樣一條 SQL,他的執行樹需要表示成下圖的樣子。

SQL 查詢的分布式執行與排程

如上圖所示,我們現将 t2 的内容讀入 Hash 表,建構 t2.col1 -> t2.col2 的映射。之後再逐一讀入 t1 表的内容,對于 t1 表的每一行,我們使用它的 col1 列在 Hash 表中進行查詢,進而得到 JOIN 之後的結果。

對于這樣的幾個執行方案,他的執行順序圖可以表示成如下情況。可以看到,對 t2 表的掃描完全包含在 JOIN 算子的執行過程當中。

SQL 查詢的分布式執行與排程

可以看到,JOIN 算子的執行分為兩個部分。第一個部分它會在内部不斷調用 t2.next() 函數建構 Hash 表,之後調用 t1 的 SCAN 算子一次以便傳回第一行結果。之後對 JOIN 的調用就隻會讀取 t1 的内容進行并在 Hash 表裡探查(Probe)。這一方式實作的 Hash 表是一個帶有内部狀态的算子,與其他的算子大不相同。而它建構内部 Hash 表的過程可以被認為是一種物化(Materialization)。物化這一概念在之後讨論并發執行的時候也十分重要。

Push 模型(Bottom to Top 模型 )

除了上述 Volcano/疊代器模型,還有一種反向的調用模型也十分流行,那就是 Push 模型。Push 模型簡單來說就是由排程器先分析執行樹,然後從樹的葉子節點開始執行,在執行之後,由子節點通知父節點執行操作。Push 模型的時間順序圖如下:

SQL 查詢的分布式執行與排程

Push 模型的執行看起來更為直覺,但是由于控制流是反轉的,一般實作起來會比較繁瑣。相比起來 Volcano 模型更易于操作,比如在終止查詢的時候,Volcano 模型隻要停止從根節點繼續疊代即可。但是 Push 模型相對單純的 Volcano 模型也有很多優點:由于子算子産生的結果會直接 Push 給父算子進行操作,Push 模型的 Context switch 相對較少,對 CPU Cache 的友好性也更強。

可以看到,無論是 Volcano 還是 Push 模型,執行查詢的步驟都是排程問題。在下文,我們将會先以 Volcano 模型為例子,介紹其并發執行的解決方案。

并行模型

SQL 執行樹的并發執行一般有兩種類型:算子内部并行和算子間并行。這兩種并發模型往往被組合使用。所謂算子内部并行,是指我們将資料進行分區,是以一個算子可以同時工作在不同的分區上,進而加快查詢執行。算子間并行是指不同的算子(尤其是父子算子)同時在不同的 CPU 核心上運作,算子間通過通訊得以傳遞資料。算子間并行常見于流處理系統。

下圖是上述簡單 SQL 查詢在算子内并行和算子間并行的示意圖。

SQL 查詢的分布式執行與排程

在示意圖左邊表示的是算子内部并行,這意味着每個算子處理的都是資料的不同分區(Partition),在右邊表示的是算子間并行,這意味着每個算子都會處理所有的資料,隻是下遊的算子無需等待上遊的算子結束任務——他可以持續地處理自己已經接收到的資料。

對于算子間并行來說,其執行樹和在 Volcano 模型下的實作并沒有特别大的差別,隻是各個算子在調用 next 方法時可能會被阻塞。在 Push 模型下,它們也可以以類似 Actor Model 那樣,為每個算子提供一個信箱或者 Buffer 以儲存接收到的資料。

對于算子内部并行來說,如果隻是向上述圖示的簡單查詢,每個分片都可以被獨立的處理而不同的 CPU 不需要任何資料交換。但是對于我們讨論的第二條帶有 JOIN 語句的 SQL,一個算子内部并行的不同部分也需要進行資料交換。也就是說,有時我們可能需要進行 Shuffle 操作。

分布式/并行執行的 Shuffle 操作

在分布式/并行執行環境下需要執行 Shuffle 的原因有很多。其中一種情況是因為性能或者儲存能力的原因,我們無法使用一個全局的 Hash 表,而必須使用分布式 Hash 表。分布式 Hash 表在每個 CPU 核心(或計算機節點)上隻處理 Hash 表的某一分區。如果在 CPU 的另一個核心(或另一個計算機節點)上有這一個分區的資料,則必須将資料發送到這一分區進行處理。

SQL 查詢的分布式執行與排程

在上文所提到的帶有 JOIN 的 SQL 查詢中,JOIN 算子恰好需要建構一個 Hash 表。是以它也需要能夠 Shuffle 資料。然而,這裡就帶來一個難點:我們并不想為并行執行的 JOIN 算子編寫額外的代碼,有沒有辦法讓每個算子都以單線程模式執行,使得 Shuffle 操作對這些算子透明呢?

換句話說,我們仍然隻需要以單線程模式實作每個算子(而不需要大幅修改它們),每個算子可以被自然地并發排程起來,不需要管理資料的 Shuffle 。這樣執行器的實作就變的簡單起來——我們可以基于單線程的算子來實作并行。

Volcano 模型已經提供了這一問題的解決方案:EXCHANGE 算子

EXCHANGE 算子

EXCHANGE 算子是用來為其他算子實作 Shuffle 功能的算子,它相當于将 Shuffle 的功能抽取出來作為一個獨立的子產品。在進行并行計算時,執行器會在執行方案樹的合适位置插入 EXCHANGE 算子,然後将這一執行方案在資料的不同分區當中運作起來,每個分區的執行可以在一個不同的 CPU 核心(或計算節點)上。同一個 EXCHANGE 算子在不同核心(節點)上的執行個體會互相交換資料,保證上層算子可以透明地通過 next 調用得到正确的分區内的資料。

下圖是上述複雜 SQL 在插入 EXCHANGE 算子并進行并行執行後的執行方案示意圖。

SQL 查詢的分布式執行與排程

如圖所示,我們在 t2 表的 SCAN 算子上方添加了一個 EXCHANGE 算子。這樣,當每個 CPU 上運作的 JOIN 算子從這裡執行 next 操作時,隻會取到對應分區的資料行。如果 EXCHANGE 算子從下面的 SCAN 算子取到了其他分區的資料行,它會将其發送給對應 CPU 核心(或節點)上的 EXCHANGE 算子執行個體。是以,所有的其他算子都不需要對資料的分布和 Shuffle 操作有任何了解。

值得注意的是,我們在這裡隻為 t2 表添加了 EXCHANGE 算子而不是給所有算子都添加了 EXCHANGE,這是因為,我們可以使用 t1 表原本的分區來決定 JOIN 算子中建構的 Hash 表的分區(這裡隐含了 t1 恰巧使用 Join Key 進行了分區),是以在對 t1 表進行 Probe 的時候,是不需要通過 EXCHANGE 算子來交換資料的。這一優化可以直接通過靜态分析而實作。

Pipeline(處理管線)

前文提到 Push 模型的 Context Switch 較少,在 Volcano 模型下,當一連串算子互相之間都不需要交換資料,我們可以使用資料管線技術來實作相似的目的。資料管線技術将一連串算子使用Operator Fusion 技術合并成一個算子那樣進行調用,比如說,對于前文簡單 SQL 的僞代碼,使用處理管線合并之後的執行代碼将會更加簡潔。Pipeline 也常常配合代碼編譯技術使用,以極大地加速查詢的執行。

SQL 查詢的分布式執行與排程

值得注意的是,對于我們的第二個 SQL 查詢,雖然有 EXCHANGE 算子在其中,導緻右邊的 t2表的 SCAN 算子無法管線化,但是左邊 t1 表、JOIN 和 PROJECT 三個算子是可以組成一個 Pipeline 的。

SQL 查詢的分布式執行與排程

并行執行面臨的挑戰

前文提到了對 SQL 執行方案提到了對資料進行分割的兩種方法:對 SCAN 輸入的資料進行分區和在處理時對資料進行分段(向量化執行)。我們可以認為這兩種分割方法前者是橫向(Horizontal)切分,後者是縱向(Vertical)切分。如下圖所示:

SQL 查詢的分布式執行與排程

在啟用資料分區來實作并行處理後,我們的分布式/并行任務執行就會面臨如下的一些挑戰:

  • 資料傾斜:資料傾斜常見的情況有兩種:1)對輸入的資料進行分區時,不同分區資料量差別很大;2)經過一些計算(如 FILTER 操作)之後不同分區保留下來的資料量差別很大。由于并行處理任務結束的時間取決于最慢的任務,是以資料傾斜對執行性能的影響很大。一般來說,第一種資料傾斜的情況較為容易處理,我們可以通過再平衡和換用更好的分區方法來解決。第二種資料傾斜就比較難預測和處理了。
  • 處理速度傾斜:除了資料傾斜之外,還有一種非常影響性能的傾斜是處理速度傾斜。這種傾斜是指不同的處理器核心(或節點)處理同樣資料量所花費的時間不同。它常常出現在某些處理器核心(或節點)因環境幹擾、任務排程、阻塞、錯誤和失敗等原因減慢甚至中止響應的情況下。處理速度傾斜受環境因素影響大,很難發現和優化。
  • Data Locality:當一個算子完成它所進行的計算并将結果傳遞給下一個算子時,我們往往希望下一個算子被排程在同一個 CPU 核心(或節點)上。這是因為核心間的記憶體交換或節點間的網絡傳輸是一個非常耗時的操作。有時我們必須在 Data Locality 和資料/處理速度傾斜之間進行取舍,這對排程算法的設計帶來了嚴峻的挑戰。

為了解決上述三個難點并在排程算法設計時對其進行取舍,學術界和工業界做出了不懈的努力并産生了很多好的論文和實踐。接下來我們将會看到一些經典的方案和最新的優化方向。

經典模型

NUMA 架構

在介紹解決 OLAP 并行執行的經典模型之前,我們先介紹對我們所面臨問題的一個抽象模組化 NUMA。NUMA 是 Non-Uniform Memory Access 的縮寫。它指的是在較新生産的多核 CPU 中,不同 CPU 核心通路不同記憶體位置的速度不同的現象。

為什麼會有這種現象呢?在過去,CPU 隻有一個記憶體總線,所有的 CPU 核心通路記憶體時,都通過這個總線進行,是以每個 CPU 通路記憶體位置的速度都是相同的。這種通路模式被稱為 UMA (Uniform Memory Access)。但是使用一個總線阻止了 CPU 對記憶體的并發通路,為了增加記憶體通路性能,新的 CPU 開始加入多個可以同時獨立通路不同記憶體條的 Socket,并将相鄰的 CPU 核心連接配接在這些 Socket 上。也就是說,一塊 CPU 上的核心被組織成不同的分區,每個分區有一個 Socket 可以直接通路一塊記憶體條,為了能在這些 CPU 分區之間共享記憶體,它們之間也建立了記憶體通路通路。下圖左邊部分表示了這種架構。

SQL 查詢的分布式執行與排程

顯然,一個 CPU 核心如果要通路另一個 CPU 管理的記憶體,就需要通過它們之間的聯絡通路進行一次跳躍,這種跳躍本身将會花費一定的時間。在現代作業系統(如 Linux 等)中,進行記憶體配置設定和程序排程時已經考慮了這種情況,是以會按照一定的算法對任務和資料進行配置設定。然而,由于作業系統無法具體确知程序的具體邏輯,這種算法對于資料庫和 OLAP 應用往往并不理想。是以很多資料庫自己實作了相關排程功能。

在上圖的右側是一個基于網絡的處理叢集的示意圖,我們可以看到,通過網絡執行任務的叢集和帶有 NUMA 屬性的 CPU 有相似之處,如果在一個節點當中執行計算得到的計算結果需要被其他節點通路,也需要在節點之間進行一次傳輸操作。而每個節點自身也擁有一個類似記憶體的儲存空間(硬碟)。這兩種架構雖然有不同的規模,其結構卻是雷同的。是以很多時候我們也可以把應用于 NUMA CPU 的并行排程算法應用于分布式系統當中。

Morsel-Driven Parallelism

接下來我們來介紹在 NUMA-aware 查詢執行方面非常經典的論文

《Morsel-Driven Parallelism: A NUMA-Aware Query Evaluation Framework for the Many-Core Age》

提出的模型。這一篇論文以 HyPer 系統為基礎,主要有以下幾個特點:

  • 使用 Pipeline 技術組合算子
  • 使用自底向上的 Push 模型排程任務。當一個任務執行結束時,它會通知排程器将後序任務加入到任務隊列中
  • 既使用水準資料分區,也使用垂直資料分區,每個資料塊的機關被稱為 Morsel。一個Morsel 大約包含10000行資料。查詢任務的執行機關是處理一個 Morsel
  • NUMA-aware,為了實作 Data Locality,一個核心上執行的任務,由于其産出結果都儲存在目前核心的 Cache 或 Memory 裡,是以會優先将這個任務産生的後序任務排程在同一個核心上。這樣就避免了在核心間進行資料通信的開銷。
  • 使用 Work-stealing 實作彈性伸縮和任務負載均衡,以緩解資料傾斜和處理速度傾斜帶來的性能瓶頸。也就是說,當一個核心空閑時,它有能力從其他核心“偷取”一個任務來執行,這雖然有時會增加一個資料傳輸的開銷,但是卻緩解了忙碌核心上任務的堆積,總體來說将會加快任務的執行。
  • 使用 Delay Scheduling 防止過于頻繁的 Work stealing。在核心空閑并可以偷取任務時,排程器并非立即滿足空閑核心的要求,而是讓它稍稍等待一段時間。在這段時間裡,也許忙碌核心就可以完成自己的任務,而跨核心排程任務就可以被避免。令人驚訝的是,這種簡單的處理方式在實際應用中效果非常好。

在這一模型中,Pipeline 執行、以 Morsel 為機關進行資料切分和放置以及 Push 模型的任務排程前文都有涉及,也不難了解。在此特别介紹一下這一模型使用的 NUMA-aware、Work-stealing 和 Delay Scheduling 算法。

在上述論文中,NUMA-aware/Data Locality 的實作是基于一個全局的任務隊列。這一任務隊列使用

無鎖資料結構

實作,是以可以被各個核心上的不同線程高效地通路和修改。每個被添加的到隊列中的任務與各個核心之間有不同的親近(Affinity)值。一個任務如果在某一個核心上執行,那麼他産生的後序任務和這個核心就都具有較高的親近值,是以當此核心空閑時,新的任務就非常可能會被排程到它上。這樣就實作了對 Data Locality 的滿足。

進一步,由于可能出現資料傾斜和處理速度傾斜,嚴格靜态的滿足 Data Locality 的要求可能不是最佳的解決方案。是以論文提出了使用 Work-stealing 技術進行負載均衡的方法。

熟悉 Java 并發程式設計的朋友可能對

WorkStealingPool

很熟悉,在這種 Executor 當中,空閑的 Thread 可以從其他 Thread “偷竊”一個任務來執行,這樣各個線程的負載就會越來越均衡。在 HyPer 中,這是通過空閑核心上的線程從全局隊列當中偷取與其他核心親近值較高的任務來實作的。

為了防止偷取過于頻繁得發生,HyPer 還引入了 Delay Schedule 的概念,也就是稍稍等待任務原本應該執行的核心一小會,期待原核心上的任務在這段時間内就可以完成,進而避免偷竊任務帶來的資料傳輸開銷。這一方法并非由 HyPer 首創,實際上,早在 2010 年由 Spark 主要作者 Matei 發表的

《Delay Scheduling: A Simple Technique for Achieving Locality and Fairness in Cluster Scheduling》

就介紹了這種方法。它不光應用于 NUMA-aware 的資料庫系統中,還廣泛應用于各種大資料處理系統和任務排程系統(如 Hadoop、YARN 等)。HyPer 的任務排程政策可以由下圖表示。

SQL 查詢的分布式執行與排程

除了上述特性以外,HyPer 還會在每個核心排程兩個線程,以便最好地利用每個核心的 CPU 時間,填充其中一個線程出現 IO 産生的空隙。

除了 HyPer 所實作的 NUMA-aware 模型,SAP HANA 系統也實作了類似的 Morsel-Driven 系統。不同的是,HANA 不是使用所有線程共享的全局隊列,而是為每個線程配置本地的任務隊列。而任務的再平衡也是通過一個獨立運作的 Watch Dog 線程完成的。在這一模型中,Watch Dog 線程可以實作非常豐富的功能,靈活地監控和調配資源。但是任務再平衡的邏輯就比較複雜,沒有 Work-stealing 這麼簡單直覺了。

為了防止某些不适合被放到其他核心的任務被排程走 ,HANA 會為每個線程配置 Hard Queue 和 Soft Queue 兩個隊列。其中,Soft Queue 裡的任務可以被 Watch Dog 線程重新配置到其他核心。

SQL 查詢的分布式執行與排程

分布式和本地并行混合方案

在 2015 年的 VLDB 論文

《High-Speed Query Processing over High-Speed Networks》

裡介紹了 HyPer 系統在高速網絡尤其是 RDMA 系統上實作的進一步優化。在本文中,我們略去其中有關高速網絡應用的部分,僅關注其中有關分布式和本地并行混合模型的優化方案。

所謂分布式與本地并行的混合方案,指的是在分布式系統中,每個節點又都是具備多核計算能力的伺服器。是以,從微觀來說,每個節點本身是一個 NUMA 系統,從宏觀來說,這些節點組成的叢集也是一個 NUMA 系統。

SQL 查詢的分布式執行與排程

這篇文章提出的第一個優化是,對于某些 EXCHANGE 算子,如果我們知道它下面的輸出結果集比較小,我們可以不通過 Shuffle 的方式而是通過 Broadcast 的方式來将其傳遞到各個節點。這也是執行 OLAP 計算的一種常見優化。比方說,在上述 t1 JOIN t2 的 SQL 查詢中,如果 t2 的資料量很小,那麼将其全部物化并直接建構出一個 Hash 表,并傳播給各個節點是一個不錯的選擇,這是因為大部分聚合操作在本地就完成了,避免了多次資料的交換。

這篇論文提出的另一個相關的技術也很簡單。在傳統的模型下,假如我們有 $M$ 台機器,每台機器運作 $N$ 個查詢程序,每個程序裡有一個 EXCHANGE 算子。那麼這些 EXCHANGE 算子之間就有 $(M\times N)\cdot (M\times N - 1)$ 條互相交流的鍊路。也就是說所有這些 EXCHANGE 算子都可能會互相交換資訊,這将産生巨大的連接配接數。如果這些鍊路同時發送資料包,很有可能産生資料湧塞。是以,此論文提議在每台機器上啟動一個 Multiplexer 專門用來管理資料請求。也就是說,同一個節點上的 EXCHANGE 執行個體會先将資料發送到 Multiplexer,而本機内的資料 EXCHANGE 直接通過 Multiplexer 處理而無需使用網絡棧。對于外部的資料請求,Multiplexer 将會進行緩沖 和批量傳送。是以鍊路數減少到 $M\times (M - 1)$ ,獲得了一個量級上的減少。

動态調整資料放置和執行計劃

前面提到,SAP HANA 的 NUMA-aware 模型使用了 Watch Dog 線程。這一實作增加了排程的靈活性,Watch Dog 線程也可以掌握到很多系統的資訊并根據這些資訊動态調整資料放置和資源排程。2019 VLDB 的論文

《Adaptive NUMA-aware data placement and task scheduling for analytical workloads in main-memory column-stores》

就是基于這一優勢所做出的改進。論文中提出了算法以很低的資源消耗獲得較為準确的記憶體和 CPU 使用估量,并根據這個估量和對查詢當中資料放置(資料分區)的了解來實作較為複雜和精準的排程。

這篇論文的一個很重要的貢獻就是發現 Memory 密集的操作(如 SCAN 操作)不适合被 Steal 到其他核心運作。論文中提出的算法可以智能地發現這些 Memory 密集的操作,進而逐漸将其鎖定 在最合适的核心上,以便進一步壓榨 NUMA 系統中的計算資源。

動态調整資料放置和執行計劃可以說是資料庫系統排程的最前沿研究和發展了。其難度和複雜度都相當高。在本文中隻能對其小部分思想進行簡介,有興趣的朋友可以閱讀原論文以獲得更準确的資訊。

總結

本文以 SQL 查詢為基礎,在關系模型的執行方案下讨論了分布式/并行 OLAP 任務執行的基本模型和經典方案,并且涵蓋了一些最新研究(如動态調整技術)的介紹。我們可以看到,對于并行 執行來說,資料的橫向和縱向分割都是必不可少的。對資料進行橫向分區使得我們可以在不同的分區上并行執行任務,将資料在縱向上切分,可以減少方法調用次數、減少 Context Switch 以及為彈性擴充和解決資料傾斜問題提供可能。

EXCHANGE 算子是在 Volcano 模型下實作資料交換(Shuffle)的重要解決方案,它使得其他算子完全可以以單線程模式運作,并将資料交換變為一個透明的操作。EXCHANGE 算子的引入使得傳統關系模型的執行方案可以被優雅地轉換為并行執行方案。

NUMA 模型是分布式/并行執行 OLAP 查詢的一個基礎抽象,它既可以應用于單機多核環境,也可以應用于多機叢集環境。我們介紹了兩種 NUMA-aware 的經典模型,它們一般使用 Work-stealing/relocation 方法來處理資料失衡和處理速度失衡,同時使用 Delay Scheduling 來防止過于頻繁的任務偷竊或交換。

最後我們提到了一些比較新和複雜的實作,如動态調整資料放置和執行計劃等。這些嶄新的研究可以進一步壓榨計算資源,獲得更高的執行性能。

在本文中,沒有詳細介紹一種很簡單的優化方式,那就是慢任務異地重試機制。有時因為節點失敗或者網絡阻塞等原因,一個查詢的分布式任務中會有一小部分執行非常慢,而整體的查詢速度則受限于這些查詢。這類任務被稱為尾部(Tail)任務。分布式系統業界的知名大佬 Jeff Dean 在其

《The tail at scale》

一文中詳細介紹了這種情況和解決方案。

從上述 Work-stealing、Delay Scheduling 和慢任務異地重試可以看出,很多分布式系統當中的棘手問題都可以使用十分簡單的解決辦法獲得不錯的效果。讓人不禁感慨系統設計有時真是大道至簡。

阿裡巴巴開源大資料技術團隊成立Apache Spark中國技術社群,定期推送精彩案例,技術專家直播,問答區近萬人Spark技術同學線上提問答疑,隻為營造純粹的Spark氛圍,歡迎釘釘掃碼加入!

SQL 查詢的分布式執行與排程

對開源大資料和感興趣的同學可以加小編微信(下圖二維碼,備注“進群”)進入技術交流微信群。

SQL 查詢的分布式執行與排程