天天看點

【kudu pk parquet】runtime filter實踐

已經有好一陣子沒有寫博文了,今天給大家帶來一篇最近一段時間開發相關的文章:在impala和kudu上支援runtime filter。

大家搜尋下實踐者社群,可以發現前面已經有好幾位同學寫了這個主題的博文(都是我們組的^_^),說明這個功能在資料庫領域的重要性,是以,嘿嘿,再敲一遍黑闆:“重點,必考題!”

附上年初測試kudu時候的博文《 【大資料之資料倉庫】kudu性能測試報告分析》作為背景。

背景準備

為了生動、立體的給大家展示runtime filter功能,這裡就以一個具體的sql例子來講解。

  • 表結構:
create table orders
       (
           o_orderkey      bigint,        ->   主鍵,也是分區鍵(分布式資料庫用于資料分片)
           o_custkey       bigint,        ->   外鍵,同customer.c_custkey
           o_orderstatus   string, 
           o_totalprice    double, 
           o_orderdate     string, 
           o_orderpriority string, 
           o_clerk         string, 
           o_shippriority  bigint, 
           o_comment       string
       )
       create table customer
       (
           c_custkey       bigint,        ->   主鍵,也是分區鍵
           c_name          string, 
           c_address       string, 
           c_nationkey     bigint, 
           c_phone         string, 
           c_acctbal       double, 
           c_mktsegment    string, 
           c_comment       string
       )           
  • 測試sql:
select c.*  from orders o join customer c on c.c_custkey = o.o_custkey where o_orderkey = 1125;           

我們用業界的TPC-H工具生成1TB的測試資料,使用上面的sql語句來測試orders和customer兩表關聯。

很多對資料庫熟悉的同學會說,簡單:從orders表裡用“o_orderkey = 1125”條件過濾出o_custkey字段,再用“傳回的o_orderkey值”作為條件到customer表裡過濾出全部字段。

對,完全正确!!

可是,那是受過專業資料庫知識教育訓練的你的大腦的優化器的優化結果(比人工智能還智能,哈哈),但對于資料庫計算引擎,則未必會如你般冰雪聰明。

不信?找你熟悉的資料庫,用上面的表結構和sql語句,僞造幾條資料測試一下^_^

那麼,計算引擎是如何處理的呢?

以MPP的impala為例,簡短講就是先掃描(讀)一張表,比如這裡的orders表,把掃描結果儲存到hash資料結構裡,然後再掃描另一張表,比如這裡的customer表,把掃描的結果到前面的hash資料結構裡找(關聯字段),比對的就是關聯到的結果。

修改前測試

我們以impala計算引擎對接kudu存儲引擎為例,拿修改之前的版本測試:

  • 在impala-shell執行sql:
【kudu pk parquet】runtime filter實踐
  • 檢視profile:
F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
       PLAN-ROOT SINK
       |  mem-estimate=0B mem-reservation=0B
       |
       04:EXCHANGE [UNPARTITIONED]
       |  mem-estimate=0B mem-reservation=0B
       |  tuple-ids=1,0 row-size=251B cardinality=2
       |
       F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
       02:HASH JOIN [INNER JOIN, BROADCAST]            ->  以hashJoin的方式兩表關聯
       |  hash predicates: c.c_custkey = o.o_custkey   ->  用“c.c_custkey = o.o_custkey”條件關聯
       |  mem-estimate=9B mem-reservation=136.00MB
       |  tuple-ids=1,0 row-size=251B cardinality=2
       |
       |--03:EXCHANGE [BROADCAST]
       |  |  mem-estimate=0B mem-reservation=0B
       |  |  tuple-ids=0 row-size=8B cardinality=1
       |  |
       |  F01:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
       |  00:SCAN KUDU [kudu_1000g.orders o]             ->  掃描orders表
       |     kudu predicates: o_orderkey = 1125          ->  用“o_orderkey = 1125”條件過濾
       |     mem-estimate=0B mem-reservation=0B
       |     tuple-ids=0 row-size=8B cardinality=1
       |
       01:SCAN KUDU [kudu_1000g.customer c]              ->  掃描customer表
          mem-estimate=0B mem-reservation=0B             ->  沒有過濾條件,傳回全部資料
          tuple-ids=1 row-size=243B cardinality=150000000           

      profile中已經做了一些批注,概括一下就是:

a.用“o_orderkey = 1125”條件掃描orders表,把傳回的結果放入hash資料結構中;

        b.再全表掃描customer表,傳回所有的資料,傳回過程中逐批同前面hash資料結構中的資料進行比對,比對成功的儲存到結果集合中。

  • 檢視plan:
【kudu pk parquet】runtime filter實踐

    顯而易見,plan中顯示的資料更加直覺,并且把耗時長的節點都标記成了紅色。

    我們可以看到,左邊紅色掃描customer表,活生生把全表(總共1.5億條記錄)的全部字段都掃描上來了,磁盤掃描開銷、網絡傳輸開銷,還有大資料集合關聯帶來的CPU計算開銷,是以耗時很長,達到了37秒鐘。

修改後測試

在前面,我們有提到更聰明、更高效的方法,那是否可以實作呢?答案是肯定的,我們确實把掃描orders表的傳回結果應用到了掃描customer表的掃描節點中,作為 動态謂詞下發了。

術語:

  1. 謂詞,就是filter或者過濾器,條件表達式;
  2. 靜态則表示的是來自于sql語句本身,動态即運作過程中産生,也即runtime;
  3. 動态謂詞就是runtime filter。

