實時計算的源表是指流式資料存儲。流式資料存儲驅動實時計算的運作,是以每個實時計算子作業必須提供至少一個流式資料存儲。
文法
-
CREATE TABLE tableName
-
(columnName dataType [, columnName dataType ]*)
-
[ WITH (propertyName=propertyValue [, propertyName=propertyValue ]*) ];
示例
-
create table datahub_stream(
-
name varchar,
-
age BIGINT,
-
birthday BIGINT
-
) with (
-
type='datahub',
-
endPoint='http://dh-et2.aliyun-inc.com',
-
project='blink_xxx',
-
topic='test_xxx',
-
accessId='0i70Rxxxxx',
-
accessKey='yF60EwUxxxx',
-
startTime='2017-07-21 00:00:00'
-
);
Watermark定義
Watermark是一種衡量Event Time進展的機制,它是資料本身的一個隐藏屬性。Watermark的定義是資料原表DDL定義的一部分。Flink提供如下文法定義:
-
WATERMARK [watermarkName] FOR <rowtime_field> AS withOffset(<rowtime_field>, offset)
-
辨別了這個 watermark 的名字,可選。watermarkName
-
必須是表中已定義的一列(目前僅支援為<rowtime_field>
類型),含義是基于該列生成 watermark,并且辨別該列為 Event Time 列,可以在後續 query 中用來定義視窗。Timestamp
-
是目前提供的watermark的生成政策,是根據withOffset
生成watermark的值。withOffset的第一個參數必須是<rowtime_field> - offset
。<rowtime_field>
-
機關為毫秒,含義為watermark值與event time值的偏移量。offset
通常一條記錄中的某個字段就代表了該記錄的發生時間。例如,表中有個rowtime字段,類型為Timestamp,其中某個值為
1501750584000(2017-08-03 08:56:24.000)
,如果您需要定義一個基于該rowtime列的watermark,且watermark政策為偏移4秒,需要如下定義。
-
WATERMARK FOR rowtime AS withOffset(rowtime, 4000)
這條資料的watermark時間為
1501750584000 - 4000 = 1501750580000(2017-08-03 08:56:20.000)
。這條資料中timestamp小于
1501750580000(2017-08-03 08:56:20.000)
的資料,都已經到達了。
計算列
概念
計算列是虛拟列,并非實際存儲在表中。計算列的表達式可以使用其他列中的資料來計算其所屬列的值,可以使用表達式、内置函數、或是自定義函數。靈活度與SELECT中的表達式一樣。計算列在Flink中可以像普通字段一樣被使用。
用途
目前watermark的rowtime列隻支援Timestamp類型(未來會支援Long類型),watermark隻能定義在源表DDL中,如果您的源表中沒有 Timestamp類型的列,需要從其他類型的字段轉換而來,可以使用計算列來轉換。
-
<computed_column_definition> ::= column_name AS computed_column_expression
-
#如果datahub的TIME字段是微秒級别的(16位Unix時間戳),可以用計算列來轉換。
-
CREATE TABLE sls_stream(
-
a INT,
-
b BIGINT,
-
TIME BIGINT,
-
ts AS TO_TIMESTAMP(TIME/1000),
-
WATERMARK FOR ts AS withOffset(ts, 1000)
-
) with (
-
type = 'DATAHUB',
-
...
-
);
如上示例中所示,源表資料中的字段
TIME
包含時間資訊,為BIGINT類型。用計算列的功能将字段
TIME
轉換成了Timestamp類型的
ts
字段,并将
ts
字段作為watermark的rowtime字段。
本文轉自實時計算——
資料源表概述