流計算中一個常見的需求就是為資料流補齊字段。因為資料采集端采集到的資料往往比較有限,在做資料分析之前,就要先将所需的次元資訊補全。比如采集到的交易日志中隻記錄了商品 id,但是在做業務時需要根據店鋪次元或者行業緯度進行聚合,這就需要先将交易日志與商品維表進行關聯,補全所需的次元資訊。這裡所說的維表與資料倉庫中的概念類似,是次元屬性的集合,比如商品維,地點維,使用者維等等。
在流計算中,這是一個典型的 stream-to-table jon 的問題。本文主要講解在 Flink SQL 中是如何解決這個問題的,使用者如何簡單上手使用這個功能。
假設我們有一個 Orders 訂單資料流,希望根據産品 ID 補全流上的産品次元資訊,是以需要跟 Products 次元表進行關聯。Orders 和 Products 的 DDL 聲明語句如下:
如上聲明了一張來自 TT 的 Orders 訂單資料流,和一張存儲于 HBase 中的 Products 産品維表。為了補齊訂單流的産品資訊,需要 JOIN 維表,這裡可以分為 JOIN 目前表和 JOIN 曆史表。
Flink SQL 支援 LEFT JOIN 和 INNER JOIN 的維表關聯。如上文法所示的,維表 JOIN 文法與傳統的 JOIN 文法并無二異。隻是 Products 維表後面需要跟上 <code>FOR SYSTEM_TIME AS OF PROCTIME()</code> 的關鍵字,其含義是每條到達的資料所關聯上的是到達時刻的維表快照,也就是說,當資料到達時,我們會根據資料上的 key 去查詢遠端資料庫,拿到比對的結果後關聯輸出。這裡的 <code>PROCTIME</code> 即 processing time。使用 JOIN 目前維表功能需要注意的是,如果維表插入了一條資料能比對上之前左表的資料時,JOIN的結果流,不會發出更新的資料以彌補之前的未比對。JOIN行為隻發生在處理時間(processing time),即使維表中的資料都被删了,之前JOIN流已經發出的關聯上的資料也不會被撤回或改變。

有時候想關聯上的次元資料,并不是目前時刻的值,而是某個曆史時刻的值。比如,産品的價格一直在發生變化,訂單流希望補全的是下單時的價格,而不是目前的價格,那就是 JOIN 曆史維表。文法上隻需要将上文的 <code>PROCTIME()</code> 改成 <code>o.orderTime</code> 即可。含義是關聯上的是下單時刻的 Products 維表。
Flink 在擷取次元資料時,會根據左流的時間去查對應時刻的快照資料。是以 JOIN 曆史維表需要外部存儲支援多版本存儲,如 HBase,或者存儲的資料中帶有多版本資訊。
注:JOIN 曆史維表功能目前暫未開放
在實際使用的過程中,會遇到許多性能問題。為了解決這些性能問題,我們做了大量的優化,性能得到大幅提升,在雙11的洪峰下表現特别淡定。
我們的優化主要是為了解決兩方面的問題:
1. 提高吞吐。維表的IO請求嚴重阻塞了資料流的計算處理。
2. 降低維表資料庫讀壓力。如 HBase 也隻能承受單機每秒 20 萬的讀請求。
通過 <code>cache='LRU'</code>參數可以開啟 LRU 緩存優化,Blink 會為每個 JoinTable 節點建立一個 LRU 本地緩存。當每個資料進來的時候,先去緩存中查詢,如果存在則直接關聯輸出,減少了一次 IO 請求。如果不存在,再發起資料庫查詢請求(異步或同步方式),請求傳回的結果會先存入緩存中以備下次查詢。
為了防止緩存無限制增長,是以使用的是 LRU 緩存,并且可以通過 <code>cacheSize</code> 調整緩存的大小。為了定期更新維表資料,可以通過 <code>cacheTTLMs</code> 調整緩存的失效時間。<code>cacheTTLMs</code> 是作用于每條緩存資料上的,也就是某條緩存資料在指定 timeout 時間内沒有被通路,則會從緩存中移除。
Async 和 LRU-Cache 能極大提高吞吐率并降低資料庫的讀壓力,但是仍然會有大量的 IO 請求存在,尤其是當 miss key(維表中不存在的 key)很多的時候。如果維表資料不大(通常百萬級以内),那麼其實可以将整個維表緩存到本地。那麼 miss key 永遠不會去請求資料庫。因為本地緩存就是維表的鏡像,緩存中不存在那麼遠端資料庫中也不存在。
ALL cache 可以通過 <code>cache='ALL'</code>參數開啟,通過<code>cacheTTLMs</code>控制緩存的重新整理間隔。Flink SQL 會為 JoinTable 節點起一個異步線程去同步緩存。在 Job 剛啟動時,會先阻塞主資料流,直到緩存資料加載完畢,保證主資料流流過時緩存就已經 ready。在之後的更新緩存的過程中,不會阻塞主資料流。因為異步更新線程會将維表資料加載到臨時緩存中,加載完畢後再與主緩存做原子替換。隻有替換操作是加了鎖的。
因為幾乎沒有 IO 操作,是以使用 cache ALL 的維表 JOIN 性能可以非常高。但是由于記憶體需要能同時容納下兩份維表拷貝,是以需要加大記憶體的配置。
在使用 LRU 緩存時,如果存在大量的 invalid key ,或者資料庫中不存在的 key。由于命中不了緩存,導緻緩存的收益較低,仍然會有大量請求打到資料庫。是以我們将未命中的 key 也加進了緩存,提高了未命中 key 和 invalid key 情況下的緩存命中率。
預設 JoinTable 節點與上遊節點之間的資料是通過 shuffle 傳輸的。這在緩存大小有限、key總量大、熱點不明顯的情況下, 緩存的收益可能較低。這種情況下可以将上遊節點與 JoinTable 節點的資料傳輸改成按 key 分區。這樣通常可以縮小單個節點的 key 個數,提高緩存的命中率。比如一段時間内 JoinTable 節點總共需要處理100萬個key, 節點并發100, 在資料不傾斜時單節點平均隻需處理1萬個key = 100萬/100并發. 如果不做 key 分區, 單節點實際處理的key個數可能遠大于1萬。使用上也非常簡單,在維表的 DDL 參數中加上<code>partitionedJoin='true'</code>即可。
在使用維表 JOIN 時,如果維表資料不大,或者 miss key (維表中不存在的 key)非常多,則可以使用 ALL cache,但是可能需要适當調大節點的記憶體,因為記憶體需要能同時容納下兩份維表拷貝。如果用不了 ALL cache,則可以使用 Async + LRU 來提高節點的吞吐。
使用 SideInput 減少對資料庫的全量讀取
引入 Partitioned-ALL-cache 支援超大維表
使用批量請求提高單次 IO 的吞吐
Multi-Join 優化
另外,Async 極大地提高了吞吐,但是每一次 IO 請求隻取了單 key 的資料,效率比較低。未來計劃使用 Batch Get 來提高每次 IO 請求的吞吐。
目前在 Table API 上已經支援了 Multi-Join 的優化,能極大提高多元表連續 JOIN 時的性能,減少網絡資料的傳輸開銷。未來會在 SQL 上也支援 Multi-Join 的優化。