本文根據 《【Flink 1.12】基于 FLINK SQL 的實時資料打寬》中的内容整理而成
1 實時資料打寬
利⽤ Flink SQL 打寬實時資料,共有以下方式:
- 雙流join (Regular join)
- 區間join (Interval join)
- 時态表join (Temporal join)
1.1 雙流join
- 支援 INNER JOIN, LEFT JOIN, RIGHT JOIN,FULL OUTER JOIN
- 文法, 語義 均和傳統批 SQL 一緻
- 左右流都會觸發結果更新
- 狀态持續增長,一般結合 state TTL 使用
SELECT i.*, c.*
FROM impressions as i JOIN clicks as c
ON i.id = c.impression_id

1.2 區間join
- 支援 INNER JOIN, LEFT JOIN, RIGHT JOIN,FULL OUTER JOIN
- 左右流都會觸發結果更新
- State 自動清理,根據時間區間保留資料
- 輸出流保留時間屬性
注意:INTERVAL ‘10’ MINUTE
SELECT i.*, c.*
FROM impressions as i LEFT JOIN clicks as c
ON i.id = c.impression_id AND c.click_time >= i.show_time AND
c.click_time <= i.show_time + INTERVAL '10' MINUTE
1.3 時态表 Join
1.3.1 時态 (Temporal)表 與 臨時(Temporary)表差別
Temporal Table(時态表) | 臨時(Temporary)表 |
---|---|
Temporal adj. 時間的,時态的 | Temporary adj. 暫時的,臨時的 |
時态表是随時間變化的表, 典型 的是版本表:版本表中的每⾏數 據有對應的⽣命周期,可以追蹤 該表在給定時間的内容(版本)。 如定義在數 據庫 changelog上的表,可以 追蹤曆史版本 維表也是時态表,維表中的資料 也是在随時間表化,隻是⼤多數 維表不能追蹤曆史版本 | 臨時的表對象,屬于目前 session(會話),随着 session 結 束⽽消失。 該表不屬于具體 Catalog和 DB,不會持久化。 |
1.3.2 時态表 Join
- 支援 INNER JOIN, LEFT JOIN
- 隻有左流都會觸發結果更新
- 輸出流保留時間屬性
- 時态表join關聯支援如下:
- 關聯 Lookup DB
- 關聯 版本表
- 關聯 Hive 分區表
文法結構
注意: FOR SYSTEM_TIME AS OF
SELECT * FROM fact
LEFT JOIN dim FOR SYSTEM_TIME AS OF fact.{proctime | rowtime}
ON fact.id = dim.id
1.3.3 Temporal Join Lookup DB
- 具備 lookup 的能力的外部系統
- 典型的 connector 有 HBase,JDBC
- 自定義connector需實作 LookupTableSource
- 支援 Async IO 和 Cache 提升lookup效率,在1.13.x中已經支援 HBase Async IO lookup
基于 FLINK SQL 的實時資料打寬1 實時資料打寬2 示例
1.3.4 Temporal Join 版本表
1.3.4.1 資料庫的changelog
通過CDC讀取資料庫某一個表的changelog 或 消費kafka中某一個表的changelog
在定義版本表時,必須定義如下:
- 定義 版本表的 主鍵,見primary的定義
- 定義版本表的 event time,見watermark的定義
- 定義版本表的資料源,見format的定義
Create TABLE productChangelog (
product STRING,
price DECIMAL(10,4),
update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
PRIMARY KEY(product) NOTE ENFORCED,
WATERMARK FOR update_time AS update_time)
WITH (
'connector' = 'kafka',
'topic' = 'products',
'properties.bootstrap.servers' = '172.25.*.*:9092',
'properties.group.id' = 'flink_gp_test5',
'scan.startup.mode' = 'group-offsets',
'format' = 'debezium-json',
'properties.zookeeper.connect' = '172.25.*.*:2181/kafka'
);
關聯SQL示例
SELECT orderId,orderTime,productTime,price
FROM orders AS o LEFT JOIN productChangelog FOR SYSTEM_TIME AS OF o.order_time AS p
ON o.product = p.product
1.3.4.2 Temporal Join 版本表【versioned view】
将業務表如匯率變化曆史轉化一個版本表
CREATE TABLE ratesHistory (
currency_time TIMESTAMP(3),
currency STRING,
rate DECIMAL(38,10),
WATERMARK FOR currency_time as currency_time
) WITH (
'connector' = 'kafka',
'topic' = 'flink_test_4',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'flink_gp_test4',
'scan.startup.mode' = 'group-offsets',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
);
CREATE VIEW versionedRates AS
SELECT currency,rate,currency_time FROM (
SELECT * ROW_NUMBER() OVER (PARTITION BY currency ORDER BY currency_time DESC) AS rowNum
FROM ratesHistory) WHERE row_num =1 ;
1.3.4.3 Temporal Join 版本表(upsert-kafka)
CREATE TABLE user (
user_id BIGINT,
user_name STRING,
phone_num STRING,
address STRING,
modify_time TIMESTAMP(3) METADATA FROM 'timestamp',
PRIMARY KEY (`user_id`) NOT ENFORCED,
WATERMARK FOR modify_time AS modify_time
) WITH (
'connector' = 'upsert-kafka'
'topic' = 'users',
'properties.bootstrap.servers' = 'localhost:9092',
'key.format' = 'json',
'value.format' = 'json');
1.3.5 Temporal Join Hive分區表
1.3.5.1 關聯最新分區
- 此時 Hive 表是 stream 讀,每次讀取一個分區
- streaming-source.partition-include 取值 latest
- 通過 streaming-source.monitor-interval 發現分區
SET 'table.sql-dialect'='hive';
CREATE TABLE shopDim (
shopId STRING,
shopName STRING,
shopLevel STRING
) PARTITIONED BY (dt STRING) TBLPROPERTIES (
'streaming-source.enable' = 'true',
'streaming-source.partition.include' = 'latest',
'streaming-source.partition.order' = 'partition-name',
'streaming-source.monitor-interval' = '1 h');
SET 'table.sql-dialect'='hive';
SELECT o.*,s.*
FROM orders AS o LEFT JOIN shopDim FOR SYSTEM_TIME AS OF o.proctime AS s
ON o.shopId = s.shopId;
1.3.5.2 關聯最新表
- 此時 Hive 表是 batch 讀,每次讀取全部分區
- streaming-source.partition-include 取值all
- 通過 lookup.join.cache.ttl 控制 reload 時間
SET 'table.sql-dialect'='hive';
CREATE TABLE shopDim (
shopId STRING,
shopName STRING,
shopLevel STRING
) PARTITIONED BY (dt STRING) TBLPROPERTIES (
'streaming-source.enable' = 'false',
'streaming-source.partition.include' = 'all',
'streaming-source.monitor-interval' = '12 h');
SET 'table.sql-dialect'='hive';
SELECT o.*,s.*
FROM orders AS o LEFT JOIN shopDim FOR SYSTEM_TIME AS OF o.proctime AS s
ON o.shopId = s.shopId;
2 示例
flink-temporal-join-demo