一、Table API和Flink SQL是什麼?
1)Flink對批處理和流處理,提供了統一的上層API;
2)Table API是一套内嵌在Java和Scala語言中的查詢API,它允許以非常直覺的方式組合來自一些關系運算符的查詢;
3)Flink的SQL支援基于實作了SQL标準的Apache Calcite;

1、基本程式結構
1)Table API和SQL的程式結構,與流式處理的程式結構十分類似
2、建立TableEnvironment
1)建立表的執行環境,需要将flink流處理的執行環境傳入;
2)TableEnvironment是flink中內建Table API和SQL的核心概念,所有對表的操作都基于TableEnvironment
—— 注冊Catalog
—— 在Catalog中系統資料庫
—— 執行SQL查詢
—— 注冊使用者自定義函數(UDF)
3、配置TableEnvironment
1)配置老版本planner的流式查詢
2)配置老版本planner的批式查詢
3)配置blink planner的流式查詢
4)配置blink planner的批式查詢
二、表(Table)
1)TableEnvironment可以注冊目錄Catalog,并可以基于Catalog系統資料庫;
2)表(Table)是由一個“辨別符”(identifier)來指定的,由3部分組成:Catalog名、資料庫(database)名和對象名;
3)表可以是正常的,也可以是虛拟的(視圖,View);
4)正常表(Table)一般可以用來描述外部資料,比如檔案、資料庫表或消息隊列的資料,也可以直接從DataStream轉換而來;
5)視圖(View)可以從現有的表中建立,通常是Table API或者SQL查詢的一個結果表;
1、建立表
1)TableEnvironment可以調用.connect()方法,連接配接外部系統,并調用.createTemporaryTable()方法,在Catalog中系統資料庫;
2)可以建立Table來描述檔案資料,它可以從檔案中讀取,或者将資料寫入檔案
2、表的查詢——Table API
1)Table API是內建在Scala和Java語言内的查詢API;
2)Table API基于代表“表”的Table類,并提供一整套操作處理的方法API;這些方法會傳回一個新的Table對象,表示對輸入表應用轉換操作的結果;
3)有些關系型轉換操作,可以由多個方法調用組成,構成鍊式調用結構;
4)Flink的SQL內建,基于實作了SQL标準的Apache Calcite;
5)在Flink中,用正常字元串來定義SQL查詢語句;
6)SQL查詢的結果,也是一個新的Table;
3、将DataStream轉換成表
1)對于一個DataStream,可以直接轉換成Table,進而友善地調用Table API做轉換操作;
2)預設轉換後的Table schema和DataStream中的字段定義一一對應,也可以單獨指定出來;
4、資料類型與Schema的對應
1)DataStream中的資料類型,與表的Schema之間的對應關系,可以有兩種:基于字段名稱,或者基于字段位置;
2)基于名稱(name-based)
3)基于位置(position-based)
5、建立臨時視圖(Temporary View)
1)基于DataStream建立臨時視圖
2)基于Table建立臨時視圖
6、輸出表
1)表的輸出,是通過将資料寫入TableSink來實作的;
2)TableSink是一個通用接口,可以支援不同的檔案格式、存儲資料庫和消息隊列;
3)輸出表最直接的方法,就是通過Table.insertInto()方法将一個Table寫入注冊過的TableSink中
7、輸出到檔案
8、更新模式
1)對于流式查詢,需要聲明如何在表和外部連接配接器之間執行轉換;
2)與外部系統交換的消息類型,由更新模式(Update Mode)指定
》》追加(Append)模式
—— 表隻做插入操作,和外部連接配接器隻交換插入(Insert)消息;
》》撤回(Retract)模式
—— 表和外部連接配接器交換添加(Add)和撤回(Retract)消息;
—— 插入操作(Insert)編碼為Add消息;删除(Delete)編碼為Retract消息;更新(Update)編碼為上一條的Retract和下一條的Add消息;
》》更新插入(Upsert)模式
—— 更新和插入都被編碼為Upsert消息;删除編碼為Delete消息;
9、輸出到Kafka
1)可以建立Table來描述kafka中的資料,作為輸入或輸出的TableSink
10、輸出到ES
1)可以建立Table來描述ES中的資料,作為輸出的TableSink
11、輸出到MySQL
1)可以建立Table來描述MySQL中的資料,作為輸入和輸出
12、将Table轉換成DataStream
1)表可以轉換為DataStream或DataSet,這樣自定義流處理或批處理程式就可以繼續在Table API或SQL查詢的結果上運作了;
2)将表轉換為DataStream或DataSet時,需要指定生成的資料類型,即要将表的每一行轉換成的資料類型;
3)表作為流式查詢的結果,是動态更新的;
4)轉換有兩種轉換模式:追加(Append)模式和撤回(Retract)模式;
》》追加模式(Append Mode)
—— 用于表隻會被插入(Insert)操作更改的場景
》》撤回模式(Retract Mode)
—— 用于任何場景,有些類似于更新模式中Retract模式,它隻有Insert和Delete兩類操作;
—— 得到的資料會增加一個Boolean類型的辨別位(傳回的第一個字段,用它來辨別到底是新增的資料(Insert)),還是被删除的資料(Delete)。
13、檢視執行計劃
1)Table API提供了一種機制來解釋計算表的邏輯和優化查詢計劃;
2)檢視執行計劃,可以通過TableEnvironment.explain(table)方法或TableEnvironment.explain()方法完成,傳回一個字元串,描述三個計劃:
》》優化的邏輯查詢計劃;
》》優化後的邏輯查詢計劃;
》》實際執行計劃;
14、流處理和關系代數的差別
關系代數(表)/SQL | 流處理 | |
處理的資料對象 | 字段元組的有界集合 | 字段元組的無限序列 |
查詢(Query) 對資料的通路 | 可以通路到完整的資料輸入 | 無法通路所有資料, 必須持續“等待”流式輸入 |
查詢終止條件 | 生成固定大小的結果集後終止 | 永不停止,根據持續收到的資料不斷更新查詢結果 |
15、動态表(Dynamic Tables)
1)動态表是Flink對流資料的Table API和SQL支援的核心概念;
2)與表示批處理資料的靜态表不同,動态表是随時間變化的;
》》持續查詢(Continuous Query)
(1)動态表可以像靜态的批處理表一樣進行查詢,查詢一個動态表會産生持續查詢(Continuous Query);
(2)連續查詢永遠不會終止,并會生成另一個動态表;
(3)查詢會不斷更新其動态結果表,以反映其動态輸入表上的更改;
16、動态表和持續查詢
》》流式表查詢的處理過程:
(1)流被轉換為動态表;
(2)對動态表計算連續查詢,生成新的動态表;
(3)生成的動态表被轉換回流;
17、将流轉換成動态表
1)為了處理帶有關系查詢的流,必須先将其轉換為表;
2)從概念上将,流的每個資料記錄,都被解釋為結果表的插入(Insert)修改操作;
18、持續查詢
1)持續查詢會在動态表上做計算處理,并作為結果生成新的動态表;
19、将動态表轉換成DataStream
1)與正常的資料庫表一樣,動态表可以通過插入(Insert)、更新(Update)和删除(Delete)更改,進行持續的修改;
2)将動态表轉換為流或将其寫入外部系統時,需要對這些更改進行編碼;
》》僅追加(Append-only)流
—— 僅通過插入(Insert)更改來修改的動态表,可以直接轉換為僅追加流;
》》撤回(Retract)流
—— 撤回流流是包含兩類消息的流:添加(Add)消息和撤回(Retract)消息;
》》Upsert(更新插入)流
—— Upsert流也包含兩種類型的消息:Upsert消息和删除(Delete)消息;
20、時間特性(Time Attributes)
1)基于時間的操作(比如Table API和SQL中視窗操作),需要定義相關的時間語義和時間資料來源的資訊;
2)Table可以提供一個邏輯上的時間字段,用于在表處理程式中,訓示時間和通路相應的時間戳;
3)時間屬性,可以是每個表schema的一部分。一旦定義了時間屬性,它就可以作為一個字段引用,并且可以在基于時間的操作中使用;
4)時間屬性的行為類似于正常時間戳,可以通路,并且進行計算;
21、定義處理時間(Processing Time)
1)處理時間語義下,允許表處理程式根據機器的本地時間生成結果。它是時間的最簡單概念,它既不需要提取時間戳,也不需要生成Watermark;
》》由DataStream轉換成表時指定
(1)在定義Schema期間,可以使用.proctime,指定字段名定義處理時間字段;
(2)這個proctime屬性隻能通過附加邏輯字段,來擴充實體schema。是以,隻能在schema定義的末尾定義它
》》定義Table Schema時指定
》》在建立表的DDL中定義
2)事件時間語義,允許表處理程式根據每個記錄中包含的時間生成結果。這樣即使在有亂序事件或者延遲事件時,也可以獲得正确的結果。
3)為了處理無序事件,并區分流中的準時和遲到事件;Flink需要從事件資料中,提取時間戳,并用來推進事件時間的進展;
4)定義事件時間,同樣有三種方法:
》由DataStream轉換成表時指定
》定義Table Schema時指定
》在建立表的DDL中定義
》》由DataStream轉換成表時指定
(1)在DataStream轉換成Table,使用.rowtime可以定義事件時間屬性
(2)定義Table Schema時指定
(3)在建立表的DDL中定義
22、視窗
1)時間定義,要配合視窗操作才能發揮作用;
2)在Table API和SQL中,主要有兩種視窗
》》Group Windows(分組視窗)
—— 根據時間或行計數間隔,将行聚合到有限的組(Group)中,并對每個組的資料執行一次聚合函數;
》》Over Windows
—— 針對每個輸入行,計算相鄰行範圍内的聚合;
22.1 Group Windows
(1)Group WIndows是使用window(w:GroupWindow)子句定義的,并且必須由as子句指定一個别名。
(2)為了按視窗對表進行分組,視窗的别名必須在group by子句中,像正常的分組字段一樣引用
(3)Table API提供了一組具有特定語義的預定義Window類,這些類會被轉換為底層DataStream或DataSet的視窗操作;
22.2 滾動視窗(Tumbling windows)
(1)滾動視窗要用Tumble類來定義
22.3 滑動視窗(Sliding windows)
(1)滑動視窗要用slide類來定義
22.4 會話視窗(Session windows)
(1)會話視窗要用Session類來定義
22.5 Over Windows
(1)Over window聚合是标準SQL中已有的(over子句),可以在查詢的SELECT子句中定義;
(2)Over window聚合,會針對每個輸入行,計算相鄰行範圍内的聚合;
(3)Over windows使用window(w:overwindows*)子句定義,并在select()方法中通過别名來引用
4)Table API提供了Over類,來配置Over視窗的屬性;
22.6 無界Over Windows
1)可以在事件時間或處理時間,以及指定為時間間隔,或行計數的範圍内,定義Over windows
2)無界的over window是使用常量指定的
22.7 有界Over Windows
1)有界的over window是用間隔的大小指定的
23、SQL中的Group Windows
1)Group Windows定義在SQL查詢的Group By子句中
》》TUMBLE(time_attr,interval)
(1)定義一個滾動視窗,第一個參數是時間字段,第二個參數是視窗長度;
》》HOP(time_attr,interval)
(1)定義一個滑動視窗,第一個參數是時間字段,第二個參數是視窗滑動步長,第三個是視窗長度;
》》SESSION(time_attr,interval)
(1)定義一個會話視窗,第一個參數是時間字段,第二個參數是視窗間隔;
24、SQL中的Over Windows
1)用Over做視窗聚合時,所有聚合必須在同一視窗上定義,也就是說必須是相同的分區、排序和範圍;
2)目前僅支援在目前行範圍之前的視窗;
3)ORDER BY必須在單一的時間屬性上指定
25、函數(Functions)
1)Flink Table API 和SQL為使用者提供了一組用于資料轉換的内置函數;
2)SQL中支援的很多函數,Table API和SQL都已經做了實作;
26、使用者自定義函數(UDF)
1)使用者定義函數(User-defined Functions,UDF)是一個重要的特性,它們顯著地擴充了查詢的表達能力;
2)在大多數情況下,使用者定義的函數必須先注冊,然後才能在查詢中使用;
3)函數通過調用registerFunction()方法在TableEnvironment中注冊。當使用者定義的函數被注冊時,它被插入到TableEnvironment的函數目錄中,這樣Table API或SQL解析器就可以識别并正确地解釋它;
27、标量函數(Scalar Functions)
1)使用者定義的标量函數,可以将0、1或多個标量值,映射到新的标量值;
2)為了定義标量函數,必須在org.apache.flink.table.functions中擴充基類Scalar Function,并實作(一個或多個)求值(eval)方法;
3)标量函數的行為由求值方法決定,求值方法必須公開聲明并命名為eval
28、表函數(Table Functions)
1)使用者定義的表函數,也可以将0、1或多個标量值作為輸入參數;與标量函數不同的是,它可以傳回任意數量的行行為輸出,而不是單個值;
2)為了定義一個表函數,必須擴充org.apache.flink.table.functions中的基類TableFunction并實作(一個或多個)求值方法;
3)表函數的行為由其求值方法決定,求值方法必須是public的,并命名為eval:
29、聚合函數(Aggregate Functions)
1)使用者自定義聚合函數(User-Defined Aggregate Functions,UDAGGs)可以把一個表中的資料,聚合成一個标量值;
2)使用者定義的聚合函數,是通過繼承AggregateFunction抽象類實作的
3)AggregationFunction要求必須實作的方法:
—— createAccumulator()
—— accumulate()
—— getValue()
4)AggregateFunction的工作原理如下:
—— 首先,它需要一個累加器(Accumulator),用來儲存聚合中間結果的資料結構;可以通過調用createAccumulator()方法建立空累加器;
—— 随後,對每個輸入行調用函數的accumulate()方法來更新累加器;
—— 處理完所有行後,将調用函數的getValue()方法來計算并傳回最終結果;
30、表聚合函數(Table Aggregate Functions)
1)使用者定義的表聚合函數(User-defined Table Aggregate Functions,UDTAGGs),可以把一個表中資料,聚合為具有多行和多列的結果表;
2)使用者定義表聚合函數,是通過內建TableAggregateFunction抽象類來實作的;
3)AggregationFunction要求必須實作的方法:
—— createAccumulator()
—— accumulate()
—— emitValue()
4)TableAggregateFunction的工作原理如下:
—— 首先,它同樣需要一個累加器(Accumulator),它是儲存聚合中間結果的資料結構。通過調用createAccumulator()方法可以建立空累加器;
—— 随後,對每個輸入行調用函數的accumulate()方法來更新累加器;
—— 處理完所有行後,将調用函數的emitValue()方法來計算并傳回最終結果;