作者:陸路,花名世儀,阿裡巴巴計算平台事業部EMR團隊進階開發工程師,大資料領域技術愛好者,對Spark、Hive等有濃厚興趣和一定的了解,目前主要專注于EMR産品中開源計算引擎的優化工作。
背景介紹
TPC-DS 測試集采用星型和雪花型等多元資料模型,包含 7 張事實表和 17 張次元表,以 store channel 為例,事實表和次元表的關聯關系如下所示:

分析 TPC-DS 全部 99 個查詢語句不難發現,絕大部分語句的過濾條件都不是直接作用于事實表,而是通過過濾次元表并将結果集與事實表 join 來間接完成。是以,優化器很難直接利用事實表索引來減少資料掃描量。如何利用好查詢執行時的次元表過濾資訊,并将這些資訊下推至存儲層來完成事實表的過濾,對于性能提升至關重要。
在 2019 年的打榜測試中,我們基于 Spark SQL Catalyst Optimizer 開發的 RuntimeFilter 優化 對于 10TB 資料 99 query 的整體性能達到 35% 左右的提升。簡單來說,RuntimeFilter 包括兩點核心優化:
- 動态分區裁剪:事實表以日期列(date_sk)為分區列建表,當事實表與 date_dim 表 join 時,optimizer 在運作時收集 date_dim 過濾結果集的所有 date_sk 取值,并在掃描事實表前過濾掉所有未命中的分區檔案。
- 非分區列動态過濾:當事實表與次元表的 join 列為非分區列時,optimizer 動态建構和收集次元表結果集中 join 列的 Min-Max Range 或 BloomFilter,并在掃描事實表時下推至存儲層,利用存儲層索引(如 Parquet、ORCFile 的 zone map 索引)來減少掃描資料量。
問題分析
為了進一步挖掘 RuntimeFilter 優化的潛力,我們選取了部分執行時間較長的 query 進行了細緻的性能剖析。這些 query 均包含大于一個事實表和多個次元表的複雜 join。在分析了 RuntimeFilter 對各個 query 的性能提升效果後,我們發現:
- 動态分區裁剪的性能提升效果明顯,但很難有進一步的優化空間
- 非分區列動态過濾對整體提升貢獻相比分區裁剪小很多,主要是因為很多下推至存儲層的過濾條件并沒有達到索引掃描的效果
聰明的同學應該已經發現,隻有 date_dim 這一張次元表和分區列相關,那麼所有與其它次元表的 join 查詢從 RuntimeFilter 優化中受益都較為有限。對于這種情況,我們做了進一步的拆解分析:
- 絕大部分 join 列均為次元表的自增主鍵,且與過濾條件沒有相關性,是以結果集取值常常均勻稀疏地散布在該列的整個取值空間中
- 對于事實表,考慮最常見的 Zone Map 索引方式,由于 load 階段沒有針對非分區列做任何聚集操作(Clustering),每個 zone 的取值一般也稀疏分散在各個列的值域中。
- 相比 BloomFilter,Min-Max Range 的建構開銷和索引查詢開銷要低得多,但由于資訊粒度太粗,索引過濾命中的效果也會差很多
綜合以上幾點考慮,一種可能的優化方向是在 load 階段按照 join 列對事實表進行 Z-Order 排序。但是這種方式會顯著增加 load 階段執行時間,有可能導緻 TPC-DS 評測總分反而下降。同時,由于建表階段優化的複雜性,實際生産環境的推廣使用也會比較受限。
RuntimeFilter Plus
基于上述分析,我們認為依賴過濾條件下推至存儲層這一方式很難再提升查詢性能,嘗試往其它方向進行探索:
- 不依賴存儲層索引
- 不僅優化事實表與次元表 join
最終我們提煉兩個新的運作時過濾優化點:次元表過濾廣播和事實表 join 動态過濾,并在原版 RuntimeFilter 優化的基礎上進行了擴充實作。
次元表過濾廣播
這一優化的思想來源于 Lookahead Information Passing(LIP),在論文
《Looking Ahead Makes Query Plans Robust》中首次提出。其針對的場景如下圖所示:
當事實表(lineorder)連續與多個次元表過濾結果做 multi-join 時,可将所有次元表的過濾資訊下推至 join 之前。該方法與我們的 RuntimeFilter 的主要不同在于下推時考慮了完整的 multi-join tree 而不是局部 binary-join tree。其優化效果是即使 join ordering 為 bad case,無用的事實表資料也能夠被盡早過濾掉,即讓查詢執行更加 robust。
我們參考論文算法實作了第一版過濾下推規則,但并沒有達到預期的性能提升,主要原因在于:
- Spark CBO Join-Reorder 結合我們的遺傳算法優化,已經達到了接近最優的 join ordering 效果
- 前置的 LIP filters 執行性能并沒有明顯優于 Spark BroadcastHashJoin 算子
基于過濾條件可以傳遞至複雜 multi-join tree 的任意節點這一思想去發散思考,我們發現,當 multi-join tree 中存在多個事實表時,可将次元表過濾條件廣播至所有的事實表 scan,進而減少後續事實表 SortMergeJoin 等耗時算子執行時所需處理的資料量。以一個簡化版的 query 64 為例:
with cs_ui as
(select cs_item_sk
,sum(cs_ext_list_price) as sale
from catalog_sales
,catalog_returns
where cs_item_sk = cr_item_sk
and cs_order_number = cr_order_number
group by cs_item_sk)
select i_product_name product_name
,i_item_sk item_sk
,sum(ss_wholesale_cost) s1
from store_sales
,store_returns
,cs_ui
,item
where ss_item_sk = i_item_sk and
ss_item_sk = sr_item_sk and
ss_ticket_number = sr_ticket_number and
ss_item_sk = cs_ui.cs_item_sk and
i_color in ('almond','indian','sienna','blue','floral','rosy') and
i_current_price between 19 and 19 + 10 and
i_current_price between 19 + 1 and 19 + 15
group by i_product_name
,i_item_sk
該查詢的 plan tree 如下圖所示:
考慮未實作次元表過濾廣播的執行流程,store_sales 資料經過 RuntimeFilter 和 BroadcastHashJoin 算子進行過濾,但由于過濾後資料仍然較大,後續的所有 join 都需要走昂貴的 SortMergeJoin 算子。但如果将 LIP filter 下推至 4 張事實表的 scan 算子(無需下推至存儲層),不僅減少了 join 資料量,也減少了 catalog_sales 和 catalog_returns 表 join 後的 group-by aggregation 資料量 。
LIP 實作
在 optimizer 層,我們在原版 RuntimeFilter 的 SyntheticJoinPredicate 規則後插入 PropagateDynamicValueFilter 規則,将合成的動态謂詞廣播至所有合法的 join 子樹中;同時結合原有的謂詞下推邏輯,保證動态謂詞最終傳播到所有相關的 scan 算子上。在算子層,LIP filters 的底層實作可以是 HashMap 或 BloomFilter,針對 TPC-DS 的資料特性,我們選擇 BitMap 作為廣播過濾條件的底層實作。由于 BitMap 本身是精确的(Exact Filter),可以結合主外鍵限制資訊進一步做 semi-join 消除優化。基于主外鍵限制的優化規則将在系列後續文章做詳細介紹。
應用該優化後,query 64 執行時間由 177 秒降低至 63 秒,加速比達到 2.8 倍。
事實表 Join 動态過濾
使用 BloomFilter 來優化大表 join 是一種常見的查詢優化技術,比如在論文
《Building a Hybrid Warehouse: Efficient Joins between Data Storedin HDFS and Enterprise Warehouse》中提出對 join 兩表交替應用 BloomFilter 的 zig-zag join 方法,降低分布式 join 中的資料傳輸總量。對于 TPC-DS 測試集,以 query 93 為例,store_sales 與 store_returns join 後的結果集大小遠小于 store_sales 原始資料量,非常适合應用這一優化。
BloomFilter 的建構和應用都存在較高的計算開銷,對于 selectivity 較大的join,盲目使用這一優化可能反而導緻性能回退。基于靜态 stats 的 join selectivity 估算往往誤差,Spark 現有的 CBO 優化規則難以勝任魯棒的 BloomFilter join 優化決策。是以,我們基于 Spark Adaptive Execution(AE) 運作時重優化機制來實作動态的 BloomFilter join 優化規則。AE 的基本原理是在查詢作業的每個 stage 執行完成後,允許優化器根據運作時采集的 stage stats 資訊重新調整後續的實體執行計劃。目前主要支援三種優化:
(1)reduce stage 并發度調整;
(2)針對 skew 情況的 shuffle 資料均衡分布;
(3)SortMergeJoin 轉換為 BroadcastHashJoin
基于 AE 的優化規則流程如下:
- 根據靜态 stats 判斷 join 的一端的 size 是否可能适合建構 BloomFilter( build side),如果是,則 build side 和 stream side 的 scan stage 會依次串行送出執行;否則這兩個 stage 将并行執行。
- 在 build side 的 scan stage 執行完成後,AE 根據運作時收集的 size 和 join 列 histogram 進行代價估算,并決定最終走 BroadcastHashJoin、BloomFilter-SortMergeJoinJoin 還是原本的 SortMergeJoin。
- 當實體執行計劃為 BloomFilter-SortMergeJoinJoin,優化器會插入一個新的作業并行掃描 build side 的 shuffle 資料來建構 BloomFilter,并下推至 stream side 的 scan stage 中。
BloomFilter 算子實作
為了減少 BloomFilter 帶來的額外開銷,我們重新實作了高效的 BuildBloomFiler 和 Native-InBloomFilter 的算子。在建構階段,使用 RDD aggregate 來合并各個資料分片的 BloomFiler 會導緻 driver 成為資料傳輸和 bitmap 合并計算的性能瓶頸;使用 RDD treeAggregate 實作并行分層合并顯著降低了整體的建構延遲。在過濾階段,Native-InBloomFilter 的算子會被推入 scan 算子中合并執行。該算子直接通路 Spark 列式讀取記憶體格式,按批量資料來調用 SIMD 優化的 native 函數,降低 CPU 執行開銷;同時,我們将原版算法替換為 Blocked BloomFilter 算法實作,該算法通過犧牲少量的 bitmap 存儲空間來換取訪存時更低的 CPU cache miss 率。
應用該優化後,query 93 執行時間由 225 秒降低至 50 秒,加速比達到 4.5 倍。
阿裡巴巴開源大資料技術團隊成立Apache Spark中國技術社群,定期推送精彩案例,技術專家直播,問答區近萬人Spark技術同學線上提問答疑,隻為營造純粹的Spark氛圍,歡迎釘釘掃碼加入!
對開源大資料和感興趣的同學可以加小編微信(下圖二維碼,備注“進群”)進入技術交流微信群。
Apache Spark技術交流社群公衆号,微信掃一掃關注