天天看點

Flink SQL中Timestamp使用的坑

flink版本為1.10。

flink sql消費kafka消息,表定義為

CREATE TABLE start_log_source( 
   mid_id VARCHAR,  
   user_id INT,  
   ... 
   app_time TIMESTAMP, -- 13位的時間戳(1587975971431)
   WATERMARK FOR app_time AS app_time- INTERVAL '5' SECOND   -- 在ts上定義5 秒延遲的 watermark
) WITH ( 
    'connector.type' = 'kafka',  -- 使用 kafka connector
    'connector.version' = 'universal',  -- kafka 版本,universal 支援 0.11 以上的版本
    'connector.topic' = 'start_log',  -- kafka topic
	'connector.properties.group.id' = 'start_log_group',
    'connector.startup-mode' = 'earliest-offset',  -- 從起始 offset 開始讀取
    'connector.properties.zookeeper.connect' = '192.168.1.109:2181',  -- zookeeper 位址
    'connector.properties.bootstrap.servers' = '192.168.1.109:9092',  -- kafka broker 位址
    'format.type' = 'json'  -- 資料源格式為 json  
);
           

以上代碼運作時報錯,大概資訊是轉換錯誤,查閱文檔後發現應該是app_time定義有問題。如果app_time類型定義為TIMESTAMP,那它的值應該為2020-04-06T16:26:11類似的格式。

進一步查詢發現watermark的一些使用要求:

WATERMARK 定義了表的事件時間屬性,其形式為 WATERMARK FOR rowtime_column_name AS watermark_strategy_expression 。

rowtime_column_name 把一個現有的列定義為一個為表标記事件時間的屬性。該列的類型必須為 TIMESTAMP(3),且是 schema 中的頂層列,它也可以是一個計算列。

但是app_time實際資料格式為13位的時間戳,也就是毫秒級的時間戳,TIMESTAMP(3)是秒級時間戳,則必須經過轉換才行。剛開始使用TO_TIMESTAMP内置函數,但是它不支援數值型時間戳值轉換為TIMESTAMP。經過一番折騰,終于成功,SQL如下:

CREATE TABLE start_log_source( 
   mid_id VARCHAR,  
   user_id INT,  
   ... 
   app_time BIGINT, -- 13位的時間戳(1587975971431)
   ts AS TO_TIMESTAMP(FROM_UNIXTIME(app_time / 1000, 'yyyy-MM-dd HH:mm:ss')), -- 定義事件時間
   WATERMARK FOR ts AS ts - INTERVAL '5' SECOND   -- 在ts上定義5 秒延遲的 watermark
) WITH ( 
    'connector.type' = 'kafka',  -- 使用 kafka connector
    'connector.version' = 'universal',  -- kafka 版本,universal 支援 0.11 以上的版本
    'connector.topic' = 'start_log',  -- kafka topic
	'connector.properties.group.id' = 'start_log_group',
    'connector.startup-mode' = 'earliest-offset',  -- 從起始 offset 開始讀取
    'connector.properties.zookeeper.connect' = '192.168.1.109:2181',  -- zookeeper 位址
    'connector.properties.bootstrap.servers' = '192.168.1.109:9092',  -- kafka broker 位址
    'format.type' = 'json'  -- 資料源格式為 json  
);
           

繼續閱讀