天天看點

Flink流式處理架構中的Table API和Flink SQL

一、Table  API和Flink  SQL是什麼?

1)Flink對批處理和流處理,提供了統一的上層API;

2)Table  API是一套内嵌在Java和Scala語言中的查詢API,它允許以非常直覺的方式組合來自一些關系運算符的查詢;

3)Flink的SQL支援基于實作了SQL标準的Apache  Calcite;

Flink流式處理架構中的Table API和Flink SQL

1、基本程式結構

1)Table  API和SQL的程式結構,與流式處理的程式結構十分類似

Flink流式處理架構中的Table API和Flink SQL

2、建立TableEnvironment

1)建立表的執行環境,需要将flink流處理的執行環境傳入;

Flink流式處理架構中的Table API和Flink SQL

2)TableEnvironment是flink中內建Table API和SQL的核心概念,所有對表的操作都基于TableEnvironment

      —— 注冊Catalog

      —— 在Catalog中系統資料庫

      —— 執行SQL查詢

      —— 注冊使用者自定義函數(UDF)

3、配置TableEnvironment

1)配置老版本planner的流式查詢

Flink流式處理架構中的Table API和Flink SQL

2)配置老版本planner的批式查詢

Flink流式處理架構中的Table API和Flink SQL

3)配置blink  planner的流式查詢

Flink流式處理架構中的Table API和Flink SQL

 4)配置blink  planner的批式查詢

Flink流式處理架構中的Table API和Flink SQL

二、表(Table)

1)TableEnvironment可以注冊目錄Catalog,并可以基于Catalog系統資料庫;

2)表(Table)是由一個“辨別符”(identifier)來指定的,由3部分組成:Catalog名、資料庫(database)名和對象名;

3)表可以是正常的,也可以是虛拟的(視圖,View);

4)正常表(Table)一般可以用來描述外部資料,比如檔案、資料庫表或消息隊列的資料,也可以直接從DataStream轉換而來;

5)視圖(View)可以從現有的表中建立,通常是Table  API或者SQL查詢的一個結果表;

1、建立表

1)TableEnvironment可以調用.connect()方法,連接配接外部系統,并調用.createTemporaryTable()方法,在Catalog中系統資料庫;

Flink流式處理架構中的Table API和Flink SQL

2)可以建立Table來描述檔案資料,它可以從檔案中讀取,或者将資料寫入檔案

Flink流式處理架構中的Table API和Flink SQL

2、表的查詢——Table  API

1)Table  API是內建在Scala和Java語言内的查詢API;

2)Table API基于代表“表”的Table類,并提供一整套操作處理的方法API;這些方法會傳回一個新的Table對象,表示對輸入表應用轉換操作的結果;

3)有些關系型轉換操作,可以由多個方法調用組成,構成鍊式調用結構;

Flink流式處理架構中的Table API和Flink SQL

4)Flink的SQL內建,基于實作了SQL标準的Apache Calcite;

5)在Flink中,用正常字元串來定義SQL查詢語句;

6)SQL查詢的結果,也是一個新的Table;

Flink流式處理架構中的Table API和Flink SQL

3、将DataStream轉換成表

1)對于一個DataStream,可以直接轉換成Table,進而友善地調用Table API做轉換操作;

Flink流式處理架構中的Table API和Flink SQL

2)預設轉換後的Table schema和DataStream中的字段定義一一對應,也可以單獨指定出來;

Flink流式處理架構中的Table API和Flink SQL

4、資料類型與Schema的對應

1)DataStream中的資料類型,與表的Schema之間的對應關系,可以有兩種:基于字段名稱,或者基于字段位置;

2)基于名稱(name-based)

Flink流式處理架構中的Table API和Flink SQL

3)基于位置(position-based)

Flink流式處理架構中的Table API和Flink SQL

5、建立臨時視圖(Temporary  View)

1)基于DataStream建立臨時視圖

Flink流式處理架構中的Table API和Flink SQL

2)基于Table建立臨時視圖

Flink流式處理架構中的Table API和Flink SQL

6、輸出表

1)表的輸出,是通過将資料寫入TableSink來實作的;

2)TableSink是一個通用接口,可以支援不同的檔案格式、存儲資料庫和消息隊列;

3)輸出表最直接的方法,就是通過Table.insertInto()方法将一個Table寫入注冊過的TableSink中

Flink流式處理架構中的Table API和Flink SQL

7、輸出到檔案

Flink流式處理架構中的Table API和Flink SQL

8、更新模式

