天天看點

Drois

Apache Doris 是一個現代化的MPP(Massively Parallel Processing,即大規模并行處理) 分析型資料庫産品。僅需亞秒級響應時間即可獲得查詢結果,有效地支援實時資料分析。 Apache Doris 的分布式架構非常簡潔,易于運維,并且可以支援 10PB 以上的超大資料集。

一、Drois架構

Drois

Doris 的架構很簡潔,隻設 FE(Frontend)、BE(Backend)兩種角色、兩個程序,不依賴于 外部元件,友善部署和運維,FE、BE 都可線性擴充。

FE(Frontend):存儲、維護叢集中繼資料;負責接收、解析查詢請求,規劃查詢計劃,排程查詢執行,傳回查詢結果。主要有三個角色:

  1. Leader 和 Follower:主要是用來達到中繼資料的高可用,保證單節點當機的情況下, 中繼資料能夠實時地線上恢複,而不影響整個服務。
  2. Observer:用來擴充查詢節點,同時起到中繼資料備份的作用。如果在發現叢集壓力 非常大的情況下,需要去擴充整個查詢的能力,那麼可以加 observer 的節點。observer 不 參與任何的寫入,隻參與讀取。

BE(Backend):負責實體資料的存儲和計算;依據 FE 生成的實體計劃,分布式地執行查詢。

        資料的可靠性由 BE 保證,BE 會對整個資料存儲多副本或者是三副本。副本數可根據 需求動态調整。

二、資料(表)模型

       Palo 的表中分 Key 列、Value 列;其中Key列在前,Value列在後且需要執行具體的聚合方式;Key列版本内有序,根據 Palo 會根據 Key 列隊資料進行排序,便于快速查找;

1. 稀疏索引

        Palo 中使用類似 字首索引 的結構來提高查詢性能。資料在 Palo 内部組織為一個個 Data Block。每個 Data Block 的第一行的前幾列會被用作這個 Data Block 的索引,在資料導入時被建立。考慮到索引大小等因素,Palo 最多使用一行的前 36 個位元組作為索引,并且遇到 VARCHAR 類型則會中斷并截止。并且 VARCHAR 類型最多隻是用字元串的前 20 個位元組。

下面舉例說明:

Drois

2. 資料模型

        Palo 中提供三種模型:AGGREGATE、 UNIQUE、 DUPLICATE,Palo 會根據對應的模型對表中相同的key做相應的處理;當選擇AGGREGATE模型時,全局key唯一,會根據相同的key進行聚合操作,包括SUM、REPLACE、MAX、MIN;

AGGREGATE KEY:key列相同的記錄,value列按照指定的聚合類型進行聚合,适合報表、多元分析等業務場景。

UNIQUE KEY:key列相同的記錄,value列按導入順序進行覆寫,适合按key列進行增删改查的點查詢業務。

DUPLICATE KEY:key列相同的記錄,同時存在于Palo中,适合存儲明細資料或者資料無聚合特性的業務場景。

        注:其中 DUPLICATE 模型适合資料既沒有主鍵,也沒有聚合需求的場景,DUPLICATE KEY 隻能指明底層資料排序順序,而 UNIQUE KEY 能保證 KEY 列值得唯一性;

比如,Palo中有一張表包含三列:k1,k2和v,其中v是int類型的value列,聚合方法是SUM,k1和k2是key列。假如原本有資料如下:

Drois

3. 物化視圖(Rollup)

        Rollup可以了解為 Table 的一個物化索引結構。物化 是因為其資料在實體上是獨立存儲的。而 索引 的意思是,Rollup 存在的目是用于加速在這個 Table 上的某類查詢響應。Rollup 附屬于 Table,一個 Table 可以有多個 Rollup。在建立 Table 時會預設生成一個 Base Rollup,該 Rollup 包含 Table 的所有列。後續的其他 Rollup 在此基礎上建立。并且通常其他 Rollup 的列數要少于 Base Rollup。如下圖所示:

Drois

