天天看點

火山引擎在行為分析場景下的 ClickHouse JOIN 優化

作者:位元組跳動技術團隊

1. 背景

火山引擎增長分析 DataFinder 基于 ClickHouse 來進行行為日志的分析,ClickHouse 的主要版本是基于社群版改進開發的位元組内部版本。主要的表結構:

火山引擎在行為分析場景下的 ClickHouse JOIN 優化

事件表:存儲使用者行為資料,以使用者 ID 分 shard 存儲。

--列出了主要的字段資訊
CREATE TABLE tob_apps_all
(
`tea_app_id` UInt32, --應用ID
`device_id` String DEFAULT '', --裝置ID
`time` UInt64,--事件日志接受時間
`event` String,--事件名稱
`user_unique_id` String,--使用者ID
`event_date` Date , --事件日志日期,由time轉換而來
`hash_uid` UInt64 --使用者ID hash過後的id,用來join降低記憶體消耗
)│
```           

使用者表:存儲使用者的屬性資料,以使用者 ID 分 shard 存儲。

--列出了主要的字段資訊
CREATE TABLE users_unique_all
(
    `tea_app_id`            UInt32,            --應用ID
    `user_unique_id`        String DEFAULT '', -- 使用者ID
    `device_id`             String DEFAULT '', -- 使用者最近的裝置ID
    `hash_uid`              UInt64,--使用者ID hash過後的id,用來join降低記憶體消耗
    `update_time`           UInt64,--最近一次更新時間
    `last_active_date`      Date  --使用者最後活躍日期
)
           

裝置表:儲存設備相關的資料,以裝置 ID 分 shard 存儲。

--列出了主要的字段資訊
CREATE TABLE devices_all
(
    `tea_app_id`            UInt32,            --應用ID
    `device_id`             String DEFAULT '', --裝置ID    
    `update_time`           UInt64,            --最近一次更新時間
    `last_active_date`      Date  --使用者最後活躍日期
)
           

業務對象表:存儲業務對象相關的資料,每個 shard 存儲全量的資料。

--列出了主要的字段資訊
CREATE TABLE rangers.items_all
(
    `tea_app_id`            UInt32,
    `hash_item_id`          Int64,
    `item_name`             String, --業務對象名稱。比如商品
    `item_id`               String, --業務對象ID。比如商品id 1000001
    `last_active_date`      Date
) 
           

1.1 業務挑戰

火山引擎在行為分析場景下的 ClickHouse JOIN 優化

随着接入應用以及應用的 DAU 日益增加,ClickHouse 表的事件量增長迅速;并且基于行為資料需要分析的業務名額越來越複雜,需要 JOIN 的表增多;我們遇到有一些涉及到 JOIN 的複雜 SQL 執行效率低,記憶體和 CPU 資源占用高,導緻分析接口響應時延和錯誤率增加。

2. 關于 Clickhouse 的 JOIN

在介紹優化之前,先介紹一下基本的 ClickHouse JOIN 的類型和實作方式。

2.1 分布式 JOIN

SELECT 
    et.os_name, 
    ut.device_id AS user_device_id
FROM tob_apps_all AS et 
ANY LEFT JOIN 
(
    SELECT 
        device_id, 
        hash_uid
    FROM users_unique_all 
    WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411) 
AND (event = 'app_launch') 
AND (event_date = '2022-08-06')
           

基本執行過程:

  1. 一個 Clickhouse 節點作為 Coordinator 節點,給每個節點分發子查詢,子查詢 sql(tob_apps_all 替換成本地表,users_unique_all 保持不變依然是分布式表)。
  2. 每個節點執行 Coordinator 分發的 sql 時,發現 users_unique_all 是分布式表,就會去所有節點上去查詢以下 SQL(一共有 N*N。N 為 shard 數量)。
    1. SELECT device_id, hash_uid FROM users_unique WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
  3. 每個節點從其他 N-1個節點拉取2中子查詢的全部資料,全量存儲(記憶體 or 檔案) ,進行本地 JOIN。
  4. Coordinator 節點從每個節點拉取3中的結果集,然後做處理傳回給 client。

存在的問題:

  1. 子查詢數量放大。
  2. 每個節點都全量存儲全量的資料。

2.2 分布式 Global JOIN

SELECT 
    et.os_name, 
    ut.device_id AS user_device_id
FROM tob_apps_all AS et 
GLOBAL ANY LEFT JOIN 
(
    SELECT 
        device_id, 
        hash_uid
    FROM users_unique_all 
    WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411) 
AND (event = 'app_launch') 
AND (event_date = '2022-08-06')
           

基本執行過程:

  1. 一個 Clickhouse 節點作為 Coordinator 節點,分發查詢。在每個節點上執行sql(tob_apps_all 替換成本地表,右表子查詢替換成别名 ut)。
  2. Coordinator 節點去其他節點拉取 users_unique_all 的全部資料,然後分發到全部節點(作為1中别名表 ut 的資料)。
  3. 每個節點都會存儲全量的2中分發的資料(記憶體or檔案),進行本地 local join。
  4. Coordinator 節點從每個節點拉取3中的結果集,然後做處理傳回給 client。

存在的問題:

  1. 每個節點都全量存儲資料。
  2. 如果右表較大,分發的資料較大,會占用網絡帶寬資源。

2.3 本地 JOIN

SQL 裡面隻有本地表的 JOIN,隻會在目前節點執行。

SELECT et.os_name,ut.device_id AS user_device_id
FROM tob_apps et any LEFT JOIN
    (SELECT device_id,
         hash_uid
    FROM rangers.users_unique
    WHERE tea_app_id = 268411
            AND last_active_date>='2022-08-06') ut
    ON et.hash_uid=ut.hash_uid
WHERE tea_app_id = 268411
        AND event='app_launch'
        AND event_date='2022-08-06' 
           

2.3.1 Hash join

  • 右表全部資料加載到記憶體,再在記憶體建構 hash table。key 為 joinkey。
  • 從左表分批讀取資料,從右表 hash table比對資料。
  • 優點是:速度快 缺點是:右表資料量大的情況下占用記憶體。

3.3.2 Merge join‍

  • 對右表排序,内部 block 切分,超出記憶體部分 flush 到磁盤上,記憶體大小通過參數設定。
  • 左表基于 block 排序,按照每個 block 依次與右表 merge。
  • 優點是:能有效控制記憶體 缺點是:大資料情況下速度會慢。

優先使用 hash join 當記憶體達到一定門檻值後再使用 merge join,優先滿足性能要求。

3. 解決方案

火山引擎在行為分析場景下的 ClickHouse JOIN 優化

3.1 避免JOIN

3.1.1 資料預生成

資料預生成(由 Spark/Flink 或者 Clickhouse 物化視圖産出資料),形成大寬表,基于單表的查詢是 ClickHouse 最為擅長的場景。

我們有個名額,實作的 SQL 比較複雜(如下),每次實時查詢很耗時,我們單獨建了一個表 table,由 Spark 每日建構出這個名額,查詢時直接基于 table 查詢。

SELECT event_date,count(distinct uc1) AS uv,sum(value) AS sum_value, ......
FROM
    (SELECT event_date,hash_uid AS uc1,sum(et.float_params{ 'amount' }) AS value, count(1) AS cnt, value*cnt AS multiple
    FROM tob_apps_all et GLOBAL  ANY  LEFT  JOIN
(SELECT hash_uid AS join_key,int_profiles{ '$ab_time_34' }*1000 AS first_time
        FROM users_unique_all
        WHERE app_id = 10000000 AND last_active_date >= '2022-07-19'  AND first_time is  NOT  null) upt
            ON et.hash_uid=upt.join_key
        WHERE (查詢條件)
        GROUP  BY uc1,event_date)
GROUP  BY event_date;
           

資料量2300W,查詢時間由7秒->0.008秒。當然這種方式,需要維護額外的資料建構任務。總的思路就是不要讓 ClickHouse 實時去 JOIN。

火山引擎在行為分析場景下的 ClickHouse JOIN 優化

3.1.2 使用 IN 代替 JOIN

JOIN 需要基于記憶體建構 hash table 且需要存儲右表全部的資料,然後再去比對左表的資料。而 IN 查詢會對右表的全部資料建構 hash set,但是不需要比對左表的資料,且不需要回寫資料到 block。

比如:

SELECT event_date, count()
FROM tob_apps_all et global any INNER JOIN
    (SELECT hash_uid AS join_key
    FROM users_unique_all
    WHERE app_id = 10000000
            AND last_active_date >= '2022-01-01') upt
    ON et.hash_uid = upt.join_key
WHERE app_id = 10000000
        AND event_date >= '2022-01-01'
        AND event_date <= '2022-08-02'
GROUP BY  event_date 
           

可以改成如下形式:

SELECT event_date,
         count()
FROM tob_apps_all
WHERE app_id = 10000000
        AND event_date >= '2022-01-01'
        AND event_date <= '2022-08-02'
        AND hash_uid global IN 
    (SELECT hash_uid
    FROM users_unique_all
    WHERE (tea_app_id = 10000000)
            AND (last_active_date >= '2022-01-01') )
 GROUP BY event_date
           

如果需要從右表提取出屬性到外層進行計算,則不能使用 IN 來代替 JOIN。

相同的條件下,上面的測試 SQL,由 JOIN 時的16秒優化到了 IN 查詢時的11秒。

火山引擎在行為分析場景下的 ClickHouse JOIN 優化

3.2 更快的 JOIN

3.2.1 優先本地 JOIN

資料預先相同規則分區

也就是 Colocate JOIN。優先将需要關聯的表按照相同的規則進行分布,查詢時就不需要分布式的 JOIN。

SELECT 
    et.os_name, 
    ut.device_id AS user_device_id
FROM tob_apps_all AS et 
ANY LEFT JOIN 
(
    SELECT 
        device_id, 
        hash_uid
    FROM users_unique_all 
    WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411) 
AND (event = 'app_launch') 
AND (event_date = '2022-08-06')
settings distributed_perfect_shard=1
           

比如事件表 tob_apps_all 和使用者表 users_unique_all 都是按照使用者 ID 來分 shard 存儲的,相同的使用者的兩個表的資料都在同一個 shard 上,是以這兩個表的 JOIN 就不需要分布式 JOIN 了。

distributed_perfect_shard 這個 settings key 是位元組内部 ClickHouse 支援的,設定過這個參數,指定執行計劃時就不會再執行分布式 JOIN 了。

基本執行過程:

  1. 一個 ClickHouse 節點作為 Coordinator 節點,分發查詢。在每個節點上執行 sql(tob_apps_all、users_unique_all替換成本地表)。
  2. 每個節點都執行1中分發的本地表 join 的 SQL(這一步不再分發右表全量的資料)。
  3. 資料再回傳到 coordinator 節點,然後傳回給 client。

資料備援存儲

如果一個表的資料量比較小,可以不分 shard 存儲,每個 shard 都存儲全量的資料,例如我們的業務對象表。查詢時,不需要分布式 JOIN,直接在本地進行 JOIN 即可。

SELECT count()
FROM tob_apps_all AS et 
ANY LEFT JOIN 
(
    SELECT item_id
    FROM items_all 
    WHERE (tea_app_id = 268411)
) AS it ON et.item_id = it.item_id
WHERE (tea_app_id = 268411) 
AND (event = 'app_launch') 
AND (event_date = '2022-08-06')
settings distributed_perfect_shard=1
           

例如這個 SQL,items_all 表每個 shard 都存儲同樣的資料,這樣也可以避免分布式 JOIN 帶來的查詢放大和全表資料分發問題。

3.2.2 更少的資料

不論是分布式 JOIN 還是本地 JOIN,都需要盡量讓少的資料參與 JOIN,既能提升查詢速度也能減少資源消耗。

SQL 下推

ClickHouse 對 SQL 的下推做的不太好,有些複雜的 SQL 下推會失效。是以,我們手動對 SQL 做了下推,目前正在測試基于查詢優化器來幫助實作下推優化,以便讓 SQL 更加簡潔。

下推的 SQL:

SELECT 
    et.os_name, 
    ut.device_id AS user_device_id
FROM tob_apps_all AS et 
ANY LEFT JOIN 
(
    SELECT 
        device_id, 
        hash_uid
    FROM users_unique_all 
    WHERE (tea_app_id = 268411) 
        AND (last_active_date >= '2022-08-06'
        AND 使用者屬性條件 1  OR 使用者屬性條件 2)
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411) 
AND (event = 'app_launch') 
AND (event_date = '2022-08-06')
settings distributed_perfect_shard=1
           

對應的不下推的 SQL:

SELECT 
    et.os_name, 
    ut.device_id AS user_device_id
FROM tob_apps_all AS et 
ANY LEFT JOIN 
(
    SELECT 
        device_id, 
        hash_uid
    FROM rangers.users_unique_all 
    WHERE (tea_app_id = 268411) 
        AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
WHERE (tea_app_id = 268411) 
AND (event = 'app_launch') 
AND (event_date = '2022-08-06')
AND (ut.使用者屬性條件 1  OR ut.使用者屬性條件 2)
settings distributed_perfect_shard=1
           

可以看到,不下推的 SQL 更加簡潔,直接基于 JOIN 過後的寬表進行過濾。但是 ClickHouse 可能會将不滿足條件的 users_unique_all 資料也進行 JOIN。

我們使用中有一個複雜的 case,使用者表過濾條件不下推有1千萬+,SQL 執行了3000秒依然執行逾時,而做了下推之後60秒内就執行成功了。

火山引擎在行為分析場景下的 ClickHouse JOIN 優化

3.2.3 Clickhouse 引擎層優化

一個 SQL 實際在 Clickhouse 如何執行,對 SQL 的執行時間和資源消耗至關重要。社群版的 Clickhouse 在執行模型和 SQL 優化器上還要改進的空間,尤其是複雜 SQL 以及多 JOIN 的場景下。

執行模型優化

社群版的 Clickhouse 目前還是一個兩階段執行的執行模型。第一階段,Coordinator 在收到查詢後,将請求發送給對應的 Worker 節點。第二階段,Worker 節點完成計算,Coordinator 在收到各 Worker 節點的資料後進行彙聚和處理,并将處理後的結果傳回。

火山引擎在行為分析場景下的 ClickHouse JOIN 優化

有以下幾個問題:

  1. 第二階段的計算比較複雜時,Coordinator 的節點計算壓力大,容易成為瓶頸。
  2. 不支援 shuffle join,hash join 時右表為大表時建構慢,容易 OOM。
  3. 對複雜查詢的支援不友好。

位元組跳動 ClickHouse 團隊為了解決上述問題,改進了執行模型,參考其他的分布式資料庫引擎(例如 Presto 等),将一個複雜的 Query 按資料交換情況切分成多個 Stage,各 Stage 之間則通過 Exchange 完成資料交換。根據 Stage 依賴關系定義拓撲結構,産生 DAG 圖,并根據 DAG 圖排程 Stage。例如兩表 JOIN,會先排程左右表讀取 Stage,之後再排程 JOIN 這個 Stage, JOIN 的 Stage 依賴于左右表的 Stage。

火山引擎在行為分析場景下的 ClickHouse JOIN 優化

舉個例子

SELECT 
    et.os_name, 
    ut.device_id AS user_device_id, 
    dt.hash_did AS device_hashid
FROM tob_apps_all AS et 
GLOBAL ANY LEFT JOIN 
(
    SELECT 
        device_id, 
        hash_uid
    FROM users_unique_all 
    WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS ut ON et.hash_uid = ut.hash_uid
GLOBAL ANY LEFT JOIN 
(
    SELECT 
        device_id, 
        hash_did
    FROM devices_all 
    WHERE (tea_app_id = 268411) AND (last_active_date >= '2022-08-06')
) AS dt ON et.device_id = dt.device_id
WHERE (tea_app_id = 268411) AND (event = 'app_launch') AND (event_date = '2022-08-06')
LIMIT 10
           

Stage執行模型基本過程(可能的):

  1. 讀取 tob_apps_all 資料,按照 join key(hash_uid)進行 shuffle,資料分發到每個節點。這是一個Stage。
  2. 讀取 users_unique_all 資料,按照 join key(hash_uid)進行 shuffle,資料分發到每個節點。這是一個 Stage。
  3. 上述兩個表的資料,在每個節點上的資料進行本地 JOIN,然後再按照 join key(device_id) 進行 shuffle。這是一個 Stage。
  4. 讀取 devices_all 資料,按照 join key(device_id)進行 shuffle,這是一個Stage。
  5. 第3步、第4步的資料,相同 join key(device_id) 的資料都在同一個節點上,然後進行本地JOIN,這是一個 Stage。
  6. 彙總資料,傳回 limit 10 的資料。這是一個 Stage。

統計效果如下:

火山引擎在行為分析場景下的 ClickHouse JOIN 優化

查詢優化器

有了上面的 stage 的執行模型,可以靈活調整 SQL 的執行順序,位元組跳動 Clickhouse 團隊自研了查詢優化器,根據優化規則(基于規則和代價預估)對 SQL 的執行計劃進行轉換,一個執行計劃經過優化規則後會變成另外一個執行計劃,能夠準确的選擇出一條效率最高的執行路徑,然後建構 Stage 的 DAG 圖,大幅度降低查詢時間。

下圖描述了整個查詢的執行流程,從 SQL parse 到執行期間所有内容全部進行了重新實作(其中紫色子產品),建構了一套完整的且規範的查詢優化器。

火山引擎在行為分析場景下的 ClickHouse JOIN 優化

還是上面的三表 JOIN 的例子,可能的一個執行過程是:

  1. 查詢優化器發現 users_unique_all 表與 tob_apps_all 表的分 shard 規則一樣(基于使用者 ID ),是以就不會先對表按 join key 進行 shuffle,users_unique 與 tob_apps 直接基于本地表 JOIN,然後再按照 join key(device_id)進行 shuffle。這是一個 Stage。
  2. 查詢優化器根據規則或者代價預估決定裝置表 devices_all 是需要 broadcast join 還是 shuffle join。
    1. 如果 broadcast join:在一個節點查到全部的 device 資料,然後分發到其他節點。這是一個 Stage。
    2. 如果 shuffle join:在每個節點對 device 資料按照 join key(device_id) 進行 shuffle。這是一個 Stage。
  3. 彙總資料,傳回 limit 10 的資料。這是一個 Stage。

效果:

可以看到,查詢優化器能優化典型的複雜的 SQL 的執行效率,縮短執行時間。

火山引擎在行為分析場景下的 ClickHouse JOIN 優化

4. 總結

ClickHouse 最為擅長的領域是一個大寬表來進行查詢,多表 JOIN 時Clickhouse 性能表現不佳。作為業内領先的使用者分析與營運平台,火山引擎增長分析 DataFinder 基于海量資料做到了複雜名額能夠秒級查詢。本文介紹了我們是如何優化 Clickhouse JOIN 查詢的。

主要有以下幾個方面:

  1. 減少參與 JOIN 的表以及資料量。
  2. 優先使用本地 JOIN,避免分布式 JOIN 帶來的性能損耗。
  3. 優化本地 JOIN,優先使用記憶體進行 JOIN。
  4. 優化分布式 JOIN 的執行邏輯,依托于位元組跳動對 ClickHouse 的深度定制化。

繼續閱讀