天天看點

揭秘位元組跳動解決ClickHouse複雜查詢問題的技術方案

作者:DataFunTalk

導讀:ClickHouse已經成為行業主流且熱門的開源引擎。随着業務資料量擴大,場景覆寫變廣泛,在複雜query場景下,ClickHouse容易存在查詢異常問題,影響業務正常推進。本次主要分享位元組跳動如何解決ClickHouse複雜查詢問題,并詳細解讀技術實作細節,目前該能力已經通過火山引擎ByteHouse面向開發者輸出。

全文将圍繞以下幾方面展開:

  • 項目背景
  • 技術方案
  • 優化與診斷
  • 效果及展望

--

01

項目背景

1. ClickHouse執行模式

ClickHouse 的執行模式相對比較簡單,和Druid、ES 類似,其基本查詢模式分為兩個階段:

揭秘位元組跳動解決ClickHouse複雜查詢問題的技術方案

第一階段,Coordinator 收到查詢後将請求發送給對應的 worker 節點;

第二階段,Coordinator 收到各個 worker 節點的結果後彙聚起來處理後傳回。

以下面的SQL為例:

Select name from student_distribute where id = 5

①當 Coordinator 收到請求後,由于student_distribute是一個分布式表,是以需要将SQL 改寫為對local表查詢,并轉發請求給每一個shard的worker;

②Worker收到請求後查詢本地的local表資料,傳回結果給coordinator;

③Coordinator彙總每一個shard的資料并把結果傳回給client。

Select name from student_local where id = 5

第二階段執行的模式能夠高效地支援很多常見場景,比如常見的針對大寬表的各類查詢,但是随着業務場景的複雜化,也存在以下三點問題:

其一,第一階段傳回的資料比較多且第二階段的計算比較複雜時,對于Coordinator的壓力會比較大,容易成為query的瓶頸,且shard越多可能計算越慢,瓶頸越大。例如一些重計算的agg算子count distinct。如果我們使用hash表去重時,第二階段需要在coordinator單機上merge各個worker的hash表,計算量很重且不能并行;又比如說group by基數比較大或者window計算。

其二,join是SQL的重要場景。由于不支援Shuffle操作,對于Join來說右表必須是全量資料。無論是普通Join還是Global Join,當Join的右表比較大時都放到記憶體裡容易OOM,而Spill到磁盤雖然解決記憶體問題,可能會因為有磁盤 io和序列化計算的開銷影響性能。特别是當Join為最常見的Hash Join 時,右表如果是大表建構也比較慢。雖然社群最近也做了一些右表建構的優化,通過單機按照 join key split 來達到并行建構hash table。但是額外的代價是左右表都增加了一次 split 操作。

其三,對于複雜查詢(如多表 Join、嵌套多個子查詢、window function等)的支援并不友好,由于不能通過shuffle來分散資料,生成的pipeline在一些case下不能充分并行,難以充分發揮叢集的全部資源。

2. 其他MMP資料庫

目前主流的MPP資料庫基本都支援Stage執行的方式。以Presto為例,如下圖所示,一個兩表join的agg sql可拆分為5個 Stage。

揭秘位元組跳動解決ClickHouse複雜查詢問題的技術方案

其中 Stage3、Stage4分别對應左右表資料讀取,Stage2完成兩表Join和partial agg 計算,Stage1完成final agg計算,Stage0收集Stage1的資料後彙總和輸出。在這個過程中,Stage 3、4、2、1可以在多個節點上并行執行,單個複雜的query被拆分成若幹Stage,進而實作了Stage之間,不同worker的資料傳輸。

3. 業務背景和目标

随着業務複雜程度提高,業務并不希望所有的資料都通過etl 産生大寬表;複雜查詢(特别是多輪分布式 Join和比較多的agg)的需求越來越強烈,而整體的資料量又在不斷增長。在叢集資源有限的情況下,我們希望能夠充分利用機器資源,基于ClickHouse 高效地支援複雜查詢。

ByteHouse是位元組跳動研發同學基于開源ClickHouse 進行了深度優化和改造的版本,提供海量資料上更強的查詢服務和資料寫入性能,支援多種應用場景。如圖所示,ByteHouse在内部多個場景如行為分析、畫像分析、智能營銷分析、APP 日志分析上得到充分的驗證和使用,并在多個方面進行了增強,具備特有的能力。