三、存儲結構

        Doris高效的導入、查詢離不開其存儲結構精巧的設計。存儲層對存儲資料的管理通過 storage_root_path 路徑進行配置,路徑可以是多個。存儲目錄下一層按照分桶進行組織,分桶目錄下存放具體的 tablet,按照 tablet_id 命名子目錄。

        Segment 檔案存放在 tablet_id 目錄下按 SchemaHash 管理。Segment 檔案可以有多個,一般按照大小進行分割,預設為 256MB。其中,Segment v2 檔案命名規則為:${rowset_id}_${segment_id}.dat。具體存儲目錄存放格式如下圖所示:

Drois

        新版Segment V2存儲格式參考了Parquet的設計思路,引入了基于Page的最小資料存儲單元,并将資料檔案劃分為資料區、索引區和中繼資料區三個部分。針對不同的列類型、索引格式實作了不同的Page編碼方式,顯著提升了資料的讀寫效率,并增強了資料格式的可擴充性。

Drois

Data Region:用于存儲各個列的資料資訊,這裡的資料是按需分page加載的。

Index Region:Doris中将各個列的index資料統一存儲在Index Region,這裡的資料會按照列粒度進行加載,是以跟列的資料資訊分開存儲。

Footer資訊:

  • SegmentFooterPB:定義檔案的中繼資料資訊
  • 4個位元組的FooterPB内容的checksum
  • 4個位元組的FileFooterPB消息長度,用于讀取FileFooterPB
  • 8個位元組的MAGIC CODE,之是以在末位存儲,是友善不同的場景進行檔案類型的識别

1. Data Region

Drois

 壓縮與編碼:針對不同的字段類型采用了不同的編碼,預設采用LZ4F格式對資料進行壓縮。

Drois

2. Index Region

2.1 Ordinal Index(一級索引)

        Ordinal Index 索引提供了通過行号來查找Column Data Page資料頁的實體位址。Ordinal Index能夠将按列存儲資料按行對齊,可以形象了解為所謂的一級索引。

        Column Data Page是由Ordinal index進行管理,Ordinal index記錄了每個Column Data Page的位置offset、大小size和第一個資料項行号資訊,即Ordinal。這樣每個列具有按行資訊進行快速掃描的能力。

Drois

2.2 Short Key Index(字首索引)

        資料查詢時,會打開Segment檔案,從footer中擷取Short Key Page的offset以及大小,然後從Segment檔案中讀取Short Key Page中的索引資料,并解析出每一條字首索引項。如果查詢過濾條件包含字首字段時,就可以使用字首索引進行快速地行過濾。

        字首索引又是稀疏索引,不能精确定位到key所在的行,隻能粗粒度地定位出key可能存在的範圍,然後使用二分查找算法精确地定位key的位置。

Drois

2.3 ZoneMap Index

        ZoneMap索引存儲了Segment和每個列對應每個Page的統計資訊。這些統計資訊可以幫助在查詢時提速,減少掃描資料量,統計資訊包括了Min最大值、Max最小值、HashNull空值、HasNotNull不全為空的資訊。

Drois

2.4 Bloom Filte Index

        Doris支援對取值區分度比較大的字段添加Bloom Filter索引。資料查詢時,查詢條件在設定有Bloom Filter索引的字段進行過濾,當某個Data Page的Bloom Filter沒有命中時,表示該Data Page中沒有需要的資料,這樣可以對Data Page進行快速過濾,減少不必要的資料讀取。

Drois

2.5 BitMap Index

        為了加速資料查詢,Doris可以為某些字段添加Bitmap索引。

  • 有序字典:有序儲存一列中所有的不同取值。
  • 字典值的位圖:儲存有序字典中每一個取值的位圖,表示字典值在列中的行号。
Drois

3. Footer

        Footer資訊段在檔案的尾部,存儲了檔案的整體結構,包括資料域的位置,索引域的位置等資訊,其中有SegmentFooterPB,CheckSum,Length,MAGIC CODE 4個部分。

Drois

索引的查詢流程:

        在查詢一個 Segment 中的資料時,根據執行的查詢條件,會對首先根據字段加索引的情況對資料進行過濾。然後在進行讀取資料,整體的查詢流程如下:

