天天看點

(十九)Flink Table API & SQL 程式設計指南 流式概念概覽動态表 (Dynamic Table)時間屬性時态表(Temporal Tables)時态表函數

文章目錄

  • 概覽
    • 狀态管理
    • 狀态用法
  • 動态表 (Dynamic Table)
    • DataStream 上的關系查詢
    • 動态表 & 連續查詢(Continuous Query)
    • 在流上定義表
      • 連續查詢
      • 更新和追加查詢
      • 查詢限制
    • 表到流的轉換
  • 時間屬性
    • 時間屬性介紹
    • 處理時間
      • 在建立表的 DDL 中定義
      • 在 DataStream 到 Table 轉換時定義
      • 使用 TableSource 定義
    • 事件時間
      • 在 DDL 中定義
      • 在 DataStream 到 Table 轉換時定義
      • 使用 TableSource 定義
  • 時态表(Temporal Tables)
    • 設計初衷
      • 關聯一張版本表
      • 關聯一張普通表
    • 時态表
      • 聲明版本表
      • 聲明版本視圖
      • 聲明普通表
  • 時态表函數
    • 定義時态表函數
    • 時态表函數連接配接

概覽

Flink 的Table API和SQL 支援是用于批處理和流處理的統一 API。這意味着 Table API 和 SQL 查詢具有相同的語義,無論它們的輸入是有界批處理輸入還是無界流輸入。

以下頁面解釋了 Flink 流資料的關系 API 的概念、實際限制和特定于流的配置參數。

狀态管理

以流模式運作的表程式利用 Flink 作為有狀态流處理器的所有功能。

事實上,一個表程式(Table program)可以配置一個 state backend 和多個不同的 checkpoint 選項 以處理對不同狀态大小和容錯需求。這可以對正在運作的 Table API & SQL 管道(pipeline)生成 savepoint,并在這之後用其恢複應用程式的狀态。

狀态用法

由于 Table API & SQL 程式是聲明式的,管道内的狀态會在哪以及如何被使用并不明确。 Planner 會确認是否需要狀态來得到正确的計算結果, 管道會被現有優化規則集優化成盡可能少地使用狀态。

從概念上講,源表永遠不會完全保持狀态。實作者處理邏輯表(即動态表)。它們的狀态要求取決于使用的操作。

形如 SELECT … FROM … WHERE 這種隻包含字段映射或過濾器的查詢的查詢語句通常是無狀态的管道。 然而諸如 join、 聚合或去重操作需要在 Flink 抽象的容錯存儲内儲存中間結果。

動态表 (Dynamic Table)

SQL 和關系代數在設計時并未考慮流資料。是以,在關系代數(和 SQL)之間幾乎沒有概念上的差異。

本文會讨論這種差異,并介紹 Flink 如何在無界資料集上實作與資料庫引擎在有界資料上的處理具有相同的語義。

DataStream 上的關系查詢

下表比較了傳統的關系代數和流處理與輸入資料、執行和輸出結果的關系。

(十九)Flink Table API & SQL 程式設計指南 流式概念概覽動态表 (Dynamic Table)時間屬性時态表(Temporal Tables)時态表函數

盡管存在這些差異,但是使用關系查詢和 SQL 處理流并不是不可能的。進階關系資料庫系統提供了一個稱為 物化視圖(Materialized Views) 的特性。物化視圖被定義為一條 SQL 查詢,就像正常的虛拟視圖一樣。與虛拟視圖相反,物化視圖緩存查詢的結果,是以在通路視圖時不需要對查詢進行計算。緩存的一個常見難題是防止緩存為過期的結果提供服務。當其定義查詢的基表被修改時,物化視圖将過期。 即時視圖維護(Eager View Maintenance) 是一種一旦更新了物化視圖的基表就立即更新視圖的技術。

如果我們考慮以下問題,那麼即時視圖維護和流上的SQL查詢之間的聯系就會變得顯而易見:

  • 資料庫表是 INSERT、UPDATE 和 DELETE DML 語句的 stream 的結果,通常稱為 changelog stream 。
  • 物化視圖被定義為一條 SQL 查詢。為了更新視圖,查詢不斷地處理視圖的基本關系的changelog 流。
  • 物化視圖是流式 SQL 查詢的結果。

