天天看點

Flink Table API & SQL程式設計指南(1)

本文來自:https://jiamaoxiang.top/2020/05/25/Flink-Table-API-SQL%E7%BC%96%E7%A8%8B%E6%8C%87%E5%8D%97/

概述

Apache Flink提供了兩種頂層的關系型API,分别為Table API和SQL,Flink通過Table API&SQL實作了批流統一。其中Table API是用于Scala和Java的語言內建查詢API,它允許以非常直覺的方式組合關系運算符(例如select,where和join)的查詢。Flink SQL基于Apache Calcite 實作了标準的SQL,使用者可以使用标準的SQL處理資料集。Table API和SQL與Flink的DataStream和DataSet API緊密內建在一起,使用者可以實作互相轉化,比如可以将DataStream或者DataSet注冊為table進行操作資料。值得注意的是,Table API and SQL目前尚未完全完善,還在積極的開發中,是以并不是所有的算子操作都可以通過其實作。

依賴

從Flink1.9開始,Flink為Table & SQL API提供了兩種planner,分别為Blink planner和old planner,其中old planner是在Flink1.9之前的版本使用。主要差別如下:

提示:對于生産環境,目前推薦使用old planner.

  • flink-table-common

    : 通用子產品,包含 Flink Planner 和 Blink Planner 一些共用的代碼
  • flink-table-api-java

    : java語言的Table & SQL API,僅針對table(處于早期的開發階段,不推薦使用)
  • flink-table-api-scala

    : scala語言的Table & SQL API,僅針對table(處于早期的開發階段,不推薦使用)
  • flink-table-api-java-bridge

    : java語言的Table & SQL API,支援DataStream/DataSet API(推薦使用)
  • flink-table-api-scala-bridge

    : scala語言的Table & SQL API,支援DataStream/DataSet API(推薦使用)
  • flink-table-planner

    :planner 和runtime. planner為Flink1,9之前的old planner(推薦使用)
  • flink-table-planner-blink

    : 新的Blink planner.
  • flink-table-runtime-blink

    : 新的Blink runtime.
  • flink-table-uber

    : 将上述的API子產品及old planner打成一個jar包,形如flink-table-*.jar,位與/lib目錄下
  • flink-table-uber-blink

    :将上述的API子產品及Blink 子產品打成一個jar包,形如fflink-table-blink-*.jar,位與/lib目錄下

Blink planner & old planner

Blink planner和old planner有許多不同的特點,具體列舉如下:

  • Blink planner将批處理作業看做是流處理作業的特例。是以,不支援Table 與DataSet之間的轉換,批處理的作業也不會被轉成DataSet程式,而是被轉為DataStream程式。
  • Blink planner不支援 

    BatchTableSource

    ,使用的是有界的StreamTableSource。
  • Blink planner僅支援新的 

    Catalog

    ,不支援

    ExternalCatalog

     (已過時)。
  • 對于FilterableTableSource的實作,兩種Planner是不同的。old planner會謂詞下推到

    PlannerExpression

    (未來會被移除),而Blink planner 會謂詞下推到 

    Expression

    (表示一個産生計算結果的邏輯樹)。
  • 僅僅Blink planner支援key-value形式的配置,即通過Configuration進行參數設定。
  • 關于PlannerConfig的實作,兩種planner有所不同。
  • Blink planner 會将多個sink優化成一個DAG(僅支援TableEnvironment,StreamTableEnvironment不支援),old planner總是将每一個sink優化成一個新的DAG,每一個DAG都是互相獨立的。
  • old planner不支援catalog統計,Blink planner支援catalog統計。

Flink Table & SQL程式的pom依賴

根據使用的語言不同,可以選擇下面的依賴,包括scala版和java版,如下:

<!-- java版 -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_2.11</artifactId>
  <version>1.11.3</version>
  <scope>provided</scope>
</dependency>
<!-- scala版 -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
  <version>1.11.3</version>
  <scope>provided</scope>
</dependency>
           

除此之外,如果需要在本地的IDE中運作Table API & SQL的程式,則需要添加下面的pom依賴:

<!-- Flink 1.9之前的old planner -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner_2.11</artifactId>
  <version>1.11.3</version>
  <scope>provided</scope>
</dependency>
<!-- 新的Blink planner -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner-blink_2.11</artifactId>
  <version>1.11.3</version>
  <scope>provided</scope>
</dependency>
           