Drois
  1. 首先,會按照 Segment 的行數建構一個 row_bitmap,表示記錄那些資料需要進行讀取,沒有使用任何索引的情況下,需要讀取所有資料。
  2. 當查詢條件中按字首索引規則使用到了 key 時,會先進行 ShortKey Index 的過濾,可以在 ShortKey Index 中比對到的 ordinal 行号範圍,合入到 row_bitmap 中。
  3. 當查詢條件中列字段存在 BitMap Index 索引時,會按照 BitMap 索引直接查出符合條件的 ordinal 行号,與 row_bitmap 求交過濾。這裡的過濾是精确的,之後去掉該查詢條件,這個字段就不會再進行後面索引的過濾。
  4. 當查詢條件中列字段存在 BloomFilter 索引并且條件為等值(eq,in,is)時,會按 BloomFilter 索引過濾,這裡會走完所有索引,過濾每一個 Page 的 BloomFilter,找出查詢條件能命中的所有 Page。将索引資訊中的 ordinal 行号範圍與 row_bitmap 求交過濾。
  5. 當查詢條件中列字段存在 ZoneMap 索引時,會按 ZoneMap 索引過濾,這裡同樣會走完所有索引,找出查詢條件能與 ZoneMap 有交集的所有 Page。将索引資訊中的 ordinal 行号範圍與 row_bitmap 求交過濾。
  6. 生成好 row_bitmap 之後,批量通過每個 Column 的 OrdinalIndex 找到到具體的 Data Page。
  7. 批量讀取每一列的 Column Data Page 的資料。在讀取時,對于有 null 值的 page,根據 null 值位圖判斷目前行是否是 null,如果為 null 進行直接填充即可。

四、分區方式

        在 Doris 的存儲引擎中,使用者資料被水準劃分為若幹個資料分片(Tablet,也稱作資料分桶)。每個 Tablet 包含若幹資料行。各個 Tablet 之間的資料沒有交集,并且在實體上是獨立存儲的。

        多個 Tablet 在邏輯上歸屬于不同的分區(Partition)。一個 Tablet 隻屬于一個 Partition。而一個 Partition 包含若幹個 Tablet。因為 Tablet 在實體上是獨立存儲的,是以可以視為 Partition 在實體上也是獨立。Tablet 是資料移動、複制等操作的最小實體存儲單元。若幹個 Partition 組成一個 Table。Partition 可以視為是邏輯上最小的管理單元。資料的導入與删除,都可以或僅能針對一個 Partition 進行。

Drois

分區與分桶

        Doris 支援兩層的資料劃分。第一層是 Partition,支援 Range 和 List 的劃分方式。第二層是 Bucket(Tablet),僅支援 Hash 的劃分方式。

也可以僅使用一層分區。使用一層分區時,隻支援 Bucket 劃分。

1. Partition

  • Partition 列可以指定一列或多列。分區類必須為 KEY 列。多列分區的使用方式在後面 多列分區 小結介紹。
  • 不論分區列是什麼類型,在寫分區值時,都需要加雙引号。
  • 分區數量理論上沒有上限。
  • 當不使用 Partition 建表時,系統會自動生成一個和表名同名的,全值範圍的 Partition。該 Partition 對使用者不可見,并且不可删改。

 2. Bucket

        分桶是更細粒度的劃分資料的方式。隻支援Hash的劃分方式。通過對分桶列進行hash操作,進而分散資料,将同一個hash值的資料存放在一個桶中。

  1. 如果使用了 Partition,則

    DISTRIBUTED ...

    語句描述的是資料在各個分區内的劃分規則。如果不使用 Partition,則描述的是對整個表的資料的劃分規則。
  2. 分桶列可以是多列,但必須為 Key 列。分桶列可以和 Partition 列相同或不同。
  3. 分桶列的選擇,是在 查詢吞吐 和 查詢并發:如果選擇多個分桶列,則資料分布更均勻。如果一個查詢條件不包含所有分桶列的等值條件,那麼該查詢會觸發所有分桶同時掃描,這樣查詢的吞吐會增加,單個查詢的延遲随之降低。這個方式适合大吞吐低并發的查詢場景。如果僅選擇一個或少數分桶列,則對應的點查詢可以僅觸發一個分桶掃描。此時,當多個點查詢并發時,這些查詢有較大的機率分别觸發不同的分桶掃描,各個查詢之間的IO影響較小(尤其當不同桶分布在不同磁盤上時),是以這種方式适合高并發的點查詢場景。
  4. 分桶的數量理論上沒有上限。