了解了這些要點之後,我們将在下一節中介紹 動态表(Dynamic tables) 的概念。

動态表 & 連續查詢(Continuous Query)

動态表 是 Flink 的支援流資料的 Table API 和 SQL 的核心概念。與表示批處理資料的靜态表不同,動态表是随時間變化的。可以像查詢靜态批處理表一樣查詢它們。查詢動态表将生成一個 連續查詢 。一個連續查詢永遠不會終止,結果會生成一個動态表。查詢不斷更新其(動态)結果表,以反映其(動态)輸入表上的更改。本質上,動态表上的連續查詢非常類似于定義物化視圖的查詢。

需要注意的是,連續查詢的結果在語義上總是等價于以批處理模式在輸入表快照上執行的相同查詢的結果。

下圖顯示了流、動态表和連續查詢之間的關系:

(十九)Flink Table API & SQL 程式設計指南 流式概念概覽動态表 (Dynamic Table)時間屬性時态表(Temporal Tables)時态表函數
  1. 将流轉換為動态表。
  2. 在動态表上計算一個連續查詢,生成一個新的動态表。
  3. 生成的動态表被轉換回流。

注意: 動态表首先是一個邏輯概念。在查詢執行期間不一定(完全)物化動态表。

在下面,我們将解釋動态表和連續查詢的概念,并使用具有以下模式的單擊事件流:

[
  user:  VARCHAR,   // 使用者名
  cTime: TIMESTAMP, // 通路 URL 的時間
  url:   VARCHAR    // 使用者通路的 URL
]
           

在流上定義表

為了使用關系查詢處理流,必須将其轉換成 Table。從概念上講,流的每條記錄都被解釋為對結果表的 INSERT 操作。本質上我們正在從一個 INSERT-only 的 changelog 流建構表。

下圖顯示了單擊事件流(左側)如何轉換為表(右側)。當插入更多的單擊流記錄時,結果表将不斷增長。

(十九)Flink Table API & SQL 程式設計指南 流式概念概覽動态表 (Dynamic Table)時間屬性時态表(Temporal Tables)時态表函數

注意: 在流上定義的表在内部沒有物化。

連續查詢

在動态表上計算一個連續查詢,并生成一個新的動态表。與批處理查詢不同,連續查詢從不終止,并根據其輸入表上的更新更新其結果表。在任何時候,連續查詢的結果在語義上與以批處理模式在輸入表快照上執行的相同查詢的結果相同。

在接下來的代碼中,我們将展示 clicks 表上的兩個示例查詢,這個表是在點選事件流上定義的。

第一個查詢是一個簡單的 GROUP-BY COUNT 聚合查詢。它基于 user 字段對 clicks 表進行分組,并統計通路的 URL 的數量。下面的圖顯示了當 clicks 表被附加的行更新時,查詢是如何被評估的。

(十九)Flink Table API & SQL 程式設計指南 流式概念概覽動态表 (Dynamic Table)時間屬性時态表(Temporal Tables)時态表函數

當查詢開始,clicks 表(左側)是空的。當第一行資料被插入到 clicks 表時,查詢開始計算結果表。第一行資料 [Mary,./home] 插入後,結果表(右側,上部)由一行 [Mary, 1] 組成。當第二行 [Bob, ./cart] 插入到 clicks 表時,查詢會更新結果表并插入了一行新資料 [Bob, 1]。第三行 [Mary, ./prod?id=1] 将産生已計算的結果行的更新,[Mary, 1] 更新成 [Mary, 2]。最後,當第四行資料加入 clicks 表時,查詢将第三行 [Liz, 1] 插入到結果表中。

第二條查詢與第一條類似,但是除了使用者屬性之外,還将 clicks 分組至每小時滾動視窗中,然後計算 url 數量(基于時間的計算,例如基于特定時間屬性的視窗,後面會讨論)。同樣,該圖顯示了不同時間點的輸入和輸出,以可視化動态表的變化特性。

(十九)Flink Table API & SQL 程式設計指南 流式概念概覽動态表 (Dynamic Table)時間屬性時态表(Temporal Tables)時态表函數

