天天看點

Flink SQL 如何實作資料流的 Join?

雲栖号: https://yqh.aliyun.com 第一手的上雲資訊,不同行業精選的上雲企業案例庫,基于衆多成功案例萃取而成的最佳實踐,助力您上雲決策!

無論在 OLAP 還是 OLTP 領域,Join 都是業務常會涉及到且優化規則比較複雜的 SQL 語句。對于離線計算而言,經過資料庫領域多年的積累,Join 語義以及實作已經十分成熟,然而對于近年來剛興起的 Streaming SQL 來說 Join 卻處于剛起步的狀态。

其中最為關鍵的問題在于 Join 的實作依賴于緩存整個資料集,而 Streaming SQL Join 的對象卻是無限的資料流,記憶體壓力和計算效率在長期運作來說都是不可避免的問題。下文将結合 SQL 的發展解析 Flink SQL 是如何解決這些問題并實作兩個資料流的 Join。

離線 Batch SQL Join 的實作

傳統的離線 Batch SQL (面向有界資料集的 SQL)有三種基礎的實作方式,分别是 Nested-loop Join、Sort-Merge Join 和 Hash Join。

  • Nested-loop Join 最為簡單直接,将兩個資料集加載到記憶體,并用内嵌周遊的方式來逐個比較兩個資料集内的元素是否符合 Join 條件。Nested-loop Join 雖然時間效率以及空間效率都是最低的,但勝在比較靈活适用範圍廣,是以其變體 BNL 常被傳統資料庫用作為 Join 的預設基礎選項。
  • Sort-Merge Join 顧名思義,分為兩個 Sort 和 Merge 階段。首先将兩個資料集進行分别排序,然後對兩個有序資料集分别進行周遊和比對,類似于歸并排序的合并。值得注意的是,Sort-Merge 隻适用于 Equi-Join(Join 條件均使用等于作為比較算子)。Sort-Merge Join 要求對兩個資料集進行排序,成本很高,通常作為輸入本就是有序資料集的情況下的優化方案。
  • Hash Join 同樣分為兩個階段,首先将一個資料集轉換為 Hash Table,然後周遊另外一個資料集元素并與 Hash Table 内的元素進行比對。第一階段和第一個資料集分别稱為 build 階段和 build table,第二個階段和第二個資料集分别稱為 probe 階段和 probe table。Hash Join 效率較高但對空間要求較大,通常是作為 Join 其中一個表為适合放入記憶體的小表的情況下的優化方案。和 Sort-Merge Join 類似,Hash Join 也隻适用于 Equi-Join。

實時 Streaming SQL Join

相對于離線的 Join,實時 Streaming SQL(面向無界資料集的 SQL)無法緩存所有資料,是以 Sort-Merge Join 要求的對資料集進行排序基本是無法做到的,而 Nested-loop Join 和 Hash Join 經過一定的改良則可以滿足實時 SQL 的要求。

我們通過例子來看基本的 Nested Join 在實時 Streaming SQL 的基礎實作(案例及圖來自 Piotr Nowojski 在 Flink Forward San Francisco 的分享[2])。

Flink SQL 如何實作資料流的 Join?

圖1. Join-in-continuous-query-1

Table A 有 1、42 兩個元素,Table B 有 42 一個元素,是以此時的 Join 結果會輸出 42。

Flink SQL 如何實作資料流的 Join?

圖2. Join-in-continuous-query-2

接着 Table B 依次接受到三個新的元素,分别是 7、3、1。因為 1 比對到 Table A 的元素,是以結果表再輸出一個元素 1。

Flink SQL 如何實作資料流的 Join?

圖3. Join-in-continuous-query-3

随後 Table A 出現新的輸入 2、3、6,3 比對到 Table B 的元素,是以再輸出 3 到結果表。

可以看到在 Nested-Loop Join 中我們需要儲存兩個輸入表的内容,而随着時間的增長 Table A 和 Table B 需要儲存的曆史資料無止境地增長,導緻很不合理的記憶體磁盤資源占用,而且單個元素的比對效率也會越來越低。類似的問題也存在于 Hash Join 中。

那麼有沒有可能設定一個緩存剔除政策,将不必要的曆史資料及時清理呢?答案是肯定的,關鍵在于緩存剔除政策如何實作,這也是 Flink SQL 提供的三種 Join 的主要差別。

Flink SQL 的 Join

  • Regular Join

Regular Join 是最為基礎的沒有緩存剔除政策的 Join。Regular Join 中兩個表的輸入和更新都會對全局可見,影響之後所有的 Join 結果。舉例,在一個如下的 Join 查詢裡,Orders 表的新紀錄會和 Product 表所有曆史紀錄以及未來的紀錄進行比對。

SELECT * FROM Orders
INNER JOIN Product
ON Orders.productId = Product.id           

因為曆史資料不會被清理,是以 Regular Join 允許對輸入表進行任意種類的更新操作(insert、update、delete)。然而因為資源問題 Regular Join 通常是不可持續的,一般隻用做有界資料流的 Join。

  • Time-Windowed Join

Time-Windowed Join 利用視窗給兩個輸入表設定一個 Join 的時間界限,超出時間範圍的資料則對 JOIN 不可見并可以被清理掉。值得注意的是,這裡涉及到的一個問題是時間的語義,時間可以指計算發生的系統時間(即 Processing Time),也可以指從資料本身的時間字段提取的 Event Time。如果是 Processing Time,Flink 根據系統時間自動劃分 Join 的時間視窗并定時清理資料;如果是 Event Time,Flink 配置設定 Event Time 視窗并依據 Watermark 來清理資料。