另外,如果需要實作自定義的格式(比如和kafka互動)或者使用者自定義函數,需要添加如下依賴:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-common</artifactId>
  <version>1.11.3</version>
  <scope>provided</scope>
</dependency>
           

Table API & SQL的程式設計模闆

所有的Table API&SQL的程式(無論是批處理還是流處理)都有着相同的形式,下面将給出通用的程式設計結構形式:

// 建立一個TableEnvironment對象,指定planner、處理模式(batch、streaming)
TableEnvironment tableEnv = ...; 
// 建立一個表
tableEnv.connect(...).createTemporaryTable("table1");
// 注冊一個外部的表
tableEnv.connect(...).createTemporaryTable("outputTable");
// 通過Table API的查詢建立一個Table 對象
Table tapiResult = tableEnv.from("table1").select(...);
// 通過SQL查詢的查詢建立一個Table 對象
Table sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table1 ... ");
// 将結果寫入TableSink
tapiResult.insertInto("outputTable");
// 執行
tableEnv.execute("java_job");
           

注意:Table API & SQL的查詢可以互相內建,另外還可以在DataStream或者DataSet中使用Table API & SQL的API,實作DataStreams、 DataSet與Table之間的互相轉換。具體參見 Flink 中 DataStream / DataSet 與 Table 的互相轉換

建立TableEnvironment

TableEnvironment是Table API & SQL程式的一個入口,主要包括如下的功能:

  • 在内部的catalog中注冊Table
  • 注冊catalog
  • 加載可插拔子產品
  • 執行SQL查詢
  • 注冊使用者定義函數
  • DataStream

     、

    DataSet

    與Table之間的互相轉換
  • 持有對

    ExecutionEnvironment

     、

    StreamExecutionEnvironment

    的引用

一個Table必定屬于一個具體的TableEnvironment,不可以将不同TableEnvironment的表放在一起使用(比如join,union等操作)。

TableEnvironment是通過調用 

BatchTableEnvironment.create()

 或者StreamTableEnvironment.create()的靜态方法進行建立的。另外,預設兩個planner的jar包都存在與classpath下,所有需要明确指定使用的planner。

// **********************
// FLINK 流處理查詢
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;

EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
//或者TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);

// ******************
// FLINK 批處理查詢
// ******************
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;

ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);

// **********************
// BLINK 流處理查詢
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;

StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
// 或者 TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);

// ******************
// BLINK 批處理查詢
// ******************
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
           

在catalog中建立表

臨時表與永久表

表可以分為臨時表和永久表兩種,其中永久表需要一個catalog(比如Hive的Metastore)來維護表的中繼資料資訊,一旦永久表被建立,隻要連接配接到該catalog就可以通路該表,隻有顯示删除永久表,該表才可以被删除。臨時表的生命周期是Flink Session,這些表不能夠被其他的Flink Session通路,這些表不屬于任何的catalog或者資料庫,如果與臨時表相對應的資料庫被删除了,該臨時表也不會被删除。

建立表

虛表(Virtual Tables)

一個Table對象相當于SQL中的視圖(虛表),它封裝了一個邏輯執行計劃,可以通過一個catalog建立,具體如下:

// 擷取一個TableEnvironment

TableEnvironment tableEnv = ...; 

// table對象,查詢的結果集 

Table projTable = tableEnv.from("X").select(...); 

// 注冊一個表,名稱為 "projectedTable" 

tableEnv.createTemporaryView("projectedTable", projTable);

外部資料源表(Connector Tables)

可以把外部的資料源注冊成表,比如可以讀取MySQL資料庫資料、Kafka資料等

tableEnvironment
  .connect(...)
  .withFormat(...)
  .withSchema(...)
  .inAppendMode()
  .createTemporaryTable("MyTable")
           

擴充建立表的辨別屬性

表的注冊總是包含三部分辨別屬性:catalog、資料庫、表名。使用者可以在内部設定一個catalog和一個資料庫作為目前的catalog和資料庫,是以對于catalog和資料庫這兩個辨別屬性是可選的,即如果不指定,預設使用的是“defaykt_catalog”和 “default_database”。

具體說明見 Flink官網

Flink Table API &amp; SQL程式設計指南(1)
TableEnvironment tEnv = ...;
tEnv.useCatalog("default_catalog");//設定catalog
tEnv.useDatabase("default_database");//設定資料庫
Table table = ...;
// 注冊一個名為exampleView的視圖,catalog名為default_catalog
// 資料庫的名為default_database
tableEnv.createTemporaryView("exampleView", table);