與前面一樣,左邊顯示了輸入表 clicks。查詢每小時持續計算結果并更新結果表。clicks表包含四行帶有時間戳(cTime)的資料,時間戳在 12:00:00 和 12:59:59 之間。查詢從這個輸入計算出兩個結果行(每個 user 一個),并将它們附加到結果表中。對于 13:00:00 和 13:59:59 之間的下一個視窗,clicks 表包含三行,這将導緻另外兩行被追加到結果表。随着時間的推移,更多的行被添加到 click 中,結果表将被更新。

更新和追加查詢

雖然這兩個示例查詢看起來非常相似(都計算分組計數聚合),但它們在一個重要方面不同:

  • 第一個查詢更新先前輸出的結果,即定義結果表的 changelog 流包含 INSERT 和 UPDATE 操作。
  • 第二個查詢隻附加到結果表,即結果表的 changelog 流隻包含 INSERT 操作。

一個查詢是産生一個隻追加的表還是一個更新的表有一些含義:

  • 産生更新更改的查詢通常必須維護更多的狀态(請參閱以下部分)。
  • 将 append-only 的表轉換為流與将已更新的表轉換為流是不同的(參閱表到流的轉換章節)。

查詢限制

許多(但不是全部)語義上有效的查詢可以作為流上的連續查詢進行評估。有些查詢代價太高而無法計算,這可能是由于它們需要維護的狀态大小,也可能是由于計算更新代價太高。

  • 狀态大小: 連續查詢在無界流上計算,通常應該運作數周或數月。是以,連續查詢處理的資料總量可能非常大。必須更新先前輸出的結果的查詢需要維護所有輸出的行,以便能夠更新它們。例如,第一個查詢示例需要存儲每個使用者的 URL 計數,以便能夠增加該計數并在輸入表接收新行時發送新結果。如果隻跟蹤注冊使用者,則要維護的計數數量可能不會太高。但是,如果未注冊的使用者配置設定了一個惟一的使用者名,那麼要維護的計數數量将随着時間增長,并可能最終導緻查詢失敗。
SELECT user, COUNT(url)
FROM clicks
GROUP BY user;
           
  • 計算更新: 有些查詢需要重新計算和更新大量已輸出的結果行,即使隻添加或更新一條輸入記錄。顯然,這樣的查詢不适合作為連續查詢執行。下面的查詢就是一個例子,它根據最後一次單擊的時間為每個使用者計算一個 RANK。一旦 click 表接收到一個新行,使用者的 lastAction 就會更新,并必須計算一個新的排名。然而,由于兩行不能具有相同的排名,是以所有較低排名的行也需要更新。
SELECT user, RANK() OVER (ORDER BY lastAction)
FROM (
  SELECT user, MAX(cTime) AS lastAction FROM clicks GROUP BY user
);
           

查詢配置章節讨論了控制連續查詢執行的參數。一些參數可以用來在維持狀态的大小和獲得結果的準确性之間做取舍。

表到流的轉換

動态表可以像普通資料庫表一樣通過 INSERT、UPDATE 和 DELETE 來不斷修改。它可能是一個隻有一行、不斷更新的表,也可能是一個 insert-only 的表,沒有 UPDATE 和 DELETE 修改,或者介于兩者之間的其他表。

在将動态表轉換為流或将其寫入外部系統時,需要對這些更改進行編碼。Flink的 Table API 和 SQL 支援三種方式來編碼一個動态表的變化:

  • Append-only 流: 僅通過 INSERT 操作修改的動态表可以通過輸出插入的行轉換為流。
  • Retract 流: retract 流包含兩種類型的 message: add messages 和 retract messages 。通過将INSERT 操作編碼為 add message、将 DELETE 操作編碼為 retract message、将 UPDATE 操作編碼為更新(先前)行的 retract message 和更新(新)行的 add message,将動态表轉換為 retract 流。下圖顯示了将動态表轉換為 retract 流的過程。
    (十九)Flink Table API & SQL 程式設計指南 流式概念概覽動态表 (Dynamic Table)時間屬性時态表(Temporal Tables)時态表函數
  • Upsert 流: upsert 流包含兩種類型的 message: upsert messages 和delete messages。轉換為 upsert 流的動态表需要(可能是組合的)唯一鍵。通過将 INSERT 和 UPDATE 操作編碼為 upsert message,将 DELETE 操作編碼為 delete message ,将具有唯一鍵的動态表轉換為流。消費流的算子需要知道唯一鍵的屬性,以便正确地應用 message。與 retract 流的主要差別在于 UPDATE 操作是用單個 message 編碼的,是以效率更高。下圖顯示了将動态表轉換為 upsert 流的過程。

