天天看點

基于 FLINK SQL 的實時資料打寬1 實時資料打寬2 示例

本文根據 《【Flink 1.12】基于 FLINK SQL 的實時資料打寬》中的内容整理而成

1 實時資料打寬

利⽤ Flink SQL 打寬實時資料,共有以下方式:

  • 雙流join (Regular join)
  • 區間join (Interval join)
  • 時态表join (Temporal join)

1.1 雙流join

  • 支援 INNER JOIN, LEFT JOIN, RIGHT JOIN,FULL OUTER JOIN
  • 文法, 語義 均和傳統批 SQL 一緻
  • 左右流都會觸發結果更新
  • 狀态持續增長,一般結合 state TTL 使用
SELECT i.*, c.*
FROM impressions as i JOIN clicks as c
ON i.id = c.impression_id
           
基于 FLINK SQL 的實時資料打寬1 實時資料打寬2 示例

1.2 區間join

  • 支援 INNER JOIN, LEFT JOIN, RIGHT JOIN,FULL OUTER JOIN
  • 左右流都會觸發結果更新
  • State 自動清理,根據時間區間保留資料
  • 輸出流保留時間屬性

注意:INTERVAL ‘10’ MINUTE

SELECT i.*, c.*
FROM impressions as i LEFT JOIN clicks as c
ON i.id = c.impression_id AND c.click_time >= i.show_time AND
c.click_time <= i.show_time + INTERVAL '10' MINUTE
           
基于 FLINK SQL 的實時資料打寬1 實時資料打寬2 示例

1.3 時态表 Join

1.3.1 時态 (Temporal)表 與 臨時(Temporary)表差別

Temporal Table(時态表) 臨時(Temporary)表
Temporal adj. 時間的,時态的 Temporary adj. 暫時的,臨時的

時态表是随時間變化的表,

典型 的是版本表:版本表中的每⾏數 據有對應的⽣命周期,可以追蹤 該表在給定時間的内容(版本)。

如定義在數 據庫 changelog上的表,可以 追蹤曆史版本

維表也是時态表,維表中的資料 也是在随時間表化,隻是⼤多數 維表不能追蹤曆史版本

臨時的表對象,屬于目前 session(會話),随着 session 結 束⽽消失。

該表不屬于具體 Catalog和 DB,不會持久化。

1.3.2 時态表 Join

  • 支援 INNER JOIN, LEFT JOIN
  • 隻有左流都會觸發結果更新
  • 輸出流保留時間屬性
  • 時态表join關聯支援如下:
    • 關聯 Lookup DB
    • 關聯 版本表
    • 關聯 Hive 分區表

文法結構

注意: FOR SYSTEM_TIME AS OF

SELECT * FROM fact
LEFT JOIN dim FOR SYSTEM_TIME AS OF fact.{proctime | rowtime}
ON fact.id = dim.id
           

1.3.3 Temporal Join Lookup DB

  • 具備 lookup 的能力的外部系統
  • 典型的 connector 有 HBase,JDBC
  • 自定義connector需實作 LookupTableSource
  • 支援 Async IO 和 Cache 提升lookup效率,在1.13.x中已經支援 HBase Async IO lookup
    基于 FLINK SQL 的實時資料打寬1 實時資料打寬2 示例

1.3.4 Temporal Join 版本表

1.3.4.1 資料庫的changelog

通過CDC讀取資料庫某一個表的changelog 或 消費kafka中某一個表的changelog

在定義版本表時,必須定義如下:

  • 定義 版本表的 主鍵,見primary的定義
  • 定義版本表的 event time,見watermark的定義
  • 定義版本表的資料源,見format的定義
Create TABLE productChangelog (
  product STRING,
  price DECIMAL(10,4),
  update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
  PRIMARY KEY(product) NOTE ENFORCED,
  WATERMARK FOR update_time AS update_time)