// 注冊一個名為exampleView的視圖,catalog的名為default_catalog
// 資料庫的名為other_database
tableEnv.createTemporaryView("other_database.exampleView", table);
 
// 注冊一個名為'View'的視圖,catalog的名稱為default_catalog
// 資料庫的名為default_database,'View'是保留關鍵字,需要使用``(反引号)
tableEnv.createTemporaryView("`View`", table);

// 注冊一個名為example.View的視圖,catalog的名為default_catalog,
// 資料庫名為example
tableEnv.createTemporaryView("`example.View`", table);

// 注冊一個名為'exampleView'的視圖, catalog的名為'other_catalog'
// 資料庫名為other_database' 
tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table);
           

查詢表

Table API

Table API是一個內建Scala與Java語言的查詢API,與SQL相比,它的查詢不是一個标準的SQL語句,而是由一步一步的操作組成的。如下展示了一個使用Table API實作一個簡單的聚合查詢。

// 擷取TableEnvironment
TableEnvironment tableEnv = ...;
//注冊Orders表

// 查詢注冊的表
Table orders = tableEnv.from("Orders");
// 計算操作
Table revenue = orders
  .filter("cCountry === 'FRANCE'")
  .groupBy("cID, cName")
  .select("cID, cName, revenue.sum AS revSum");
           

SQL

Flink SQL依賴于Apache Calcite,其實作了标準的SQL文法,如下案例:

// 擷取TableEnvironment
TableEnvironment tableEnv = ...;

//注冊Orders表

// 計算邏輯同上面的Table API
Table revenue = tableEnv.sqlQuery(
    "SELECT cID, cName, SUM(revenue) AS revSum " +
    "FROM Orders " +
    "WHERE cCountry = 'FRANCE' " +
    "GROUP BY cID, cName"
  );

// 注冊"RevenueFrance"外部輸出表
// 計算結果插入"RevenueFrance"表
tableEnv.sqlUpdate(
    "INSERT INTO RevenueFrance " +
    "SELECT cID, cName, SUM(revenue) AS revSum " +
    "FROM Orders " +
    "WHERE cCountry = 'FRANCE' " +
    "GROUP BY cID, cName"
  );
           

輸出表

一個表通過将其寫入到TableSink,然後進行輸出。TableSink是一個通用的支援多種檔案格式(CSV、Parquet, Avro)和多種外部存儲系統(JDBC, Apache HBase, Apache Cassandra, Elasticsearch)以及多種消息對列(Apache Kafka, RabbitMQ)的接口。

批處理的表隻能被寫入到 

BatchTableSink

,流處理的表需要指明 AppendStreamTableSink、RetractStreamTableSink 或者 

UpsertStreamTableSink

// 擷取TableEnvironment
TableEnvironment tableEnv = ...;

// 建立輸出表
final Schema schema = new Schema()
    .field("a", DataTypes.INT())
    .field("b", DataTypes.STRING())
    .field("c", DataTypes.LONG());

tableEnv.connect(new FileSystem("/path/to/file"))
    .withFormat(new Csv().fieldDelimiter('|').deriveSchema())
    .withSchema(schema)
    .createTemporaryTable("CsvSinkTable");

// 計算結果表
Table result = ...
// 輸出結果表到注冊的TableSink
result.insertInto("CsvSinkTable");
           

Table API & SQL底層的轉換與執行

上文提到了Flink提供了兩種planner,分别為old planner和Blink planner,對于不同的planner而言,Table API & SQL底層的執行與轉換是有所不同的。

Old planner

根據是流處理作業還是批處理作業,Table API &SQL會被轉換成DataStream或者DataSet程式。一個查詢在内部表示為一個邏輯查詢計劃,會被轉換為兩個階段:

  • 1.邏輯查詢計劃優化
  • 2.轉換成DataStream或者DataSet程式

上面的兩個階段隻有下面的操作被執行時才會被執行:

  • 當一個表被輸出到TableSink時,比如調用了Table.insertInto()方法
  • 當執行更新查詢時,比如調用TableEnvironment.sqlUpdate()方法
  • 當一個表被轉換為DataStream或者DataSet時

一旦執行上述兩個階段,Table API & SQL的操作會被看做是普通的DataStream或者DataSet程式,是以當

StreamExecutionEnvironment.execute()

或者

ExecutionEnvironment.execute()

 被調用時,會執行轉換後的程式。

Blink planner