關于 Partition 和 Bucket 的數量和資料量的建議。

1. 一個表的 Tablet 總數量等于 (Partition num * Bucket num)。

2. 一個表的 Tablet 數量,在不考慮擴容的情況下,推薦略多于整個叢集的磁盤數量。

3. 單個 Tablet 的資料量理論上沒有上下界,但建議在 1G - 10G 的範圍内。如果單個 Tablet 資料量過小,則資料的聚合效果不佳,且中繼資料管理壓力大。如果資料量過大, 則不利于副本的遷移、補齊,且會增加 Schema Change 或者 Rollup 操作失敗重試的代價 (這些操作失敗重試的粒度是 Tablet)。

4. 當 Tablet 的資料量原則和數量原則沖突時,建議優先考慮資料量原則。

5. 在建表時,每個分區的 Bucket 數量統一指定。但是在動态增加分區時可以單獨指定新分區的 Bucket 數量。可以利用這個功能友善的應對資料縮小或膨脹。

6. 一個 Partition 的 Bucket 數量一旦指定,不可更改。是以在确定 Bucket 數量時, 需要預先考慮叢集擴容的情況。比如目前隻有 3 台 host,每台 host 有 1 塊盤。如果 Bucket 的數量隻設定為 3 或更小,那麼後期即使再增加機器,也不能提高并發度。

        舉例:假設在有 10 台 BE,每台 BE 一塊磁盤的情況下。如果一個表總大小為 500MB,則可以考慮 4-8 個分片。5GB:8-16 個。50GB:32 個。500GB:建議分區, 每個分區大小在 50GB 左右,每個分區 16-32 個分片。5TB:建議分區,每個分區大小在 50GB 左右,每個分區 16-32 個分片。

五、元資訊管理

        Doris的FE需要管理元資訊,BE隻是負責計算,無需儲存元資訊。

        Doris 的中繼資料是全記憶體的。每個 FE 記憶體中,都維護一個完整的中繼資料鏡像。FE 節點分為 follower 和 observer 兩類。各個 FE 之間,通過 bdbje(BerkeleyDB Java Edition)進行 leader 選舉,資料同步等 工作。follower 節點通過選舉,其中一個 follower 成為 leader 節點,負責中繼資料的寫入操作。當 leader 節點當機後,其他 follower 節點會重新選舉出一個 leader,保證服務的高可用。observer 節點 僅從 leader 節點進行中繼資料同步,不參與選舉。可以橫向擴充以提供中繼資料的讀服務的擴充性。

六、資料導入

        導入(Load)功能就是将使用者的原始資料導入到 Doris 中。導入成功後,使用者即可通過 Mysql 用戶端查詢資料。為适配不同的資料導入需求,Doris 系統提供了 6 種不同的導入方式。每種導入方式支援不同的資料源,存在不同的使用方式(異步,同步)。

1. Broker load

        通過 Broker 程序通路并讀取外部資料源(如 HDFS)導入到 Doris。使用者通過 Mysql協定送出導入作業後,異步執行。通過 SHOW LOAD 指令檢視導入結果。

2. Stream load

        使用者通過 HTTP 協定送出請求并攜帶原始資料建立導入。主要用于快速将本地檔案或資料流中的資料導入到 Doris。導入指令同步傳回導入結果。

3. Insert

        類似 MySQL 中的 Insert 語句,Doris 提供 INSERT INTO tbl SELECT ...; 的方式從 Doris 的表中讀取資料并導入到另一張表。或者通過 INSERT INTO tbl VALUES(...);插入單 條資料。

