天天看點

Flink實戰(八十三):flink-sql使用(十)維表join(五)Flink SQL之維表join之Temporal Table Join

聲明:本系列部落格是根據SGG的視訊整理而成,非常适合大家入門學習。

《2021年最新版大資料面試題全面開啟更新》

維表是數倉中的一個概念,維表中的次元屬性是觀察資料的角度,在建設離線數倉的時候,通常是将維表與事實表進行關聯建構星型模型。在實時數倉中,同樣也有維表與事實表的概念,其中事實表通常存儲在kafka中,維表通常存儲在外部裝置中(比如MySQL,HBase)。對于每條流式資料,可以關聯一個外部維表資料源,為實時計算提供資料關聯查詢。維表可能是會不斷變化的,在維表JOIN時,需指明這條記錄關聯維表快照的時刻。需要注意是,目前Flink SQL的維表JOIN僅支援對目前時刻維表快照的關聯(處理時間語義),而不支援事實表rowtime所對應的的維表快照(事件時間語義)。通過本文你可以了解到:

  • 如何使用Flink SQL建立表
  • 如何定義Kafka資料源表
  • 如何定義MySQL資料源表
  • 什麼是Temporal Table Join
  • 維表join的案例

Flink SQL建立表

注意:本文的所有操作都是在Flink SQL cli中進行的

建立表的文法