揭秘位元組跳動解決ClickHouse複雜查詢問題的技術方案

--

02

技術方案

1. 設計思想

基于 ClickHouse 的複雜查詢的實作采用分Stage的方式,替換目前 ClickHouse的兩階段執行方式。類似其他分布式資料庫引擎(如 Presto、Impala 等),将一個複雜的Query按照資料交換情況切分成多個Stage,Stage和Stage之間通過 exchange完成資料的交換,單個Stage内不存在資料交換。Stage間的資料交換主要有以下三種形式:

①按照單(多)個 key 進行 Shuffle(shuffle)

②由1個或者多個節點彙聚到一個節點(我們稱為 gather)

③同一份資料複制到多個節點(也稱為 broadcast 或者說廣播)

按照不同的功能切分不同的子產品,設計目标如下:

①各個子產品約定好接口,盡量減少彼此的依賴和耦合。一旦某個子產品有變動不會影響别的子產品,例如Stage生成邏輯的調整不影響排程的邏輯。

②子產品采用插件的架構,允許子產品根據配置靈活支援不同的政策。

2. 相關術語

  • ExchangeNode 在文法樹中表示資料交換的節點
  • PlanSegment 單個 Stage 對應的執行的計劃片段
  • ExchangeManager 管理資料的 exchange,負責不同 Stage 節點之間的資料交換
  • SegmentScheduler 計劃片段排程器,負責下發計劃片段給 worker,由 Coordinator 節點調用
  • InterpreterPlanSegment 計劃片段執行器,執行一個具體的計劃片段
揭秘位元組跳動解決ClickHouse複雜查詢問題的技術方案

3. 執行流程

①Coordinator 接受複雜查詢後,在目前 ClickHouse 文法樹的基礎上,根據節點類型和資料分布情況插入 Exchange 節點并生成分布式 Plan。

②Coordinator 根據 Exchange Node 類型,切分分布式 Plan 生成每個 Stage 的執行片段 PlanSegment。

③Coordinator 調用 SegmentScheduler 将各階段的 PlanSegment 發送到 Worker 節點。

④Worker 節點接受 PlanSegment 通過 InterpreterPlanSegment 完成資料的讀取和執行,通過 ExchangeManager 完成資料的互動。

⑤Coordinator 從最後一輪 Stage 對應節點的 ExchangeManager 讀取資料後處理後傳回給 client。

4. Plan切分

下面是一個Plan切分的例子,這是1個2表Join的查詢場景,根據Exchange資訊,将整個分布式 Plan切分成4個Stage。

揭秘位元組跳動解決ClickHouse複雜查詢問題的技術方案

5. 查詢片段排程器(SegmentScheduler)

查詢片段排程器SegmentScheduler 根據上下遊依賴關系和資料分布,以及 Stage 并行度和worker 分布和狀态資訊,按照一定的排程政策,将 PlanSemgent 發給不同的 Worker 節點。

揭秘位元組跳動解決ClickHouse複雜查詢問題的技術方案

目前支援的2種政策是:

①依賴排程:根據 Stage 依賴關系定義拓撲結構,産生 DAG 圖,根據 DAG 圖排程 stage,類似于拓撲排序,等到依賴的 Stage 啟動後再啟動新的 Stage。例如剛才的兩表 join,會先排程左右表讀取 stage,再排程 join stage。

②AllAtOnce:類似于Presto的AllAtOnce政策,會先計算每一個 Stage 的相關資訊,一次性排程所有的Stage。

相比而言,這兩種政策是在容錯、資源使用和延時上做取舍。

第一種排程政策可以實作更好的容錯,由于 ClickHouse 可以有多個副本,目前一個 Stage 部分節點連接配接失敗時可以嘗試切換到副本節點,對後續依賴 stage 無感覺。這裡指的是讀資料的 Stage,我們稱為 Source Stage,非 Source Stage 因為沒有資料依賴,容錯能力會更強,隻要保證并行度的節點數即可,甚至極端情況下可以降低 stage 并行度來支援更好的容錯。缺點是排程有依賴,不能完全并行,會增加排程時長,對于一些資料量和計算量小,但是 stage 多的節點排程延時可能會占 SQL 整體時間不小的比例。我們也做了一些針對性的優化,對于無依賴關系的盡可能支援并行。