在通用概念中讨論了将動态表轉換為 DataStream 的 API。請注意,在将動态表轉換為 DataStream 時,隻支援 append 流和 retract 流。在 TableSources 和 TableSinks 章節讨論向外部系統輸出動态表的 TableSink 接口。

時間屬性

Flink 可以基于幾種不同的 時間 概念來處理資料。

  • 處理時間 指的是執行具體操作時的機器時間(大家熟知的絕對時間, 例如 Java的 System.currentTimeMillis()) )
  • 事件時間 指的是資料本身攜帶的時間。這個時間是在事件産生時的時間。
  • 攝入時間 指的是資料進入 Flink 的時間;在系統内部,會把它當做事件時間來處理。

本頁面說明了如何在 Flink Table API & SQL 裡面定義時間以及相關的操作。

時間屬性介紹

像視窗(在 Table API 和 SQL )這種基于時間的操作,需要有時間資訊。是以,Table API 中的表就需要提供邏輯時間屬性來表示時間,以及支援時間相關的操作。

每種類型的表都可以有時間屬性,可以在用CREATE TABLE DDL建立表的時候指定、也可以在 DataStream 中指定、也可以在定義 TableSource 時指定。一旦時間屬性定義好,它就可以像普通列一樣使用,也可以在時間相關的操作中使用。

隻要時間屬性沒有被修改,而是簡單地從一個表傳遞到另一個表,它就仍然是一個有效的時間屬性。時間屬性可以像普通的時間戳的列一樣被使用和計算。一旦時間屬性被用在了計算中,它就會被物化,進而變成一個普通的時間戳。普通的時間戳是無法跟 Flink 的時間以及watermark等一起使用的,是以普通的時間戳就無法用在時間相關的操作中。

Table API 程式需要在 streaming environment 中指定時間屬性:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default

// 或者:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
           

處理時間

處理時間是基于機器的本地時間來處理資料,它是最簡單的一種時間概念,但是它不能提供确定性。它既不需要從資料裡擷取時間,也不需要生成 watermark。

共有三種方法可以定義處理時間。

在建立表的 DDL 中定義

處理時間屬性可以在建立表的 DDL 中用計算列的方式定義,用 PROCTIME() 就可以定義處理時間,函數 PROCTIME() 的傳回類型是 TIMESTAMP_LTZ 。

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time AS PROCTIME() -- 聲明一個額外的列作為處理時間屬性
) WITH (
  ...
);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

           

在 DataStream 到 Table 轉換時定義

處理時間屬性可以在 schema 定義的時候用 .proctime 字尾來定義。時間屬性一定不能定義在一個已有字段上,是以它隻能定義在 schema 定義的最後。

DataStream<Tuple2<String, String>> stream = ...;

// 聲明一個額外的字段作為時間屬性字段
Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").proctime());

WindowedTable windowedTable = table.window(
        Tumble.over(lit(10).minutes())
            .on($("user_action_time"))
            .as("userActionWindow"));
           

使用 TableSource 定義

處理時間屬性可以在實作了 DefinedProctimeAttribute 的 TableSource 中定義。邏輯的時間屬性會放在 TableSource 已有實體字段的最後

// 定義一個由處理時間屬性的 table source
public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {

	@Override
	public TypeInformation<Row> getReturnType() {
		String[] names = new String[] {"user_name" , "data"};
		TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
		return Types.ROW(names, types);
	}

	@Override
	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
		// create stream
		DataStream<Row> stream = ...;
		return stream;
	}

	@Override
	public String getProctimeAttribute() {
		// 這個名字的列會被追加到最後,作為第三列
		return "user_action_time";
	}
}

// register table source
tEnv.registerTableSource("user_actions", new UserActionSource());

WindowedTable windowedTable = tEnv
	.from("user_actions")
	.window(Tumble
	    .over(lit(10).minutes())
	    .on($("user_action_time"))
	    .as("userActionWindow"));

           

事件時間

