天天看點

TiFlash 源碼閱讀(九)TiFlash 中常用算子的設計與實作

本文主要介紹了資料庫系統中常用的算子 Join 和 Aggregation 在 TiFlash 中的執行情況,包括查詢計劃生成、編譯階段與執行階段,以期望讀者對 TiFlash 的算子有初步的了解。

視訊

https://www.bilibili.com/video/BV1tt4y1875T

算子概要

在閱讀本文之前,推薦閱讀本系列的前作:計算層 overview,以對 TiFlash 計算層、MPP 架構有一定了解。

在資料庫系統中,算子是執行 SQL 主要邏輯的地方。一條 SQL 會被 parser 解析為一棵算子樹(查詢計劃),然後經過 optimizer 的優化,再交給對應的 executor 執行,如下圖所示。

TiFlash 源碼閱讀(九)TiFlash 中常用算子的設計與實作

本文的主要内容包括

  1. TiDB 如何生成與優化 MPP 算子與查詢計劃
  2. Join 算子在 TiFlash 中的編譯(編譯指的是将 TiDB-server 下發的執行計劃片段生成可執行結構的過程,下同)與執行
  3. Aggregation 算子在 TiFlash 中的編譯與執行

建構查詢計劃

一些背景知識:

  1. 邏輯計劃與實體計劃:可以簡單了解為邏輯計劃是指算子要做什麼,實體計劃是指算子怎樣去做這件事。比如,“将資料從表 a 和表 b 中讀取出來,然後做 join”描述的是邏輯計劃;而“在 TiFlash 中做 shuffle hash join” 描述的是實體計劃。更多資訊可以參閱:TiDB 源碼閱讀系列文章
  2. MPP:大規模并行計算,一般用來描述節點間可以交換資料的并行計算,在目前版本(6.1.0,下同)的 TiDB 中,MPP 運算都發生在 TiFlash 節點上。推薦觀看:源碼解讀 - TiFlash 計算層 overview。MPP 是實體計劃級别的概念。

MPP 計劃

在 TiDB 中,可以在 SQL 前加上 explain 來檢視這條 SQL 的查詢計劃,如下圖所示,是一棵由實體算子組成的樹,可以檢視 TiDB 執行計劃概覽 來對其有更多的了解。

TiFlash 源碼閱讀(九)TiFlash 中常用算子的設計與實作

MPP 查詢計劃的獨特之處在于查詢計劃中多出了用于進行資料交換的 ExchangeSender 和 ExchangeReceiver 算子。

執行計劃中會有這樣的 pattern,代表将會在此處進行資料傳輸與交換。

...
     |_ExchangeReceiver_xx
        |_ ExchangeSender_xx
             …           

複制

每個 ExchangeSender 都會有一個 ExchangeType,來辨別本次資料傳輸的類别,包括:

  1. HashPartition,将資料按 Hash 值進行分區之後分發到上遊節點。
  2. Broadcast,将自身資料拷貝若幹份,廣播到所有上遊節點中。
  3. PassThrough,将自己的資料全部傳給一個指定節點,此時接收方可以是 TiFlash 節點(ExchangeReceiver);也可以是 TiDB-server 節點(TableReader),代表 MPP 運算完畢,向 TiDB-server 傳回資料。

在上面的查詢計劃圖中,一共有三個 ExchangeSender,id 分别是 19, 13 和 17。其中 ExchangeSender_13 和 ExchangeSender_17 都是将讀入後的資料按哈希值 shuffle 到所有節點中,以便進行 join,而 ExchangeSender_19 則是将 join 完成後的資料傳回到 TiDB-server 節點中。

添加 Exchange

在優化器的計劃探索過程中,會有兩處為查詢計劃樹插入 Exchange 算子:

  1. 一個是 MPP 計劃在探索完畢後,接入 TiDB 的 tableReader 時。類型為 passThrough type. 源碼在函數

    func (t *mppTask) convertToRootTaskImpl

  2. 一個是 MPP 計劃在探索過程中,發現目前算子的 property(這裡主要指分區屬性)不滿足上層要求時。例如上層要求需要按 a 列的 hash 值分區,但是下層算子不能滿足這個要求,就會插入一組 Exchange.