我們拿修改之後版本測試:

【kudu pk parquet】runtime filter實踐
F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
       PLAN-ROOT SINK
       |  mem-estimate=0B mem-reservation=0B
       |
       04:EXCHANGE [UNPARTITIONED]
       |  mem-estimate=0B mem-reservation=0B
       |  tuple-ids=1,0 row-size=251B cardinality=2
       |
       F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
       02:HASH JOIN [INNER JOIN, BROADCAST]            ->  以hashJoin的方式兩表關聯
       |  hash predicates: c.c_custkey = o.o_custkey   ->  用“c.c_custkey = o.o_custkey”條件關聯
       |  runtime filters: RF000 <- o.o_custkey        ->  這裡生成了1個runtime filter
       |  mem-estimate=9B mem-reservation=136.00MB
       |  tuple-ids=1,0 row-size=251B cardinality=2
       |
       |--03:EXCHANGE [BROADCAST]
       |  |  mem-estimate=0B mem-reservation=0B
       |  |  tuple-ids=0 row-size=8B cardinality=1
       |  |
       |  F01:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
       |  00:SCAN KUDU [kudu_1000g.orders o]            ->  掃描orders表
       |     kudu predicates: o_orderkey = 1125         ->  用“o_orderkey = 1125”條件過濾
       |     mem-estimate=0B mem-reservation=0B
       |     tuple-ids=0 row-size=8B cardinality=1
       |
       01:SCAN KUDU [kudu_1000g.customer c]             ->  掃描customer表
          runtime filters: RF000 -> c.c_custkey         ->  這裡應用了1個runtime filter
          mem-estimate=0B mem-reservation=0B
          tuple-ids=1 row-size=243B cardinality=150000000           
【kudu pk parquet】runtime filter實踐

    跟修改前比較,可以發現左邊紅色部分隻傳回了1條記錄。

是以,總結上面的對比可以發現性能:43.08秒對2.04秒,足足提升了20倍!

相關分享

  1. 對于runtime filter,我們需要明白誰産生和誰使用的關系,前者僅由關聯節點生成,而後者僅由掃描節點使用,兩者都屬于計算引擎。其中掃描節點在使用runtime filter上有兩種方式,一種是把runtime filter直接推送到存儲引擎,離資料最近,理論上效果肯定是最佳的,我們選擇的正是這種方式;還有一種是在掃描節點上過濾,把遠端資料全部讀取過來進行本地過濾,可以減少流入上層關聯節點的資料量,比如parquet就是這種方式。這裡有必要說明下parquet的特殊之處,它可以選擇采用hdfs的short circuit,簡短的了解:作為分布式檔案系統的hdfs,它的資料檔案是以block檔案塊的形式組織起來的,而parquet的資料是放在一個個的block上,在impala和hdfs配對部署的前提下,當impala把需要掃描block檔案塊的計算任務配置設定到block檔案塊所在的impala節點上,那這個impala計算節點就可以直接通過作業系統的檔案系統讀block檔案塊,省去了hdfs分布式檔案系統的中間層傳輸開銷;
  2. runtime filter的類型可以有很多種:包括min/max(範圍區間,或者大于、小于)、in list(數組)、bloom filter(布隆過濾器)、equality(等值)等,但是在目前的impala裡僅支援bloom filter,這是萬金油,最友善實作,後續我們可以考慮引入其他的類型,降低存儲引擎掃描時候的計算量(節約CPU計算時間)。從kudu官方來看,一直建議使用min/max或者in list的方式進行下推,估計同修改的工作量有關,因為它目前的通信協定是不支援bloom filter這種謂詞下發,而且兩邊(impala和 kudu)的bloom filter算法也是不一樣的;
  3. 分布式計算引擎,對掃描傳回的資料做重分布(repartition或者shuffle)後,會生成一個統一的runtime filter,這個工作由coordinator集中merge再分發給各個計算節點,并且在左子樹上,隻要關聯字段一樣,它會一直推送到最底層的掃描節點;同一個列,多份runtime filter、多種謂詞,通過merge的方式進行合并,比如bloom filter + range組合,range + range組合等等;
  4. 通常資料掃描節點在啟動掃描以後,就不會再更新過濾器,也即不會再下發新的謂詞,因為本身這個過程就已經比較複雜。但是我們的修改,可以支援在掃描過程的中間(mid-scan),把新的runtime filter下發下去,并且在kudu存儲引擎層進行直接應用,這對于縮小傳回的資料集非常有幫助;
  5. 最後一個是關于runtime filter應用于裁剪資料分片,這個意義也比較大,決定着響應時間。可以分兩步:第一步是針對分區鍵,比較容易了解,就是啟動掃描或者掃描的中間,把不需要掃描的資料分片直接跳過,有同學可能會說,關聯鍵不一定是分區鍵哦,是的,這時,我們就需要第二步,針對非分區鍵的索引(俗稱二級索引),實作上可以有多種方案,比如針對分片的min/max或者bitmap等,但是工作量都不小呢:(;

寫在最後

說了半天,很多同學是否被runtime filter、過濾器、謂詞、條件表達式給搞迷糊了?其實是同一個概念,用中間過程産生的資料構造出一個條件,并且應用于下一個階段,對于資料計算效率的提升非常有幫助,這就是runtime filter存在的價值。

本文來自網易雲社群,經作者何李夫授權釋出。

原文位址:【kudu pk parquet】runtime filter實踐

更多網易研發、産品、營運經驗分享請通路網易雲社群。 

繼續閱讀