事件時間允許程式按照資料中包含的時間來處理,這樣可以在有亂序或者晚到的資料的情況下産生一緻的處理結果。它可以保證從外部存儲讀取資料後産生可以複現(replayable)的結果。

除此之外,事件時間可以讓程式在流式和批式作業中使用同樣的文法。在流式程式中的事件時間屬性,在批式程式中就是一個正常的時間字段。

為了能夠處理亂序的事件,并且區分正常到達和晚到的事件,Flink 需要從事件中擷取事件時間并且産生 watermark(watermarks)。

事件時間屬性也有類似于處理時間的三種定義方式:在DDL中定義、在 DataStream 到 Table 轉換時定義、用 TableSource 定義。

在 DDL 中定義

事件時間屬性可以用 WATERMARK 語句在 CREATE TABLE DDL 中進行定義。WATERMARK 語句在一個已有字段上定義一個 watermark 生成表達式,同時标記這個已有字段為時間屬性字段。更多資訊可以參考:CREATE TABLE DDL

Flink 支援和在 TIMESTAMP 列和 TIMESTAMP_LTZ 列上定義事件時間。如果源資料中的時間戳資料表示為年-月-日-時-分-秒,則通常為不帶時區資訊的字元串值,例如 2020-04-15 20:13:40.564,建議将事件時間屬性定義在 TIMESTAMP 列上:

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time TIMESTAMP(3),
  -- 聲明 user_action_time 是事件時間屬性,并且用 延遲 5 秒的政策來生成 watermark
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
  ...
);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
           

源資料中的時間戳資料表示為一個紀元 (epoch) 時間,通常是一個 long 值,例如 1618989564564,建議将事件時間屬性定義在 TIMESTAMP_LTZ 列上:

CREATE TABLE user_actions (
 user_name STRING,
 data STRING,
 ts BIGINT,
 time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
 -- declare time_ltz as event time attribute and use 5 seconds delayed watermark strategy
 WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND
) WITH (
 ...
);

SELECT TUMBLE_START(time_ltz, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(time_ltz, INTERVAL '10' MINUTE);
           

在 DataStream 到 Table 轉換時定義

事件時間屬性可以用 .rowtime 字尾在定義 DataStream schema 的時候來定義。時間戳和 watermark 在這之前一定是在 DataStream 上已經定義好了。 在從 DataStream 轉換到 Table 時,由于 DataStream 沒有時區概念,是以 Flink 總是将 rowtime 屬性解析成 TIMESTAMP WITHOUT TIME ZONE 類型,并且将所有事件時間的值都視為 UTC 時區的值。

在從 DataStream 到 Table 轉換時定義事件時間屬性有兩種方式。取決于用 .rowtime 字尾修飾的字段名字是否是已有字段,事件時間字段可以是:

  • 在 schema 的結尾追加一個新的字段
  • 替換一個已經存在的字段。

不管在哪種情況下,事件時間字段都表示 DataStream 中定義的事件的時間戳。

// Option 1:

// 基于 stream 中的事件産生時間戳和 watermark
DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);

// 聲明一個額外的邏輯字段作為事件時間屬性
Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"), $("user_action_time").rowtime());


// Option 2:

// 從第一個字段擷取事件時間,并且産生 watermark
DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);

// 第一個字段已經用作事件時間抽取了,不用再用一個新字段來表示事件時間了
Table table = tEnv.fromDataStream(stream, $("user_action_time").rowtime(), $("user_name"), $("data"));

// Usage:

WindowedTable windowedTable = table.window(Tumble
       .over(lit(10).minutes())
       .on($("user_action_time"))
       .as("userActionWindow"));
           

使用 TableSource 定義

事件時間屬性可以在實作了 DefinedRowTimeAttributes 的 TableSource 中定義。getRowtimeAttributeDescriptors() 方法傳回 RowtimeAttributeDescriptor 的清單,包含了描述事件時間屬性的字段名字、如何計算事件時間、以及 watermark 生成政策等資訊。

同時需要確定 getDataStream 傳回的 DataStream 已經定義好了時間屬性。 隻有在定義了 StreamRecordTimestamp 時間戳配置設定器的時候,才認為 DataStream 是有時間戳資訊的。 隻有定義了 PreserveWatermarks watermark 生成政策的 DataStream 的 watermark 才會被保留。反之,則隻有時間字段的值是生效的。