以更常用的 Event Time Windowed Join 為例,一個将 Orders 訂單表和 Shipments 運輸單表依據訂單時間和運輸時間 Join 的查詢如下:

SELECT *
FROM 
  Orders o, 
  Shipments s
WHERE 
  o.id = s.orderId AND
  s.shiptime BETWEEN o.ordertime AND o.ordertime + INTERVAL '4' HOUR           

這個查詢會為 Orders 表設定了 o.ordertime > s.shiptime- INTERVAL ‘4’ HOUR 的時間下界(圖4)。

Flink SQL 如何實作資料流的 Join?

圖4. Time-Windowed Join 的時間下界 - Orders 表

并為 Shipmenets 表設定了 s.shiptime >= o.ordertime 的時間下界(圖5)。

Flink SQL 如何實作資料流的 Join?

圖5. Time-Windowed Join 的時間下界 - Shipment 表

是以兩個輸入表都隻需要緩存在時間下界以上的資料,将空間占用維持在合理的範圍。

不過雖然底層實作上沒有問題,但如何通過 SQL 文法定義時間仍是難點。盡管在實時計算領域 Event Time、Processing Time、Watermark 這些概念已經成為業界共識,但在 SQL 領域對時間資料類型的支援仍比較弱[4]。是以,定義 Watermark 和時間語義都需要通過程式設計 API 的方式完成,比如從 DataStream 轉換至 Table ,不能單純靠 SQL 完成。這方面的支援 Flink 社群計劃通過拓展 SQL 方言來完成,感興趣的讀者可以通過 FLIP-66[7] 來追蹤進度。

  • Temporal Table Join

雖然 Timed-Windowed Join 解決了資源問題,但也限制了使用場景: Join 兩個輸入流都必須有時間下界,超過之後則不可通路。這對于很多 Join 維表的業務來說是不适用的,因為很多情況下維表并沒有時間界限。針對這個問題,Flink 提供了 Temporal Table Join 來滿足使用者需求。

Temporal Table Join 類似于 Hash Join,将輸入分為 Build Table 和 Probe Table。前者一般是緯度表的 changelog,後者一般是業務資料流,典型情況下後者的資料量應該遠大于前者。在 Temporal Table Join 中,Build Table 是一個基于 append-only 資料流的帶時間版本的視圖,是以又稱為 Temporal Table。Temporal Table 要求定義一個主鍵和用于版本化的字段(通常就是 Event Time 時間字段),以反映記錄在不同時間的内容。

比如典型的一個例子是對商業訂單金額進行匯率轉換。假設有一個 Orders 流記錄訂單金額,需要和 RatesHistory 匯率流進行 Join。RatesHistory 代表不同貨币轉為日元的匯率,每當匯率有變化時就會有一條更新記錄。兩個表在某一時間節點内容如下:

Flink SQL 如何實作資料流的 Join?

圖6. Temporal Table Join Example]

我們将 RatesHistory 注冊為一個名為 Rates 的 Temporal Table,設定主鍵為 currency,版本字段為 time。

Flink SQL 如何實作資料流的 Join?

圖7. Temporal Table Registration]

此後給 Rates 指定時間版本,Rates 則會基于 RatesHistory 來計算符合時間版本的匯率轉換内容。

Flink SQL 如何實作資料流的 Join?

圖8. Temporal Table Content]

在 Rates 的幫助下,我們可以将業務邏輯用以下的查詢來表達:

SELECT 
  o.amount * r.rate
FROM
  Orders o,
  LATERAL Table(Rates(o.time)) r
WHERE
  o.currency = r.currency           

值得注意的是,不同于在 Regular Join 和 Time-Windowed Join 中兩個表是平等的,任意一個表的新記錄都可以與另一表的曆史記錄進行比對,在 Temporal Table Join 中,Temoparal Table 的更新對另一表在該時間節點以前的記錄是不可見的。這意味着我們隻需要儲存 Build Side 的記錄直到 Watermark 超過記錄的版本字段。因為 Probe Side 的輸入理論上不會再有早于 Watermark 的記錄,這些版本的資料可以安全地被清理掉。

總結

實時領域 Streaming SQL 中的 Join 與離線 Batch SQL 中的 Join 最大不同點在于無法緩存完整資料集,而是要給緩存設定基于時間的清理條件以限制 Join 涉及的資料範圍。根據清理政策的不同,Flink SQL 分别提供了 Regular Join、Time-Windowed Join 和 Temporal Table Join 來應對不同業務場景。

另外,盡管在實時計算領域 Join 可以靈活地用底層程式設計 API 來實作,但在 Streaming SQL 中 Join 的發展仍處于比較初級的階段,其中關鍵點在于如何将時間屬性合适地融入 SQL 中,這點 ISO SQL 委員會制定的 SQL 标準并沒有給出完整的答案。或者從另外一個角度來講,作為 Streaming SQL 最早的開拓者之一,Flink 社群很适合探索出一套合理的 SQL 文法反過來貢獻給 ISO。

參考

作者介紹:

林小鉑,網易遊戲進階開發工程師,負責遊戲資料中心實時平台的開發及運維工作,目前專注于 Apache Flink 的開發及應用。探究問題本來就是一種樂趣。

原文連結

雲栖号線上課堂,每天都有産品技術專家分享

立即加入圈子:

https://c.tb.cn/F3.Z8gvnK 與專家面對面,及時了解課程最新動态!

原文釋出時間:2019-12-18

本文作者:林小鉑

本文來自:“

阿裡雲雲栖社群

”,了解相關資訊可以關注“