4. Multi load

        使用者通過 HTTP 協定送出多個導入作業。Multi Load 可以保證多個導入作業的原子生 效。

5. Routine load

        使用者通過 MySQL 協定送出例行導入作業,生成一個常駐線程,不間斷的從資料源(如Kafka)中讀取資料并導入到 Doris 中。

6. 通過 S3 協定直接導入

        使用者通過 S3 協定直接導入資料,用法和 Broker Load 類似。Broker load 是一個異步的導入方式,支援的資料源取決于 Broker 程序支援的資料源。 使用者需要通過 MySQL 協定建立 Broker load 導入,并通過檢視導入指令檢查導入結果。

        所有導入方式都支援 csv 資料格式。其中Broker load還支援parquet和orc 資料格式。不同的導入方式适用于不同的場景。一下主要介紹Broker load和Stream load兩種常見的導入方式:

1.Stream load

        Stream load 是一個同步的導入方式,使用者通過發送 HTTP 協定發送請求将本地檔案或 資料流導入到 Doris 中。Stream load 同步執行導入并傳回導入結果。使用者可直接通過請求 的傳回體判斷本次導入是否成功。

        Stream load 主要适用于導入本地檔案,或通過程式導入資料流中的資料。目前 Stream Load 支援兩個資料格式:CSV(文本)和 JSON。下圖展示了 Stream load 的主要流程,省略了一些導入細節。

Drois

        Stream load 中,Doris 會標明一個節點作為 Coordinator 節點。該節點負責接資料并分發資料到其他資料節點。使用者通過 HTTP 協定送出導入指令。如果送出到 FE,則 FE 會通過 HTTP redirect 指 令将請求轉發給某一個 BE。使用者也可以直接送出導入指令給某一指定 BE。導入的最終結果由 Coordinator BE 傳回給使用者。

資料導入的流程如下:

  1. 使用者選擇一台BE作為協調者Coordinator,發起資料導入請求,傳入資料格式,資料源和辨別此次資料導入的label,label用于避免資料重複導入。如果送出到 FE,則 FE 會通過 HTTP redirect 指令将請求轉發給某一個 BE;
  2. Coordinator收到請求後,向FE master節點上報,執行loadTxnBegin, 建立全局事務。 因為導入過程中,需要同時更新base表和物化索引的多個bucket, 為了保證資料導入的一緻性,用事務控制本次導入的原子性;
  3. Coordinator建立事務成功後, 執行streamLoadPut調用, 從FE獲得本次資料導入的計劃。 資料導入, 可以看成是将資料分發到所涉及的全部的tablet副本上, BE從FE擷取的導入計劃包含資料的schema資訊和tablet副本資訊;
  4. Coordinator從資料源拉取資料, 根據base表和物化索引表的schema資訊, 構造内部資料格式;
  5. Coordinator根據分區分桶的規則和副本位置資訊,将發往同一個BE的資料,批量打包,發送給BE,BE收到資料後,将資料寫入到對應的tablet副本中;
  6. 當Coordinator節點完成此次資料導入,向FE master節點執行loadTxnCommit,送出全局事務,發送本次資料導入的執行情況,FE master确認所有涉及的tablet的多數副本都成功完成,則釋出本次資料導入使資料對外可見,否則導入失敗,資料不可見,背景負責清理掉不一緻的資料;

 2. Broker load

       Broker load 是一個異步的導入方式,支援的資料源取決于 Broker 程序支援的資料源。使用者需要通過 MySQL 協定 建立 Broker load 導入,并通過檢視導入指令檢查導入結果。Broker 以插件的形式,獨立于 Doris 部署。如果需要從第三方存儲系統導入資料,需要部署相應的 broker。

Drois

        使用者在送出導入任務後,FE 會生成對應的 Plan 并根據目前 BE 的個數和檔案的大小,将 Plan 分給多個 BE 執行,每個 BE 執行一部分導入資料。BE 在執行的過程中會從 Broker 拉取資料,在對資料 transform 之後将資料導入系統。所有 BE 均完成導入,由 FE 最終決定導入是否成功。