// 定義一個有事件時間屬性的 table source
public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttributes {

	@Override
	public TypeInformation<Row> getReturnType() {
		String[] names = new String[] {"user_name", "data", "user_action_time"};
		TypeInformation[] types =
		    new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()};
		return Types.ROW(names, types);
	}

	@Override
	public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
		// 構造 DataStream
		// ...
		// 基于 "user_action_time" 定義 watermark
		DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...);
		return stream;
	}

	@Override
	public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
		// 标記 "user_action_time" 字段是事件時間字段
		// 給 "user_action_time" 構造一個時間屬性描述符
		RowtimeAttributeDescriptor rowtimeAttrDescr = new RowtimeAttributeDescriptor(
			"user_action_time",
			new ExistingField("user_action_time"),
			new AscendingTimestamps());
		List<RowtimeAttributeDescriptor> listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr);
		return listRowtimeAttrDescr;
	}
}

// register the table source
tEnv.registerTableSource("user_actions", new UserActionSource());

WindowedTable windowedTable = tEnv
	.from("user_actions")
	.window(Tumble.over(lit(10).minutes()).on($("user_action_time")).as("userActionWindow"));

           

時态表(Temporal Tables)

時态表(Temporal Table)是一張随時間變化的表 – 在 Flink 中稱為動态表,時态表中的每條記錄都關聯了一個或多個時間段,所有的 Flink 表都是時态的(動态的)。

時态表包含表的一個或多個有版本的表快照,時态表可以是一張跟蹤所有變更記錄的表(例如資料庫表的 changelog,包含多個表快照),也可以是物化所有變更之後的表(例如資料庫表,隻有最新表快照)。

版本: 時态表可以劃分成一系列帶版本的表快照集合,表快照中的版本代表了快照中所有記錄的有效區間,有效區間的開始時間和結束時間可以通過使用者指定,根據時态表是否可以追蹤自身的曆史版本與否,時态表可以分為 版本表 和 普通表。

版本表: 如果時态表中的記錄可以追蹤和并通路它的曆史版本,這種表我們稱之為版本表,來自資料庫的 changelog 可以定義成版本表。

普通表: 如果時态表中的記錄僅僅可以追蹤并和它的最新版本,這種表我們稱之為普通表,來自資料庫 或 HBase 的表可以定義成普通表。

設計初衷

關聯一張版本表

以訂單流關聯産品表這個場景舉例,orders 表包含了來自 Kafka 的實時訂單流,product_changelog 表來自資料庫表 products 的 changelog , 産品的價格在資料庫表 products 中是随時間實時變化的。

SELECT * FROM product_changelog;

(changelog kind)  update_time  product_id product_name price
================= ===========  ========== ============ ===== 
+(INSERT)         00:01:00     p_001      scooter      11.11
+(INSERT)         00:02:00     p_002      basketball   23.11
-(UPDATE_BEFORE)  12:00:00     p_001      scooter      11.11
+(UPDATE_AFTER)   12:00:00     p_001      scooter      12.99
-(UPDATE_BEFORE)  12:00:00     p_002      basketball   23.11 
+(UPDATE_AFTER)   12:00:00     p_002      basketball   19.99
-(DELETE)         18:00:00     p_001      scooter      12.99 

           

表 product_changelog 表示資料庫表 products不斷增長的 changelog, 比如,産品 scooter 在時間點 00:01:00的初始價格是 11.11, 在 12:00:00 的時候漲價到了 12.99, 在 18:00:00 的時候這條産品價格記錄被删除。

如果我們想輸出 product_changelog 表在 10:00:00 對應的版本,表的内容如下所示:

update_time  product_id product_name price
===========  ========== ============ ===== 
00:01:00     p_001      scooter      11.11
00:02:00     p_002      basketball   23.11
           

如果我們想輸出 product_changelog 表在 13:00:00 對應的版本,表的内容如下所示:

update_time  product_id product_name price
===========  ========== ============ ===== 
12:00:00     p_001      scooter      12.99
12:00:00     p_002      basketball   19.99
           

上述例子中,products 表的版本是通過 update_time 和 product_id 進行追蹤的,product_id 對應 product_changelog 表的主鍵,update_time 對應事件時間。