1)對于流式查詢,需要聲明如何在表和外部連接配接器之間執行轉換;

2)與外部系統交換的消息類型,由更新模式(Update Mode)指定

》》追加(Append)模式

     —— 表隻做插入操作,和外部連接配接器隻交換插入(Insert)消息;

》》撤回(Retract)模式

      —— 表和外部連接配接器交換添加(Add)和撤回(Retract)消息;

      —— 插入操作(Insert)編碼為Add消息;删除(Delete)編碼為Retract消息;更新(Update)編碼為上一條的Retract和下一條的Add消息;

》》更新插入(Upsert)模式

      —— 更新和插入都被編碼為Upsert消息;删除編碼為Delete消息;

9、輸出到Kafka

1)可以建立Table來描述kafka中的資料,作為輸入或輸出的TableSink

Flink流式處理架構中的Table API和Flink SQL

10、輸出到ES

1)可以建立Table來描述ES中的資料,作為輸出的TableSink

Flink流式處理架構中的Table API和Flink SQL

11、輸出到MySQL

1)可以建立Table來描述MySQL中的資料,作為輸入和輸出

Flink流式處理架構中的Table API和Flink SQL

12、将Table轉換成DataStream

1)表可以轉換為DataStream或DataSet,這樣自定義流處理或批處理程式就可以繼續在Table API或SQL查詢的結果上運作了;

2)将表轉換為DataStream或DataSet時,需要指定生成的資料類型,即要将表的每一行轉換成的資料類型;

3)表作為流式查詢的結果,是動态更新的;

4)轉換有兩種轉換模式:追加(Append)模式和撤回(Retract)模式;

》》追加模式(Append  Mode)

      ——  用于表隻會被插入(Insert)操作更改的場景

Flink流式處理架構中的Table API和Flink SQL

》》撤回模式(Retract  Mode)

      ——  用于任何場景,有些類似于更新模式中Retract模式,它隻有Insert和Delete兩類操作;

      ——  得到的資料會增加一個Boolean類型的辨別位(傳回的第一個字段,用它來辨別到底是新增的資料(Insert)),還是被删除的資料(Delete)。

Flink流式處理架構中的Table API和Flink SQL

13、檢視執行計劃

1)Table API提供了一種機制來解釋計算表的邏輯和優化查詢計劃;

2)檢視執行計劃,可以通過TableEnvironment.explain(table)方法或TableEnvironment.explain()方法完成,傳回一個字元串,描述三個計劃:

》》優化的邏輯查詢計劃;

》》優化後的邏輯查詢計劃;

》》實際執行計劃;

Flink流式處理架構中的Table API和Flink SQL

14、流處理和關系代數的差別

關系代數(表)/SQL 流處理
處理的資料對象 字段元組的有界集合 字段元組的無限序列

查詢(Query)

對資料的通路

可以通路到完整的資料輸入

無法通路所有資料,

必須持續“等待”流式輸入

查詢終止條件 生成固定大小的結果集後終止 永不停止,根據持續收到的資料不斷更新查詢結果

15、動态表(Dynamic  Tables)

1)動态表是Flink對流資料的Table API和SQL支援的核心概念;

2)與表示批處理資料的靜态表不同,動态表是随時間變化的;

》》持續查詢(Continuous  Query)

(1)動态表可以像靜态的批處理表一樣進行查詢,查詢一個動态表會産生持續查詢(Continuous  Query);

(2)連續查詢永遠不會終止,并會生成另一個動态表;

(3)查詢會不斷更新其動态結果表,以反映其動态輸入表上的更改;

16、動态表和持續查詢

Flink流式處理架構中的Table API和Flink SQL

》》流式表查詢的處理過程:

(1)流被轉換為動态表;

(2)對動态表計算連續查詢,生成新的動态表;

(3)生成的動态表被轉換回流;

17、将流轉換成動态表

1)為了處理帶有關系查詢的流,必須先将其轉換為表;

2)從概念上将,流的每個資料記錄,都被解釋為結果表的插入(Insert)修改操作;

Flink流式處理架構中的Table API和Flink SQL

18、持續查詢

1)持續查詢會在動态表上做計算處理,并作為結果生成新的動态表;

Flink流式處理架構中的Table API和Flink SQL

19、将動态表轉換成DataStream

1)與正常的資料庫表一樣,動态表可以通過插入(Insert)、更新(Update)和删除(Delete)更改,進行持續的修改;

2)将動态表轉換為流或将其寫入外部系統時,需要對這些更改進行編碼;