無論是批處理作業還是流處理作業,如果使用的是Blink planner,底層都會被轉換為DataStream程式。在一個查詢在内部表示為一個邏輯查詢計劃,會被轉換成兩個階段:

  • 1.邏輯查詢計劃優化
  • 2.轉換成DataStream程式

對于

TableEnvironment

 and 

StreamTableEnvironment

而言,一個查詢的轉換是不同的

首先對于TableEnvironment,當TableEnvironment.execute()方法執行時,Table API & SQL的查詢才會被轉換,因為TableEnvironment會将多個sink優化為一個DAG。

對于StreamTableEnvironment,轉換發生的時間與old planner相同。

與DataStream & DataSet API內建

對于Old planner與Blink planner而言,隻要是流處理的操作,都可以與DataStream API內建,僅僅隻有Old planner才可以與DataSet API內建,由于Blink planner的批處理作業會被轉換成DataStream程式,是以不能夠與DataSet API內建。值得注意的是,下面提到的table與DataSet之間的轉換僅适用于Old planner。

Table API & SQL的查詢很容易與DataStream或者DataSet程式內建,并可以将Table API & SQL的查詢嵌入DataStream或者DataSet程式中。DataStream或者DataSet可以轉換成表,反之,表也可以被轉換成DataStream或者DataSet。

從DataStream或者DataSet中注冊臨時表(視圖)

提示:隻能将DataStream或者DataSet轉換為臨時表(視圖),之後可以使用 SQL API 進行查詢操作。

下面示範DataStream的轉換,對于DataSet的轉換類似。

// 擷取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
DataStream<Tuple2<Long, String>> stream = ...
// 将DataStream注冊為一個名為myTable的視圖,其中字段分别為"f0", "f1"
tableEnv.createTemporaryView("myTable", stream);
// 将DataStream注冊為一個名為myTable2的視圖,其中字段分别為"myLong", "myString"
tableEnv.createTemporaryView("myTable2", stream, "myLong, myString");
           

将DataStream或者DataSet轉化為Table對象

可以直接将DataStream或者DataSet轉換為Table對象,之後可以使用Table API進行查詢操作。下面示範DataStream的轉換,對于DataSet的轉換類似。

// 擷取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
DataStream<Tuple2<Long, String>> stream = ...
// 将DataStream轉換為Table對象,預設的字段為"f0", "f1"
Table table1 = tableEnv.fromDataStream(stream);
// 将DataStream轉換為Table對象,預設的字段為"myLong", "myString"
Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");
           

将表轉換為DataStream或者DataSet

當将Table轉為DataStream或者DataSet時,需要指定DataStream或者DataSet的資料類型。通常最友善的資料類型是row類型,Flink提供了很多的資料類型供使用者選擇,具體包括Row、POJO、樣例類、Tuple和原子類型。

将表轉換為DataStream

一個流處理查詢的結果是動态變化的,是以将表轉為DataStream時需要指定一個更新模式,共有兩種模式:Append Mode和Retract Mode。

  • Append Mode

如果動态表僅隻有Insert操作,即之前輸出的結果不會被更新,則使用該模式。如果更新或删除操作使用追加模式會失敗報錯

  • Retract Mode

始終可以使用此模式。傳回值是boolean類型。它用true或false來标記資料的插入和撤回,傳回true代表資料插入,false代表資料的撤回。

// 擷取StreamTableEnvironment. 
StreamTableEnvironment tableEnv = ...; 
// 包含兩個字段的表(String name, Integer age)
Table table = ...
// 将表轉為DataStream,使用Append Mode追加模式,資料類型為Row
DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);
// 将表轉為DataStream,使用Append Mode追加模式,資料類型為定義好的TypeInformation
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
  Types.STRING(),
  Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple = 
  tableEnv.toAppendStream(table, tupleType);
// 将表轉為DataStream,使用的模式為Retract Mode撤回模式,類型為Row
// 對于轉換後的DataStream<Tuple2<Boolean, X>>,X表示流的資料類型,
// boolean值表示資料改變的類型,其中INSERT傳回true,DELETE傳回的是false
DataStream<Tuple2<Boolean, Row>> retractStream = 
  tableEnv.toRetractStream(table, Row.class);
           

将表轉換為DataSet

// 擷取BatchTableEnvironment
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
// 包含兩個字段的表(String name, Integer age)
Table table = ...
// 将表轉為DataSet資料類型為Row
DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);
// 将表轉為DataSet,通過TypeInformation定義Tuple2<String, Integer>資料類型
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
  Types.STRING(),
  Types.INT());
