一、Table API和SQL的基本使用
想使用Table API和SQL,首先要建立一個TableEnvironment。TableEnvironment對象是Table API和SQL內建的核心,通過TableEnvironment可以實作以下功能。
•通過内部目錄建立表。
•通過外部目錄建立表。
•執行SQL查詢。
•注冊一個使用者自定義的Function。
•把DataStream或者DataSet轉換成Table。
•持有ExecutionEnvironment或者StreamExecutionEnvironment的引用。一個查詢中隻能綁定一個指定的TableEnvironment,TableEnvironment可以通過Table Environment.getTableEnvironment()或者TableConfig來生成。TableConfig可以用來配置TableEnvironment或者自定義查詢優化。如何建立一個TableEnvironment對象?具體實作代碼如下。Java代碼實作如下。[插圖]Scala代碼實作如下。[插圖]通過擷取到的TableEnvironment對象可以建立Table對象,有兩種類型的Table對象:輸入Table(Input Table)和輸出Table(Output Table)。輸入Table可以給Table API和SQL提供查詢資料,輸出Table可以把Table API和SQL的查詢結果發送到外部存儲媒體中。輸入Table可以通過多種資料源注冊。
•已存在的Table對象:通常是Table API和SQL的查詢結果。
•TableSource:通過它可以通路外部資料,比如檔案、資料庫和消息隊列。
•DataStream或DataSet。輸出Table需要使用TableSink注冊。下面示範如何通過TableSource注冊一個Table。
接下來示範如何通過TableSink把資料寫到外部存儲媒體中。
我們知道了如何通過TableSource讀取資料和通過TableSink寫出資料,下面介紹如何查詢Table中的資料。
1.使用Table API
Java代碼實作如下。
2.使用SQL
Java代碼實作如下。
注意:Table API和SQL查詢很容易融合在一起,因為它們都傳回Table對象。
•Table API查詢可以基于SQL查詢結果的Table來進行。
•SQL查詢可以基于Table API查詢的結果來定義。
二、DataStream、DataSet和Table之間的轉換
Table API和SQL查詢可以很容易地和DataStream、DataSet程式內建到一起。通過一個TableEnvironment,可以把DataStream或者DataSet注冊為Table,這樣就可以使用Table API和SQL查詢了。通過TableEnvironment 也可以把Table對象轉換為DataStream或者DataSet,這樣就能夠使用DataStream或者DataSet中的相關API了。
1.把DataStream或者DataSet注冊為Table對象
(1)通過注冊的形式實作。
注意:DataStream程式的表名不能滿足規則_DataStreamTable_[0-9]+,DataSet程式的表名不能滿足規則DataSetTable[0-9]+,這些規則的名字是内部使用的。
(2)通過直接轉化的形式實作。
2.把Table對象轉換為DataStream或者DataSet
當我們想把一個Table對象轉換為DataStream或者DataSet的時候,需要指定DataStream 或者 DataSet中資料的類型。通常較友善的轉換類型是行,如下都是支援的資料類型。
•Row:通過角标映射字段,支援任意數量的字段,并且支援null值和非類型安全的通路。
•POJO:Java中的實體類,這個實體類中的字段名稱需要和Table中的字段名稱保持一緻,支援任意數量的字段,支援null值,類型安全的通路。
•Case Class:通過角标映射字段,不支援null值,類型安全的通路
•Tuple:通過角标映射字段,Scala中限制22個字段,Java中限制25個字段,不支援null值,類型安全的通路。
•Atomic Type:Table必須要有一個字段,不支援null值,類型安全的通路。
(1)把Table轉化為DataStream。
流式查詢的結果Table會被動态地更新,即每個新的記錄到達輸入流時結果就會發生變化。是以,轉換此動态查詢的DataStream需要對表的更新進行編碼。
有幾種模式可以将Table轉換為DataStream。
•Append Mode:這種模式隻适用于當動态表僅由INSERT更改修改時(僅附加),之前添加的資料不會被更新。
•Retract Mode:可以始終使用此模式,它使用一個Boolean辨別來編碼INSERT和DELETE更改。
(2)把Table轉化為DataSet。