01、概述
日前,POLARDB for MySQL 8.0版本重磅推出并行查詢架構,當您打開并行查詢開關後并且查詢資料量到達一定門檻值,就會自動啟動并行查詢架構,進而使查詢耗時指數級下降。
在存儲層将資料分片到不同的線程上,多個線程并行計算,将結果流水線彙總到總線程,最後總線程做些簡單歸并傳回給使用者,提高查詢效率。并行查詢(Parallel Query)利用多核CPU的并行處理能力,以8核 32G 配置為例,示意圖如下所示:

在基于POLARDB 32核256G的環境下,我們進行TPCH 40G的性能測試,設定并行度為32時,TPCH中超過70%的SQL可以得到加速,超過40%的SQL加速比超過10.
圖2: 并行查詢加速比
02、背景
無論是MySQL資深DBA還是MySQL小白使用者,都面臨一個無法解決的難題,随着MySQL資料量的不斷增長,查詢的響應時間都是指數級的增長。
比如最簡單的一個表統計行數語句,當表資料達到6億後,即使在一個32核的空閑MySQL上,查詢依舊耗時100多秒。
圖3: 表統計耗時
為什麼最簡單查詢,當資料量大了過後,就會如此之慢。究其重要的原因之一,就是MySQL一直以來都是一個OLTP的資料庫,追求的是更高的并發,更短的時延。MySQL的指令執行流程如下圖所示:
圖4: MySQL 執行模式
當來了一個請求後,就會配置設定一個線程服務這個請求。在一個單獨的線程中,完成解析,優化,資料讀取,執行等等一些操作,中間無多線程間的切換和同步,尤其是等待。
這種設計,在資料量小的時候,系統的效率是最高的。當高并發時, 系統的平均響應時間是最短的。但這種設計,當系統有一定資源空閑時并且資料量大的時候,隻有一個線程處理大資料的請求,而其他線程卻無法參與其中,無法進行加速。
為了減少查詢耗時,無數DBA絞盡各種腦汁,通過各種分庫分表,增加索引,大幅增加查詢條件等等各種手段,通過這些手段可以減少資料的計算量,進而達到加速查詢, 但都回避了一個問題,如何提高資料庫的計算能力?
在今天的商業資料庫中, 最常用的一種方式,就是利用多線程,對一個大的複雜查詢,同時啟動多個線程進行查詢, 進而達到加速的效果。
03、設計與實作
架構設計
設計思路:在解析完SQL生成執行plan之前,增加一個hook, 進行判斷是否可以并行加速, 如果可以進行并行查詢, 則生成并行執行plan, 否則生成串行執行plan。
在并行執行plan中,啟動2種線程,一種是worker 線程,負責做各種計算,另外一種是leader 線程,負責将worker 線程的計算結果收集起來,然後做一些不能并行的計算,并最終傳回給使用者。如下圖所示:
圖5: 并行執行架構
從架構圖中可以看到,并行查詢主要包含四部分:
【1】Leader線程就是傳統POLARDB的連接配接線程,它負責生成并行查詢計劃,協調并行執行過程的其他元件。
【2】Message queue是leader線程和worker線程的通訊層,worker線程通過message queue向leader線程發送資料,而leader線程也會通過message queue向worker線程發送控制資訊。
【3】Worker線程負責真正的執行任務。Leader線程解析查詢語句生成并行計劃,然後同時啟動多個worker線程進行并行任務處理。worker線程在進行掃描,聚集,排序等操作後将中間結果集傳回給leader,leader負責收集來自worker的所有資料集,然後進行适當的二次處理(比如merge sort,二次group by 等操作),最後将最終結果傳回給用戶端。
【4】Parallel Scan層是存儲引擎要實作并行執行,需要将掃描資料劃分成多個分區。如何能夠讓所有的workers盡可能的均勻的工作是資料分區劃分的目标。
并行執行判斷:
優化器生成串行執行計劃後, 需要做的第一件事情,就是判斷是否需要進行并行執行, 如果可進行并行加速, 我們在串行計劃的基礎上生成并行計劃:
(1)是否打開了強制并行,或者關閉了并行查詢開關。
(2)表資料量是否大與20000行
(3)是否有足夠的資源, 線程池中還有足夠的計算資源, 并且目前記憶體還有一定的盈餘
(4)文法判斷, 目前SQL 文法是否可以支援并行, 比如subquery 目前還無法支援
并行讀取
在并行查詢計算之前, 必須先解決第一個先決條件, 并行掃描, 隻有讓資料并行讀取出來,才能夠并行計算。
是以POLARDB 首先制定一個原則, 将表資料進行分片, 分成很多分片, 一個worker線程一個時刻隻處理一個分片, worker線程和worker線程之間互相獨立, 不會處理相同的分片, 也不會漏掉分片, 另外分片和分片之間不會有重疊。
那如何進行合理并高效的分片呢,Innodb 表是基于B+ 樹索引, 可以根據B+ 的節點所指定的範圍,進行切分, 如下圖所示:
圖6: 智能資料分片
在上圖中, 如果切片數設定為6(切片數大于等于并行度), B+ 樹自頂向下逐層分析,在第一層中, 隻有一個節點, 節點數小于切片數, 是以,不能在這一層進行切片, 向下走一層。
當走到第二層時, 第二層,有4個節點, 節點數小于切片數6, 是以再向下走一層。
當走到第三層時, 有11個節點, 大于切片數, 是以,可以對這一層,進行切分, 每個切片含1個或2個節點。
POLARDB做了一個優化, 為了讓線程間工作更加均衡, 切片數會遠大于并行度,是以每個切片的資料量會更小, 切片和切片之間不均衡性得到削弱, 另外,當一個線程處理完一個切片後, 會從池子中再取出一個切片進行計算, 進而讓每個worker 都處于飽和狀态工作。
并行執行
今天并行計算有一點點類似大資料中的MapReduce, 将盡可能多的工作下放到worker線程中進行執行,将彙總的工作留給leader線程。
LEADER
- 在optimizer生成串行計劃之後,我們在串行計劃的基礎上生成并行計劃,包含兩部分:
- Leader上需要生成包含Gather(彙總)操作符的執行計劃。
- 為Workers生成屬于worker自身的執行計劃,該計劃是從串行的執行計劃拷貝而來。
下面對并行執行執行進行簡單的描述:
并行聚集
并行查詢執行聚集函數下推到worker上并行執行。并行聚集是通過兩次聚集來完成的, 比如, 我們最常見的“SELECT count(*) FROM production.product;”操作中:第一次,參與并行查詢部分的每個worker執行聚集步驟(count操作)。
第二次,Gather或Gather Merge節點将每個worker産生的結果彙總到leader(進行sum操作)。Leader會将所有worker的結果再次進行聚集,得到最終的結果。
圖7: 并行聚集
多表并行連接配接
并行查詢會将多表連接配接操作完整的下推到worker上去執行。POLARDB優化器隻會選擇一個自認為最優的表進行并行掃描,而除了該表外,其他表都是一般掃描。每個worker會将連接配接後的結果集傳回給leader線程,leader線程通過Gather操作進行彙總,最後将結果傳回給用戶端。
比如,我們在最常見的join操作中, “SELECT * FROM t1 JOIN t3 ON t1.id = t3. id;”, 我們會對滿足分片的表進行篩選, 選擇其中較小的表進行切分, 每個worker 負責一個分片,在worker 線程中, 這個分片負責和其他表進行join。
圖8: 并行連接配接
并行排序
POLARDB優化器會根據查詢情況,将Order By下推到每個worker裡執行,每個worker将排序後的結果傳回給leader,leader通過Gather Merge Sort操作進行歸并排序,最後将排序後的結果傳回到用戶端。
比如在最常見的 “SELECT col1, col2, col3 FROM t1 ORDER BY 1,2;”
圖9: 并行排序
并行分組
POLARDB優化器會根據查詢情況,将Group By下推到worker上去并行執行。每個worker負責部分資料的Group By。Worker會将Group By的中間結果傳回給leader,leader通過Gather操作彙總所有資料。
這裡POLARDB優化器會根據查詢計劃情況來自動識别是否需要再次在leader上進行Group By,例如,如果Group By使用了Loose index scan,leader上将不會進行再次Group By。否則Leader會再次進行Group By操作,然後把最終結果傳回到用戶端。
比如在我們常見的“SELECT col1, col2, SUM(col3) FROM t1 GROUP BY 1,2;”
圖十:并行分組
MESSAGE QUEUE
Leader和Workers之間需要通信,Message queue就是為了兩者的通信而設計的,它可以實作Leader和Workers之間的資訊交換。Message queue的結構如下圖所示:
圖10: 無鎖隊列
Leader通過調用GatherIterator::Read接口來與Message queue進行通信。Message queue分裝了多個無鎖隊列,每個worker會對應一個隊列;worker會将傳回結果存放到自己對應Message queue中,當Message queue中的結果未能被Leader及時消費打滿時,worker将等待。
WORKERS
Workers上需要實作調用執行器的流程,将需要的結果集傳回給Message queue。為了高效的執行查詢,Worker上的執行不需要進行再次優化,而是直接從Leader上來拷貝生成好的計劃分片。這需要實作POLARDB執行計劃樹上所有節點的拷貝。
拷貝完成後,需要workers進行執行前的所有準備,比如打開表,初始化所有資料結構,比如讓拷貝過來的執行計劃樹種的節點包含的列重新指到對應的新打開表的列等操作。
為了能讓Worker執行拷貝過來的執行計劃,也需要準備好官方POLARDB在優化階段生成的資料結構,比如對GROUP BY操作建立的臨時表等邏輯。
性能對比
以TPC-H為例,為您介紹并行查詢使用示例。案例中所有示例,使用的資料量是TPC-H中SF=100GB ,使用POLARDB節點規格為88 核 710G的配置在主節點進行測試。
- GROUP BY & ORDER BY支援
- AGGREGATE函數支援(SUM/AVG/COUNT)
- JOIN支援
- BETWEEN函數 & IN函數支援
- LIMIT支援
- INTERVAL函數支援
- CASE WHEN支援
- LIKE支援
未開啟并行查詢,耗時1563.32秒,開啟并行查詢後,耗時隻用49.65秒,提升31.48倍。
原始SQL語句,如下所示:
SELECT l_returnflag,
l_linestatus,
Sum(l_quantity) AS sum_qty,
Sum(l_extendedprice) AS sum_base_price,
Sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
Sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge,
Avg(l_quantity) AS avg_qty,
Avg(l_extendedprice) AS avg_price,
Avg(l_discount) AS avg_disc,
Count(*) AS count_order
FROM lineitem
WHERE l_shipdate <= date '1998-12-01' - INTERVAL '93' day
GROUP BY l_returnflag,
l_linestatus
ORDER BY l_returnflag,
l_linestatus ;
未開啟并行查詢,耗時1563.32秒。
開啟并行查詢後,耗時隻用49.65秒,提升31.48 倍。
SELECT l_returnflag,
l_linestatus,
Sum(l_quantity) AS sum_qty,
Sum(l_extendedprice) AS sum_base_price,
Sum(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
Sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge,
Avg(l_quantity) AS avg_qty,
Avg(l_extendedprice) AS avg_price,
Avg(l_discount) AS avg_disc,
Count(*) AS count_order
FROM lineitem
WHERE l_shipdate <= date '1998-12-01' - INTERVAL '93' day
GROUP BY l_returnflag,
l_linestatus
ORDER BY l_returnflag,
l_linestatus ;
未開啟并行查詢,耗時1563.32秒
開啟并行查詢後,耗時隻用49.65秒,提升31.48 倍
未開啟并行查詢,耗時21.73秒,開啟并行查詢後,耗時1.37秒,提升15.86倍。
select sum(l_extendedprice* (1 - l_discount)) as revenue
from lineitem, part
where ( p_partkey = l_partkey and p_brand = 'Brand#12'
and p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG')
and l_quantity >= 6 and l_quantity <= 6 + 10
and p_size between 1 and 5
and l_shipmode in ('AIR', 'AIR REG')
and l_shipinstruct = 'DELIVER IN PERSON' )
or ( p_partkey = l_partkey and p_brand = 'Brand#13'
and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK')
and l_quantity >= 10 and l_quantity <= 10 + 10
and p_size between 1 and 10
and l_shipmode in ('AIR', 'AIR REG')
and l_shipinstruct = 'DELIVER IN PERSON' )
or ( p_partkey = l_partkey and p_brand = 'Brand#24'
and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG')
and l_quantity >= 21 and l_quantity <= 21 + 10
and p_size between 1 and 15
and l_shipmode in ('AIR', 'AIR REG')
and l_shipinstruct = 'DELIVER IN PERSON' );
未開啟并行查詢,耗時21.73秒
開啟并行查詢後,耗時1.37秒,提升15.86倍
select sum(l_extendedprice* (1 - l_discount)) as revenue
from lineitem, part
where ( p_partkey = l_partkey and p_brand = 'Brand#12'
and p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG')
and l_quantity >= 6 and l_quantity <= 6 + 10
and p_size between 1 and 5
and l_shipmode in ('AIR', 'AIR REG')
and l_shipinstruct = 'DELIVER IN PERSON' )
or ( p_partkey = l_partkey and p_brand = 'Brand#13'
and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK')
and l_quantity >= 10 and l_quantity <= 10 + 10
and p_size between 1 and 10
and l_shipmode in ('AIR', 'AIR REG')
and l_shipinstruct = 'DELIVER IN PERSON' )
or ( p_partkey = l_partkey and p_brand = 'Brand#24'
and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG')
and l_quantity >= 21 and l_quantity <= 21 + 10
and p_size between 1 and 15
and l_shipmode in ('AIR', 'AIR REG')
and l_shipinstruct = 'DELIVER IN PERSON' );
未開啟并行查詢,耗時339.22 秒,開啟并行查詢後,耗時29.31秒,提升11.57倍。
select l_shipmode, sum(case when o_orderpriority = '1-URGENT' or o_orderpriority = '2-HIGH' then 1
else 0
end) as high_line_count, sum(case when o_orderpriority <> '1-URGENT' and o_orderpriority <> '2-HIGH' then 1
else 0
end) as low_line_count
from orders, lineitem
where o_orderkey = l_orderkey
and l_shipmode in ('MAIL', 'TRUCK')
and l_commitdate < l_receiptdate
and l_shipdate < l_commitdate
and l_receiptdate >= date '1996-01-01'
and l_receiptdate < date '1996-01-01' + interval '1' year
group by l_shipmode
order by l_shipmode limit 10;
未開啟并行查詢,耗時220.87秒。
開啟并行查詢後,耗時7.75秒,提升28.5倍
未開啟并行查詢,耗時220.87秒,開啟并行查詢後,耗時7.75秒,提升28.5倍。
select
100.00 * sum(case when p_type like 'PROMO%' then l_extendedprice * (1 - l_discount)
else 0
end) / sum(l_extendedprice * (1 - l_discount)) as promo_revenue
from lineitem, part
where l_partkey = p_partkey
and l_shipdate >= date '1996-01-01'
and l_shipdate < date '1996-01-01' + interval '1' month limit 10;
開啟并行查詢後,耗時7.75秒,提升28.5倍。
select
100.00 * sum(case when p_type like 'PROMO%' then l_extendedprice * (1 - l_discount)
else 0
end) / sum(l_extendedprice * (1 - l_discount)) as promo_revenue
from lineitem, part
where l_partkey = p_partkey
and l_shipdate >= date '1996-01-01'
and l_shipdate < date '1996-01-01' + interval '1' month limit 10;
未開啟并行查詢,耗時427.46秒,開啟并行查詢後,耗時33.72秒,提升12.68倍。
select s_name, s_address from
supplier, nation where
s_suppkey in
( select ps_suppkey from partsupp where
ps_partkey in ( select p_partkey from part where p_name like 'dark%')
and ps_availqty>(select 0.0005 * sum(l_quantity) as col1
from lineitem, partsupp
where l_partkey = ps_partkey and l_suppkey = ps_suppkey
and l_shipdate >= date '1993-01-01' and l_shipdate < date '1993-01-01' + interval '1' year)
)
and s_nationkey = n_nationkey and n_name = 'JORDAN'
order by s_name limit 10;
未開啟并行查詢,耗時427.46秒
開啟并行查詢後,耗時33.72秒,提升12.68倍