七、資料查詢

        使用者可使用MySQL用戶端連接配接FE,執行SQL查詢, 獲得結果。

  1. MySQL用戶端執行DQL SQL指令。
  2. FE解析, 分析, 改寫, 優化和規劃, 生成分布式執行計劃。
  3. 分布式執行計劃由 若幹個可在單台be上執行的plan fragment構成, FE執行exec_plan_fragment, 将plan fragment分發給BE,指定其中一台BE為coordinator。
  4. BE執行本地計算, 比如掃描資料。
  5. 其他BE調用transimit_data将中間結果發送給BE coordinator節點彙總為最終結果。
  6. FE調用fetch_data擷取最終結果。
  7. FE将最終結果發送給MySQL client。

        執行計劃在BE上的實際執行過程比較複雜,,采用向量化執行方式,比如一個算子産生4096個結果,輸出到下一個算子參與計算,而非batch方式或者one-tuple-at-a-time。

Drois

八、Drois SQL 原了解析

        SQL解析在下文中指的是将一條sql語句經過一系列的解析最後生成一個完整的實體執行計劃的過程。

        Doris SQL解析具體包括了五個步驟:詞法分析,文法分析,生成單機邏輯計劃,生成分布式邏輯計劃,生成實體執行計劃。

具體代碼實作上包含以下五個步驟:Parse、Analyze、SinglePlan、DistributedPlan、Schedule。

Drois

下圖展示了一個簡單的查詢SQL在Doris的解析實作:

Drois

1. Parse階段

詞法分析采用jflex技術,文法分析采用java cup parser技術,最後生成抽象文法樹(Abstract Syntax Tree)AST,這些都是現有的、成熟的技術,在這裡不進行詳細介紹。

        AST是一種樹狀結構,代表着一條SQL。不同類型的查詢select, insert, show, set, alter table, create table等經過Parse階段後生成不同的資料結構(SelectStmt, InsertStmt, ShowStmt, SetStmt, AlterStmt, AlterTableStmt, CreateTableStmt等),但他們都繼承自Statement,并根據自己的文法規則進行一些特定的處理。例如:對于select類型的 sql, Parse之後生成了SelectStmt結構。

        SelectStmt結構包含了SelectList,FromClause,WhereClause,GroupByClause, SortInfo等結構。這些結構又包含了更基礎的一些資料結構,如WhereClause包含了BetweenPredicate (between表達 式), BinaryPredicate(二進制表達式), CompoundPredicate(and or組合表達式), InPredicate(in表達式)等。

2. Analyze階段

抽象文法樹是由StatementBase這個抽象類表示。這個抽象類包含一個最重要的成員函數analyze(),用來執行Analyze階段要做的事。

        不同類型的查詢select, insert, show, set, alter table, create table等經過Parse階段後生成不同的資料結構(SelectStmt, InsertStmt, ShowStmt, SetStmt, AlterStmt, AlterTableStmt, CreateTableStmt等),這些資料結構繼承自StatementBase,并實作analyze()函數,對特定類型的SQL進行特定的 Analyze。

例如:select類型的查詢,會轉成對select sql的子語句SelectList, FromClause, GroupByClause, HavingClause, WhereClause, SortInfo等的analyze()。然後這些子語句再各自對自己的子結構進行進一步的analyze(),通過層層疊代,把各種類型的sql的各種 情景都分析完畢。例如:WhereClause進一步分析其包含的BetweenPredicate(between表達式), BinaryPredicate(二進制表達式), CompoundPredicate(and or組合表達式), InPredicate(in表達式)等。

3. 生成單機邏輯Plan階段

這部分工作主要是根據AST抽象文法樹生成代數關系,也就是俗稱的算子數。樹上的每個節點都是一個算子,代表着一種操作。

        ScanNode代表着對一個表的掃描操作,将一個表的資料讀出來。HashJoinNode代表着join操作,小表在記憶體中建構哈希表,周遊大表找到連接配接鍵相同的值。Project表示投影操作,代表着最後需要輸出的列,圖檔表示隻用輸出citycode這一列。