DataSet<Tuple2<String, Integer>> dsTuple = 
  tableEnv.toDataSet(table, tupleType);
           

表的Schema與資料類型之間的映射

表的Schema與資料類型之間的映射有兩種方式:分别是基于字段下标位置的映射和基于字段名稱的映射。

基于字段下标位置的映射

該方式是按照字段的順序進行一一映射,使用方式如下:

// 擷取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
DataStream<Tuple2<Long, Integer>> stream = ...
// 将DataStream轉為表,預設的字段名為"f0"和"f1"
Table table = tableEnv.fromDataStream(stream);
// 将DataStream轉為表,選取tuple的第一個元素,指定一個名為"myLong"的字段名
Table table = tableEnv.fromDataStream(stream, "myLong");
// 将DataStream轉為表,為tuple的第一個元素指定名為"myLong",為第二個元素指定myInt的字段名
Table table = tableEnv.fromDataStream(stream, "myLong, myInt");
           

基于字段名稱的映射

基于字段名稱的映射方式支援任意的資料類型包括POJO類型,可以很靈活地定義表Schema映射,所有的字段被映射成一個具體的字段名稱,同時也可以使用”as”為字段起一個别名。其中Tuple元素的第一個元素為f0,第二個元素為f1,以此類推。

// 擷取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
DataStream<Tuple2<Long, Integer>> stream = ...
// 将DataStream轉為表,預設的字段名為"f0"和"f1"
Table table = tableEnv.fromDataStream(stream);
// 将DataStream轉為表,選擇tuple的第二個元素,指定一個名為"f1"的字段名
Table table = tableEnv.fromDataStream(stream, "f1");
// 将DataStream轉為表,交換字段的順序
Table table = tableEnv.fromDataStream(stream, "f1, f0");
// 将DataStream轉為表,交換字段的順序,并為f1起别名為"myInt",為f0起别名為"myLong
Table table = tableEnv.fromDataStream(stream, "f1 as myInt, f0 as myLong");
           

原子類型

Flink将

Integer

Double

String

或者普通的類型稱之為原子類型,一個資料類型為原子類型的DataStream或者DataSet可以被轉成單個字段屬性的表,這個字段的類型與DataStream或者DataSet的資料類型一緻,這個字段的名稱可以進行指定。

//擷取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
// 資料類型為原子類型Long
DataStream<Long> stream = ...
// 将DataStream轉為表,預設的字段名為"f0"
Table table = tableEnv.fromDataStream(stream);
// 将DataStream轉為表,指定字段名為myLong"
Table table = tableEnv.fromDataStream(stream, "myLong");
           

Tuple類型

Tuple類型的DataStream或者DataSet都可以轉為表,可以重新設定表的字段名(即根據tuple元素的位置進行一一映射,轉為表之後,每個元素都有一個别名),如果不為字段指定名稱,則使用預設的名稱(java語言預設的是f0,f1,scala預設的是_1),使用者也可以重新排列字段的順序,并為每個字段起一個别名。

// 擷取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
//Tuple2<Long, String>類型的DataStream
DataStream<Tuple2<Long, String>> stream = ...
// 将DataStream轉為表,預設的字段名為 "f0", "f1"
Table table = tableEnv.fromDataStream(stream);
// 将DataStream轉為表,指定字段名為 "myLong", "myString"(按照Tuple元素的順序位置)
Table table = tableEnv.fromDataStream(stream, "myLong, myString");
// 将DataStream轉為表,指定字段名為 "f0", "f1",并且交換順序
Table table = tableEnv.fromDataStream(stream, "f1, f0");
// 将DataStream轉為表,隻選擇Tuple的第二個元素,指定字段名為"f1"
Table table = tableEnv.fromDataStream(stream, "f1");
// 将DataStream轉為表,為Tuple的第二個元素指定别名為myString,為第一個元素指定字段名為myLong
Table table = tableEnv.fromDataStream(stream, "f1 as 'myString', f0 as 'myLong'");
           

POJO類型

當将POJO類型的DataStream或者DataSet轉為表時,如果不指定表名,則預設使用的是POJO字段本身的名稱,原始字段名稱的映射需要指定原始字段的名稱,可以為其起一個别名,也可以調換字段的順序,也可以隻選擇部分的字段。