func (t *mppTask) enforceExchanger(prop *property.PhysicalProperty) *mppTask {
   if !t.needEnforceExchanger(prop) {
      return t
   }
   return t.copy().(*mppTask).enforceExchangerImpl(prop)
}

// t.partTp 表示目前算子已有的 partition type,prop 表示父算子要求的 partition type
func (t *mppTask) needEnforceExchanger(prop *property.PhysicalProperty) bool {
   switch prop.MPPPartitionTp {
   case property.AnyType:
      return false
   case property.BroadcastType:
      return true
   case property.SinglePartitionType:
      return t.partTp != property.SinglePartitionType
   default:
      if t.partTp != property.HashType {
         return true
      }
      if len(prop.MPPPartitionCols) != len(t.hashCols) {
         return true
      }
      for i, col := range prop.MPPPartitionCols {
         if !col.Equal(t.hashCols[i]) {
            return true
         }
      }
      return false
   }
}           

複制

Property 對于分區屬性的要求(MPPPartitionTp)有以下幾種:

  1. AnyType,對下層算子沒有要求,是以并不需要添加 exchange;
  2. BroadcastType,用于 broadcast join,要求下層節點複制資料并廣播到所有節點中,此時一定需要添加一個 broadcast exchange;
  3. SinglePartitionType,要求下層節點将資料彙總到同一台節點中,此時如果已經在同一台節點上,則不用再進行 exchange。
  4. HashType,要求下層節點按特定列的哈希值進行分區,如果已經按要求分好區了,則不用再進行 exchange.

在優化器的生成查詢計劃的探索中,每個算子都會對下層有 property 要求,同時也需要滿足上層傳下來的 property;當上下兩層的 property 無法比對時,就插入一個 exchange 算子交換資料。依靠這些 property,可以不重不漏的插入 exchange 算子。

MPP 算法

是否選擇 MPP 算法是在 TiDB 優化器生成實體計劃時決定,即 CBO(Cost-Based Optimization) 階段。優化器會周遊所有可選擇的計劃路徑,包括含有 MPP 算法的計劃與不含有 MPP 算法的計劃,估計它們的代價,并選擇其中總代價最小的一個查詢計劃。

對于目前的 TiDB repo 代碼,有四個位置可以觸發 MPP 計劃的生成,分别對應于 join、agg、window function、projection 四個算子:

  1. func (p *LogicalJoin) tryToGetMppHashJoin
  2. func (la *LogicalAggregation) tryToGetMppHashAggs
  3. func (lw *LogicalWindow) tryToGetMppWindows
  4. func (p *LogicalProjection) exhaustPhysicalPlans

這裡隻描述具有代表性的 join 和 agg 算子,其他算子同理。

Join

目前 TiDB 支援兩種 MPP Join 算法,分别是:

  • Shuffle Hash Join,将兩張表的資料各自按 hash key 分區後 shuffle 到各個節點上,然後做 hash join,如上一節中舉出的查詢計劃圖所示。
  • Broadcast Join,将小表廣播到大表所在的每個節點,然後做 hash join,如下圖所示。
TiFlash 源碼閱讀(九)TiFlash 中常用算子的設計與實作

tryToGetMppHashJoin 函數在建構 join 算子時給出了對子算子的 property 要求:

if useBCJ { // broadcastJoin
    …
    childrenProps[buildside] = {MPPPartitionTp: BroadcastType}
    childrenProps[1-buildside] = {MPPPartitionTp: AnyType}
    …
} else { // shuffle hash join
    …
    childrenProps[0] = {MPPPartitionTp: HashType, key: leftKeys}
    childrenProps[1] = {MPPPartitionTp: HashType, key: rightKeys}
    …
}           

複制