Drois

4. 生成分布式Plan階段

        有了單機的PlanNode樹之後,就需要進一步根據分布式環境,拆成分布式PlanFragment樹(PlanFragment用來表示獨立的執行單元),畢竟一個表的資料分散地存儲在多台主機上,完全可以讓一些計算并行起來。

        這個步驟的主要目标是最大化并行度和資料本地化。主要方法是将能夠并行執行的節點拆分出去單獨建立一個PlanFragment,用 ExchangeNode代替被拆分出去的節點,用來接收資料。拆分出去的節點增加一個DataSinkNode,用來将計算之後的資料傳送到 ExchangeNode中,做進一步的處理。

        這一步采用遞歸的方法,自底向上,周遊整個PlanNode樹,然後給樹上的每個葉子節點建立一個PlanFragment,如果碰到父節點,則考 慮将其中能夠并行執行的子節點拆分出去,父節點和保留下來的子節點組成一個parent PlanFragment。拆分出去的子節點增加一個父節點DataSinkNode組成一個child PlanFragment,child PlanFragment指向parent PlanFragment。這樣就确定了資料的流動方向。

5. Schedule階段

這一步是根據分布式邏輯計劃,建立分布式實體計劃。主要解決以下問題:

  •     哪個 BE 執行哪個 PlanFragment
  •     每個 Tablet 選擇哪個副本去查詢
  •     如何進行多執行個體并發

九、SQL相容性

        Doirs的sql符合sql标準,另外join時無需感覺資料的分布。Doris還支援shuffer join和不相關子查詢。Doris在建表時指定兩表的分布時,也支援colocation join,比較适合分桶方式相同的兩個大表的join。

Drois

1. Colocation Join

        Colocation Join 功能,是将一組擁有相同 CGS 的 Table 組成一個 CG。并保證這些 Table 對應的資料分片會落在同一個 BE 節點上。使得當 CG 内的表進行分桶列上的 Join 操作時,可以通過直接進行本地資料 Join,減少資料在節點間的傳輸耗時。

一個表的資料,最終會根據分桶列值 Hash、對桶數取模的後落在某一個分桶内。假設一個 Table 的分桶數為 8,則共有

[0, 1, 2, 3, 4, 5, 6, 7]

8 個分桶(Bucket),我們稱這樣一個序列為一個

BucketsSequence

。每個 Bucket 内會有一個或多個資料分片(Tablet)。當表為單分區表時,一個 Bucket 内僅有一個 Tablet。如果是多分區表,則會有多個。

為了使得 Table 能夠有相同的資料分布,同一 CG 内的 Table 必須保證以下屬性相同:

  1. 分桶列和分桶數。

    分桶列,即在建表語句中

    DISTRIBUTED BY HASH(col1, col2, ...)

    中指定的列。分桶列決定了一張表的資料通過哪些列的值進行 Hash 劃分到不同的 Tablet 中。同一 CG 内的 Table 必須保證分桶列的類型和數量完全一緻,并且桶數一緻,才能保證多張表的資料分片能夠一一對應的進行分布控制。
  2. 副本數。

    同一個 CG 内所有表的所有分區(Partition)的副本數必須一緻。如果不一緻,可能出現某一個 Tablet 的某一個副本,在同一個 BE 上沒有其他的表分片的副本對應。

    同一個 CG 内的表,分區的個數、範圍以及分區列的類型不要求一緻。

    在固定了分桶列和分桶數後,同一個 CG 内的表會擁有相同的 BucketsSequence。而副本數決定了每個分桶内的 Tablet 的多個副本,存放在哪些 BE 上。假設 BucketsSequence 為

    [0, 1, 2, 3, 4, 5, 6, 7]

    ,BE 節點有

    [A, B, C, D]

    4個。則一個可能的資料分布如下:
Drois

 CG 内所有表的資料都會按照上面的規則進行統一分布,這樣就保證了,分桶列值相同的資料都在同一個 BE 節點上,可以進行本地資料 Join。

2. Bucket Shuffle Join

