在深入研究Presto查詢規劃器和基于成本的優化如何工作之前,讓我們先建立一個查詢,并針對這個查詢進行分析,以幫助了解查詢規劃的過程。
執行個體使用了TPC-H資料集,目的是彙總每個nation的所有order的totalprice值并列出排名前五的。
-- 執行個體一:
SELECT
(SELECT name FROM region r WHERE regionkey = n.regionkey) AS region_name,
n.name AS nation_name,
sum(totalprice) orders_sum
FROM nation n, orders o, customer c
WHERE n.nationkey = c.nationkey
AND c.custkey = o.custkey
GROUP BY n.nationkey, regionkey, n.name
ORDER BY orders_sum DESC
LIMIT 5;
如上SQL所示:子查詢的目的是從region表中提取region_name
一、Parsing and Analysis
在計劃執行之前,需要對其進行轉化和分析,Presto首先會根據文法規則校驗SQL文本,之後就是對查詢進行分析:
1.1、确認查詢中的Tables
Presto中的表是根據catalogs+Schemas進行組織的,二者確定唯一,是以不同的表Schema下可以具有相同名稱的表,例如,TPC-H資料中就有多個表名為orders的表,但是他們在不同的Schema下面,如:sf10.orders 以及 sf100.orders
1.2、辨別查詢中使用的colums
如SQL中所示,orders.totalprice即明确的引用了order表中的totalprice 列,當SQL中涉及的表裡沒有相同字段時,通常直接寫Column名就可以,Presto Analyzer會自動确定Column來自哪個表。
1.3、确定ROW中Field的引用
單純的給出一個表達式,如:c.bonus;它的含義可能表示c表中的bonus列,但也可能是指一個複雜類型的列的名字為C,且C中一個字段為bonus,如何區分主要由Presto決定,但如果有沖突發生時,優先按第一種情況處理。 解析過程會遵循SQL語言的作用域和可見性規則, 也會收集一些資訊,如辨別符消歧,這些收集到的資訊稍後會在查詢計劃規劃的過程中使用, 這樣Planner 就不需要再次了解了解查詢語言的規則,避免重複工作。
Query Analyzer具有複雜的功能, 它的實作是非常有技術性的。對于使用者來說,隻要使用者輸入的查詢是正确的,那麼Query Analyzer對使用者就是透明的,隻有當查詢違反了SQL文法,超過使用者權限或由于其他原因導緻錯誤時,Query Analyzer才會主動提示使用者;
一旦分析完成,處理并解析了查詢中的所有辨別符,Presto進入下一個階段:Query Planning
二、Initial Query Planning
Query Planning可以看做是擷取查詢結果的流程,需要注意的是SQL是一種聲明式的語言,即使用者編寫一個SQL來指定他們希望從系統獲得的資料。 這與指令式程式有很大的不同,指令式程式通常需要指定如何處理資料;而使用SQL時,使用者不指定如何處理資料以獲得結果,這些步驟和順序留給Query Planner和Optimizer**來确定。
這一系列步驟通常稱為Query Plan。理論上,很多的不同的Query Planning可以産生相同的查詢結果,但彼此的性能可能會相差很大,這就是為什麼Presto planner和Optimizer總是試圖确定最優計劃的原因。通常我們将那些可以産生相同執行結果的計劃稱為:equivalent plans
讓我們考慮本文最開始提到的那個SQL,關于這個SQL最簡單的查詢計劃就是按照SQL查詢文法結構進行規劃,如下所示, 執行計劃就是一棵樹,它的執行從葉子節點開始,沿着樹結構向上進行。
- Limit[5]
- Sort[orders_sum DESC]
- LateralJoin[2]
- Aggregate[by nationkey...; orders_sum := sum(totalprice)]
- Filter[c.nationkey = n.nationkey AND c.custkey = o.custkey]
- CrossJoin
- CrossJoin
- TableScan[nation]
- TableScan[orders]
- TableScan[customer]
- EnforceSingleRow[region_name := r.name]
- Filter[r.regionkey = n.regionkey]
- TableScan[region]
查詢計劃的每個元素的具體實作都很簡單,例如:
TableScan:通路表的底層存儲并傳回一個包含該表資料的結果集。
FilTer :會過濾掉資料中的一些行,隻保留滿足條件的行;
CrossJoin :對來自子節點的兩個資料集進行操作, 它将兩個資料集中的每個行進行兩兩組合,也可能将其中一個資料集存儲在記憶體中,這樣就不需要多次通路底層存儲。
最新的Presto版本更改了查詢計劃中操作的命名。例如,TableScan 修改為 ScanProject,而Filter修改為FilterProject,但相應的功能沒有改變。
現在讓我們考慮這個查詢計劃的計算複雜度。在不知道所有表實際資料細節的情況下,我們無法完全把握其複雜性。但是我們可以進行如下的假設:一個查詢計劃節點的複雜度的下限就是他所生成資料的大小,即查詢節點的複雜度與他生成的資料的行數正相關。是以我們使用Big Omega(Ω)來進行描述。如果 N,O,C以及R分别表示 nation,Orders,custoner以及region幾張表裡的行數,我們可以進行如下描述:
- ***TableScan[orders]***讀取order表,傳回了O行資料,是以他的複雜度是:Ω(O)。同理其他兩個TableScans分别傳回N行和C行資料;即Ω(N) 和Ω©
- 在 TableScan[nation]和TableSca[orders]之上的CrossJoin 對來自nation和orders表的資料進行合并,他的複雜度是:Ω(N × O)
- 在上一層的CrossJoin将讀取customer資料的TableScan[Customer]和上一個複雜度為Ω(N × O)的CrossJoin的資料進行合并,複雜度為:Ω(N × O × C).
- 位于底層的TableScan[region]複雜度為:Ω®。但是由于LateralJoin(也就是SELECT中的子查詢)他被調用N次,N就是Aggregate傳回的行數,是以他的複雜度是:Ω(R × N)
- Sort操作需要對N行進行排序是以他花費的時間不能少于 N × log(N)(注:以mergeSort算法複雜度為為準)
如果暫時不考慮其他成本,執行計劃的消耗至少是:Ω[N + O + C + (N × O)+ (N × O × C) + (R × N) + (N × log(N))]
在不知相對表大小的情況下可以将其簡化為 Ω[(N × O × C) + (R × N) + (N × log(N))]
按照一般經驗,region和nation通常很小,如果我們假設,region是最小的表,并且nation是第二小的表,那麼我們可以忽略結果的第二部分和第三部分得到最終結果:Ω(N × O × C)
代數公式講的差不多了,是時候看看這在實踐中意味着什麼了,讓我們舉個例子,一個廣受歡迎的購物網站有來自200個nations的1億使用者,他們總共下了10億份orders。那麼這兩個表的CrossJoin需要(20,000,000,000,000,000,000)行資料。 對于一個健壯的擁有100節點的中等叢集,每個節點每秒處理100萬行, 那麼計算該查詢對應的中間資料将花費63個世紀。
當然,Presto肯定不會去執行這樣一個不切實際的計劃。不過一個幼稚的計劃也有他的作用。這個初版的執行計劃可以作為SQL文法和查詢優化二者之前的橋梁。 查詢優化的作用是将初始計劃轉換為一個與之等效的計劃,且轉化後的計劃可以在Presto叢集資源有限的情況下盡可能快地完成執行任務,至少在合理的時間内完成執行任務。
三、Optimization Rules
接下來讨論一下查詢優化是如何達到這個目标的。
3.1、Predicate Pushdown
Predicate pushdown 即所謂的謂詞下推,他可能是最重要也是最容易了解的優化政策,它的做法是盡可能的将過濾條件靠近資料源,使得在執行查詢之前盡可能的過濾掉無用的資料。針對上面的例子如果應用該優化政策的話,結果如下所示:
之前的執行計劃(一部分:)
...
- Aggregate[by nationkey...; orders_sum := sum(totalprice)]
- Filter[c.nationkey = n.nationkey AND c.custkey = o.custkey] // original filter
- CrossJoin
- CrossJoin
- TableScan[nation]
- TableScan[orders]
- TableScan[customer]
...
優化後的執行計劃:
...
- Aggregate[by nationkey...; orders_sum := sum(totalprice)]
- Filter[c.nationkey = n.nationkey] // transformed simpler filter
- InnerJoin[o.custkey = c.custkey] // added inner join
- CrossJoin
- TableScan[nation]
- TableScan[orders]
- TableScan[customer]
...
即在不改變表關聯前後關系的基礎上,将之前的Filter轉化為更為簡單的Filter 同時将大的CrossJoin轉化為InnerJoin,且前後兩個執行計劃是等效的(即上文提到的equivalent plans),如果假設這樣的JOIN可以在分布式系統中實作,我們依然按照之前的約定:用生成資料的行數表示計算複雜度。那麼結果就是該優化政策将之前複雜度為Ω(N × O × C)的CrossJoin替換成了複雜度為Ω(N × O)的JOIN
如上所示,謂詞下推隻替換可一個CrossJoin,并沒有對nation表和orders表之間的CrossJoin進行替換,主要是因為nation和orders表之間沒有關聯條件,隻能使用CrossJoin,那該如何消除這個CrossJoin呢,這就要使用 Cross Join Elimination 優化政策了。
3.2、Cross Join Elimination
也許有人會疑問,既然nation和orders表之間沒有關聯條件,才導緻兩個表關聯隻能使用CrossJoin,那為什麼上面的執行計劃要先将沒有關聯條件nation和orders表進行關聯?
這主要是因為在沒有基于成本的優化器(即cost-based optimizer下文會講到)時,在ELECT的SQL中,Presto通常按照Table出現的前後順序安排表間的JOIN順序。是以才會出現上面的情況。
事實上,在大部分情況下,CrossJoin都不是必須的,都可以進行優化,因為基本都會對CrossJoin之後的資料進行過濾,隻擷取滿足條件的資料。但CrossJoin代價是很大的,有可能永遠也無法執行完;
Cross Join Elimination的目的就是對表之間的JOIN順序進行重新排列,以減少CrossJoin的數量,最理想的情況是沒有CrossJoin。在不清楚相關表大小的情況下,如果沒有cross join elimination,那就需要使用者在寫SQL時進行控制,注意表的順序。使用cross join elimination前後的結果如下所示:
...
- Aggregate[by nationkey...; orders_sum := sum(totalprice)]
- Filter[c.nationkey = n.nationkey] // filter on nationkey first
- InnerJoin[o.custkey = c.custkey] // then inner join cutkey
- CrossJoin
- TableScan[nation]
- TableScan[orders]
- TableScan[customer]
...
使用cross join elimination之後:即先Join nation和customer,之後在JOIN orders
...
- Aggregate[by nationkey...; orders_sum := sum(totalprice)]
- InnerJoin[c.custkey = o.custkey] // reordered to custkey first
- InnerJoin[n.nationkey = c.nationkey] // then nationkey
- TableScan[nation]
- TableScan[customer]
- TableScan[orders]
...
3.3、TopN
通常情況下,如果SQL中有LIMIT,它的前面也會有Order BY子句;因為如果沒有ORDER 子句,SQL不會保證傳回哪些行。正如文章開頭提到的查詢中我們在LIMIT之前也使用了ORDER BY;
當執行這樣的查詢時,Presto會對所有結果資料進行排序并傳回前幾行資料。這種方法的複雜度為 Ω(row_count × log(row_count)) 同時記憶體占用為 Ω(row_count) ;
然而如果僅僅是為了擷取排序之後的前幾的資料,卻需要保留所有已排序的資料,這是一種浪費。是以一種優化規則是,将後面帶有LIMIT的ORDER BY查詢轉化為TopN, 在查詢執行期間,TopN在堆資料結構中儲存所需的行,流式的讀取資料并更新堆資料。這使得計算複雜度降低到 Ω(row_count × log(limit)) 并且記憶體占用為 Ω(limit) ,總體的查詢成本為: Ω[O + (R × N) + N] 。
3.4、Partial Aggregations
Presto不需要将orders表中的所有行傳遞給join,因為我們對單個訂單不感興趣,我們的示例SQL中是要計算每一個nation的totalprice的彙總,是以可以進行預聚合,如下所示;
...
- Aggregate[by nationkey...; orders_sum := sum(totalprice)]
- InnerJoin[c.custkey = o.custkey]
- InnerJoin[n.nationkey = c.nationkey]
- TableScan[nation]
- TableScan[customer]
- Aggregate[by custkey; totalprice := sum(totalprice)]
- TableScan[orders]
...
我們通過預聚合資料來減少流向下遊的資料量,預聚合的結果是不完整的,但資料量會顯著的的減少,進而提升性能;
為了提高并行性,這種預聚合的實作方式是不同的,該方式被稱為:Partial Aggregations 。 這裡,我們呈現的是簡化的計劃,但是在實際的EXPLAIN計劃中,這與最終的彙總會有所不同
需要注意的是,如上所示的預聚合并彙總是可以實作優化的,但如果預聚合不能減少資料量時,查詢性能将會受到影響。出于該原因, 目前該優化在預設情況下是禁用的,可以通過session中的 push_partial_aggregation_through_join 切換啟用。預設情況下,會将預聚合放在JOIN上以減少Presto中節點間的資料傳輸量, 為了更有效的利用Partial Aggregations的優勢,我們需要充分考慮實際情況。
四、Implementation Rules
到目前為止,我們介紹的規則都是優化規則,這些規則的目标是減少查詢處理時間、減少查詢的記憶體占用或減少通過網絡交換的資料量。但是上面的示例SQL還包含一個我們一直沒有提到的操作:lateral join ;
4.1、Lateral Join Decorrelation
lateral join 類似一個for-each循環,他周遊資料集中的所有行并針對每一行執行相應的查詢,但是Presto并非這樣處理的。相反,Presto會将其轉換為一個left join,用SQL表示如下:
原始的SQL
SELECT
(SELECT name FROM region r WHERE regionkey = n.regionkey) AS region_name,
n.name AS nation_name
FROM nation n
轉化後的SQL:
SELECT
r.name AS region_name,
n.name AS nation_name
FROM nation n
LEFT OUTER JOIN region r ON r.regionkey = n.regionkey
但是需要注意的是,二者并非完全等價的。因為在第一個SQL中,當region表的regionkey存在重複資料時,查詢會出錯(隻有當region表中regionkey字段唯一時才可以有效執行)。但是第二個查詢在此情況下可以正常執行且不會失敗,而是生成多行資料,正因如此lateral join會在轉換時添加兩個額外的條件:首先,他對所有的資料行進行編号,以便于區分;其次,在連接配接之後會檢測是否有重複行,如果存在重複行,那麼查詢将失敗,以保證轉換後的SQL與之前的語義完全一緻。如以下示例中所示:
- TopN[5; orders_sum DESC]
- MarkDistinct & Check
- LeftJoin[n.regionkey = r.regionkey]
- AssignUniqueId
- Aggregate[by nationkey...; orders_sum := sum(totalprice)]
- ...
- TableScan[region]
4.2、Semi-Join (IN) Decorrelation
正如文章開頭提到的SQL所示,可以在查詢中使用子查詢,這樣不僅可以提取所需的資訊(正如我們在 lateral join示例中看到的那樣),還可以使用IN謂詞過濾行。事實上,IN可以在Where子句中使用,也可以在SELECT子句中使用,當你在SELECT中使用IN時,并不是簡單的Boolean值的操作,這與EXISTS有很大的不同,相反,IN可以計算為true、false、或者 null
讓我們考慮這樣一個查詢,他的目的是查找來自同一國家的客戶(customer表)和産品供應商(supplier表)的訂單。SQL如下:
SELECT DISTINCT o.orderkey
FROM lineitem l
JOIN orders o ON o.orderkey = l.orderkey
JOIN customer c ON o.custkey = c.custkey
WHERE c.nationkey IN (
-- subquery invoked multiple times
SELECT s.nationkey
FROM part p
JOIN partsupp ps ON p.partkey = ps.partkey
JOIN supplier s ON ps.suppkey = s.suppkey
WHERE p.partkey = l.partkey
);
與lateral join一樣,這可以通過循環執行子查詢來實作,其中将多次調用子查詢來檢索所有suppliers的國家。
Presto沒有這樣做,相反在Presto的實作中,子查詢隻計算一次,并且将子查詢中的關聯條件去掉,而是通過關聯條件将子查詢與外部查詢進行JOIN。
這種處理實作的難點是不要産生多個結果(這就要使用deduplicating aggregation),并且正确地保留了IN文法的三值邏輯(即IN值可以是true、false、null)。
在這種情況下,deduplicating aggregation使用與JOIN相同的分區,是以可以以流式的執行,無需通過網絡進行資料交換,占用的記憶體最少。