如代碼所示,broadcast join 要求 buildside(這裡指要廣播的小表)具有一個 BroadcastType 的 property,對大表側則沒有要求。而 shuffle hash join 則要求兩側都具有 HashType 的分區屬性,分區列分别是 left keys 和 right keys。

Aggregation

目前 tryToGetMppHashAggs 可能生成三種 MPP Aggregation 計劃:

1.“一階段 agg”,要求資料先按 group by key 分區,然後再進行聚合。

TiFlash 源碼閱讀(九)TiFlash 中常用算子的設計與實作

2.“兩階段 agg”,首先在本地節點進行第一階段聚合,然後按 group by key 分區,再進行一次聚合(用 sum 彙總結果)。

TiFlash 源碼閱讀(九)TiFlash 中常用算子的設計與實作

3.“scalar agg”,沒有分區列的特定情況,在本地節點進行第一階段聚合,然後彙總到同一台節點上完成第二階段聚合。

TiFlash 源碼閱讀(九)TiFlash 中常用算子的設計與實作

一階段 agg 和兩階段 agg 的差別是是否先在本地節點做一次預聚合,優化器會根據 SQL 與代價估算來選擇執行哪種方式。對于重複值很多的情況,兩階段 agg 可以在網絡傳輸前減少很多資料量,進而減少大量的網絡消耗;而如果重複值很少的情況下,這次預聚合并不會減少很多資料量,反而白白增大了 cpu 與記憶體消耗,此時就不如使用一階段 agg。

這裡留一個小思考題,這三種 agg 各自對下方有什麼 property 要求?在聚合做完之後又滿足了怎樣的 property?

答案是:

一階段 agg 要求 hash,做完滿足 hash;二階段 agg 無要求,做完滿足 hash;scalar agg 無要求,做完滿足 singlePartition.

編譯與執行

執行計劃建構好之後,TiDB-server 會将 dag(執行計劃的片段)下發給對應的 TiFlash 節點。在 TiFlash 節點中,需要首先解析這些執行計劃,這個過程我們稱作“編譯”,編譯的結果是 BlockInputStream,它是 TiFlash 中的可執行結構;而最後一步就是在 TiFlash 中執行這些 BlockInputStream.

下圖是一個 BlockInputStream DAG 的例子,每個 BlockInputStream 都有三個方法:readPrefix, read 和 readSuffix;類似于其他火山模型調用 open、next 和 close。

下圖的來源是 TiFlash 執行器線程模型 - 知乎專欄 (zhihu.com),關于執行模型更多的内容,可以參考這篇文章或者 TiFlash Overview,這裡不再贅述。

TiFlash 源碼閱讀(九)TiFlash 中常用算子的設計與實作

Join 的編譯與執行

TiDB-server 節點會将查詢計劃按 Exchange 會作為分界,将查詢切分為不同的計劃片段(task),作為 dag 發給 TiFlash 節點。比如對于下圖中所示的查詢計劃,會切分為這三個紅框。

TiFlash 源碼閱讀(九)TiFlash 中常用算子的設計與實作

TiFlash 節點在編譯完成後生成的 BlockInputStream 如下,可以在 debug 日志中看到:

task 1
ExchangeSender
 Expression: <final projection>
  Expression: <projection after push down filter>
   Filter: <push down filter>
    DeltaMergeSegmentThread
 
task 2
ExchangeSender
 Expression: <final projection>
  Expression: <projection after push down filter>
   Filter: <push down filter>
    DeltaMergeSegmentThread
 
task 3
CreatingSets
 Union: <for join>
  HashJoinBuildBlockInputStream x 20: <join build, build_side_root_executor_id = ExchangeReceiver_15>, join_kind = Inner
   Expression: <append join key and join filters for build side>
    Expression: <final projection>
     Squashing: <squashing after exchange receiver>
      TiRemoteBlockInputStream(ExchangeReceiver): schema: {<exchange_receiver_0, Nullable(Int32)>, <exchange_receiver_1, Nullable(Int32)>}
 Union: <for mpp>
  ExchangeSender x 20
   Expression: <final projection>
    Expression: <remove useless column after join>
     HashJoinProbe: <join probe, join_executor_id = HashJoin_34>
      Expression: <final projection>
       Squashing: <squashing after exchange receiver>
        TiRemoteBlockInputStream(ExchangeReceiver): schema: {<exchange_receiver_0, Nullable(Int32)>, <exchange_receiver_1, Nullable(Int32)>}
             