Doris支援的正常分布式Join方式包括了shuffle join 和 broadcast join。這兩種 join 都會導緻不小的網絡開銷:

舉個例子,目前存在A表與B表的Join查詢,它的Join方式為HashJoin,不同Join類型的開銷如下:

  • Broadcast Join: 如果根據資料分布,查詢規劃出A表有3個執行的HashJoinNode,那麼需要将B表全量的發送到3個HashJoinNode,那麼它的網絡開銷是

    3B

    ,它的記憶體開銷也是

    3B

  • Shuffle Join: Shuffle Join會将A,B兩張表的資料根據哈希計算分散到叢集的節點之中,是以它的網絡開銷為

    A + B

    ,記憶體開銷為

    B

        在 FE 之中儲存了 Doris 每個表的資料分布資訊,如果 join 語句命中了表的資料分布列,我們應該使用資料分布資訊來減少 join 語句的網絡與記憶體開銷,這就是 Bucket Shuffle Join 的思路來源。

Drois

        上面的圖檔展示了 Bucket Shuffle Join 的工作原理。SQL 語句為 A 表 join B 表,并且 join 的等值表達式命中了 A 的資料分布列。而 Bucket Shuffle Join 會根據 A 表的資料分布資訊,将 B 表的資料發送到對應的 A 表的資料存儲計算節點。Bucket Shuffle Join 開銷如下:

  • 網絡開銷:

    B < min(3B, A + B)

  • 記憶體開銷:

    B <= min(3B, B)

        可見,相比于 Broadcast Join 與 Shuffle Join, Bucket Shuffle Join 有着較為明顯的性能優勢。減少資料在節點間的傳輸耗時和 Join 時的記憶體開銷。相對于 Doris 原有的 Join 方式,它有着下面的優點:

  • Bucket-Shuffle-Join 降低了網絡與記憶體開銷,使一些 Join 查詢具有了更好的性能。尤其是當 FE 能夠執行左表的分區裁剪與桶裁剪時。
  • 同時與 Colocate Join 不同,它對于表的資料分布方式并沒有侵入性,這對于使用者來說是透明的。對于表的資料分布沒有強制性的要求,不容易導緻資料傾斜的問題。
  • 它可以為 Join Reorder 提供更多可能的優化空間。

3. Runtime Filter

        Runtime Filter 在查詢規劃時生成,在 HashJoinNode 中建構,在 ScanNode 中應用。

舉個例子,目前存在 T1表與 T2表的 Join 查詢,它的 Join 方式為 HashJoin,T1是一張事實表,資料行數為100000,T2是一張次元表,資料行數為2000,Doris join 的實際情況是:

        顯而易見對 T2掃描資料要遠遠快于 T1,如果我們主動等待一段時間再掃描 T1,等 T2将掃描的資料記錄交給 HashJoinNode 後,HashJoinNode 根據 T2的資料計算出一個過濾條件,例如 T2資料的最大和最小值,或者建構一個 Bloom Filter,接着将這個過濾條件發給等待掃描 T1的 ScanNode,後者應用這個過濾條件,将過濾後的資料交給 HashJoinNode,進而減少 probe hash table 的次數和網絡開銷,這個過濾條件就是 Runtime Filter,效果如下:

Drois

        如果能将過濾條件(Runtime Filter)下推到存儲引擎,則某些情況下可以利用索引來直接減少掃描的資料量,進而大大減少掃描耗時,效果如下:

Drois

        可見,和謂詞下推、分區裁剪不同,Runtime Filter 是在運作時動态生成的過濾條件,即在查詢運作時解析 join on clause 确定過濾表達式,并将表達式廣播給正在讀取左表的 ScanNode,進而減少掃描的資料量,進而減少 probe hash table 的次數,避免不必要的 I/O 和網絡傳輸。

Runtime Filter 主要用于優化針對大表的 join,如果左表的資料量太小,或者右表的資料量太大,則 Runtime Filter 可能不會取得預期效果。 

參考:雲資料倉庫 Doris 版 Colocation Join-操作指南-文檔中心-騰訊雲-騰訊雲