第二種排程政策通過并行可以極大降低排程延時,為防止大量網絡 io 線程,我們通過異步化并且控制線程數目;這種政策的缺點是容錯性沒有依賴排程好,因為每一個 stage 的 worker 在排程前就已經确定,如果有一個 worker 出現連接配接異常則整個查詢會直接失敗。并且可能有一些 Stage 上遊資料還沒有 Ready 就被排程執行了,需要長時間等資料。例如 final agg stage,需要等 partial agg 完成後才能拿到資料。雖然我們做了一些優化,并不會長時間空跑浪費 cpu 資源,但是畢竟也消耗了一部分資源,比如建立了執行的線程。

6. 查詢片段執行器(InterpreterPlanSegment)

下面介紹下計劃片段是如何執行的,原本 ClickHouse的查詢和節點執行主要是 SQL 形式,切分Stag後需要支援執行一個單獨的PlanSemgent。是以 InterpreterPlanSegment 的主要功能就是接受一個序列化後的 PlanSemgent,能夠在 Worker 節點上運作整個 PlanSemgent 的邏輯。主要的步驟為:

①根據 input 資訊讀取資料,如果 input 是具體的 table,則從本地讀取資料;如果 input 是一個 exchange input,則從對應的 ExchangeManager 讀取資料;

②執行 PlanSemgent 的邏輯;

③輸出處理後的結果資料,如果是 Coordinator 節點,就将資料發給 Client;如果是非Coordinator 節點,就按照資料的exchange方式寫給本執行個體對應的 ExchangeManager。

Interpreter部分我們盡量複用目前ClickHouse的執行邏輯,例如processor 執行方式,process list管理等等。相比于InterpreterSelect邏輯要更簡單一些,可以認為1 個Stage隻有1個階段。當然我們也做了很多功能和性能的增強,例如我們支援1個 stage處理多個join等,這樣可以減少stage數目和不必要的資料傳輸,在一張大表(通常情況下是事實表) join 多個次元表的場景有比較好的幫助。

InterpreterPlan Segment執行完會向coordinator上報對應的狀态資訊。執行異常的時候會将異常資訊報告給查詢片段排程器,取消Query其他worker的執行。

7. 資料交換(ExchangeManager)

揭秘位元組跳動解決ClickHouse複雜查詢問題的技術方案

ExchangeManager是PlanSegment資料交換的媒介,更是平衡資料上下遊處理能力的重要元件。整體上采用 push 的方式,當上遊資料 ready 時主動推送給下遊,并支援反壓。其架構如下圖所示:

揭秘位元組跳動解決ClickHouse複雜查詢問題的技術方案

具體的流程如下:

①下遊PlanSegment執行時,當input為exchange input時,根據一定的 token 規則 (通常由 query_id+segment_id+index_id 等組成)和資料 source 資訊,向上遊 ExchangeManager 注冊對應的資料請求;

②上遊ExchangeManager收到請求後,建立上下遊資料通道,并将上遊的資料推送到下遊,如果通道一直建立不了會 block 上遊的執行。

在這個過程中,上下遊都會通過隊列來優化發送和讀取,當隊列飽和的時候通過反壓的機制控制上遊的執行速度。由于采用了 push 和隊列,這裡我們要考慮一個特殊的場景,在某些 case 下下遊的 Stage 并不需要讀取全部的上遊資料,一個典型的場景是 limit。例如 limit 100,下遊 stage 是需要讀取 100 條資料即可,而上遊可能會輸出更大規模的資料,是以在這種情況下,當下遊 stage 讀到足夠的資料後,需要能主動取消上遊資料的執行并清空隊列。這是一個特定場景的優化,能夠大大加速查詢時間。

ExchangeManager 需要考慮和優化的點還有:

