本文來自:https://jiamaoxiang.top/2020/05/25/Flink-Table-API-SQL%E7%BC%96%E7%A8%8B%E6%8C%87%E5%8D%97/
概述
Apache Flink提供了兩種頂層的關系型API,分别為Table API和SQL,Flink通過Table API&SQL實作了批流統一。其中Table API是用于Scala和Java的語言內建查詢API,它允許以非常直覺的方式組合關系運算符(例如select,where和join)的查詢。Flink SQL基于Apache Calcite 實作了标準的SQL,使用者可以使用标準的SQL處理資料集。Table API和SQL與Flink的DataStream和DataSet API緊密內建在一起,使用者可以實作互相轉化,比如可以将DataStream或者DataSet注冊為table進行操作資料。值得注意的是,Table API and SQL目前尚未完全完善,還在積極的開發中,是以并不是所有的算子操作都可以通過其實作。
依賴
從Flink1.9開始,Flink為Table & SQL API提供了兩種planner,分别為Blink planner和old planner,其中old planner是在Flink1.9之前的版本使用。主要差別如下:
提示:對于生産環境,目前推薦使用old planner.
-
: 通用子產品,包含 Flink Planner 和 Blink Planner 一些共用的代碼flink-table-common
-
: java語言的Table & SQL API,僅針對table(處于早期的開發階段,不推薦使用)flink-table-api-java
-
: scala語言的Table & SQL API,僅針對table(處于早期的開發階段,不推薦使用)flink-table-api-scala
-
: java語言的Table & SQL API,支援DataStream/DataSet API(推薦使用)flink-table-api-java-bridge
-
: scala語言的Table & SQL API,支援DataStream/DataSet API(推薦使用)flink-table-api-scala-bridge
-
:planner 和runtime. planner為Flink1,9之前的old planner(推薦使用)flink-table-planner
-
: 新的Blink planner.flink-table-planner-blink
-
: 新的Blink runtime.flink-table-runtime-blink
-
: 将上述的API子產品及old planner打成一個jar包,形如flink-table-*.jar,位與/lib目錄下flink-table-uber
-
:将上述的API子產品及Blink 子產品打成一個jar包,形如fflink-table-blink-*.jar,位與/lib目錄下flink-table-uber-blink
Blink planner & old planner
Blink planner和old planner有許多不同的特點,具體列舉如下:
- Blink planner将批處理作業看做是流處理作業的特例。是以,不支援Table 與DataSet之間的轉換,批處理的作業也不會被轉成DataSet程式,而是被轉為DataStream程式。
- Blink planner不支援
,使用的是有界的StreamTableSource。BatchTableSource
- Blink planner僅支援新的
,不支援Catalog
(已過時)。ExternalCatalog
- 對于FilterableTableSource的實作,兩種Planner是不同的。old planner會謂詞下推到
(未來會被移除),而Blink planner 會謂詞下推到PlannerExpression
(表示一個産生計算結果的邏輯樹)。Expression
- 僅僅Blink planner支援key-value形式的配置,即通過Configuration進行參數設定。
- 關于PlannerConfig的實作,兩種planner有所不同。
- Blink planner 會将多個sink優化成一個DAG(僅支援TableEnvironment,StreamTableEnvironment不支援),old planner總是将每一個sink優化成一個新的DAG,每一個DAG都是互相獨立的。
- old planner不支援catalog統計,Blink planner支援catalog統計。
Flink Table & SQL程式的pom依賴
根據使用的語言不同,可以選擇下面的依賴,包括scala版和java版,如下:
<!-- java版 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>1.11.3</version> <scope>provided</scope> </dependency> <!-- scala版 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.11</artifactId> <version>1.11.3</version> <scope>provided</scope> </dependency>
除此之外,如果需要在本地的IDE中運作Table API & SQL的程式,則需要添加下面的pom依賴:
<!-- Flink 1.9之前的old planner --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>1.11.3</version> <scope>provided</scope> </dependency> <!-- 新的Blink planner --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>1.11.3</version> <scope>provided</scope> </dependency>
另外,如果需要實作自定義的格式(比如和kafka互動)或者使用者自定義函數,需要添加如下依賴:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>1.11.3</version> <scope>provided</scope> </dependency>
Table API & SQL的程式設計模闆
所有的Table API&SQL的程式(無論是批處理還是流處理)都有着相同的形式,下面将給出通用的程式設計結構形式:
// 建立一個TableEnvironment對象,指定planner、處理模式(batch、streaming) TableEnvironment tableEnv = ...; // 建立一個表 tableEnv.connect(...).createTemporaryTable("table1"); // 注冊一個外部的表 tableEnv.connect(...).createTemporaryTable("outputTable"); // 通過Table API的查詢建立一個Table 對象 Table tapiResult = tableEnv.from("table1").select(...); // 通過SQL查詢的查詢建立一個Table 對象 Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM table1 ... "); // 将結果寫入TableSink tapiResult.insertInto("outputTable"); // 執行 tableEnv.execute("java_job");
注意:Table API & SQL的查詢可以互相內建,另外還可以在DataStream或者DataSet中使用Table API & SQL的API,實作DataStreams、 DataSet與Table之間的互相轉換。具體參見 Flink 中 DataStream / DataSet 與 Table 的互相轉換
建立TableEnvironment
TableEnvironment是Table API & SQL程式的一個入口,主要包括如下的功能:
- 在内部的catalog中注冊Table
- 注冊catalog
- 加載可插拔子產品
- 執行SQL查詢
- 注冊使用者定義函數
-
、DataStream
與Table之間的互相轉換DataSet
- 持有對
、ExecutionEnvironment
的引用StreamExecutionEnvironment
一個Table必定屬于一個具體的TableEnvironment,不可以将不同TableEnvironment的表放在一起使用(比如join,union等操作)。
TableEnvironment是通過調用
BatchTableEnvironment.create()
或者StreamTableEnvironment.create()的靜态方法進行建立的。另外,預設兩個planner的jar包都存在與classpath下,所有需要明确指定使用的planner。
// ********************** // FLINK 流處理查詢 // ********************** import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.java.StreamTableEnvironment; EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build(); StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings); //或者TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings); // ****************** // FLINK 批處理查詢 // ****************** import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.table.api.java.BatchTableEnvironment; ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv); // ********************** // BLINK 流處理查詢 // ********************** import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.java.StreamTableEnvironment; StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); // 或者 TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings); // ****************** // BLINK 批處理查詢 // ****************** import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
在catalog中建立表
臨時表與永久表
表可以分為臨時表和永久表兩種,其中永久表需要一個catalog(比如Hive的Metastore)來維護表的中繼資料資訊,一旦永久表被建立,隻要連接配接到該catalog就可以通路該表,隻有顯示删除永久表,該表才可以被删除。臨時表的生命周期是Flink Session,這些表不能夠被其他的Flink Session通路,這些表不屬于任何的catalog或者資料庫,如果與臨時表相對應的資料庫被删除了,該臨時表也不會被删除。
建立表
虛表(Virtual Tables)
一個Table對象相當于SQL中的視圖(虛表),它封裝了一個邏輯執行計劃,可以通過一個catalog建立,具體如下:
// 擷取一個TableEnvironment
TableEnvironment tableEnv = ...;
// table對象,查詢的結果集
Table projTable = tableEnv.from("X").select(...);
// 注冊一個表,名稱為 "projectedTable"
tableEnv.createTemporaryView("projectedTable", projTable);
外部資料源表(Connector Tables)
可以把外部的資料源注冊成表,比如可以讀取MySQL資料庫資料、Kafka資料等
tableEnvironment .connect(...) .withFormat(...) .withSchema(...) .inAppendMode() .createTemporaryTable("MyTable")
擴充建立表的辨別屬性
表的注冊總是包含三部分辨別屬性:catalog、資料庫、表名。使用者可以在内部設定一個catalog和一個資料庫作為目前的catalog和資料庫,是以對于catalog和資料庫這兩個辨別屬性是可選的,即如果不指定,預設使用的是“defaykt_catalog”和 “default_database”。
具體說明見 Flink官網
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIwczX0xiRGZkRGZ0Xy9GbvNGL2EzXlpXazxydFRUT5hjRaBHdyglasJjWoFjMMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnLyADNwMDNyATMyETMwEjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
TableEnvironment tEnv = ...; tEnv.useCatalog("default_catalog");//設定catalog tEnv.useDatabase("default_database");//設定資料庫 Table table = ...; // 注冊一個名為exampleView的視圖,catalog名為default_catalog // 資料庫的名為default_database tableEnv.createTemporaryView("exampleView", table); // 注冊一個名為exampleView的視圖,catalog的名為default_catalog // 資料庫的名為other_database tableEnv.createTemporaryView("other_database.exampleView", table); // 注冊一個名為'View'的視圖,catalog的名稱為default_catalog // 資料庫的名為default_database,'View'是保留關鍵字,需要使用``(反引号) tableEnv.createTemporaryView("`View`", table); // 注冊一個名為example.View的視圖,catalog的名為default_catalog, // 資料庫名為example tableEnv.createTemporaryView("`example.View`", table); // 注冊一個名為'exampleView'的視圖, catalog的名為'other_catalog' // 資料庫名為other_database' tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table);
查詢表
Table API
Table API是一個內建Scala與Java語言的查詢API,與SQL相比,它的查詢不是一個标準的SQL語句,而是由一步一步的操作組成的。如下展示了一個使用Table API實作一個簡單的聚合查詢。
// 擷取TableEnvironment TableEnvironment tableEnv = ...; //注冊Orders表 // 查詢注冊的表 Table orders = tableEnv.from("Orders"); // 計算操作 Table revenue = orders .filter("cCountry === 'FRANCE'") .groupBy("cID, cName") .select("cID, cName, revenue.sum AS revSum");
SQL
Flink SQL依賴于Apache Calcite,其實作了标準的SQL文法,如下案例:
// 擷取TableEnvironment TableEnvironment tableEnv = ...; //注冊Orders表 // 計算邏輯同上面的Table API Table revenue = tableEnv.sqlQuery( "SELECT cID, cName, SUM(revenue) AS revSum " + "FROM Orders " + "WHERE cCountry = 'FRANCE' " + "GROUP BY cID, cName" ); // 注冊"RevenueFrance"外部輸出表 // 計算結果插入"RevenueFrance"表 tableEnv.sqlUpdate( "INSERT INTO RevenueFrance " + "SELECT cID, cName, SUM(revenue) AS revSum " + "FROM Orders " + "WHERE cCountry = 'FRANCE' " + "GROUP BY cID, cName" );
輸出表
一個表通過将其寫入到TableSink,然後進行輸出。TableSink是一個通用的支援多種檔案格式(CSV、Parquet, Avro)和多種外部存儲系統(JDBC, Apache HBase, Apache Cassandra, Elasticsearch)以及多種消息對列(Apache Kafka, RabbitMQ)的接口。
批處理的表隻能被寫入到
BatchTableSink
,流處理的表需要指明 AppendStreamTableSink、RetractStreamTableSink 或者
UpsertStreamTableSink
// 擷取TableEnvironment TableEnvironment tableEnv = ...; // 建立輸出表 final Schema schema = new Schema() .field("a", DataTypes.INT()) .field("b", DataTypes.STRING()) .field("c", DataTypes.LONG()); tableEnv.connect(new FileSystem("/path/to/file")) .withFormat(new Csv().fieldDelimiter('|').deriveSchema()) .withSchema(schema) .createTemporaryTable("CsvSinkTable"); // 計算結果表 Table result = ... // 輸出結果表到注冊的TableSink result.insertInto("CsvSinkTable");
Table API & SQL底層的轉換與執行
上文提到了Flink提供了兩種planner,分别為old planner和Blink planner,對于不同的planner而言,Table API & SQL底層的執行與轉換是有所不同的。
Old planner
根據是流處理作業還是批處理作業,Table API &SQL會被轉換成DataStream或者DataSet程式。一個查詢在内部表示為一個邏輯查詢計劃,會被轉換為兩個階段:
- 1.邏輯查詢計劃優化
- 2.轉換成DataStream或者DataSet程式
上面的兩個階段隻有下面的操作被執行時才會被執行:
- 當一個表被輸出到TableSink時,比如調用了Table.insertInto()方法
- 當執行更新查詢時,比如調用TableEnvironment.sqlUpdate()方法
- 當一個表被轉換為DataStream或者DataSet時
一旦執行上述兩個階段,Table API & SQL的操作會被看做是普通的DataStream或者DataSet程式,是以當
StreamExecutionEnvironment.execute()
或者
ExecutionEnvironment.execute()
被調用時,會執行轉換後的程式。
Blink planner
無論是批處理作業還是流處理作業,如果使用的是Blink planner,底層都會被轉換為DataStream程式。在一個查詢在内部表示為一個邏輯查詢計劃,會被轉換成兩個階段:
- 1.邏輯查詢計劃優化
- 2.轉換成DataStream程式
對于
TableEnvironment
and
StreamTableEnvironment
而言,一個查詢的轉換是不同的
首先對于TableEnvironment,當TableEnvironment.execute()方法執行時,Table API & SQL的查詢才會被轉換,因為TableEnvironment會将多個sink優化為一個DAG。
對于StreamTableEnvironment,轉換發生的時間與old planner相同。
與DataStream & DataSet API內建
對于Old planner與Blink planner而言,隻要是流處理的操作,都可以與DataStream API內建,僅僅隻有Old planner才可以與DataSet API內建,由于Blink planner的批處理作業會被轉換成DataStream程式,是以不能夠與DataSet API內建。值得注意的是,下面提到的table與DataSet之間的轉換僅适用于Old planner。
Table API & SQL的查詢很容易與DataStream或者DataSet程式內建,并可以将Table API & SQL的查詢嵌入DataStream或者DataSet程式中。DataStream或者DataSet可以轉換成表,反之,表也可以被轉換成DataStream或者DataSet。
從DataStream或者DataSet中注冊臨時表(視圖)
提示:隻能将DataStream或者DataSet轉換為臨時表(視圖),之後可以使用 SQL API 進行查詢操作。
下面示範DataStream的轉換,對于DataSet的轉換類似。
// 擷取StreamTableEnvironment StreamTableEnvironment tableEnv = ...; DataStream<Tuple2<Long, String>> stream = ... // 将DataStream注冊為一個名為myTable的視圖,其中字段分别為"f0", "f1" tableEnv.createTemporaryView("myTable", stream); // 将DataStream注冊為一個名為myTable2的視圖,其中字段分别為"myLong", "myString" tableEnv.createTemporaryView("myTable2", stream, "myLong, myString");
将DataStream或者DataSet轉化為Table對象
可以直接将DataStream或者DataSet轉換為Table對象,之後可以使用Table API進行查詢操作。下面示範DataStream的轉換,對于DataSet的轉換類似。
// 擷取StreamTableEnvironment StreamTableEnvironment tableEnv = ...; DataStream<Tuple2<Long, String>> stream = ... // 将DataStream轉換為Table對象,預設的字段為"f0", "f1" Table table1 = tableEnv.fromDataStream(stream); // 将DataStream轉換為Table對象,預設的字段為"myLong", "myString" Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");
将表轉換為DataStream或者DataSet
當将Table轉為DataStream或者DataSet時,需要指定DataStream或者DataSet的資料類型。通常最友善的資料類型是row類型,Flink提供了很多的資料類型供使用者選擇,具體包括Row、POJO、樣例類、Tuple和原子類型。
将表轉換為DataStream
一個流處理查詢的結果是動态變化的,是以将表轉為DataStream時需要指定一個更新模式,共有兩種模式:Append Mode和Retract Mode。
- Append Mode
如果動态表僅隻有Insert操作,即之前輸出的結果不會被更新,則使用該模式。如果更新或删除操作使用追加模式會失敗報錯
- Retract Mode
始終可以使用此模式。傳回值是boolean類型。它用true或false來标記資料的插入和撤回,傳回true代表資料插入,false代表資料的撤回。
// 擷取StreamTableEnvironment. StreamTableEnvironment tableEnv = ...; // 包含兩個字段的表(String name, Integer age) Table table = ... // 将表轉為DataStream,使用Append Mode追加模式,資料類型為Row DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class); // 将表轉為DataStream,使用Append Mode追加模式,資料類型為定義好的TypeInformation TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>( Types.STRING(), Types.INT()); DataStream<Tuple2<String, Integer>> dsTuple = tableEnv.toAppendStream(table, tupleType); // 将表轉為DataStream,使用的模式為Retract Mode撤回模式,類型為Row // 對于轉換後的DataStream<Tuple2<Boolean, X>>,X表示流的資料類型, // boolean值表示資料改變的類型,其中INSERT傳回true,DELETE傳回的是false DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class);
将表轉換為DataSet
// 擷取BatchTableEnvironment BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env); // 包含兩個字段的表(String name, Integer age) Table table = ... // 将表轉為DataSet資料類型為Row DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class); // 将表轉為DataSet,通過TypeInformation定義Tuple2<String, Integer>資料類型 TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>( Types.STRING(), Types.INT()); DataSet<Tuple2<String, Integer>> dsTuple = tableEnv.toDataSet(table, tupleType);
表的Schema與資料類型之間的映射
表的Schema與資料類型之間的映射有兩種方式:分别是基于字段下标位置的映射和基于字段名稱的映射。
基于字段下标位置的映射
該方式是按照字段的順序進行一一映射,使用方式如下:
// 擷取StreamTableEnvironment StreamTableEnvironment tableEnv = ...; DataStream<Tuple2<Long, Integer>> stream = ... // 将DataStream轉為表,預設的字段名為"f0"和"f1" Table table = tableEnv.fromDataStream(stream); // 将DataStream轉為表,選取tuple的第一個元素,指定一個名為"myLong"的字段名 Table table = tableEnv.fromDataStream(stream, "myLong"); // 将DataStream轉為表,為tuple的第一個元素指定名為"myLong",為第二個元素指定myInt的字段名 Table table = tableEnv.fromDataStream(stream, "myLong, myInt");
基于字段名稱的映射
基于字段名稱的映射方式支援任意的資料類型包括POJO類型,可以很靈活地定義表Schema映射,所有的字段被映射成一個具體的字段名稱,同時也可以使用”as”為字段起一個别名。其中Tuple元素的第一個元素為f0,第二個元素為f1,以此類推。
// 擷取StreamTableEnvironment StreamTableEnvironment tableEnv = ...; DataStream<Tuple2<Long, Integer>> stream = ... // 将DataStream轉為表,預設的字段名為"f0"和"f1" Table table = tableEnv.fromDataStream(stream); // 将DataStream轉為表,選擇tuple的第二個元素,指定一個名為"f1"的字段名 Table table = tableEnv.fromDataStream(stream, "f1"); // 将DataStream轉為表,交換字段的順序 Table table = tableEnv.fromDataStream(stream, "f1, f0"); // 将DataStream轉為表,交換字段的順序,并為f1起别名為"myInt",為f0起别名為"myLong Table table = tableEnv.fromDataStream(stream, "f1 as myInt, f0 as myLong");
原子類型
Flink将
Integer
,
Double
,
String
或者普通的類型稱之為原子類型,一個資料類型為原子類型的DataStream或者DataSet可以被轉成單個字段屬性的表,這個字段的類型與DataStream或者DataSet的資料類型一緻,這個字段的名稱可以進行指定。
//擷取StreamTableEnvironment StreamTableEnvironment tableEnv = ...; // 資料類型為原子類型Long DataStream<Long> stream = ... // 将DataStream轉為表,預設的字段名為"f0" Table table = tableEnv.fromDataStream(stream); // 将DataStream轉為表,指定字段名為myLong" Table table = tableEnv.fromDataStream(stream, "myLong");
Tuple類型
Tuple類型的DataStream或者DataSet都可以轉為表,可以重新設定表的字段名(即根據tuple元素的位置進行一一映射,轉為表之後,每個元素都有一個别名),如果不為字段指定名稱,則使用預設的名稱(java語言預設的是f0,f1,scala預設的是_1),使用者也可以重新排列字段的順序,并為每個字段起一個别名。
// 擷取StreamTableEnvironment StreamTableEnvironment tableEnv = ...; //Tuple2<Long, String>類型的DataStream DataStream<Tuple2<Long, String>> stream = ... // 将DataStream轉為表,預設的字段名為 "f0", "f1" Table table = tableEnv.fromDataStream(stream); // 将DataStream轉為表,指定字段名為 "myLong", "myString"(按照Tuple元素的順序位置) Table table = tableEnv.fromDataStream(stream, "myLong, myString"); // 将DataStream轉為表,指定字段名為 "f0", "f1",并且交換順序 Table table = tableEnv.fromDataStream(stream, "f1, f0"); // 将DataStream轉為表,隻選擇Tuple的第二個元素,指定字段名為"f1" Table table = tableEnv.fromDataStream(stream, "f1"); // 将DataStream轉為表,為Tuple的第二個元素指定别名為myString,為第一個元素指定字段名為myLong Table table = tableEnv.fromDataStream(stream, "f1 as 'myString', f0 as 'myLong'");
POJO類型
當将POJO類型的DataStream或者DataSet轉為表時,如果不指定表名,則預設使用的是POJO字段本身的名稱,原始字段名稱的映射需要指定原始字段的名稱,可以為其起一個别名,也可以調換字段的順序,也可以隻選擇部分的字段。
// 擷取StreamTableEnvironment StreamTableEnvironment tableEnv = ...; //資料類型為Person的POJO類型,字段包括"name"和"age" DataStream<Person> stream = ... // 将DataStream轉為表,預設的字段名稱為"age", "name" Table table = tableEnv.fromDataStream(stream); // 将DataStream轉為表,為"age"字段指定别名myAge, 為"name"字段指定别名myName Table table = tableEnv.fromDataStream(stream, "age as myAge, name as myName"); // 将DataStream轉為表,隻選擇一個name字段 Table table = tableEnv.fromDataStream(stream, "name"); // 将DataStream轉為表,隻選擇一個name字段,并起一個别名myName Table table = tableEnv.fromDataStream(stream, "name as myName");
Row類型
Row類型的DataStream或者DataSet轉為表的過程中,可以根據字段的位置或者字段名稱進行映射,同時也可以為字段起一個别名,或者隻選擇部分字段。
// 擷取StreamTableEnvironment StreamTableEnvironment tableEnv = ...; // Row類型的DataStream,通過RowTypeInfo指定兩個字段"name"和"age" DataStream<Row> stream = ... // 将DataStream轉為表,預設的字段名為原始字段名"name"和"age" Table table = tableEnv.fromDataStream(stream); // 将DataStream轉為表,根據位置映射,為第一個字段指定myName别名,為第二個字段指定myAge别名 Table table = tableEnv.fromDataStream(stream, "myName, myAge"); // 将DataStream轉為表,根據字段名映射,為name字段起别名myName,為age字段起别名myAge Table table = tableEnv.fromDataStream(stream, "name as myName, age as myAge"); // 将DataStream轉為表,根據字段名映射,隻選擇name字段 Table table = tableEnv.fromDataStream(stream, "name"); // 将DataStream轉為表,根據字段名映射,隻選擇name字段,并起一個别名"myName" Table table = tableEnv.fromDataStream(stream, "name as myName");
查詢優化
Old planner
Apache Flink利用Apache Calcite來優化和轉換查詢。目前執行的優化包括投影和過濾器下推,去相關子查詢以及其他類型的查詢重寫。Old Planner目前不支援優化JOIN的順序,而是按照查詢中定義的順序執行它們。
通過提供一個
CalciteConfig
對象,可以調整在不同階段應用的優化規則集。這可通過調用
CalciteConfig.createBuilder()
方法來進行建立,并通過調用
tableEnv.getConfig.setPlannerConfig(calciteConfig)
方法将該對象傳遞給TableEnvironment。
Blink planner
Apache Flink利用并擴充了Apache Calcite來執行複雜的查詢優化。這包括一系列基于規則和基于成本的優化(cost_based),例如:
- 基于Apache Calcite的取相關子查詢
- 投影裁剪
- 分區裁剪
- 過濾器謂詞下推
- 過濾器下推
- 子計劃重複資料删除以避免重複計算
- 特殊的子查詢重寫,包括兩個部分:
- 将IN和EXISTS轉換為左半聯接( left semi-join)
- 将NOT IN和NOT EXISTS轉換為left anti-join
- 調整join的順序,需要啟用
table.optimizer.join-reorder-enabled
注意: IN / EXISTS / NOT IN / NOT EXISTS目前僅在子查詢重寫的結合條件下受支援。
查詢優化器不僅基于計劃,而且還可以基于資料源的統計資訊以及每個操作的細粒度開銷(例如io,cpu,網絡和記憶體),進而做出更加明智且合理的優化決策。
進階使用者可以通過
CalciteConfig
對象提供自定義優化規則,通過調用tableEnv.getConfig.setPlannerConfig(calciteConfig),将參數傳遞給TableEnvironment。
檢視執行計劃
SQL語言支援通過explain來檢視某條SQL的執行計劃,Flink Table API也可以通過調用explain()方法來檢視具體的執行計劃。該方法傳回一個字元串用來描述三個部分計劃,分别為:
- 關系查詢的抽象文法樹,即未優化的邏輯查詢計劃,
- 優化的邏輯查詢計劃
- 實際執行計劃
小結
本文主要介紹了Flink TableAPI &SQL,首先介紹了Flink Table API &SQL的基本概念 ,然後介紹了建構Flink Table API & SQL程式所需要的依賴,接着介紹了Flink的兩種planner,還介紹了如何系統資料庫以及DataStream、DataSet與表的互相轉換,最後介紹了Flink的兩種planner對應的查詢優化并給出了一個檢視執行計劃的案例。