複制

其中 task1 和 task2 是将資料從存儲層讀出,經過簡單的處理之後,發給 ExchangeSender. 在 task3 中,有三個 BlockInpuStream 值得關注,分别是:CreatingSets, HashJoinBuild, HashJoinProbe.

CreatingSetsBlockInputStream

接受一個資料 BlockInputStream 表示 joinProbe,還有若幹個代表 JoinBuild 的 Subquery。CreatingSets 會并發啟動這些 Subquery, 等待他們執行結束後在開始啟動資料 InputStream. 下面兩張圖分别是 CreatingSets 的 readPrefix 和 read 函數的調用棧。

TiFlash 源碼閱讀(九)TiFlash 中常用算子的設計與實作
TiFlash 源碼閱讀(九)TiFlash 中常用算子的設計與實作

為什麼 CreatingSets 可能同時建立多張哈希表?因為在一個多表 join 中,同一個計劃片段可能緊接着做多次 join porbe,如下圖所示:

TiFlash 源碼閱讀(九)TiFlash 中常用算子的設計與實作
task:4
CreatingSets
 Union x 2: <for join>
  HashJoinBuildBlockInputStream x 20: <join build, build_side_root_executor_id = ExchangeReceiver_22>, join_kind = Left
   Expression: <append join key and join filters for build side>
    Expression: <final projection>
     Squashing: <squashing after exchange receiver>
      TiRemoteBlockInputStream(ExchangeReceiver): schema: {<exchange_receiver_0, Nullable(Int32)>, <exchange_receiver_1, Nullable(Int32)>}
 Union: <for mpp>
  ExchangeSender x 20
   Expression: <final projection>
    Expression: <remove useless column after join>
     HashJoinProbe: <join probe, join_executor_id = HashJoin_50>
      Expression: <final projection>
       Expression: <remove useless column after join>
        HashJoinProbe: <join probe, join_executor_id = HashJoin_14>
         Expression: <final projection>
          Squashing: <squashing after exchange receiver>
           TiRemoteBlockInputStream(ExchangeReceiver): schema: {<exchange_receiver_0, Nullable(Int32)>, <exchange_receiver_1, Nullable(Int32)>}
            

複制

Join Build

注意,join 在此處僅代表 hash join,已經與網絡通信和 MPP 級别的算法無關。

關于 join 的代碼都在 dbms/src/Interpreters/Join.cpp 中;我們以下面兩張表進行 join 為例來說明:

left_table l join right_table r 
on l.join_key=r.join_key
where l.b>=r.c            

複制

TiFlash 源碼閱讀(九)TiFlash 中常用算子的設計與實作

預設右表做 build 端,左表做 probe 端。哈希表的值使用鍊式存儲:

TiFlash 源碼閱讀(九)TiFlash 中常用算子的設計與實作

Join Probe

這裡主要描述的是 JoinBlockImpl 這個函數的流程:

1.block 包含了左表的内容;建立 added_columns, 即要添加到 block 中的右表的列;然後建立相應的過濾器 replicate_offsets:表示目前共比對了幾行,之後可以用于篩選未比對上的行,或複制比對了多行的行。

TiFlash 源碼閱讀(九)TiFlash 中常用算子的設計與實作

2.依次查找哈希表,根據查找結果調用相應的 addFound 或 addNotFound 函數,填充 added_columns 和過濾器。

TiFlash 源碼閱讀(九)TiFlash 中常用算子的設計與實作
TiFlash 源碼閱讀(九)TiFlash 中常用算子的設計與實作