①細粒度的記憶體控制,能夠按照執行個體、query、segment 多層次進行記憶體控制,避免 OOM,更長期的考慮是支援 spill 到磁盤上,降低對記憶體的使用。為了提升傳輸效率,小資料需要進行 merge,大資料要 split。同時,網絡處理在某些場景要保證有序性,比如 sort 時,partial sort 和 merge sort 的網絡傳輸必須有序,否則資料可能是有問題的。

②連接配接複用和網絡優化,包括針對上下遊在同一個節的場景下選擇走記憶體的交換不走網絡,可以減少網絡的開銷和減少資料序列化、反序列化的代價。另外,由于 ClickHouse 在計算方面做了非常充足的優化,有些場景下甚至記憶體帶寬成為瓶頸,我們在ExchangeManager的一些場景上也應用zero copy等技術來減少記憶體的拷貝。

③異常處理和監控,相比于單機執行,分布式情況下異常情況更複雜且不好感覺。通過重試能避免一些節點的暫時高負載或者異常,以及出問題時能夠快速感覺、排查和做針對性解決和優化。這裡的工程實踐更多一些。

--

03

優化與診斷

1. Join 多種實作

根據資料的規模和分布,我們支援了多種Join實作,目前已經支援的有:

①Shuffle Join,最通用的 Join;

②Broadcast Join,針對大表Join小表的場景,通過把右表廣播到左表的所有 worker 節點來減少左表的傳輸;

③Colocate Join,針對左右表根據Join key保持相通分布的場景,減少左右表資料傳輸。

2. 網絡連接配接優化

網絡連接配接的優化的核心本質就是減少連接配接的使用。特别是資料需要Shuffle 的時候,下一輪 Stage的每一個節點需要從上一輪Stage的每一個節點拉取資料。當一個叢集的節點比較多的時候,如果存在比較多的複雜 Query(Stage多,并行度(節點數)比較大),叢集的Worker節點會建立非常多的連接配接,如下圖所示,單節點建立的連接配接數與叢集節點數、并發stage數成正比。

揭秘位元組跳動解決ClickHouse複雜查詢問題的技術方案

位元組内部的clickhouse叢集規模非常大,最大的叢集(單叢集幾千台規模)在目前 ClickHouse 的執行模式下單機最大可能會建立上幾萬個網絡連接配接。是以如果支援複雜 Query 執行,由于stage變多了,需要優化網絡連接配接,特别是支援連接配接複用。我們通過盡可能複用連接配接,在不同節點之間隻會建立固定數目的連接配接,不同的查詢會複用這些連接配接,不随 query 和 stage 的規模而增長。

3. 網絡傳輸優化

在資料中心領域,遠端直接記憶體通路(RDMA)是一種繞過遠端主機作業系統核心通路其記憶體中資料的技術,由于不經過作業系統,不僅節省了大量CPU資源,同樣也提高了系統吞吐量、降低了系統的網絡通信延遲,尤其适合在大規模并行計算機叢集中有廣泛應用。

由于ClickHouse在計算層面做了很多優化,而網絡帶寬相比于記憶體帶寬要小不少,在一些資料量傳輸特别大的場景,網絡傳輸會成為一定的瓶頸。為了提升網絡傳輸的效率和提升資料exchange的吞吐,一方面我們引入壓縮來降低傳輸資料量,另一方面我們引入 RDMA 來減少一定的開銷。經過測試,在一些資料傳輸量大的場景,有不小的收益。

4. Runtime Filter

Join算子通常是OLAP引擎中最耗時的算子。如果想優化 Join 算子,可以有兩種思路,一方面可以提升Join算子的性能,例如更好的Hash Table實作和Hash算法,以及更好的并行。另一方面可以盡可能減少參與Join計算的資料。

Runtime Filter在一些場景,特别是事實表join次元表的星型模型場景下會有比較大的效果。因為這種情況下通常事實表的規模比較大,而大部分過濾條件都在次元表上,事實表可能要全量join次元表。Runtime Filter的作用是通過在 Join 的 probe 端(就是左表)提前過濾掉那些不會命中Join的輸入資料來大幅減少 Join 中的資料傳輸和計算,進而減少整體的執行時間。以下圖為例:

揭秘位元組跳動解決ClickHouse複雜查詢問題的技術方案

