本文為您介紹Flink SQL支援的Event Time和Processing Time資料類型,以及watermark和計算列。
Flink SQL支援兩種時間類型。
- Event Time:您提供的事件時間(通常是資料的最原始的建立時間),event time一定是您提供在Schema裡的資料。
- Processing Time:系統對事件進行處理的本地系統時間。
Event Time
EventTime也稱為rowtime。EventTime時間屬性必須在源表DDL中聲明,可以将源表中的某一字段聲明成 EventTime。目前隻支援将 Timestamp 類型(将來會支援 Long 類型)聲明成 rowtime 字段。如果不是 Timestamp 類型,需要借助計算列,基于現有列構造出一個 Timestamp 列。
但由于資料本身會有亂序,加之網絡抖動或其它原因,rowtime 到達的順序和被處理的順序可能是不一緻的(亂序)。是以定義一個rowtime字段,需要顯示地定義一個 Watermark計算方法。
Watremark
Watermark是一種衡量Event Time進展的機制,它是資料本身的一個隐藏屬性,Watermark的定義是source表DDL定義的一部分。Flink提供了如下文法定義Watermark。
WATERMARK [watermarkName] FOR <rowtime_field> AS withOffset(<rowtime_field>, offset)
-
辨別Watermark的名字,可選。watermarkName
-
必須是表中已定義的一列(目前僅支援為<rowtime_field>
類型),含義是基于該列生成Watermark,并且辨別該列為Event Time列,可以在後續query中用來定義視窗。Timestamp
-
是目前提供的Watermark的生成政策,是根據withOffset
生成Watermark的值。withOffset的第一個參數必須是<rowtime_field> - offset
。<rowtime_field>
-
機關為毫秒,含義為Watermark值與event time值的偏移量。offset
通常一條記錄中的某個字段就代表了該記錄的發生時間。例如,表中有個rowtime字段,類型為Timestamp,其中某個字段為
1501750584000(2017-08-03 08:56:24.000)
。定義一個基于該rowtime列,政策為偏移4秒的Watermark,示例如下。
WATERMARK FOR rowtime AS withOffset(rowtime, 4000)
在這種情況下,這條資料的Watermark時間為
1501750584000 - 4000 = 1501750580000(2017-08-03 08:56:20.000)
。這條資料的Watermark時間含義即:timestamp小于
1501750580000(2017-08-03 08:56:20.000)
的資料,都已經到達了。
說明
- 在使用Event Time Watermark時的rowtime必須是TIMESTAMP類型。目前支援毫秒級别的、在Unix時間戳裡是13位的TIMESTAMP。如果是其他類型或是在Unix時間戳不是13位,建議使用計算列來做轉換。
- Event Time和Processing Time的聲明隻能在源表上聲明。
總結:
- WaterMark的含義是所有時間t'< t 的事件都已經發生。假如Watermark t已經生效,那麼後續eventTime小于t的記錄都會被丢棄掉(目前Flink的處理是丢棄這些來的更晚的資料,後續支援使用者配置讓更晚的資料也能繼續update)。
- 針對亂序的的流,WaterMark至關重要。即使一些事件延遲到達,也不至于過于影響window視窗的計算的正确性。
- 并行資料流中,當Operator有多個輸入流時,Operator的event time以最小流event time為準。
以下為一個使用event time聚合的示例。
CREATE TABLE tt_stream(
a varchar,
b varchar,
c timeStamp,
WATERMARK wk1 FOR c as withOffset(c, 1000)
) with (
type = 'SLS',
topic = 'blink_tt2tt_test',
accessId = '0622174XXXXXXTS',
accessKey = 'a62cfe86-bXXXXXXXb9fad2618e7b'
);
CREATE TABLE rds_output(
id varchar,
c TIMESTAMP,
f TIMESTAMP,
cnt BIGINT
) with (
type = 'rds',
url = 'jdbc:mysql://XXXXXXXX3306/test',
tableName = 'datahub2rds',
userName = 'XXXXXt',
password = '1XXXXX'
);
INSERT INTO rds_output
SELECT a AS id,
SESSION_START(c, INTERVAL '1' SECOND) AS c,
CAST(SESSION_END(c, INTERVAL '1' SECOND) AS TIMESTAMP) AS f,
COUNT(a) AS cnt
FROM tt_stream
GROUP BY SESSION(c, INTERVAL '1' SECOND), a
計算列
由于目前Watermark的rowtime列,隻支援Timestamp類型(未來會支援Long類型),如果不是Timestamp類型,就需要借助計算列 ,基于現有列構造出一個Timestamp列。計算列的表達式非常靈活,可以使用任意表達式、内置函數、或是自定義函數,靈活度與 SELECT中的表達式一樣。計算列在Flink SQ中可以像普通字段一樣被使用。
<computed_column_definition> ::= column_name AS computed_column_expression
例如
CREATE TABLE sls_stream(
a INT,
b BIGINT,
c VARCHAR,
ts AS to_timestamp(c),
WATERMARK FOR ts AS withOffset(ts, 1000)
) with (
type = 'sls',
...
);
如上示例中所示,源表資料中的字段c包含時間資訊,但是是字元串類型。使用TO_TIMESTAMP内置函數将字元串轉成了Timestamp類型,并用該計算列作為Watermark 的rowtime字段。
Processing Time
processing time是系統産生的,不在您的原始資料中,需要顯式的定義一個processing time列。
filedName as PROCTIME()
這個定義需要在source的DDL中顯式指明,示例如下:
CREATE TABLE tt_stream (
a varchar,
b varchar,
c BIGINT,
d AS PROCTIME()
) with (
type = 'tt',
...
);
CREATE TABLE rds_output (
id varchar,
c TIMESTAMP,
f TIMESTAMP,
cnt BIGINT
) with (
type = 'rds',
...
);
INSERT INTO rds_output
SELECT a AS id,
SESSION_START(d, INTERVAL '1' SECOND) AS c,
SESSION_END(d, INTERVAL '1' SECOND) AS f,
COUNT(a) AS cnt
FROM tt_stream
GROUP BY SESSION(d, INTERVAL '1' SECOND), a
本文轉自實時計算——
時間屬性