天天看點

時間屬性

本文為您介紹Flink SQL支援的Event Time和Processing Time資料類型,以及watermark和計算列。

Flink SQL支援兩種時間類型。

  • Event Time:您提供的事件時間(通常是資料的最原始的建立時間),event time一定是您提供在Schema裡的資料。
  • Processing Time:系統對事件進行處理的本地系統時間。

Event Time

EventTime也稱為rowtime。EventTime時間屬性必須在源表DDL中聲明,可以将源表中的某一字段聲明成 EventTime。目前隻支援将 Timestamp 類型(将來會支援 Long 類型)聲明成 rowtime 字段。如果不是 Timestamp 類型,需要借助計算列,基于現有列構造出一個 Timestamp 列。

但由于資料本身會有亂序,加之網絡抖動或其它原因,rowtime 到達的順序和被處理的順序可能是不一緻的(亂序)。是以定義一個rowtime字段,需要顯示地定義一個 Watermark計算方法。

Watremark

Watermark是一種衡量Event Time進展的機制,它是資料本身的一個隐藏屬性,Watermark的定義是source表DDL定義的一部分。Flink提供了如下文法定義Watermark。

WATERMARK [watermarkName] FOR <rowtime_field> AS withOffset(<rowtime_field>, offset)
           
  • watermarkName

     辨別Watermark的名字,可選。
  • <rowtime_field>

     必須是表中已定義的一列(目前僅支援為

    Timestamp

    類型),含義是基于該列生成Watermark,并且辨別該列為Event Time列,可以在後續query中用來定義視窗。
  • withOffset

     是目前提供的Watermark的生成政策,是根據

    <rowtime_field> - offset

    生成Watermark的值。withOffset的第一個參數必須是

    <rowtime_field>

  • offset

     機關為毫秒,含義為Watermark值與event time值的偏移量。

通常一條記錄中的某個字段就代表了該記錄的發生時間。例如,表中有個rowtime字段,類型為Timestamp,其中某個字段為

1501750584000(2017-08-03 08:56:24.000)

。定義一個基于該rowtime列,政策為偏移4秒的Watermark,示例如下。

WATERMARK FOR rowtime AS withOffset(rowtime, 4000)
           

在這種情況下,這條資料的Watermark時間為 

1501750584000 - 4000 = 1501750580000(2017-08-03 08:56:20.000)

。這條資料的Watermark時間含義即:timestamp小于

1501750580000(2017-08-03 08:56:20.000)

的資料,都已經到達了。

說明

  • 在使用Event Time Watermark時的rowtime必須是TIMESTAMP類型。目前支援毫秒級别的、在Unix時間戳裡是13位的TIMESTAMP。如果是其他類型或是在Unix時間戳不是13位,建議使用計算列來做轉換。
  • Event Time和Processing Time的聲明隻能在源表上聲明。

總結:

  1. WaterMark的含義是所有時間t'< t 的事件都已經發生。假如Watermark t已經生效,那麼後續eventTime小于t的記錄都會被丢棄掉(目前Flink的處理是丢棄這些來的更晚的資料,後續支援使用者配置讓更晚的資料也能繼續update)。
  2. 針對亂序的的流,WaterMark至關重要。即使一些事件延遲到達,也不至于過于影響window視窗的計算的正确性。
  3. 并行資料流中,當Operator有多個輸入流時,Operator的event time以最小流event time為準。

以下為一個使用event time聚合的示例。

CREATE TABLE tt_stream(
  a varchar,
  b varchar,
  c timeStamp,
  WATERMARK wk1 FOR c as withOffset(c, 1000)
) with (
  type = 'SLS',
  topic = 'blink_tt2tt_test',
  accessId = '0622174XXXXXXTS',
  accessKey = 'a62cfe86-bXXXXXXXb9fad2618e7b'
);

CREATE TABLE rds_output(
  id varchar,
  c TIMESTAMP, 
  f TIMESTAMP,
  cnt BIGINT
) with (
  type = 'rds',
  url = 'jdbc:mysql://XXXXXXXX3306/test',
  tableName = 'datahub2rds',
  userName = 'XXXXXt',
  password = '1XXXXX'
);

INSERT INTO rds_output
SELECT a AS id, 
     SESSION_START(c, INTERVAL '1' SECOND) AS c, 
     CAST(SESSION_END(c, INTERVAL '1' SECOND) AS TIMESTAMP) AS f, 
     COUNT(a) AS cnt
FROM tt_stream
GROUP BY SESSION(c, INTERVAL '1' SECOND), a

           

計算列

由于目前Watermark的rowtime列,隻支援Timestamp類型(未來會支援Long類型),如果不是Timestamp類型,就需要借助計算列 ,基于現有列構造出一個Timestamp列。計算列的表達式非常靈活,可以使用任意表達式、内置函數、或是自定義函數,靈活度與 SELECT中的表達式一樣。計算列在Flink SQ中可以像普通字段一樣被使用。

<computed_column_definition> ::= column_name AS computed_column_expression

例如

CREATE TABLE sls_stream(    
 a INT, 
 b BIGINT,
 c VARCHAR, 
 ts AS to_timestamp(c),
 WATERMARK FOR ts AS withOffset(ts, 1000)
) with ( 
 type = 'sls',
 ...
);           

如上示例中所示,源表資料中的字段c包含時間資訊,但是是字元串類型。使用TO_TIMESTAMP内置函數将字元串轉成了Timestamp類型,并用該計算列作為Watermark 的rowtime字段。

Processing Time

processing time是系統産生的,不在您的原始資料中,需要顯式的定義一個processing time列。

filedName as PROCTIME()

這個定義需要在source的DDL中顯式指明,示例如下:

CREATE TABLE tt_stream (
 a varchar, 
 b varchar, 
 c BIGINT,
 d AS PROCTIME()
) with ( 
  type = 'tt',
  ... 
);        

CREATE TABLE rds_output ( 
 id varchar, 
 c TIMESTAMP, 
 f TIMESTAMP, 
 cnt BIGINT 
) with ( 
  type = 'rds',
  ... 
);

INSERT INTO rds_output 
SELECT a AS id,
       SESSION_START(d, INTERVAL '1' SECOND) AS c, 
       SESSION_END(d, INTERVAL '1' SECOND) AS f, 
       COUNT(a) AS cnt 
FROM tt_stream 
GROUP BY SESSION(d, INTERVAL '1' SECOND), a           

本文轉自實時計算——

時間屬性
上一篇: 關鍵字
下一篇: 上線階段