已經有好一陣子沒有寫博文了,今天給大家帶來一篇最近一段時間開發相關的文章:在impala和kudu上支援runtime filter。
大家搜尋下實踐者社群,可以發現前面已經有好幾位同學寫了這個主題的博文(都是我們組的^_^),說明這個功能在資料庫領域的重要性,是以,嘿嘿,再敲一遍黑闆:“重點,必考題!”
附上年初測試kudu時候的博文《 【大資料之資料倉庫】kudu性能測試報告分析》作為背景。
背景準備
為了生動、立體的給大家展示runtime filter功能,這裡就以一個具體的sql例子來講解。
- 表結構:
create table orders
(
o_orderkey bigint, -> 主鍵,也是分區鍵(分布式資料庫用于資料分片)
o_custkey bigint, -> 外鍵,同customer.c_custkey
o_orderstatus string,
o_totalprice double,
o_orderdate string,
o_orderpriority string,
o_clerk string,
o_shippriority bigint,
o_comment string
)
create table customer
(
c_custkey bigint, -> 主鍵,也是分區鍵
c_name string,
c_address string,
c_nationkey bigint,
c_phone string,
c_acctbal double,
c_mktsegment string,
c_comment string
)
- 測試sql:
select c.* from orders o join customer c on c.c_custkey = o.o_custkey where o_orderkey = 1125;
我們用業界的TPC-H工具生成1TB的測試資料,使用上面的sql語句來測試orders和customer兩表關聯。
很多對資料庫熟悉的同學會說,簡單:從orders表裡用“o_orderkey = 1125”條件過濾出o_custkey字段,再用“傳回的o_orderkey值”作為條件到customer表裡過濾出全部字段。
對,完全正确!!
可是,那是受過專業資料庫知識教育訓練的你的大腦的優化器的優化結果(比人工智能還智能,哈哈),但對于資料庫計算引擎,則未必會如你般冰雪聰明。
不信?找你熟悉的資料庫,用上面的表結構和sql語句,僞造幾條資料測試一下^_^
那麼,計算引擎是如何處理的呢?
以MPP的impala為例,簡短講就是先掃描(讀)一張表,比如這裡的orders表,把掃描結果儲存到hash資料結構裡,然後再掃描另一張表,比如這裡的customer表,把掃描的結果到前面的hash資料結構裡找(關聯字段),比對的就是關聯到的結果。
修改前測試
我們以impala計算引擎對接kudu存儲引擎為例,拿修改之前的版本測試:
- 在impala-shell執行sql:
- 檢視profile:
F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
PLAN-ROOT SINK
| mem-estimate=0B mem-reservation=0B
|
04:EXCHANGE [UNPARTITIONED]
| mem-estimate=0B mem-reservation=0B
| tuple-ids=1,0 row-size=251B cardinality=2
|
F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
02:HASH JOIN [INNER JOIN, BROADCAST] -> 以hashJoin的方式兩表關聯
| hash predicates: c.c_custkey = o.o_custkey -> 用“c.c_custkey = o.o_custkey”條件關聯
| mem-estimate=9B mem-reservation=136.00MB
| tuple-ids=1,0 row-size=251B cardinality=2
|
|--03:EXCHANGE [BROADCAST]
| | mem-estimate=0B mem-reservation=0B
| | tuple-ids=0 row-size=8B cardinality=1
| |
| F01:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
| 00:SCAN KUDU [kudu_1000g.orders o] -> 掃描orders表
| kudu predicates: o_orderkey = 1125 -> 用“o_orderkey = 1125”條件過濾
| mem-estimate=0B mem-reservation=0B
| tuple-ids=0 row-size=8B cardinality=1
|
01:SCAN KUDU [kudu_1000g.customer c] -> 掃描customer表
mem-estimate=0B mem-reservation=0B -> 沒有過濾條件,傳回全部資料
tuple-ids=1 row-size=243B cardinality=150000000
profile中已經做了一些批注,概括一下就是:
a.用“o_orderkey = 1125”條件掃描orders表,把傳回的結果放入hash資料結構中;
b.再全表掃描customer表,傳回所有的資料,傳回過程中逐批同前面hash資料結構中的資料進行比對,比對成功的儲存到結果集合中。
- 檢視plan:
顯而易見,plan中顯示的資料更加直覺,并且把耗時長的節點都标記成了紅色。
我們可以看到,左邊紅色掃描customer表,活生生把全表(總共1.5億條記錄)的全部字段都掃描上來了,磁盤掃描開銷、網絡傳輸開銷,還有大資料集合關聯帶來的CPU計算開銷,是以耗時很長,達到了37秒鐘。
修改後測試
在前面,我們有提到更聰明、更高效的方法,那是否可以實作呢?答案是肯定的,我們确實把掃描orders表的傳回結果應用到了掃描customer表的掃描節點中,作為 動态謂詞下發了。
術語:
- 謂詞,就是filter或者過濾器,條件表達式;
- 靜态則表示的是來自于sql語句本身,動态即運作過程中産生,也即runtime;
- 動态謂詞就是runtime filter。
我們拿修改之後版本測試:
F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
PLAN-ROOT SINK
| mem-estimate=0B mem-reservation=0B
|
04:EXCHANGE [UNPARTITIONED]
| mem-estimate=0B mem-reservation=0B
| tuple-ids=1,0 row-size=251B cardinality=2
|
F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
02:HASH JOIN [INNER JOIN, BROADCAST] -> 以hashJoin的方式兩表關聯
| hash predicates: c.c_custkey = o.o_custkey -> 用“c.c_custkey = o.o_custkey”條件關聯
| runtime filters: RF000 <- o.o_custkey -> 這裡生成了1個runtime filter
| mem-estimate=9B mem-reservation=136.00MB
| tuple-ids=1,0 row-size=251B cardinality=2
|
|--03:EXCHANGE [BROADCAST]
| | mem-estimate=0B mem-reservation=0B
| | tuple-ids=0 row-size=8B cardinality=1
| |
| F01:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
| 00:SCAN KUDU [kudu_1000g.orders o] -> 掃描orders表
| kudu predicates: o_orderkey = 1125 -> 用“o_orderkey = 1125”條件過濾
| mem-estimate=0B mem-reservation=0B
| tuple-ids=0 row-size=8B cardinality=1
|
01:SCAN KUDU [kudu_1000g.customer c] -> 掃描customer表
runtime filters: RF000 -> c.c_custkey -> 這裡應用了1個runtime filter
mem-estimate=0B mem-reservation=0B
tuple-ids=1 row-size=243B cardinality=150000000
跟修改前比較,可以發現左邊紅色部分隻傳回了1條記錄。
是以,總結上面的對比可以發現性能:43.08秒對2.04秒,足足提升了20倍!
相關分享
- 對于runtime filter,我們需要明白誰産生和誰使用的關系,前者僅由關聯節點生成,而後者僅由掃描節點使用,兩者都屬于計算引擎。其中掃描節點在使用runtime filter上有兩種方式,一種是把runtime filter直接推送到存儲引擎,離資料最近,理論上效果肯定是最佳的,我們選擇的正是這種方式;還有一種是在掃描節點上過濾,把遠端資料全部讀取過來進行本地過濾,可以減少流入上層關聯節點的資料量,比如parquet就是這種方式。這裡有必要說明下parquet的特殊之處,它可以選擇采用hdfs的short circuit,簡短的了解:作為分布式檔案系統的hdfs,它的資料檔案是以block檔案塊的形式組織起來的,而parquet的資料是放在一個個的block上,在impala和hdfs配對部署的前提下,當impala把需要掃描block檔案塊的計算任務配置設定到block檔案塊所在的impala節點上,那這個impala計算節點就可以直接通過作業系統的檔案系統讀block檔案塊,省去了hdfs分布式檔案系統的中間層傳輸開銷;
- runtime filter的類型可以有很多種:包括min/max(範圍區間,或者大于、小于)、in list(數組)、bloom filter(布隆過濾器)、equality(等值)等,但是在目前的impala裡僅支援bloom filter,這是萬金油,最友善實作,後續我們可以考慮引入其他的類型,降低存儲引擎掃描時候的計算量(節約CPU計算時間)。從kudu官方來看,一直建議使用min/max或者in list的方式進行下推,估計同修改的工作量有關,因為它目前的通信協定是不支援bloom filter這種謂詞下發,而且兩邊(impala和 kudu)的bloom filter算法也是不一樣的;
- 分布式計算引擎,對掃描傳回的資料做重分布(repartition或者shuffle)後,會生成一個統一的runtime filter,這個工作由coordinator集中merge再分發給各個計算節點,并且在左子樹上,隻要關聯字段一樣,它會一直推送到最底層的掃描節點;同一個列,多份runtime filter、多種謂詞,通過merge的方式進行合并,比如bloom filter + range組合,range + range組合等等;
- 通常資料掃描節點在啟動掃描以後,就不會再更新過濾器,也即不會再下發新的謂詞,因為本身這個過程就已經比較複雜。但是我們的修改,可以支援在掃描過程的中間(mid-scan),把新的runtime filter下發下去,并且在kudu存儲引擎層進行直接應用,這對于縮小傳回的資料集非常有幫助;
- 最後一個是關于runtime filter應用于裁剪資料分片,這個意義也比較大,決定着響應時間。可以分兩步:第一步是針對分區鍵,比較容易了解,就是啟動掃描或者掃描的中間,把不需要掃描的資料分片直接跳過,有同學可能會說,關聯鍵不一定是分區鍵哦,是的,這時,我們就需要第二步,針對非分區鍵的索引(俗稱二級索引),實作上可以有多種方案,比如針對分片的min/max或者bitmap等,但是工作量都不小呢:(;
寫在最後
說了半天,很多同學是否被runtime filter、過濾器、謂詞、條件表達式給搞迷糊了?其實是同一個概念,用中間過程産生的資料構造出一個條件,并且應用于下一個階段,對于資料計算效率的提升非常有幫助,這就是runtime filter存在的價值。
本文來自網易雲社群,經作者何李夫授權釋出。
原文位址:【kudu pk parquet】runtime filter實踐
更多網易研發、産品、營運經驗分享請通路網易雲社群。