本文主要介紹了資料庫系統中常用的算子 Join 和 Aggregation 在 TiFlash 中的執行情況,包括查詢計劃生成、編譯階段與執行階段,以期望讀者對 TiFlash 的算子有初步的了解。
視訊
https://www.bilibili.com/video/BV1tt4y1875T
算子概要
在閱讀本文之前,推薦閱讀本系列的前作:計算層 overview,以對 TiFlash 計算層、MPP 架構有一定了解。
在資料庫系統中,算子是執行 SQL 主要邏輯的地方。一條 SQL 會被 parser 解析為一棵算子樹(查詢計劃),然後經過 optimizer 的優化,再交給對應的 executor 執行,如下圖所示。
本文的主要内容包括
- TiDB 如何生成與優化 MPP 算子與查詢計劃
- Join 算子在 TiFlash 中的編譯(編譯指的是将 TiDB-server 下發的執行計劃片段生成可執行結構的過程,下同)與執行
- Aggregation 算子在 TiFlash 中的編譯與執行
建構查詢計劃
一些背景知識:
- 邏輯計劃與實體計劃:可以簡單了解為邏輯計劃是指算子要做什麼,實體計劃是指算子怎樣去做這件事。比如,“将資料從表 a 和表 b 中讀取出來,然後做 join”描述的是邏輯計劃;而“在 TiFlash 中做 shuffle hash join” 描述的是實體計劃。更多資訊可以參閱:TiDB 源碼閱讀系列文章
- MPP:大規模并行計算,一般用來描述節點間可以交換資料的并行計算,在目前版本(6.1.0,下同)的 TiDB 中,MPP 運算都發生在 TiFlash 節點上。推薦觀看:源碼解讀 - TiFlash 計算層 overview。MPP 是實體計劃級别的概念。
MPP 計劃
在 TiDB 中,可以在 SQL 前加上 explain 來檢視這條 SQL 的查詢計劃,如下圖所示,是一棵由實體算子組成的樹,可以檢視 TiDB 執行計劃概覽 來對其有更多的了解。
MPP 查詢計劃的獨特之處在于查詢計劃中多出了用于進行資料交換的 ExchangeSender 和 ExchangeReceiver 算子。
執行計劃中會有這樣的 pattern,代表将會在此處進行資料傳輸與交換。
...
|_ExchangeReceiver_xx
|_ ExchangeSender_xx
…
複制
每個 ExchangeSender 都會有一個 ExchangeType,來辨別本次資料傳輸的類别,包括:
- HashPartition,将資料按 Hash 值進行分區之後分發到上遊節點。
- Broadcast,将自身資料拷貝若幹份,廣播到所有上遊節點中。
- PassThrough,将自己的資料全部傳給一個指定節點,此時接收方可以是 TiFlash 節點(ExchangeReceiver);也可以是 TiDB-server 節點(TableReader),代表 MPP 運算完畢,向 TiDB-server 傳回資料。
在上面的查詢計劃圖中,一共有三個 ExchangeSender,id 分别是 19, 13 和 17。其中 ExchangeSender_13 和 ExchangeSender_17 都是将讀入後的資料按哈希值 shuffle 到所有節點中,以便進行 join,而 ExchangeSender_19 則是将 join 完成後的資料傳回到 TiDB-server 節點中。
添加 Exchange
在優化器的計劃探索過程中,會有兩處為查詢計劃樹插入 Exchange 算子:
- 一個是 MPP 計劃在探索完畢後,接入 TiDB 的 tableReader 時。類型為 passThrough type. 源碼在函數
中func (t *mppTask) convertToRootTaskImpl
- 一個是 MPP 計劃在探索過程中,發現目前算子的 property(這裡主要指分區屬性)不滿足上層要求時。例如上層要求需要按 a 列的 hash 值分區,但是下層算子不能滿足這個要求,就會插入一組 Exchange.
func (t *mppTask) enforceExchanger(prop *property.PhysicalProperty) *mppTask {
if !t.needEnforceExchanger(prop) {
return t
}
return t.copy().(*mppTask).enforceExchangerImpl(prop)
}
// t.partTp 表示目前算子已有的 partition type,prop 表示父算子要求的 partition type
func (t *mppTask) needEnforceExchanger(prop *property.PhysicalProperty) bool {
switch prop.MPPPartitionTp {
case property.AnyType:
return false
case property.BroadcastType:
return true
case property.SinglePartitionType:
return t.partTp != property.SinglePartitionType
default:
if t.partTp != property.HashType {
return true
}
if len(prop.MPPPartitionCols) != len(t.hashCols) {
return true
}
for i, col := range prop.MPPPartitionCols {
if !col.Equal(t.hashCols[i]) {
return true
}
}
return false
}
}
複制
Property 對于分區屬性的要求(MPPPartitionTp)有以下幾種:
- AnyType,對下層算子沒有要求,是以并不需要添加 exchange;
- BroadcastType,用于 broadcast join,要求下層節點複制資料并廣播到所有節點中,此時一定需要添加一個 broadcast exchange;
- SinglePartitionType,要求下層節點将資料彙總到同一台節點中,此時如果已經在同一台節點上,則不用再進行 exchange。
- HashType,要求下層節點按特定列的哈希值進行分區,如果已經按要求分好區了,則不用再進行 exchange.
在優化器的生成查詢計劃的探索中,每個算子都會對下層有 property 要求,同時也需要滿足上層傳下來的 property;當上下兩層的 property 無法比對時,就插入一個 exchange 算子交換資料。依靠這些 property,可以不重不漏的插入 exchange 算子。
MPP 算法
是否選擇 MPP 算法是在 TiDB 優化器生成實體計劃時決定,即 CBO(Cost-Based Optimization) 階段。優化器會周遊所有可選擇的計劃路徑,包括含有 MPP 算法的計劃與不含有 MPP 算法的計劃,估計它們的代價,并選擇其中總代價最小的一個查詢計劃。
對于目前的 TiDB repo 代碼,有四個位置可以觸發 MPP 計劃的生成,分别對應于 join、agg、window function、projection 四個算子:
- func (p *LogicalJoin) tryToGetMppHashJoin
- func (la *LogicalAggregation) tryToGetMppHashAggs
- func (lw *LogicalWindow) tryToGetMppWindows
- func (p *LogicalProjection) exhaustPhysicalPlans
這裡隻描述具有代表性的 join 和 agg 算子,其他算子同理。
Join
目前 TiDB 支援兩種 MPP Join 算法,分别是:
- Shuffle Hash Join,将兩張表的資料各自按 hash key 分區後 shuffle 到各個節點上,然後做 hash join,如上一節中舉出的查詢計劃圖所示。
- Broadcast Join,将小表廣播到大表所在的每個節點,然後做 hash join,如下圖所示。
tryToGetMppHashJoin 函數在建構 join 算子時給出了對子算子的 property 要求:
if useBCJ { // broadcastJoin
…
childrenProps[buildside] = {MPPPartitionTp: BroadcastType}
childrenProps[1-buildside] = {MPPPartitionTp: AnyType}
…
} else { // shuffle hash join
…
childrenProps[0] = {MPPPartitionTp: HashType, key: leftKeys}
childrenProps[1] = {MPPPartitionTp: HashType, key: rightKeys}
…
}
複制
如代碼所示,broadcast join 要求 buildside(這裡指要廣播的小表)具有一個 BroadcastType 的 property,對大表側則沒有要求。而 shuffle hash join 則要求兩側都具有 HashType 的分區屬性,分區列分别是 left keys 和 right keys。
Aggregation
目前 tryToGetMppHashAggs 可能生成三種 MPP Aggregation 計劃:
1.“一階段 agg”,要求資料先按 group by key 分區,然後再進行聚合。
2.“兩階段 agg”,首先在本地節點進行第一階段聚合,然後按 group by key 分區,再進行一次聚合(用 sum 彙總結果)。
3.“scalar agg”,沒有分區列的特定情況,在本地節點進行第一階段聚合,然後彙總到同一台節點上完成第二階段聚合。
一階段 agg 和兩階段 agg 的差別是是否先在本地節點做一次預聚合,優化器會根據 SQL 與代價估算來選擇執行哪種方式。對于重複值很多的情況,兩階段 agg 可以在網絡傳輸前減少很多資料量,進而減少大量的網絡消耗;而如果重複值很少的情況下,這次預聚合并不會減少很多資料量,反而白白增大了 cpu 與記憶體消耗,此時就不如使用一階段 agg。
這裡留一個小思考題,這三種 agg 各自對下方有什麼 property 要求?在聚合做完之後又滿足了怎樣的 property?
答案是:
一階段 agg 要求 hash,做完滿足 hash;二階段 agg 無要求,做完滿足 hash;scalar agg 無要求,做完滿足 singlePartition.
編譯與執行
執行計劃建構好之後,TiDB-server 會将 dag(執行計劃的片段)下發給對應的 TiFlash 節點。在 TiFlash 節點中,需要首先解析這些執行計劃,這個過程我們稱作“編譯”,編譯的結果是 BlockInputStream,它是 TiFlash 中的可執行結構;而最後一步就是在 TiFlash 中執行這些 BlockInputStream.
下圖是一個 BlockInputStream DAG 的例子,每個 BlockInputStream 都有三個方法:readPrefix, read 和 readSuffix;類似于其他火山模型調用 open、next 和 close。
下圖的來源是 TiFlash 執行器線程模型 - 知乎專欄 (zhihu.com),關于執行模型更多的内容,可以參考這篇文章或者 TiFlash Overview,這裡不再贅述。
Join 的編譯與執行
TiDB-server 節點會将查詢計劃按 Exchange 會作為分界,将查詢切分為不同的計劃片段(task),作為 dag 發給 TiFlash 節點。比如對于下圖中所示的查詢計劃,會切分為這三個紅框。
TiFlash 節點在編譯完成後生成的 BlockInputStream 如下,可以在 debug 日志中看到:
task 1
ExchangeSender
Expression: <final projection>
Expression: <projection after push down filter>
Filter: <push down filter>
DeltaMergeSegmentThread
task 2
ExchangeSender
Expression: <final projection>
Expression: <projection after push down filter>
Filter: <push down filter>
DeltaMergeSegmentThread
task 3
CreatingSets
Union: <for join>
HashJoinBuildBlockInputStream x 20: <join build, build_side_root_executor_id = ExchangeReceiver_15>, join_kind = Inner
Expression: <append join key and join filters for build side>
Expression: <final projection>
Squashing: <squashing after exchange receiver>
TiRemoteBlockInputStream(ExchangeReceiver): schema: {<exchange_receiver_0, Nullable(Int32)>, <exchange_receiver_1, Nullable(Int32)>}
Union: <for mpp>
ExchangeSender x 20
Expression: <final projection>
Expression: <remove useless column after join>
HashJoinProbe: <join probe, join_executor_id = HashJoin_34>
Expression: <final projection>
Squashing: <squashing after exchange receiver>
TiRemoteBlockInputStream(ExchangeReceiver): schema: {<exchange_receiver_0, Nullable(Int32)>, <exchange_receiver_1, Nullable(Int32)>}
複制
其中 task1 和 task2 是将資料從存儲層讀出,經過簡單的處理之後,發給 ExchangeSender. 在 task3 中,有三個 BlockInpuStream 值得關注,分别是:CreatingSets, HashJoinBuild, HashJoinProbe.
CreatingSetsBlockInputStream
接受一個資料 BlockInputStream 表示 joinProbe,還有若幹個代表 JoinBuild 的 Subquery。CreatingSets 會并發啟動這些 Subquery, 等待他們執行結束後在開始啟動資料 InputStream. 下面兩張圖分别是 CreatingSets 的 readPrefix 和 read 函數的調用棧。
為什麼 CreatingSets 可能同時建立多張哈希表?因為在一個多表 join 中,同一個計劃片段可能緊接着做多次 join porbe,如下圖所示:
task:4
CreatingSets
Union x 2: <for join>
HashJoinBuildBlockInputStream x 20: <join build, build_side_root_executor_id = ExchangeReceiver_22>, join_kind = Left
Expression: <append join key and join filters for build side>
Expression: <final projection>
Squashing: <squashing after exchange receiver>
TiRemoteBlockInputStream(ExchangeReceiver): schema: {<exchange_receiver_0, Nullable(Int32)>, <exchange_receiver_1, Nullable(Int32)>}
Union: <for mpp>
ExchangeSender x 20
Expression: <final projection>
Expression: <remove useless column after join>
HashJoinProbe: <join probe, join_executor_id = HashJoin_50>
Expression: <final projection>
Expression: <remove useless column after join>
HashJoinProbe: <join probe, join_executor_id = HashJoin_14>
Expression: <final projection>
Squashing: <squashing after exchange receiver>
TiRemoteBlockInputStream(ExchangeReceiver): schema: {<exchange_receiver_0, Nullable(Int32)>, <exchange_receiver_1, Nullable(Int32)>}
複制
Join Build
注意,join 在此處僅代表 hash join,已經與網絡通信和 MPP 級别的算法無關。
關于 join 的代碼都在 dbms/src/Interpreters/Join.cpp 中;我們以下面兩張表進行 join 為例來說明:
left_table l join right_table r
on l.join_key=r.join_key
where l.b>=r.c
複制
預設右表做 build 端,左表做 probe 端。哈希表的值使用鍊式存儲:
Join Probe
這裡主要描述的是 JoinBlockImpl 這個函數的流程:
1.block 包含了左表的内容;建立 added_columns, 即要添加到 block 中的右表的列;然後建立相應的過濾器 replicate_offsets:表示目前共比對了幾行,之後可以用于篩選未比對上的行,或複制比對了多行的行。
2.依次查找哈希表,根據查找結果調用相應的 addFound 或 addNotFound 函數,填充 added_columns 和過濾器。
從填充的過程中也可以看到,replicate_offsets 左表表示到目前行為止,一共能比對上的右表的行數。并且 replicate_offsetsi - replicate_offsetsi-1 就表示左表第 i 行比對到的右表的行數。
3.将 added_column 直接拼接到 block 上,此時會有短暫的 block 行數不一緻。
4.根據過濾器的内容,複制或過濾掉原先左表中的行。
5.最後在 block 上處理 other condition,則得到了 join 的結果。
上文中描述的是對于正常的 “all” join 的情況,需要傳回左右表的資料。與之相對的則是 “any” join,表示半連接配接,無需傳回右表,隻需傳回左表的資料,則無需使用 replicate_offsets 這個輔助數組,讀者可以自行閱讀代碼。 仍然在 dbms/src/intepreters/Join.cpp 中。
Aggregation 的編譯與執行
還是以一個查詢計劃以及對應的 BlockInputStream 為例:
task:1
ExchangeSender
Expression: <final projection>
Expression: <before order and select>
Aggregating
Concat
Expression: <before aggregation>
Expression: <projection>
Expression: <before projection>
Expression: <final projection>
DeltaMergeSegmentThread
task:2
Union: <for mpp>
ExchangeSender x 20
Expression: <final projection>
Expression: <projection>
Expression: <before projection>
Expression: <final projection>
SharedQuery: <restore concurrency>
ParallelAggregating, max_threads: 20, final: true
Expression x 20: <before aggregation>
Squashing: <squashing after exchange receiver>
TiRemoteBlockInputStream(ExchangeReceiver): schema: {<exchange_receiver_0, Int64>, <exchange_receiver_1, Nullable(Int64)>}
複制
從查詢計劃中可以看到這是一個兩階段 agg,第一階段對應 task1,執行聚合的 BlockInputStream 是 Aggregating。第二階段對應 task2,執行聚合的 BlockInputStream 是 ParallelAggragating。兩個 task 通過 Exchange 進行網絡資料傳輸。
在 aggregation 的編譯期,會檢查目前 pipeline 能夠提供的并行度,如果隻有 1,則使用 AggregatingBlockInputStream 單線程執行,如果大于 1 則使用 ParallelAggragating 并行執行。
DAGQueryBlockInterpreter::executeAggregation(){
if (pipeline.streams.size() > 1){
ParallelAggregatingBlockInputStream
}else {
AggregatingBlockInputStream
}
}
複制
AggregatingBlockInputStream 的調用棧如下:
ParallelAggregatingBlockInputStream 内部會分兩階段操作(這裡的兩階段是内部執行中的概念,發生在同一台節點上,和查詢計劃中的兩階段不是一個概念)。partial 階段分别在 N 個線程建構 HashTable,merge 階段則将 N 個 HashTable 合并起來,對外輸出一個流。調用棧如下:
如果 result 是空,那麼會單獨調用一次 executeOnBlock 方法,來生成一個預設資料,類似于 count() 沒有輸入時,會傳回一個 0.
兩種執行方式都用到了 Aggregator 的 executeOnBlock 方法和 mergeAndConvertToBlocks 方法,他們的調用棧如圖所示。前者是實際執行聚合函數的地方,會調用聚合函數的 add 方法,将資料值加入;後者的主要目的是将 ParallelAggregating 并行生成的哈希表合并。