在 Flink 中, 這由版本表表示。

關聯一張普通表

另一方面,某些使用者案列需要連接配接變化的維表,該表是外部資料庫表。

假設 LatestRates 是一個物化的最新匯率表 (比如:一張 HBase 表),LatestRates 總是表示 HBase 表 Rates 的最新内容。

我們在 10:15:00 時查詢到的内容如下所示:

10:15:00 > SELECT * FROM LatestRates;

currency  rate
========= ====
US Dollar 102
Euro      114
Yen       1
           

我們在 11:00:00 時查詢到的内容如下所示:

11:00:00 > SELECT * FROM LatestRates;

currency  rate
========= ====
US Dollar 102
Euro      116
Yen       1
           

在 Flink 中, 這由普通表表示。

時态表

Flink 使用主鍵限制和事件時間來定義一張版本表和版本視圖。

聲明版本表

在 Flink 中,定義了主鍵限制和事件時間屬性的表就是版本表。

-- 定義一張版本表
CREATE TABLE product_changelog (
  product_id STRING,
  product_name STRING,
  product_price DECIMAL(10, 4),
  update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
  PRIMARY KEY(product_id) NOT ENFORCED,      -- (1) 定義主鍵限制
  WATERMARK FOR update_time AS update_time   -- (2) 通過 watermark 定義事件時間              
) WITH (
  'connector' = 'kafka',
  'topic' = 'products',
  'scan.startup.mode' = 'earliest-offset',
  'properties.bootstrap.servers' = 'localhost:9092',
  'value.format' = 'debezium-json'
);
           

行 (1) 為表 product_changelog 定義了主鍵, 行 (2) 把 update_time 定義為表 product_changelog 的事件時間,是以 product_changelog 是一張版本表。

注意: METADATA FROM ‘value.source.timestamp’ VIRTUAL 文法的意思是從每條 changelog 中抽取 changelog 對應的資料庫表中操作的執行時間,強烈推薦使用資料庫表中操作的 執行時間作為事件時間 ,否則通過時間抽取的版本可能和資料庫中的版本不比對。

聲明版本視圖

Flink 也支援定義版本視圖隻要一個視圖包含主鍵和事件時間便是一個版本視圖。

假設我們有表 RatesHistory 如下所示:

-- 定義一張 append-only 表
CREATE TABLE RatesHistory (
    currency_time TIMESTAMP(3),
    currency STRING,
    rate DECIMAL(38, 10),
    WATERMARK FOR currency_time AS currency_time   -- 定義事件時間
) WITH (
  'connector' = 'kafka',
  'topic' = 'rates',
  'scan.startup.mode' = 'earliest-offset',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format' = 'json'                                -- 普通的 append-only 流
)
           

表 RatesHistory 代表一個兌換日元貨币匯率表(日元匯率為1),該表是不斷增長的 append-only 表。 例如,歐元 兌換 日元 從 09:00:00 到 10:45:00 的匯率為 114。從 10:45:00 到 11:15:00 的匯率為 116。

SELECT * FROM RatesHistory;

currency_time currency  rate
============= ========= ====
09:00:00      US Dollar 102
09:00:00      Euro      114
09:00:00      Yen       1
10:45:00      Euro      116
11:15:00      Euro      119
11:49:00      Pounds    108

           

為了在 RatesHistory 上定義版本表,Flink 支援通過去重查詢定義版本視圖, 去重查詢可以産出一個有序的 changelog 流,去重查詢能夠推斷主鍵并保留原始資料流的事件時間屬性。

CREATE VIEW versioned_rates AS              
SELECT currency, rate, currency_time            -- (1) `currency_time` 保留了事件時間
  FROM (
      SELECT *,
      ROW_NUMBER() OVER (PARTITION BY currency  -- (2) `currency` 是去重 query 的 unique key,可以作為主鍵
         ORDER BY currency_time DESC) AS rowNum 
      FROM RatesHistory )
WHERE rowNum = 1; 

-- 視圖 `versioned_rates` 将會産出如下的 changelog:

(changelog kind) currency_time currency   rate
================ ============= =========  ====
+(INSERT)        09:00:00      US Dollar  102
+(INSERT)        09:00:00      Euro       114
+(INSERT)        09:00:00      Yen        1
+(UPDATE_AFTER)  10:45:00      Euro       116
+(UPDATE_AFTER)  11:15:00      Euro       119
+(INSERT)        11:49:00      Pounds     108

           