WITH (
    'connector' = 'kafka',
    'topic' = 'products',
    'properties.bootstrap.servers' = '172.25.*.*:9092', 
    'properties.group.id' = 'flink_gp_test5',
    'scan.startup.mode' = 'group-offsets',
    'format' = 'debezium-json',
    'properties.zookeeper.connect' = '172.25.*.*:2181/kafka'
  );
           

關聯SQL示例

SELECT orderId,orderTime,productTime,price
FROM orders AS o LEFT JOIN productChangelog FOR SYSTEM_TIME AS OF o.order_time AS p
ON o.product = p.product
           

1.3.4.2 Temporal Join 版本表【versioned view】

将業務表如匯率變化曆史轉化一個版本表

CREATE TABLE ratesHistory (
  currency_time TIMESTAMP(3),
  currency STRING,
  rate DECIMAL(38,10),
  WATERMARK FOR currency_time as currency_time
) WITH (
    'connector' = 'kafka',
    'topic' = 'flink_test_4',
    'properties.bootstrap.servers' = 'localhost:9092', 
    'properties.group.id' = 'flink_gp_test4',
    'scan.startup.mode' = 'group-offsets',
    'format' = 'json',
    'json.fail-on-missing-field' = 'false',
    'json.ignore-parse-errors' = 'true'
  );

  CREATE VIEW versionedRates AS
    SELECT currency,rate,currency_time FROM (
      SELECT * ROW_NUMBER() OVER (PARTITION BY currency ORDER BY currency_time DESC) AS rowNum
      FROM ratesHistory) WHERE row_num =1 ;
           

1.3.4.3 Temporal Join 版本表(upsert-kafka)

CREATE TABLE user (
  user_id BIGINT,
  user_name STRING,
  phone_num STRING,
  address STRING,
  modify_time TIMESTAMP(3) METADATA FROM 'timestamp',
  PRIMARY KEY (`user_id`) NOT ENFORCED,
  WATERMARK FOR modify_time AS modify_time
) WITH (
  'connector' = 'upsert-kafka'
  'topic' = 'users',
  'properties.bootstrap.servers' = 'localhost:9092', 
  'key.format' = 'json',
  'value.format' = 'json');
    
           

1.3.5 Temporal Join Hive分區表

1.3.5.1 關聯最新分區

  • 此時 Hive 表是 stream 讀,每次讀取一個分區
  • streaming-source.partition-include 取值 latest
  • 通過 streaming-source.monitor-interval 發現分區
SET 'table.sql-dialect'='hive';

CREATE TABLE shopDim (
  shopId STRING,
  shopName STRING,
  shopLevel STRING
) PARTITIONED BY (dt STRING) TBLPROPERTIES (
  'streaming-source.enable' = 'true',
  'streaming-source.partition.include' = 'latest',
  'streaming-source.partition.order' = 'partition-name',
  'streaming-source.monitor-interval' = '1 h');

SET 'table.sql-dialect'='hive';
SELECT o.*,s.* 
FROM orders AS o LEFT JOIN shopDim FOR SYSTEM_TIME AS OF o.proctime AS s
ON o.shopId = s.shopId;
           

1.3.5.2 關聯最新表

  • 此時 Hive 表是 batch 讀,每次讀取全部分區
  • streaming-source.partition-include 取值all
  • 通過 lookup.join.cache.ttl 控制 reload 時間
SET 'table.sql-dialect'='hive';

CREATE TABLE shopDim (
  shopId STRING,
  shopName STRING,
  shopLevel STRING
) PARTITIONED BY (dt STRING) TBLPROPERTIES (
  'streaming-source.enable' = 'false',
  'streaming-source.partition.include' = 'all',
  'streaming-source.monitor-interval' = '12 h');

SET 'table.sql-dialect'='hive';
SELECT o.*,s.* 
FROM orders AS o LEFT JOIN shopDim FOR SYSTEM_TIME AS OF o.proctime AS s
ON o.shopId = s.shopId;
           

2 示例

flink-temporal-join-demo

基于 FLINK SQL 的實時資料打寬1 實時資料打寬2 示例