Flink實戰(八十三):flink-sql使用(十)維表join(五)Flink SQL之維表join之Temporal Table Join
CREATE TABLE [catalog_name.][db_name.]table_name
  (
    { <column_definition> | <computed_column_definition> }[ , ...n]
    [ <watermark_definition> ]
  )
  [COMMENT table_comment]
  [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
  WITH (key1=val1, key2=val2, ...)
-- 定義表字段
<column_definition>:
  column_name column_type [COMMENT column_comment]
-- 定義計算列
<computed_column_definition>:
  column_name AS computed_column_expression [COMMENT column_comment]
-- 定義水位線
<watermark_definition>:
  WATERMARK FOR rowtime_column_name AS watermark_strategy_expression           
Flink實戰(八十三):flink-sql使用(十)維表join(五)Flink SQL之維表join之Temporal Table Join

解釋

COMPUTED COLUMN(計算列)

計算列是一個通過

column_name AS computed_column_expression

生成的虛拟列,産生的計算列不是實體存儲在資料源表中。一個計算列可以通過原有資料源表中的某個字段、運算符及内置函數生成。比如,定義一個消費金額的計算列(cost),可以使用表的價格(price)*數量(quantity)計算得到。

計算列常常被用在定義時間屬性,可以通過PROCTIME()函數定義處理時間屬性,文法為

proc AS PROCTIME()

。除此之外,計算列可以被用作提取事件時間列,因為原始的事件時間可能不是TIMESTAMP(3)類型或者是存在JSON串中。

尖叫提示:
1.在源表上定義計算列,是在讀取資料源之後計算的,計算列需要跟在SELECT查詢語句之後;
2.計算列不能被INSERT語句插入資料,在INSERT語句中,隻能包括實際的目标表的schema,不能包括計算列           

水位線

水位線定義了表的事件時間屬性,其文法為:

WATERMARK FOR rowtime_column_name AS watermark_strategy_expression           

其中

rowtime_column_name

表示表中已經存在的事件時間字段,值得注意的是,該事件時間字段必須是TIMESTAMP(3)類型,即形如

yyyy-MM-dd HH:mm:ss

,如果不是這種形式的資料類型,需要通過定義計算列進行轉換。

watermark_strategy_expression定義了水位線生成的政策,該表達式的傳回資料類型必須是           

TIMESTAMP(3)類型。

Flink提供了許多常用的水位線生成政策:

  • 嚴格單調遞增的水位線:文法為
WATERMARK FOR rowtime_column AS rowtime_column           

即直接使用時間時間戳作為水位線

    • 遞增水位線:文法為

      WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND

    • 亂序水位線:文法為

      WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit

      -- 比如,允許5秒的亂序

      WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND

分區

根據具體的字段建立分區表,每一個分區會對應一個檔案路徑

WITH 選項

建立Table source或者Table sink需要指定表的屬性,屬性是以key/value的形式配置的,具體參考其相對應的connector

Flink實戰(八十三):flink-sql使用(十)維表join(五)Flink SQL之維表join之Temporal Table Join
尖叫提示:
Note:建立表時指定的表名有三種形式:
(1)catalog_name.db_name.table_name
(2)db_name.table_name
(3)table_name
對于第一種形式:會将表注冊到一個名為‘catalog_name’的catalog以及一個名為'db_name'd的資料庫的中繼資料中;
對于第二種形式:會将表注冊到目前執行環境的catalog以及名為‘db_name’的資料庫的中繼資料中;
對于第三種形式:會将表注冊到目前執行環境的catalog與資料庫的中繼資料中           
Flink實戰(八十三):flink-sql使用(十)維表join(五)Flink SQL之維表join之Temporal Table Join

定義Kafka資料表

kafka是建構實時數倉常用的資料儲存設備,使用Flink SQL建立kafka資料源表的文法如下:

Flink實戰(八十三):flink-sql使用(十)維表join(五)Flink SQL之維表join之Temporal Table Join
CREATE TABLE MyKafkaTable (
  ...
) WITH (
  'connector.type' = 'kafka', -- 連接配接類型       
  'connector.version' = '0.11',-- 必選: 可選的kafka版本有:0.8/0.9/0.10/0.11/universal
  'connector.topic' = 'topic_name', -- 必選: 主題名稱
  'connector.properties.zookeeper.connect' = 'localhost:2181', -- 必選: zk連接配接位址
  'connector.properties.bootstrap.servers' = 'localhost:9092', -- 必選: Kafka連接配接位址
  'connector.properties.group.id' = 'testGroup', --可選: 消費者組
   -- 可選:偏移量, earliest-offset/latest-offset/group-offsets/specific-offsets
  'connector.startup-mode' = 'earliest-offset',                                          
  -- 可選: 當偏移量指定為specific offsets,為指定每個分區指定具體位置
  'connector.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300',
  'connector.sink-partitioner' = '...',  -- 可選: sink分區器,fixed/round-robin/custom
  -- 可選: 當自定義分區器時,指定分區器的類名
  'connector.sink-partitioner-class' = 'org.mycompany.MyPartitioner',
  'format.type' = '...',                 -- 必選: 指定格式,支援csv/json/avro
   -- 指定update-mode,支援append/retract/upsert
  'update-mode' = 'append',

)           
Flink實戰(八十三):flink-sql使用(十)維表join(五)Flink SQL之維表join之Temporal Table Join
尖叫提示:
指定具體的偏移量位置:預設是從目前消費者組送出的偏移量開始消費
sink分區:預設是盡可能向更多的分區寫資料(每一個sink并行度執行個體隻向一個分區寫資料),也可以自已分區政策。當使用 round-robin分區器時,可以避免分區不均衡,但是會造成Flink執行個體與kafka broker之間大量的網絡連接配接
一緻性保證:預設sink語義是at-least-once
Kafka 0.10+ 是時間戳:從kafka0.10開始,kafka消息附帶一個時間戳作為消息的中繼資料,表示記錄被寫入kafka主題的時間,這個時間戳可以作為事件時間屬性( rowtime attribute)
**Kafka 0.11+**版本:Flink從1.7開始,支援使用universal版本作為kafka的連接配接器 ,可以相容kafka0.11之後版本           

定義MySQL資料表

Flink實戰(八十三):flink-sql使用(十)維表join(五)Flink SQL之維表join之Temporal Table Join
CREATE TABLE MySQLTable (
  ...
) WITH (
  'connector.type' = 'jdbc', -- 必選: jdbc方式
  'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', -- 必選: JDBC url
  'connector.table' = 'jdbc_table_name',  -- 必選: 表名
   -- 可選: JDBC driver,如果不配置,會自動通過url提取 
  'connector.driver' = 'com.mysql.jdbc.Driver',                                           
  'connector.username' = 'name', -- 可選: 資料庫使用者名
  'connector.password' = 'password',-- 可選: 資料庫密碼
    -- 可選, 将輸入進行分區的字段名.
  'connector.read.partition.column' = 'column_name',
    -- 可選, 分區數量.
  'connector.read.partition.num' = '50', 
    -- 可選, 第一個分區的最小值.
  'connector.read.partition.lower-bound' = '500',
    -- 可選, 最後一個分區的最大值
  'connector.read.partition.upper-bound' = '1000', 
    -- 可選, 一次提取資料的行數,預設為0,表示忽略此配置
  'connector.read.fetch-size' = '100', 
   -- 可選, lookup緩存資料的最大行數,如果超過改配置,老的資料會被清除
  'connector.lookup.cache.max-rows' = '5000', 
   -- 可選,lookup緩存存活的最大時間,超過該時間舊資料會過時,注意cache.max-rows與cache.ttl必須同時配置
  'connector.lookup.cache.ttl' = '10s', 
   -- 可選, 查詢資料最大重試次數
  'connector.lookup.max-retries' = '3', 
   -- 可選,寫資料最大的flush行數,預設5000,超過改配置,會觸發刷資料 
  'connector.write.flush.max-rows' = '5000', 
   --可選,flush資料的間隔時間,超過該時間,會通過一個異步線程flush資料,預設是0s 
  'connector.write.flush.interval' = '2s', 
  -- 可選, 寫資料失敗最大重試次數
  'connector.write.max-retries' = '3' 
)           
Flink實戰(八十三):flink-sql使用(十)維表join(五)Flink SQL之維表join之Temporal Table Join

Temporal Table Join

使用文法

SELECT column-names
FROM table1  [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.proctime [AS <alias2>]
ON table1.column-name1 = table2.key-name1           

注意:目前,僅支援INNER JOIN與LEFT JOIN。在join的時候需要使用 

FOR SYSTEM_TIME AS OF

 ,其中table1.proctime表示table1的proctime處理時間屬性(計算列)。使用

FOR SYSTEM_TIME AS OF table1.proctime

表示當左邊表的記錄與右邊的維表join時,隻比對目前處理時間維表所對應的的快照資料。

樣例

SELECT
  o.amout, o.currency, r.rate, o.amount * r.rate
FROM
  Orders AS o
  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
  ON r.currency = o.currency           

使用說明

  • 僅支援Blink planner
  • 僅支援SQL,目前不支援Table API
  • 目前不支援基于事件時間(event time)的temporal table join
  • 維表可能會不斷變化,JOIN行為發生後,維表中的資料發生了變化(新增、更新或删除),則已關聯的維表資料不會被同步變化
  • 維表和維表不能進行JOIN
  • 維表必須指定主鍵。維表JOIN時,ON的條件必須包含所有主鍵的等值條件

維表Join案例

背景

Kafka中有一份使用者行為資料,包括pv,buy,cart,fav行為;MySQL中有一份省份區域的維表資料。現将兩種表進行JOIN,統計每個區域的購買行為數量。

步驟

維表存儲在MySQL中,如下建立維表資料源:

Flink實戰(八十三):flink-sql使用(十)維表join(五)Flink SQL之維表join之Temporal Table Join
CREATE TABLE dim_province (
    province_id BIGINT,  -- 省份id
    province_name  VARCHAR, -- 省份名稱
 region_name VARCHAR -- 區域名稱
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://192.168.10.203:3306/mydw',
    'connector.table' = 'dim_province',
    'connector.driver' = 'com.mysql.jdbc.Driver',
    'connector.username' = 'root',
    'connector.password' = '123qwe',
    'connector.lookup.cache.max-rows' = '5000',
    'connector.lookup.cache.ttl' = '10min'
);           
Flink實戰(八十三):flink-sql使用(十)維表join(五)Flink SQL之維表join之Temporal Table Join

事實表存儲在kafka中,資料為使用者點選行為,格式為JSON,具體資料樣例如下:

{"user_id":63401,"item_id":6244,"cat_id":143,"action":"pv","province":3,"ts":1573445919}
{"user_id":9164,"item_id":2817,"cat_id":611,"action":"fav","province":28,"ts":1573420486}           

建立kafka資料源表,如下:

Flink實戰(八十三):flink-sql使用(十)維表join(五)Flink SQL之維表join之Temporal Table Join
CREATE TABLE user_behavior (
    user_id BIGINT, -- 使用者id
    item_id BIGINT, -- 商品id
    cat_id BIGINT,  -- 品類id
    action STRING,  -- 使用者行為
 province INT,   -- 使用者所在的省份
 ts     BIGINT,  -- 使用者行為發生的時間戳
    proctime as PROCTIME(),   -- 通過計算列産生一個處理時間列
 eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件時間
    WATERMARK FOR eventTime as eventTime - INTERVAL '5' SECOND  -- 在eventTime上定義watermark
) WITH (
    'connector.type' = 'kafka',  -- 使用 kafka connector
    'connector.version' = 'universal',  -- kafka 版本,universal 支援 0.11 以上的版本
    'connector.topic' = 'user_behavior',  -- kafka主題
    'connector.startup-mode' = 'earliest-offset',  -- 偏移量,從起始 offset 開始讀取
 'connector.properties.group.id' = 'group1', -- 消費者組
    'connector.properties.zookeeper.connect' = 'kms-2:2181,kms-3:2181,kms-4:2181',  -- zookeeper 位址
    'connector.properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092',  -- kafka broker 位址
    'format.type' = 'json'  -- 資料源格式為 json
);           
Flink實戰(八十三):flink-sql使用(十)維表join(五)Flink SQL之維表join之Temporal Table Join

建立MySQL的結果表,表示區域銷量

Flink實戰(八十三):flink-sql使用(十)維表join(五)Flink SQL之維表join之Temporal Table Join
CREATE TABLE region_sales_sink (
    region_name STRING,  -- 區域名稱
    buy_cnt BIGINT  -- 銷量
) WITH (

    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://192.168.10.203:3306/mydw',
    'connector.table' = 'top_region', -- MySQL中的待插入資料的表
    'connector.driver' = 'com.mysql.jdbc.Driver',
    'connector.username' = 'root',
    'connector.password' = '123qwe',
    'connector.write.flush.interval' = '1s'
);           
Flink實戰(八十三):flink-sql使用(十)維表join(五)Flink SQL之維表join之Temporal Table Join

使用者行為資料與省份維表資料join

Flink實戰(八十三):flink-sql使用(十)維表join(五)Flink SQL之維表join之Temporal Table Join
CREATE VIEW user_behavior_detail AS
SELECT
  u.user_id, 
  u.item_id,
  u.cat_id,
  u.action,  
  p.province_name,
  p.region_name
FROM user_behavior AS u LEFT JOIN dim_province FOR SYSTEM_TIME AS OF u.proctime AS p
ON u.province = p.province_id;           
Flink實戰(八十三):flink-sql使用(十)維表join(五)Flink SQL之維表join之Temporal Table Join

計算區域的銷量,并将計算結果寫入MySQL

Flink實戰(八十三):flink-sql使用(十)維表join(五)Flink SQL之維表join之Temporal Table Join
INSERT INTO region_sales_sink
SELECT 
  region_name,
  COUNT(*) buy_cnt
FROM user_behavior_detail
WHERE action = 'buy'
GROUP BY region_name;           
Flink實戰(八十三):flink-sql使用(十)維表join(五)Flink SQL之維表join之Temporal Table Join

結果檢視:

Flink SQL> select * from  region_sales_sink; -- 在Flink SQL cli中檢視           
Flink實戰(八十三):flink-sql使用(十)維表join(五)Flink SQL之維表join之Temporal Table Join
mysql> select * from top_region; -- 檢視MySQL的資料           
Flink實戰(八十三):flink-sql使用(十)維表join(五)Flink SQL之維表join之Temporal Table Join

總結

繼續閱讀