官網連結:https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/table/common.html#register-a-datastream-or-dataset-as-table
Table API & SQL概念和通用API
Apache Flink具有兩個關系API-Table API和SQL-用于統一流和批處理。Table API是用于Scala和Java的語言內建查詢API,它允許以非常直覺的方式組合來自關系運算符(例如選擇,過濾和聯接)的查詢。Flink的SQL支援基于實作SQL标準的Apache Calcite。無論輸入是批處理輸入
DataSet
還是流輸入
DataStream
,在兩個接口中指定的查詢都具有相同的語義并指定相同的結果。
Table API和SQL接口以及Flink的DataStream和DataSet API緊密內建在一起。您可以輕松地在所有API和基于API的庫之間切換。例如,您可以使用CEP庫從DataStream中提取模式,然後使用Table API來分析模式,或者您可以使用SQL查詢來掃描,過濾和聚合批處理表,然後在預處理的程式上運作Gelly圖算法資料。
注意,Table API和SQL尚未完成功能,正在積極開發中。[Table API,SQL]和[stream,batch]輸入的每種組合都不支援所有操作。
Table API & SQL程式結構
用于批處理和流式傳輸的所有Table API和SQL程式都遵循相同的模式。以下代碼示例顯示了Table API和SQL程式的通用結構。
// step1 : 為特定的執行計劃批處理或流建立一個TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section
// step2 : 系統資料庫
tableEnv.registerTable("table1", ...) // or
tableEnv.registerTableSource("table2", ...) // or
tableEnv.registerExternalCatalog("extCat", ...)
// step 3 : 注冊輸出表
tableEnv.registerTableSink("outputTable", ...);
// step 4 : 查詢API
val tapiResult = tableEnv.scan("table1").select(...)
// create a Table from a SQL query
val sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ...")
// step 5 : 将Table API結果表發送到TableSink,與SQL結果相同
tapiResult.insertInto("outputTable")
// step 6 : 執行程式
tableEnv.execute("scala_job")
注意:Table API和SQL查詢可以輕松地與DataStream或DataSet程式內建并嵌入其中。請參閱與DataStream和DataSet API內建,以了解如何将DataStream和DataSet轉換為Tables,反之亦然。
建立一個TableEnvironment
TableEnvironment是Table API和SQL內建的中心概念。它負責:
- 在内部Catalog中注冊Table
- 注冊外部Catalog
- 執行SQL查詢
- 注冊使用者定義的(scalar, table, or aggregation)函數
- 将DataStream或DataSet轉換為Table
- 持有對ExecutionEnvironment或StreamExecutionEnvironment的引用
Table始終綁定到特定的TableEnvironment。不可能在同一查詢中組合不同TableEnvironments的表,例如,将它們join或union。
通過調用帶有StreamExecutionEnvironment或ExecutionEnvironment和可選TableConfig的靜态BatchTableEnvironment.create()或StreamTableEnvironment.create()方法來建立TableEnvironment。TableConfig可用于配置TableEnvironment或自定義查詢優化和轉換過程(請參閱查詢優化)。
確定選擇與您的程式設計語言比對的特定計劃器BatchTableEnvironment / StreamTableEnvironment。如果兩個計劃程式jar都在類路徑上(預設行為),則應明确設定要在目前程式中使用的計劃程式。
// **********************
// FLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala.StreamTableEnvironment
val fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings)
// or val fsTableEnv = TableEnvironment.create(fsSettings)
// ******************
// FLINK BATCH QUERY
// ******************
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.scala.BatchTableEnvironment
val fbEnv = ExecutionEnvironment.getExecutionEnvironment
val fbTableEnv = BatchTableEnvironment.create(fbEnv)
// **********************
// BLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala.StreamTableEnvironment
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
// or val bsTableEnv = TableEnvironment.create(bsSettings)
// ******************
// BLINK BATCH QUERY
// ******************
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
val bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val bbTableEnv = TableEnvironment.create(bbSettings)
注意:如果lib目錄中隻有一個planner jar,則可以使用useAnyPlanner來建立特定的EnvironmentSettings。
在Catalog系統資料庫
Catalog:所有對資料庫和表的中繼資料資訊都存放在Flink CataLog内部目錄結構中,其存放了flink内部所有與Table相關的中繼資料資訊,包括表結構資訊/資料源資訊等。
TableEnvironment維護按名稱注冊的表的Catalog。表有兩種類型,輸入表和輸出表。可以在Table API和SQL查詢中引用輸入表并提供輸入資料。輸出表可用于将表API或SQL查詢的結果發送到外部系統。
輸入表可以從各種來源進行注冊:
- 現有的Table對象,通常是Table API或SQL查詢的結果。
- 一個TableSource,用于通路外部資料,例如檔案,資料庫或消息傳遞系統。
- DataStream(僅适用于流作業)或DataSet(僅适用于從舊計劃程式轉換的批處理作業)程式中的DataStream或DataSet。
可以使用TableSink注冊輸出表。
注冊Table
在TableEnvironment中注冊一個Table,如下所示:
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section
// table is the result of a simple projection query
val projTable: Table = tableEnv.scan("X").select(...)
// register the Table projTable as table "projectedTable"
tableEnv.registerTable("projectedTable", projTable)
注意:已注冊Table的處理方式與關系資料庫系統中已知的
VIEW
相似,即定義表的查詢未進行優化,但當另一個查詢引用已系統資料庫時将内聯。如果多個查詢引用同一個系統資料庫,則将為每個引用查詢内聯該表并執行多次,即将不會共享系統資料庫的結果。
注冊TableSource
通過
TableSource
,可以通路存儲在存儲系統中的外部資料,例如資料庫(MySQL,HBase等),具有特定編碼的檔案(CSV,Apache [Parquet,Avro,ORC]等)或消息傳遞系統(Apache Kafka,RabbitMQ等)。
Flink旨在為常見的資料格式和存儲系統提供
TableSources
。請檢視“[Table Sources and Sinks]”頁面,以擷取受支援的表源的清單以及如何建構自定義表源的說明。
TableSource在TableEnvironment中注冊如下:
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section
// create a TableSource
val csvSource: TableSource = new CsvTableSource("/path/to/file", ...)
// register the TableSource as table "CsvTable"
tableEnv.registerTableSource("CsvTable", csvSource)
注意:用于Blink執行計劃程式的TableEnvironment僅接受StreamTableSource,LookupableTableSource和InputFormatTableSource,并且用于批處理Blink計劃程式的StreamTableSource必須是有界的。
注冊TableSink
注冊的TableSink可用于将Table API或SQL查詢的結果發送到外部存儲系統,例如資料庫,KV存儲,消息隊列或檔案系統(采用不同的編碼,例如CSV,Apache [Parquet,Avro,ORC],…)。
Flink旨在為常見的資料格式和存儲系統提供TableSink。請參閱有關“表源和接收器”頁面的文檔,以擷取有關可用接收器的詳細資訊以及如何實作自定義TableSink的說明。
TableSink在TableEnvironment中注冊如下:
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section
// create a TableSink
val csvSink: TableSink = new CsvTableSink("/path/to/file", ...)
// define the field names and types
val fieldNames: Array[String] = Array("a", "b", "c")
val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG)
// register the TableSink as table "CsvSinkTable"
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)
注冊外部Catalog
外部catalog可以提供有關外部資料庫和表的資訊,例如它們的名稱,架構,統計資訊,以及有關如何通路存儲在外部資料庫,表或檔案中的資料的資訊。
可以通過實作
ExternalCatalog
接口來建立外部目錄,并在
TableEnvironment中
對其進行注冊,如下所示:
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section
// create an external catalog
val catalog: ExternalCatalog = new InMemoryExternalCatalog
// register the ExternalCatalog catalog
tableEnv.registerExternalCatalog("InMemCatalog", catalog)
在TableEnvironment中注冊後,可以通過指定表的完整路徑(例如catalog.database.table)從Table API或SQL查詢中通路在ExternalCatalog中定義的所有表。
目前,Flink提供了一個InMemoryExternalCatalog用于示範和測試。但是,也可以使用ExternalCatalog接口将HCatalog或Metastore之類的目錄連接配接到Table API。
注意:blink執行計劃不支援外部目錄。
Query a Table
Table API
Table API是用于Scala和Java的語言內建查詢API。與SQL相比,查詢未指定為字元串,而是以宿主語言逐漸構成。
該API基于Table類,Table類代表一個表(流式或批處理),并提供應用關系操作的方法。這些方法傳回一個新的Table對象,該對象表示對輸入Table應用關系操作的結果。某些關系操作由多個方法調用組成,例如table.groupBy(…)、select();其中groupBy(…)指定表的分組,并select(…)在分組的投影表。
Table API文檔描述了流表和批處理表支援的所有Table API操作。
以下示例顯示了一個簡單的Table API聚合查詢:
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section
// register Orders table
// scan registered Orders table
val orders = tableEnv.scan("Orders")
// compute revenue for all customers from France
val revenue = orders
.filter('cCountry === "FRANCE")
.groupBy('cID, 'cName)
.select('cID, 'cName, 'revenue.sum AS 'revSum)
// emit or convert Table
// execute query
注意:Scala Table API使用Scala符号,該符号以一個勾号(’)開頭來引用表的屬性。Table API使用Scala隐式。確定導入org.apache.flink.api.scala.和org.apache.flink.table.api.scala._以便使用Scala隐式轉換。
SQL API
Flink的SQL內建基于實作SQL标準的Apache Calcite。SQL查詢被指定為正常字元串。SQL文檔描述了Flink對流表和批處理表的SQL支援。
下面的示例示範如何指定查詢并以Table的形式傳回結果:
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section
// register Orders table
// compute revenue for all customers from France
val revenue = tableEnv.sqlQuery("""
|SELECT cID, cName, SUM(revenue) AS revSum
|FROM Orders
|WHERE cCountry = 'FRANCE'
|GROUP BY cID, cName
""".stripMargin)
// emit or convert Table
// execute query
下面的示例示範如何指定将查詢結果插入已系統資料庫的更新查詢:
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section
// register "Orders" table
// register "RevenueFrance" output table
// compute revenue for all customers from France and emit to "RevenueFrance"
tableEnv.sqlUpdate("""
|INSERT INTO RevenueFrance
|SELECT cID, cName, SUM(revenue) AS revSum
|FROM Orders
|WHERE cCountry = 'FRANCE'
|GROUP BY cID, cName
""".stripMargin)
// execute query
混合表API和SQL
表API和SQL查詢可以輕松混合,因為它們都傳回Table對象:
- 可以在SQL查詢傳回的Table對象上定義Table API查詢。
- 通過在TableEnvironment中注冊結果表并在SQL查詢的FROM子句中引用它,可以對Table API查詢的結果定義SQL查詢。
送出Table
通過将表寫入TableSink來送出Table。TableSink是通用接口,用于支援各種檔案格式(例如CSV,Apache Parquet,Apache Avro),存儲系統(例如JDBC,Apache HBase,Apache Cassandra,Elasticsearch)或消息傳遞系統(例如Apache Kafka,RabbitMQ)。
批處理表隻能寫入BatchTableSink,而流式表則需要AppendStreamTableSink,RetractStreamTableSink或UpsertStreamTableSink。
請參閱有關Table Sources & Sinks的文檔,以擷取有關可用接收器的詳細資訊以及有關如何實作自定義TableSink的說明。
Table.insertInto(String tableName)方法将Table送出到已注冊的TableSink。該方法通過名稱從目錄中查找TableSink,并驗證Table的schema與TableSink的schema是否相同。
以下示例顯示如何發出表:
// get a TableEnvironment
val tableEnv = ... // see "Create a TableEnvironment" section
// create a TableSink
val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")
// register the TableSink with a specific schema
val fieldNames: Array[String] = Array("a", "b", "c")
val fieldTypes: Array[TypeInformation] = Array(Types.INT, Types.STRING, Types.LONG)
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink)
// compute a result Table using Table API operators and/or SQL queries
val result: Table = ...
// emit the result Table to the registered TableSink
result.insertInto("CsvSinkTable")
// execute the program
翻譯和執行查詢
對于兩個執行計劃來說,翻譯和執行查詢的行為是不同的。
根據Table API和SQL查詢的輸入是流輸入還是批處理輸入,它們将翻譯為DataStream或DataSet程式。查詢在内部表示為邏輯查詢計劃,并分為兩個階段:
- 優化邏輯計劃
- 轉換為DataStream或DataSet程式
在以下情況下,将翻譯Table API或SQL查詢:
- Table被發送到TableSink,即當調用Table.insertInto() 時。
- 指定SQL更新查詢,即在調用TableEnvironment.sqlUpdate() 時。
- 将Table轉換為DataStream或DataSet(請參閱Integration with DataStream and DataSet API)。
翻譯後,将像正常DataStream或DataSet程式一樣處理Table API或SQL查詢,并在調用StreamExecutionEnvironment.execute() 或ExecutionEnvironment.execute() 時執行。
與DataStream和DataSet API內建
流上的兩個執行計劃都可以與DataStream API內建。隻有舊的執行計劃程式才能與DataSet API內建,Blink與Batch執行計劃程式不能與兩者結合。
注意:下面讨論的DataSet API僅與批量使用的舊計劃程式有關。
Table API和SQL查詢可以輕松地與DataStream和DataSet程式內建并嵌入其中。例如,可以查詢外部表(例如從RDBMS),進行一些預處理,例如過濾,投影,聚合或與中繼資料聯接,然後進一步使用DataStream或DataSet API(以及在這些API之上建構的任何庫,例如CEP或Gelly)。相反,也可以将Table API或SQL查詢應用于DataStream或DataSet程式的結果。
可以通過将DataStream或DataSet轉換為Table來實作這種互動,反之亦然。
Scala的隐式轉換
Scala Table API具有對DataSet,DataStream和Table類的隐式轉換。通過為Scala DataStream API導入org.apache.flink.table.api.scala.包以及org.apache.flink.api.scala._包,可以啟用這些轉換。
将DataStream或DataSet注冊為Table
可以在TableEnvironment中将DataStream或DataSet注冊為表。結果表的模式取決于已注冊的DataStream或DataSet的資料類型。
// get TableEnvironment
// registration of a DataSet is equivalent
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
val stream: DataStream[(Long, String)] = ...
// register the DataStream as Table "myTable" with fields "f0", "f1"
tableEnv.registerDataStream("myTable", stream)
// register the DataStream as table "myTable2" with fields "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, 'myLong, 'myString)
注意:DataStream表的名稱不得與^ DataStreamTable [0-9] +模式比對,并且DataSet表的名稱不得與^ DataSetTable [0-9] +模式比對。這些模式僅供内部使用。
将DataStream或DataSet轉換為Table
除了在TableEnvironment中注冊DataStream或DataSet之外,還可以将其直接轉換為Table。如果要在Table API查詢中使用Table,這将很友善。
// get TableEnvironment
// registration of a DataSet is equivalent
val tableEnv = ... // see "Create a TableEnvironment" section
val stream: DataStream[(Long, String)] = ...
// convert the DataStream into a Table with default fields '_1, '_2
val table1: Table = tableEnv.fromDataStream(stream)
// convert the DataStream into a Table with fields 'myLong, 'myString
val table2: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)
将Table轉換為DataStream或DataSet
将Table轉換為DataStream或DataSet時,您需要指定結果DataStream或DataSet的資料類型,即要将表的行轉換為的資料類型。最友善的轉換類型通常是Row。以下清單概述了不同選項的功能:
- ROW:字段按位置,任意數量的字段進行映射,支援空值,沒有類型安全的通路。
- POJO:字段按名稱映射(POJO字段必須命名為Table字段),任意數量的字段,支援空值,類型安全通路。
- Case Class:字段按位置映射,不支援空值,類型安全通路。
- Tuple:按位置映射字段,限制為22(Scala)或25(Java)字段,不支援空值,類型安全通路。
- Atomic Type:表必須具有單個字段,不支援空值,類型安全通路。
将Table轉換為DataStream
流式查詢結果産生的Table将動态更新,即随着新記錄到達查詢的輸入流中而不斷變化。是以,将這種動态查詢轉換成的DataStream需要對表的更新進行編碼。
有兩種模式可以将Table轉換為DataStream:
1. Append Mode:僅當動态表僅通過INSERT更改進行修改時才可以使用此模式,即它僅是追加操作,并且以前發出的結果從不更新。
2. Retract Mode:始終可以使用此模式。它使用布爾标志對INSERT和DELETE更改進行編碼。
// get TableEnvironment.
// registration of a DataSet is equivalent
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
// Table with two fields (String name, Integer age)
val table: Table = ...
// convert the Table into an append DataStream of Row
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)
// convert the Table into an append DataStream of Tuple2[String, Int]
val dsTuple: DataStream[(String, Int)] dsTuple =
tableEnv.toAppendStream[(String, Int)](table)
// convert the Table into a retract DataStream of Row.
// A retract stream of type X is a DataStream[(Boolean, X)].
// The boolean field indicates the type of the change.
// True is INSERT, false is DELETE.
val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)
注意:有關動态表及其屬性的詳細讨論,請參見“Dynamic Tables”文檔。
将Table轉換為DataSet
将Tabble轉換為DataSet,如下所示:
// get TableEnvironment
// registration of a DataSet is equivalent
val tableEnv = BatchTableEnvironment.create(env)
// Table with two fields (String name, Integer age)
val table: Table = ...
// convert the Table into a DataSet of Row
val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)
// convert the Table into a DataSet of Tuple2[String, Int]
val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)
資料類型到Table的映射
Flink的DataStream和DataSet API支援多種類型。Tuples(内置Scala和Flink Java元組),POJO,Scala case class和Flink的Row類型等複合類型允許嵌套的資料結構具有多個字段,可以在Table表達式中進行通路。其他類型被視為原子類型。在下面,我們描述Table API如何将這些類型轉換為内部行表示形式,并顯示将DataStream轉換為Table的示例。
資料類型到Table模式的映射可以通過兩種方式發生:基于字段位置或基于字段名稱。
基于位置的映射
基于位置的映射可用于在保持字段順序的同時為字段賦予更有意義的名稱。此映射可用于具有定義的字段順序的複合資料類型以及原子類型。元組,行和案例類等複合資料類型具有這樣的字段順序。但是,必須根據字段名稱映射POJO的字段。可以将字段投影出來,但不能使用别名as重命名。
在定義基于位置的映射時,輸入資料類型中不得存在指定的名稱,否則API會假定映射應基于字段名稱進行。如果未指定任何字段名稱,則使用複合類型的預設字段名稱和字段順序,或者原子類型使用f0。
// get a TableEnvironment
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
val stream: DataStream[(Long, Int)] = ...
// convert DataStream into Table with default field names "_1" and "_2"
val table: Table = tableEnv.fromDataStream(stream)
// convert DataStream into Table with field "myLong" only
val table: Table = tableEnv.fromDataStream(stream, 'myLong)
// convert DataStream into Table with field names "myLong" and "myInt"
val table: Table = tableEnv.fromDataStream(stream, 'myLong, 'myInt)
基于名稱的映射
基于名稱的映射可用于任何資料類型,包括POJO。這是定義表模式映射的最靈活的方法。映射中的所有字段均按名稱引用,并且可以使用别名as重命名。字段可以重新排序和投影。
如果未指定任何字段名稱,則使用複合類型的預設字段名稱和字段順序,或者原子類型使用f0。
// get a TableEnvironment
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
val stream: DataStream[(Long, Int)] = ...
// convert DataStream into Table with default field names "_1" and "_2"
val table: Table = tableEnv.fromDataStream(stream)
// convert DataStream into Table with field "_2" only
val table: Table = tableEnv.fromDataStream(stream, '_2)
// convert DataStream into Table with swapped fields
val table: Table = tableEnv.fromDataStream(stream, '_2, '_1)
// convert DataStream into Table with swapped fields and field names "myInt" and "myLong"
val table: Table = tableEnv.fromDataStream(stream, '_2 as 'myInt, '_1 as 'myLong)
原子類型
Flink将基元(Integer, Double, String)或通用類型(無法分析和分解的類型)視為原子類型。原子類型的DataStream或DataSet轉換為具有單個屬性的表。從原子類型推斷出屬性的類型,并且可以指定屬性的名稱。
// get a TableEnvironment
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
val stream: DataStream[Long] = ...
// convert DataStream into Table with default field name "f0"
val table: Table = tableEnv.fromDataStream(stream)
// convert DataStream into Table with field name "myLong"
val table: Table = tableEnv.fromDataStream(stream, 'myLong)
Tuples(Scala和Java)和Case Classes(僅Scala)
Flink支援Scala的内置元組,并為Java提供了自己的元組類。兩種元組的DataStreams和DataSet都可以轉換為表。可以通過提供所有字段的名稱來重命名字段(根據位置進行映射)。如果未指定任何字段名稱,則使用預設字段名稱。如果引用了原始字段名稱(Flink元組為f0,f1,…,Scala元組為_1,_2,…),則API會假定映射是基于名稱的,而不是基于位置的。基于名稱的映射允許使用别名(as)對字段和投影進行重新排序。
// get a TableEnvironment
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
val stream: DataStream[(Long, String)] = ...
// convert DataStream into Table with renamed default field names '_1, '_2
val table: Table = tableEnv.fromDataStream(stream)
// convert DataStream into Table with field names "myLong", "myString" (position-based)
val table: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)
// convert DataStream into Table with reordered fields "_2", "_1" (name-based)
val table: Table = tableEnv.fromDataStream(stream, '_2, '_1)
// convert DataStream into Table with projected field "_2" (name-based)
val table: Table = tableEnv.fromDataStream(stream, '_2)
// convert DataStream into Table with reordered and aliased fields "myString", "myLong" (name-based)
val table: Table = tableEnv.fromDataStream(stream, '_2 as 'myString, '_1 as 'myLong)
// define case class
case class Person(name: String, age: Int)
val streamCC: DataStream[Person] = ...
// convert DataStream into Table with default field names 'name, 'age
val table = tableEnv.fromDataStream(streamCC)
// convert DataStream into Table with field names 'myName, 'myAge (position-based)
val table = tableEnv.fromDataStream(streamCC, 'myName, 'myAge)
// convert DataStream into Table with reordered and aliased fields "myAge", "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'age as 'myAge, 'name as 'myName)
POJO (Java and Scala)
Flink支援POJO作為複合類型。确定POJO的規則在此處記錄。
在不指定字段名稱的情況下将POJO DataStream或DataSet轉換為Table時,将使用原始POJO字段的名稱。名稱映射需要原始名稱,并且不能按職位進行映射。可以使用别名(使用as關鍵字)對字段進行重命名,重新排序和投影。
// get a TableEnvironment
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
// Person is a POJO with field names "name" and "age"
val stream: DataStream[Person] = ...
// convert DataStream into Table with default field names "age", "name" (fields are ordered by name!)
val table: Table = tableEnv.fromDataStream(stream)
// convert DataStream into Table with renamed fields "myAge", "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'age as 'myAge, 'name as 'myName)
// convert DataStream into Table with projected field "name" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name)
// convert DataStream into Table with projected and renamed field "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName)
Row
Row資料類型支援任意數量的字段和具有空值的字段。可以通過RowTypeInfo或在将Row DataStream或DataSet轉換為Table時指定字段名稱。行類型支援按位置和名稱映射字段。可以通過提供所有字段的名稱(基于位置的映射)來重命名字段,也可以為投影/排序/重新命名(基于名稱的映射)單獨選擇字段。
// get a TableEnvironment
val tableEnv: StreamTableEnvironment = ... // see "Create a TableEnvironment" section
// DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`
val stream: DataStream[Row] = ...
// convert DataStream into Table with default field names "name", "age"
val table: Table = tableEnv.fromDataStream(stream)
// convert DataStream into Table with renamed field names "myName", "myAge" (position-based)
val table: Table = tableEnv.fromDataStream(stream, 'myName, 'myAge)
// convert DataStream into Table with renamed fields "myName", "myAge" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName, 'age as 'myAge)
// convert DataStream into Table with projected field "name" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name)
// convert DataStream into Table with projected and renamed field "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName)
查詢優化
Apache Flink利用Apache Calcite來優化和翻譯查詢。目前執行的優化包括projection和filter push-down,子查詢去相關以及其他類型的查詢重寫。Old Planner尚未優化聯接的順序,而是按照查詢中定義的順序執行它們(FROM子句中的表順序和/或WHERE子句中的連接配接謂詞順序)。
通過提供CalciteConfig對象,可以調整在不同階段應用的優化規則集。可以通過建構器調用CalciteConfig.createBuilder() 來建立此屬性,并通過調用tableEnv.getConfig.setPlannerConfig(calciteConfig)将其提供給TableEnvironment。
Explaining a Table
Table API提供了一種機制來解釋計算Table的邏輯和優化查詢計劃。這是通過TableEnvironment.explain(table)方法或TableEnvironment.explain()方法完成的。explain(table)傳回給定Table的計劃。explain()傳回多接收器計劃的結果,主要用于Blink計劃器。它傳回一個描述三個計劃的字元串:
- 關系查詢的抽象文法樹,即未優化的邏輯查詢計劃,
- 優化的邏輯查詢計劃,
- 以及實際執行計劃。
以下代碼顯示了一個示例以及使用explain(table)給定Table的相應輸出的執行計劃:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
val table = table1
.where('word.like("F%"))
.unionAll(table2)
val explanation: String = tEnv.explain(table)
println(explanation)
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
FlinkLogicalDataStreamScan(id=[1], fields=[count, word])
FlinkLogicalDataStreamScan(id=[2], fields=[count, word])
== Optimized Logical Plan ==
DataStreamUnion(all=[true], union all=[count, word])
DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
DataStreamScan(id=[1], fields=[count, word])
DataStreamScan(id=[2], fields=[count, word])
== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat
Stage 2 : Data Source
content : collect elements with CollectionInputFormat
Stage 3 : Operator
content : from: (count, word)
ship_strategy : REBALANCE
Stage 4 : Operator
content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word)
ship_strategy : FORWARD
Stage 5 : Operator
content : from: (count, word)
ship_strategy : REBALANCE
以下代碼顯示了一個示例以及使用explain() 的多sink相應輸出的執行計劃:
val settings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.build
val tEnv = TableEnvironment.create(settings)
val fieldNames = Array("count", "word")
val fieldTypes = Array[TypeInformation[_]](Types.INT, Types.STRING)
tEnv.registerTableSource("MySource1", new CsvTableSource("/source/path1", fieldNames, fieldTypes))
tEnv.registerTableSource("MySource2", new CsvTableSource("/source/path2",fieldNames, fieldTypes))
tEnv.registerTableSink("MySink1", new CsvTableSink("/sink/path1").configure(fieldNames, fieldTypes))
tEnv.registerTableSink("MySink2", new CsvTableSink("/sink/path2").configure(fieldNames, fieldTypes))
val table1 = tEnv.scan("MySource1").where("LIKE(word, 'F%')")
table1.insertInto("MySink1")
val table2 = table1.unionAll(tEnv.scan("MySource2"))
table2.insertInto("MySink2")
val explanation = tEnv.explain(false)
println(explanation)
多彙計劃的結果是
== Abstract Syntax Tree ==
LogicalSink(name=[MySink1], fields=[count, word])
+- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
+- LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]])
LogicalSink(name=[MySink2], fields=[count, word])
+- LogicalUnion(all=[true])
:- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
: +- LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]])
+- LogicalTableScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]])
== Optimized Logical Plan ==
Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')], reuse_id=[1])
+- TableSourceScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])
Sink(name=[MySink1], fields=[count, word])
+- Reused(reference_id=[1])
Sink(name=[MySink2], fields=[count, word])
+- Union(all=[true], union=[count, word])
:- Reused(reference_id=[1])
+- TableSourceScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])
== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat
Stage 2 : Operator
content : CsvTableSource(read fields: count, word)
ship_strategy : REBALANCE
Stage 3 : Operator
content : SourceConversion(table:Buffer(default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]), fields:(count, word))
ship_strategy : FORWARD
Stage 4 : Operator
content : Calc(where: (word LIKE _UTF-16LE'F%'), select: (count, word))
ship_strategy : FORWARD
Stage 5 : Operator
content : SinkConversionToRow
ship_strategy : FORWARD
Stage 6 : Operator
content : Map
ship_strategy : FORWARD
Stage 8 : Data Source
content : collect elements with CollectionInputFormat
Stage 9 : Operator
content : CsvTableSource(read fields: count, word)
ship_strategy : REBALANCE
Stage 10 : Operator
content : SourceConversion(table:Buffer(default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]), fields:(count, word))
ship_strategy : FORWARD
Stage 12 : Operator
content : SinkConversionToRow
ship_strategy : FORWARD
Stage 13 : Operator
content : Map
ship_strategy : FORWARD
Stage 7 : Data Sink
content : Sink: CsvTableSink(count, word)
ship_strategy : FORWARD
Stage 14 : Data Sink
content : Sink: CsvTableSink(count, word)
ship_strategy : FORWARD