本文為 Apache Flink 新版本重大功能特性解讀之 Flink SQL 系列文章的開篇,Flink SQL 系列文章由其核心貢獻者們分享,涵蓋基礎知識、實踐、調優、内部實作等各個方面,帶你由淺入深地全面了解 Flink SQL。
1. 發展曆程
今年的8月22日 Apache Flink 釋出了1.9.0 版本(下文簡稱1.9),在 Flink 1.9 中,Table 子產品迎來了核心架構的更新,引入了阿裡巴巴Blink團隊貢獻的諸多功能,本文對Table 子產品的架構進行梳理并介紹如何使用 Blink Planner。
Flink 的 Table 子產品 包括 Table API 和 SQL,Table API 是一種類SQL的API,通過Table API,使用者可以像操作表一樣操作資料,非常直覺和友善;SQL作為一種聲明式語言,有着标準的文法和規範,使用者可以不用關心底層實作即可進行資料的處理,非常易于上手,Flink Table API 和 SQL 的實作上有80%左右的代碼是公用的。作為一個流批統一的計算引擎,Flink 的 Runtime 層是統一的,但在 Flink 1.9 之前,Flink API 層 一直分為DataStream API 和 DataSet API, Table API & SQL 位于 DataStream API 和 DataSet API 之上。
Flink 1.8 Table 架構
在 Flink 1.8 架構裡,如果使用者需要同時流計算、批處理的場景下,使用者需要維護兩套業務代碼,開發人員也要維護兩套技術棧,非常不友善。 Flink 社群很早就設想過将批資料看作一個有界流資料,将批處理看作流計算的一個特例,進而實作流批統一,阿裡巴巴的 Blink 團隊在這方面做了大量的工作,已經實作了 Table API & SQL 層的流批統一。 幸運的是,阿裡巴巴已經将 Blink 開源回饋給 Flink 社群。為了實作 Flink 整個體系的流批統一,在結合 Blink 團隊的一些先行經驗的基礎上,Flink 社群的開發人員在多輪讨論後,基本敲定了Flink 未來的技術架構。
Flink 未來架構
在Flink 的未來架構中,DataSet API将被廢除,面向使用者的API隻有 DataStream API 和 Table API & SQL,在實作層,這兩個API共享相同的技術棧,使用統一的 DAG 資料結構來描述作業,使用統一的 StreamOperator 來編寫算子邏輯,以及使用統一的流式分布式執行引擎,實作徹底的流批統一。 這兩個API都提供流計算和批處理的功能,DataStream API 提供了更底層和更靈活的程式設計接口,使用者可以自行描述和編排算子,引擎不會做過多的幹涉和優化;Table API & SQL 則提供了直覺的Table API、标準的SQL支援,引擎會根據使用者的意圖來進行優化,并選擇最優的執行計劃。
2.Flink 1.9 Table 架構
Blink 的 Table 子產品的架構在開源時就已經實作了流批統一,向着 Flink 的未來架構邁進了第一步,走在了 Flink 社群前面。 是以在 Flink 1.9 合入 Blink Table 代碼時,為了保證 Flink Table 已有架構和 Blink Table的架構能夠并存并朝着 Flink 未來架構演進,社群的開發人員圍繞FLIP-32(FLIP 即 Flink Improvement Proposals,專門記錄一些對Flink 做較大修改的提議。FLIP-32是:Restructure flink-table for future contributions) 進行了重構和優化,進而使得 Flink Table 的新架構具備了流批統一的能力,可以說 Flink 1.9 是 Flink 向着流批徹底統一這個未來架構邁出的第一步。
Flink 1.9 Table 架構
在 Flink Table 的新架構中,有兩個查詢處理器:Flink Query Processor 和 Blink Query Processor,分别對應兩個Planner,我們稱之為 Old Planner 和 Blink Planner。查詢處理器是 Planner 的具體實作, 通過parser(解析器)、optimizer(優化器)、codegen(代碼生成技術)等流程将 Table API & SQL作業轉換成 Flink Runtime 可識别的 Transformation DAG (由Transformation組成的有向無環圖,表示作業的轉換邏輯),最終由 Flink Runtime 進行作業的排程和執行。
Flink 的查詢處理器針對流計算和批處理作業有不同的分支處理,流計算作業底層的 API 是 DataStream API, 批處理作業底層的 API 是 DataSet API;而 Blink 的查詢處理器則實作流批作業接口的統一,底層的 API 都是Transformation。
3.Flink Planner 與 Blink Planner
Flink Table 的新架構實作了查詢處理器的插件化,社群完整保留原有 Flink Planner (Old Planner),同時又引入了新的 Blink Planner,使用者可以自行選擇使用 Old Planner 還是 Blink Planner。
在模型上,Old Planner 沒有考慮流計算作業和批處理作業的統一,針對流計算作業和批處理作業的實作不盡相同,在底層會分别翻譯到 DataStream API 和 DataSet API 上。而 Blink Planner 将批資料集看作 bounded DataStream (有界流式資料) ,流計算作業和批處理作業最終都會翻譯到 Transformation API 上。 在架構上,Blink Planner 針對批處理和流計算,分别實作了BatchPlanner 和 StreamPlanner ,兩者共用了大部分代碼,共享了很多優化邏輯。 Old Planner 針對批處理和流計算的代碼實作的是完全獨立的兩套體系,基本沒有實作代碼和優化邏輯複用。
除了模型和架構上的優點外,Blink Planner 在阿裡巴巴集團内部的海量業務場景下沉澱了許多實用功能,集中在三個方面:
- Blink Planner 對代碼生成機制做了改進、對部分算子進行了優化,提供了豐富實用的新功能,如維表 join、Top N、MiniBatch、流式去重、聚合場景的資料傾斜優化等新功能。
- Blink Planner 的優化政策是基于公共子圖的優化算法,包含了基于成本的優化(CBO)和基于規則的優化(CRO)兩種政策,優化更為全面。同時,Blink Planner 支援從 catalog 中擷取資料源的統計資訊,這對CBO優化非常重要。
- Blink Planner 提供了更多的内置函數,更标準的 SQL 支援,在 Flink 1.9 版本中已經完整支援 TPC-H ,對高階的 TPC-DS 支援也計劃在下一個版本實作。
整體看來,Blink 查詢處理器在架構上更為先進,功能上也更為完善。出于穩定性的考慮,Flink 1.9 預設依然使用 Flink Planner,使用者如果需要使用 Blink Planner,可以作業中顯式指定。
4.如何啟用 Blink Planner
在IDE環境裡,隻需要引入兩個 Blink Planner 的相關依賴,就可以啟用 Blink Planner。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.9.0</version>
</dependency>
對于流計算作業和批處理作業的配置非常類似,隻需要在 EnvironmentSettings 中設定 StreamingMode 或 BatchMode 即可,流計算作業的設定如下:
// **********************
// BLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);
bsTableEnv.sqlUpdate(…);
bsTableEnv.execute();
批處理作業的設定如下 :
// ******************
// BLINK BATCH QUERY
// ******************
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
bbTableEnv.sqlUpdate(…)
bbTableEnv.execute()
如果作業需要運作在叢集環境,打包時将 Blink Planner 相關依賴的 scope 設定為 provided,表示這些依賴由叢集環境提供。這是因為 Flink 在編譯打包時, 已經将 Blink Planner 相關的依賴打包,不需要再次引入,避免沖突。
5. 社群長遠計劃
目前,TableAPI & SQL 已經成為 Flink API 的一等公民,社群也将投入更大的精力在這個子產品。在不遠的将來,待 Blink Planner 穩定之後,将會作為預設的 Planner ,而 Old Planner 也将會在合适的時候退出曆史的舞台。目前社群也在努力賦予 DataStream 批處理的能力,進而統一流批技術棧,屆時 DataSet API 也将退出曆史的舞台。
▼ Apache Flink 社群推薦 ▼
Apache Flink 及大資料領域頂級盛會 Flink Forward Asia 2019 重磅開啟,目前正在征集議題,限量早鳥票優惠ing。了解 Flink Forward Asia 2019 的更多資訊,請檢視:
https://developer.aliyun.com/special/ffa2019首屆 Apache Flink 極客挑戰賽重磅開啟,聚焦機器學習與性能優化兩大熱門領域,40萬獎金等你拿,加入挑戰請點選:
https://tianchi.aliyun.com/markets/tianchi/flink2019