從填充的過程中也可以看到,replicate_offsets 左表表示到目前行為止,一共能比對上的右表的行數。并且 replicate_offsetsi - replicate_offsetsi-1 就表示左表第 i 行比對到的右表的行數。

TiFlash 源碼閱讀(九)TiFlash 中常用算子的設計與實作

3.将 added_column 直接拼接到 block 上,此時會有短暫的 block 行數不一緻。

TiFlash 源碼閱讀(九)TiFlash 中常用算子的設計與實作

4.根據過濾器的内容,複制或過濾掉原先左表中的行。

TiFlash 源碼閱讀(九)TiFlash 中常用算子的設計與實作

5.最後在 block 上處理 other condition,則得到了 join 的結果。

TiFlash 源碼閱讀(九)TiFlash 中常用算子的設計與實作

上文中描述的是對于正常的 “all” join 的情況,需要傳回左右表的資料。與之相對的則是 “any” join,表示半連接配接,無需傳回右表,隻需傳回左表的資料,則無需使用 replicate_offsets 這個輔助數組,讀者可以自行閱讀代碼。 仍然在 dbms/src/intepreters/Join.cpp 中。

Aggregation 的編譯與執行

還是以一個查詢計劃以及對應的 BlockInputStream 為例:

TiFlash 源碼閱讀(九)TiFlash 中常用算子的設計與實作
task:1
ExchangeSender
 Expression: <final projection>
  Expression: <before order and select>
   Aggregating
    Concat
     Expression: <before aggregation>
      Expression: <projection>
       Expression: <before projection>
        Expression: <final projection>
         DeltaMergeSegmentThread
 
task:2
Union: <for mpp>
 ExchangeSender x 20
  Expression: <final projection>
   Expression: <projection>
    Expression: <before projection>
     Expression: <final projection>
      SharedQuery: <restore concurrency>
       ParallelAggregating, max_threads: 20, final: true
        Expression x 20: <before aggregation>
         Squashing: <squashing after exchange receiver>
          TiRemoteBlockInputStream(ExchangeReceiver): schema: {<exchange_receiver_0, Int64>, <exchange_receiver_1, Nullable(Int64)>}           

複制

從查詢計劃中可以看到這是一個兩階段 agg,第一階段對應 task1,執行聚合的 BlockInputStream 是 Aggregating。第二階段對應 task2,執行聚合的 BlockInputStream 是 ParallelAggragating。兩個 task 通過 Exchange 進行網絡資料傳輸。

在 aggregation 的編譯期,會檢查目前 pipeline 能夠提供的并行度,如果隻有 1,則使用 AggregatingBlockInputStream 單線程執行,如果大于 1 則使用 ParallelAggragating 并行執行。

DAGQueryBlockInterpreter::executeAggregation(){
    if (pipeline.streams.size() > 1){
        ParallelAggregatingBlockInputStream
    }else {
        AggregatingBlockInputStream
    }
}           

複制

AggregatingBlockInputStream 的調用棧如下:

TiFlash 源碼閱讀(九)TiFlash 中常用算子的設計與實作

ParallelAggregatingBlockInputStream 内部會分兩階段操作(這裡的兩階段是内部執行中的概念,發生在同一台節點上,和查詢計劃中的兩階段不是一個概念)。partial 階段分别在 N 個線程建構 HashTable,merge 階段則将 N 個 HashTable 合并起來,對外輸出一個流。調用棧如下:

TiFlash 源碼閱讀(九)TiFlash 中常用算子的設計與實作

如果 result 是空,那麼會單獨調用一次 executeOnBlock 方法,來生成一個預設資料,類似于 count() 沒有輸入時,會傳回一個 0.

兩種執行方式都用到了 Aggregator 的 executeOnBlock 方法和 mergeAndConvertToBlocks 方法,他們的調用棧如圖所示。前者是實際執行聚合函數的地方,會調用聚合函數的 add 方法,将資料值加入;後者的主要目的是将 ParallelAggregating 并行生成的哈希表合并。

TiFlash 源碼閱讀(九)TiFlash 中常用算子的設計與實作