左表并沒有直接過濾條件,右表帶有過濾條件item.proce > 1000。當完成右表查詢時,可以确定item.id 的範圍和集合,根據join類型inner join和join條件sales.item_id=item.id可以推斷出sales.item的範圍和集合。我們可以把sales.item 的範圍和集合作為一個過濾條件,在join前過濾sales的資料。

我們在複雜查詢上支援了Runtime Filter,目前主要支援minmax和bloomfilter。

總體執行流程如下:

①build plan segment worker(right table)會将生成的單節點 runtime filter 發送到coordinator節點;

②coordinator 在等待各個 worker的 runtime filter 都發送完成之後進行一次merge操作,将合并好的 runtime filter 分發到各個 execute plan segment worker(left table)節點中去;

③在 runtime filter 構造期間,execute plan segment(left table) 需要等待一定的時間,在逾時之前如果runtime filter已經下發,則通過 runtime filter 執行過濾。

這裡需要思考一個問題,Runtime filter column 是否建構索引(主鍵、skip index等)和命中prewhere?如果runtime filter的列(join column)建構了索引是需要重新生成 pipeline 的。因為命中索引後,可能會減少資料的讀取,pipeline并行度和對應資料的處理range都可能發生變化。如果runtime filter的列跟索引無關,可以在計劃生成的時候預先帶上過濾條件,隻不過一開始作為占位是空的,runtime filter下發的時候把占位資訊改成真正的過濾條件即可。這樣即使runtime filter 下發逾時了,查詢片段已經開始執行了,隻要查詢片段沒有執行完,之後的資料仍然可以進行過濾。

需要注意的是,runtime filter 是一種特殊場景下的優化,其針對的場景是右表資料量不大,且建構的 runtime filter 對左表有比較強的過濾效果。如果右表資料量比較大,建構runtime filter比較慢,或者對左表的資料過濾效果很差甚至沒有,那麼 runtime filter 反而會增加查詢的耗時。是以,要根據資料的特征和規模來決定是否開啟。

5. 診斷和分析

引入複雜查詢的多Stage 執行模型後,SQL的執行模式變得複雜了。特别是當使用者查詢一些非常複雜的查詢,幾百行的sql生成的stage會非常多,把stage都看一遍并了解sql的含義要花比較長的時間。題外話:我們很早之前就完整的跑通了所有的tpcds query,這裡面就有一些sql可能會産生幾十個 stage。那麼在這種情況下,如何定位 SQL 的瓶頸并加以優化是一個難題。

我們做了如下兩點優化:

首先,最常見的做法是增加各類完善的metrics,包括整個Query的執行時間和不同Stage的執行時間、IO資料量、算子處理資料和執行情況、算子 metrics 和profile event等。

其次,我們記錄了反壓資訊和上下遊隊列長度,以此來推斷 stage 執行情況和瓶頸。

坦率地說,SQL 場景包括萬象,很多非常複雜的場景目前還是需要對引擎比較熟悉的同學才能診斷和分析SQL才能給出優化建議。在不斷積累經驗的過程中,我們希望通過能夠不斷完善 metrics 和分析路徑,不斷減輕oncall的負擔,并且在某些場景下可以更智能地給出優化提示,這對于使用同學來說也是有好處的。

--

04

效果及展望

1. 複雜查詢效果

根據上面的執行模型的三個缺點,分别測試如下三個場景:

①第二階段的計算比較複雜

②Hash Join 右表為大表

③多表 Join

以SSB 1T資料作為資料集,叢集包含8個節點。

2. 第二階段的計算比較複雜

這個case SQL 如下圖所示:

揭秘位元組跳動解決ClickHouse複雜查詢問題的技術方案

uniqExact是count distinct的預設算法,采用hash table進行資料去重。使用複雜查詢後,query 執行時間從 8.514s=>2.198s,第二階段 agg uniqExact 算子的合并原本由 coordinator單點合并,現在通過按照group by key shuffle 後可以由多個節點并行完成。是以通過shuffle減輕了coordinator的merge agg 壓力。

3. Hash Join 右表為大表

這個 case 示範了右表是一個大表的場景,由于 ClickHouse 對多表的優化做的還不是很到位。這裡采用子查詢來下推過濾的條件。