行 (1) 保留了事件時間作為視圖 versioned_rates 的事件時間,行 (2) 使得視圖 versioned_rates 有了主鍵, 是以視圖 versioned_rates 是一個版本視圖。

視圖中的去重 query 會被 Flink 優化并高效地産出 changelog stream, 産出的 changelog 保留了主鍵限制和事件時間。

如果我們想輸出 versioned_rates 表在 11:00:00 對應的版本,表的内容如下所示:

currency_time currency   rate  
============= ========== ====
09:00:00      US Dollar  102
09:00:00      Yen        1
10:45:00      Euro       116

           

如果我們想輸出 versioned_rates 表在 12:00:00 對應的版本,表的内容如下所示:

currency_time currency   rate  
============= ========== ====
09:00:00      US Dollar  102
09:00:00      Yen        1
10:45:00      Euro       119
11:49:00      Pounds     108

           

聲明普通表

普通表的聲明和 Flink 建表 DDL 一緻,參考 create table 頁面擷取更多如何建表的資訊。

-- 用 DDL 定義一張 HBase 表,然後我們可以在 SQL 中将其當作一張時态表使用
-- 'currency' 列是 HBase 表中的 rowKey
 CREATE TABLE LatestRates (   
     currency STRING,   
     fam1 ROW<rate DOUBLE>   
 ) WITH (   
    'connector' = 'hbase-1.4',   
    'table-name' = 'rates',   
    'zookeeper.quorum' = 'localhost:2181'   
 );

           

注意: 理論上講任意都能用作時态表并在基于處理時間的時态表 Join 中使用,但目前支援作為時态表的普通表必須實作接口 LookupableTableSource。接口 LookupableTableSource 的執行個體隻能作為時态表用于基于處理時間的時态 Join 。

通過 LookupableTableSource 定義的表意味着該表具備了在運作時通過一個或多個 key 去查詢外部存儲系統的能力,目前支援在 基于處理時間的時态表 join 中使用的表包括 JDBC, HBase 和 Hive。

在基于處理時間的時态表 Join 中支援任意表作為時态表會在不遠的将來支援。

時态表函數

時态表函數提供對特定時間點時态表版本的通路。為了通路時态表中的資料,必須傳遞一個時間屬性,該屬性确定将傳回的表的版本。Flink 使用表函數的 SQL 文法來提供一種表達方式。

與版本化表不同,時态表函數隻能在僅附加流之上定義——它不支援更改日志輸入。此外,不能在純 SQL DDL 中定義時态表函數。

定義時态表函數

可以使用Table API在僅附加流之上定義時态表函數。該表注冊了一個或多個關鍵列,以及用于版本控制的時間屬性。

假設我們有一個僅附加的匯率表,我們希望将其注冊為時态表函數。

SELECT * FROM currency_rates;

update_time   currency   rate
============= =========  ====
09:00:00      Yen        102
09:00:00      Euro       114
09:00:00      USD        1
11:15:00      Euro       119
11:49:00      Pounds     108
           

使用 Table API,我們可以使用currencykey 和update_time版本控制時間屬性來注冊這個流。

TemporalTableFunction rates = tEnv
    .from("currency_rates").
    .createTemporalTableFunction("update_time", "currency");
 
tEnv.registerFunction("rates", rates); 
           

時态表函數連接配接

一旦定義,時态表函數将用作标準表函數。僅附加表(左側輸入/探測側)可以與臨時表(右側輸入/建構側)連接配接,即随時間變化并跟蹤其變化的表,以檢索鍵的值,就像它在特定的時間點。

orders考慮一個以不同貨币跟蹤客戶訂單的僅附加表。

SELECT * FROM orders;

order_time amount currency
========== ====== =========
10:15        2    Euro
10:30        1    USD
10:32       50    Yen
10:52        3    Euro
11:04        5    USD

           

鑒于這些表格,我們希望将訂單轉換為通用貨币——美元。

SELECT
  SUM(amount * rate) AS amount
FROM
  orders,
  LATERAL TABLE (rates(order_time))
WHERE
  rates.currency = orders.currency