》》僅追加(Append-only)流

       ——  僅通過插入(Insert)更改來修改的動态表,可以直接轉換為僅追加流;

》》撤回(Retract)流

       ——  撤回流流是包含兩類消息的流:添加(Add)消息和撤回(Retract)消息;

》》Upsert(更新插入)流

        ——  Upsert流也包含兩種類型的消息:Upsert消息和删除(Delete)消息;

Flink流式處理架構中的Table API和Flink SQL

20、時間特性(Time  Attributes)

1)基于時間的操作(比如Table  API和SQL中視窗操作),需要定義相關的時間語義和時間資料來源的資訊;

2)Table可以提供一個邏輯上的時間字段,用于在表處理程式中,訓示時間和通路相應的時間戳;

3)時間屬性,可以是每個表schema的一部分。一旦定義了時間屬性,它就可以作為一個字段引用,并且可以在基于時間的操作中使用;

4)時間屬性的行為類似于正常時間戳,可以通路,并且進行計算;

21、定義處理時間(Processing  Time)

1)處理時間語義下,允許表處理程式根據機器的本地時間生成結果。它是時間的最簡單概念,它既不需要提取時間戳,也不需要生成Watermark;

》》由DataStream轉換成表時指定

(1)在定義Schema期間,可以使用.proctime,指定字段名定義處理時間字段;

(2)這個proctime屬性隻能通過附加邏輯字段,來擴充實體schema。是以,隻能在schema定義的末尾定義它

Flink流式處理架構中的Table API和Flink SQL

》》定義Table  Schema時指定

Flink流式處理架構中的Table API和Flink SQL

》》在建立表的DDL中定義

Flink流式處理架構中的Table API和Flink SQL

2)事件時間語義,允許表處理程式根據每個記錄中包含的時間生成結果。這樣即使在有亂序事件或者延遲事件時,也可以獲得正确的結果。

3)為了處理無序事件,并區分流中的準時和遲到事件;Flink需要從事件資料中,提取時間戳,并用來推進事件時間的進展;

4)定義事件時間,同樣有三種方法:

       》由DataStream轉換成表時指定

       》定義Table  Schema時指定

       》在建立表的DDL中定義

》》由DataStream轉換成表時指定

(1)在DataStream轉換成Table,使用.rowtime可以定義事件時間屬性

Flink流式處理架構中的Table API和Flink SQL

(2)定義Table  Schema時指定

Flink流式處理架構中的Table API和Flink SQL

(3)在建立表的DDL中定義

Flink流式處理架構中的Table API和Flink SQL

22、視窗

1)時間定義,要配合視窗操作才能發揮作用;

2)在Table  API和SQL中,主要有兩種視窗

》》Group  Windows(分組視窗)

       ——  根據時間或行計數間隔,将行聚合到有限的組(Group)中,并對每個組的資料執行一次聚合函數;

》》Over Windows

       ——  針對每個輸入行,計算相鄰行範圍内的聚合;

22.1 Group  Windows

(1)Group  WIndows是使用window(w:GroupWindow)子句定義的,并且必須由as子句指定一個别名。

(2)為了按視窗對表進行分組,視窗的别名必須在group by子句中,像正常的分組字段一樣引用

Flink流式處理架構中的Table API和Flink SQL

(3)Table  API提供了一組具有特定語義的預定義Window類,這些類會被轉換為底層DataStream或DataSet的視窗操作;

22.2 滾動視窗(Tumbling  windows)

(1)滾動視窗要用Tumble類來定義

Flink流式處理架構中的Table API和Flink SQL

22.3 滑動視窗(Sliding  windows)

(1)滑動視窗要用slide類來定義

Flink流式處理架構中的Table API和Flink SQL

22.4  會話視窗(Session  windows)

(1)會話視窗要用Session類來定義

Flink流式處理架構中的Table API和Flink SQL

22.5 Over  Windows

(1)Over  window聚合是标準SQL中已有的(over子句),可以在查詢的SELECT子句中定義;

(2)Over window聚合,會針對每個輸入行,計算相鄰行範圍内的聚合;

(3)Over windows使用window(w:overwindows*)子句定義,并在select()方法中通過别名來引用

Flink流式處理架構中的Table API和Flink SQL

4)Table  API提供了Over類,來配置Over視窗的屬性;

22.6  無界Over  Windows

1)可以在事件時間或處理時間,以及指定為時間間隔,或行計數的範圍内,定義Over windows

2)無界的over  window是使用常量指定的

Flink流式處理架構中的Table API和Flink SQL