揭秘位元組跳動解決ClickHouse複雜查詢問題的技術方案

在這個case中,采用複雜查詢模式後,query 執行時間從17.210=>1.749s。lineorder 是一張大表,通過shuffle可以将大表資料按照join key shuffle到每個worker節點,減少了右表建構的壓力。

4. 多表 Join

這個 case 是一個 5 表 join 的 case。

揭秘位元組跳動解決ClickHouse複雜查詢問題的技術方案

開啟複雜查詢模式後,query 執行時間從8.583s=>4.464s,所有的右表可同時開始資料讀取和建構。為了和現有模式對比,針對複雜查詢沒有開啟 runtime filter,開啟 runtime filter後效果會更快。

這裡還要重點說一下,今天的分享主要是從執行模式上講解如何支援複雜查詢。實際上,優化器對于複雜查詢的性能提升也非常大。通過一些rbo的規則,比如常見的謂詞下推、相關子查詢處理等。實際上這裡的優化規則非常多,可以極大的提升 SQL 的執行效率。上面的 SQL 其實原本比較簡單,5 表 join 和一些維表的過濾條件,這裡寫成子查詢是為了在 ClickHouse 現有模式下右表過濾條件更好下推。其實對于我們來說,在複雜查詢的模式下,由于有優化器的存在,使用者不用寫的這麼複雜,優化器會自動完成下推和rbo優化。

上面是一些規則的優化,實際上在複雜查詢中, cbo 的優化也有很大作用。舉一個例子,在 ClickHouse 中,相同的兩個表,大表 join 小表的性能比小表 join 大表要好很多。前一個效果 2 中如果把表順序調整一下會快很多;另外,選用哪一種 join 的實作對 join 性能影響比較大,如果滿足 join key 分布,colcate join 比 shuffle join 來說完全減少了資料的 shuffle。多表 join 中,join 的順序和 join 的實作方式對執行的時長影響會比 2 表 join 影響更大。借助資料的統計資訊,通過一些 cbo 優化,可以得到一個比較優的執行模式。

有了優化器,業務同學可以按照業務邏輯來寫任何的 SQL,引擎自動計算出相對最優的 SQL 計劃并執行,加速查詢的執行。

5. 展望

CLickHouse 目前的模式其實在很多單表查詢的場景上表現優異。我們主要是針對複雜的查詢場景做優化,主要是實作多stage的執行模式,并實作了stage之間資料傳輸。工程實踐上來說,做了比較多的嘗試和優化來提升執行和網絡傳輸的性能,并且希望通過完善metrics和智能診斷來降低SQL分析和調優的門檻,并減少oncall 的壓力。

目前的實作隻是第一步,未來我們還有很多努力的方向。

首先,肯定是繼續提升執行和 Exchange 的性能。這裡不談論引擎執行通用的優化,比如更好的索引或者算子的優化,主要是跟複雜查詢模式有關。

其次是Metrics 和智能診斷加強,就如同剛才提到的,SQL 的靈活度太高了,對于一些複雜的查詢沒有 metrics 幾乎難以診斷和調優,這個我們會長期持續的去做。

今天的分享就到這裡,謝謝大家。

分享嘉賓:董一峰 位元組跳動

編輯整理:胡勝達 蔚來汽車

出品平台:DataFunTalk

01/分享嘉賓

揭秘位元組跳動解決ClickHouse複雜查詢問題的技術方案

董一峰|位元組跳動火山引擎ByteHouse資深研發工程師

位元組跳動資料平台資深研發工程師,2016 年加入位元組跳動 OLAP 團隊,一直從事大資料查詢引擎的開發和推廣工作,先後負責 Hive、Spark、Druid、ClickHouse 等大資料引擎,目前主要聚焦于 ClickHouse 執行層相關的研發。

02/關于我們

DataFun:專注于大資料、人工智能技術應用的分享與交流。發起于2017年,在北京、上海、深圳、杭州等城市舉辦超過100+線下和100+線上沙龍、論壇及峰會,已邀請超過2000位專家和學者參與分享。其公衆号 DataFunTalk 累計生産原創文章700+,百萬+閱讀,14萬+精準粉絲。

繼續閱讀