// 擷取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
//資料類型為Person的POJO類型,字段包括"name"和"age"
DataStream<Person> stream = ...
// 将DataStream轉為表,預設的字段名稱為"age", "name"
Table table = tableEnv.fromDataStream(stream);
//  将DataStream轉為表,為"age"字段指定别名myAge, 為"name"字段指定别名myName
Table table = tableEnv.fromDataStream(stream, "age as myAge, name as myName");
//  将DataStream轉為表,隻選擇一個name字段
Table table = tableEnv.fromDataStream(stream, "name");
//  将DataStream轉為表,隻選擇一個name字段,并起一個别名myName
Table table = tableEnv.fromDataStream(stream, "name as myName");
           

Row類型

Row類型的DataStream或者DataSet轉為表的過程中,可以根據字段的位置或者字段名稱進行映射,同時也可以為字段起一個别名,或者隻選擇部分字段。

// 擷取StreamTableEnvironment
StreamTableEnvironment tableEnv = ...; 
// Row類型的DataStream,通過RowTypeInfo指定兩個字段"name"和"age"
DataStream<Row> stream = ...
// 将DataStream轉為表,預設的字段名為原始字段名"name"和"age"
Table table = tableEnv.fromDataStream(stream);
// 将DataStream轉為表,根據位置映射,為第一個字段指定myName别名,為第二個字段指定myAge别名
Table table = tableEnv.fromDataStream(stream, "myName, myAge");
// 将DataStream轉為表,根據字段名映射,為name字段起别名myName,為age字段起别名myAge
Table table = tableEnv.fromDataStream(stream, "name as myName, age as myAge");
// 将DataStream轉為表,根據字段名映射,隻選擇name字段
Table table = tableEnv.fromDataStream(stream, "name");
// 将DataStream轉為表,根據字段名映射,隻選擇name字段,并起一個别名"myName"
Table table = tableEnv.fromDataStream(stream, "name as myName");
           

查詢優化

Old planner

Apache Flink利用Apache Calcite來優化和轉換查詢。目前執行的優化包括投影和過濾器下推,去相關子查詢以及其他類型的查詢重寫。Old Planner目前不支援優化JOIN的順序,而是按照查詢中定義的順序執行它們。

通過提供一個

CalciteConfig

對象,可以調整在不同階段應用的優化規則集。這可通過調用

CalciteConfig.createBuilder()

方法來進行建立,并通過調用

tableEnv.getConfig.setPlannerConfig(calciteConfig)

方法将該對象傳遞給TableEnvironment。

Blink planner

Apache Flink利用并擴充了Apache Calcite來執行複雜的查詢優化。這包括一系列基于規則和基于成本的優化(cost_based),例如:

  • 基于Apache Calcite的取相關子查詢
  • 投影裁剪
  • 分區裁剪
  • 過濾器謂詞下推
  • 過濾器下推
  • 子計劃重複資料删除以避免重複計算
  • 特殊的子查詢重寫,包括兩個部分:
    • 将IN和EXISTS轉換為左半聯接( left semi-join)
    • 将NOT IN和NOT EXISTS轉換為left anti-join
  • 調整join的順序,需要啟用 

    table.optimizer.join-reorder-enabled

注意: IN / EXISTS / NOT IN / NOT EXISTS目前僅在子查詢重寫的結合條件下受支援。

查詢優化器不僅基于計劃,而且還可以基于資料源的統計資訊以及每個操作的細粒度開銷(例如io,cpu,網絡和記憶體),進而做出更加明智且合理的優化決策。

進階使用者可以通過

CalciteConfig

對象提供自定義優化規則,通過調用tableEnv.getConfig.setPlannerConfig(calciteConfig),将參數傳遞給TableEnvironment。

檢視執行計劃

SQL語言支援通過explain來檢視某條SQL的執行計劃,Flink Table API也可以通過調用explain()方法來檢視具體的執行計劃。該方法傳回一個字元串用來描述三個部分計劃,分别為:

  1. 關系查詢的抽象文法樹,即未優化的邏輯查詢計劃,
  2. 優化的邏輯查詢計劃
  3. 實際執行計劃

小結

本文主要介紹了Flink TableAPI &SQL,首先介紹了Flink Table API &SQL的基本概念 ,然後介紹了建構Flink Table API & SQL程式所需要的依賴,接着介紹了Flink的兩種planner,還介紹了如何系統資料庫以及DataStream、DataSet與表的互相轉換,最後介紹了Flink的兩種planner對應的查詢優化并給出了一個檢視執行計劃的案例。