22.7  有界Over Windows

1)有界的over  window是用間隔的大小指定的

Flink流式處理架構中的Table API和Flink SQL

23、SQL中的Group Windows

1)Group Windows定義在SQL查詢的Group By子句中

》》TUMBLE(time_attr,interval)

(1)定義一個滾動視窗,第一個參數是時間字段,第二個參數是視窗長度;

》》HOP(time_attr,interval)

(1)定義一個滑動視窗,第一個參數是時間字段,第二個參數是視窗滑動步長,第三個是視窗長度;

》》SESSION(time_attr,interval)

(1)定義一個會話視窗,第一個參數是時間字段,第二個參數是視窗間隔;

24、SQL中的Over  Windows

1)用Over做視窗聚合時,所有聚合必須在同一視窗上定義,也就是說必須是相同的分區、排序和範圍;

2)目前僅支援在目前行範圍之前的視窗;

3)ORDER  BY必須在單一的時間屬性上指定

Flink流式處理架構中的Table API和Flink SQL

25、函數(Functions)

1)Flink  Table  API 和SQL為使用者提供了一組用于資料轉換的内置函數;

2)SQL中支援的很多函數,Table  API和SQL都已經做了實作;

Flink流式處理架構中的Table API和Flink SQL
Flink流式處理架構中的Table API和Flink SQL

26、使用者自定義函數(UDF)

1)使用者定義函數(User-defined  Functions,UDF)是一個重要的特性,它們顯著地擴充了查詢的表達能力;

2)在大多數情況下,使用者定義的函數必須先注冊,然後才能在查詢中使用;

3)函數通過調用registerFunction()方法在TableEnvironment中注冊。當使用者定義的函數被注冊時,它被插入到TableEnvironment的函數目錄中,這樣Table  API或SQL解析器就可以識别并正确地解釋它;

27、标量函數(Scalar  Functions)

1)使用者定義的标量函數,可以将0、1或多個标量值,映射到新的标量值;

2)為了定義标量函數,必須在org.apache.flink.table.functions中擴充基類Scalar  Function,并實作(一個或多個)求值(eval)方法;

3)标量函數的行為由求值方法決定,求值方法必須公開聲明并命名為eval

Flink流式處理架構中的Table API和Flink SQL

28、表函數(Table  Functions)

1)使用者定義的表函數,也可以将0、1或多個标量值作為輸入參數;與标量函數不同的是,它可以傳回任意數量的行行為輸出,而不是單個值;

2)為了定義一個表函數,必須擴充org.apache.flink.table.functions中的基類TableFunction并實作(一個或多個)求值方法;

3)表函數的行為由其求值方法決定,求值方法必須是public的,并命名為eval:

Flink流式處理架構中的Table API和Flink SQL

29、聚合函數(Aggregate  Functions)

1)使用者自定義聚合函數(User-Defined  Aggregate  Functions,UDAGGs)可以把一個表中的資料,聚合成一個标量值;

2)使用者定義的聚合函數,是通過繼承AggregateFunction抽象類實作的

Flink流式處理架構中的Table API和Flink SQL

3)AggregationFunction要求必須實作的方法:

      ——  createAccumulator()

      ——   accumulate()

      ——   getValue()

4)AggregateFunction的工作原理如下:

       ——  首先,它需要一個累加器(Accumulator),用來儲存聚合中間結果的資料結構;可以通過調用createAccumulator()方法建立空累加器;

       ——   随後,對每個輸入行調用函數的accumulate()方法來更新累加器;

       ——    處理完所有行後,将調用函數的getValue()方法來計算并傳回最終結果;

30、表聚合函數(Table  Aggregate  Functions)

1)使用者定義的表聚合函數(User-defined  Table  Aggregate  Functions,UDTAGGs),可以把一個表中資料,聚合為具有多行和多列的結果表;

2)使用者定義表聚合函數,是通過內建TableAggregateFunction抽象類來實作的;

Flink流式處理架構中的Table API和Flink SQL

3)AggregationFunction要求必須實作的方法:

      ——  createAccumulator()

      ——  accumulate()

      ——  emitValue()

4)TableAggregateFunction的工作原理如下:

      —— 首先,它同樣需要一個累加器(Accumulator),它是儲存聚合中間結果的資料結構。通過調用createAccumulator()方法可以建立空累加器;

      ——  随後,對每個輸入行調用函數的accumulate()方法來更新累加器;

      ——   處理完所有行後,将調用函數的emitValue()方法來計算并傳回最終結果;

繼續閱讀