天天看點

開篇 | 揭秘 Flink 1.9 新架構,Blink Planner 你會用了嗎?

本文為 Apache Flink 新版本重大功能特性解讀之 Flink SQL 系列文章的開篇,Flink SQL 系列文章由其核心貢獻者們分享,涵蓋基礎知識、實踐、調優、内部實作等各個方面,帶你由淺入深地全面了解 Flink SQL。

1. 發展曆程

今年的8月22日 Apache Flink 釋出了1.9.0 版本(下文簡稱1.9),在 Flink 1.9 中,Table 子產品迎來了核心架構的更新,引入了阿裡巴巴Blink團隊貢獻的諸多功能,本文對Table 子產品的架構進行梳理并介紹如何使用 Blink Planner。

Flink 的 Table 子產品 包括 Table API 和 SQL,Table API 是一種類SQL的API,通過Table API,使用者可以像操作表一樣操作資料,非常直覺和友善;SQL作為一種聲明式語言,有着标準的文法和規範,使用者可以不用關心底層實作即可進行資料的處理,非常易于上手,Flink Table API 和 SQL 的實作上有80%左右的代碼是公用的。作為一個流批統一的計算引擎,Flink 的 Runtime 層是統一的,但在 Flink 1.9 之前,Flink API 層 一直分為DataStream API 和 DataSet API, Table API & SQL 位于 DataStream API 和 DataSet API 之上。

開篇 | 揭秘 Flink 1.9 新架構,Blink Planner 你會用了嗎?

Flink 1.8 Table 架構

在 Flink 1.8 架構裡,如果使用者需要同時流計算、批處理的場景下,使用者需要維護兩套業務代碼,開發人員也要維護兩套技術棧,非常不友善。 Flink 社群很早就設想過将批資料看作一個有界流資料,将批處理看作流計算的一個特例,進而實作流批統一,阿裡巴巴的 Blink 團隊在這方面做了大量的工作,已經實作了 Table API & SQL 層的流批統一。 幸運的是,阿裡巴巴已經将 Blink 開源回饋給 Flink 社群。為了實作 Flink 整個體系的流批統一,在結合 Blink 團隊的一些先行經驗的基礎上,Flink 社群的開發人員在多輪讨論後,基本敲定了Flink 未來的技術架構。

開篇 | 揭秘 Flink 1.9 新架構,Blink Planner 你會用了嗎?

Flink 未來架構

在Flink 的未來架構中,DataSet API将被廢除,面向使用者的API隻有 DataStream API 和 Table API & SQL,在實作層,這兩個API共享相同的技術棧,使用統一的 DAG 資料結構來描述作業,使用統一的 StreamOperator 來編寫算子邏輯,以及使用統一的流式分布式執行引擎,實作徹底的流批統一。 這兩個API都提供流計算和批處理的功能,DataStream API 提供了更底層和更靈活的程式設計接口,使用者可以自行描述和編排算子,引擎不會做過多的幹涉和優化;Table API & SQL 則提供了直覺的Table API、标準的SQL支援,引擎會根據使用者的意圖來進行優化,并選擇最優的執行計劃。

2.Flink 1.9 Table 架構

Blink 的 Table 子產品的架構在開源時就已經實作了流批統一,向着 Flink 的未來架構邁進了第一步,走在了 Flink 社群前面。 是以在 Flink 1.9 合入 Blink Table 代碼時,為了保證 Flink Table 已有架構和 Blink Table的架構能夠并存并朝着 Flink 未來架構演進,社群的開發人員圍繞FLIP-32(FLIP 即 Flink Improvement Proposals,專門記錄一些對Flink 做較大修改的提議。FLIP-32是:Restructure flink-table for future contributions) 進行了重構和優化,進而使得 Flink Table 的新架構具備了流批統一的能力,可以說 Flink 1.9 是 Flink 向着流批徹底統一這個未來架構邁出的第一步。

開篇 | 揭秘 Flink 1.9 新架構,Blink Planner 你會用了嗎?

Flink 1.9 Table 架構

在 Flink Table 的新架構中,有兩個查詢處理器:Flink Query Processor 和 Blink Query Processor,分别對應兩個Planner,我們稱之為 Old Planner 和 Blink Planner。查詢處理器是 Planner 的具體實作, 通過parser(解析器)、optimizer(優化器)、codegen(代碼生成技術)等流程将 Table API & SQL作業轉換成 Flink Runtime 可識别的 Transformation DAG (由Transformation組成的有向無環圖,表示作業的轉換邏輯),最終由 Flink Runtime 進行作業的排程和執行。

Flink 的查詢處理器針對流計算和批處理作業有不同的分支處理,流計算作業底層的 API 是 DataStream API, 批處理作業底層的 API 是 DataSet API;而 Blink 的查詢處理器則實作流批作業接口的統一,底層的 API 都是Transformation。

3.Flink Planner 與 Blink Planner

Flink Table 的新架構實作了查詢處理器的插件化,社群完整保留原有 Flink Planner (Old Planner),同時又引入了新的 Blink Planner,使用者可以自行選擇使用 Old Planner 還是 Blink Planner。

在模型上,Old Planner 沒有考慮流計算作業和批處理作業的統一,針對流計算作業和批處理作業的實作不盡相同,在底層會分别翻譯到 DataStream API 和 DataSet API 上。而 Blink Planner 将批資料集看作 bounded DataStream (有界流式資料) ,流計算作業和批處理作業最終都會翻譯到 Transformation API 上。 在架構上,Blink Planner 針對批處理和流計算,分别實作了BatchPlanner 和 StreamPlanner ,兩者共用了大部分代碼,共享了很多優化邏輯。 Old Planner 針對批處理和流計算的代碼實作的是完全獨立的兩套體系,基本沒有實作代碼和優化邏輯複用。

除了模型和架構上的優點外,Blink Planner 在阿裡巴巴集團内部的海量業務場景下沉澱了許多實用功能,集中在三個方面:

  • Blink Planner 對代碼生成機制做了改進、對部分算子進行了優化,提供了豐富實用的新功能,如維表 join、Top N、MiniBatch、流式去重、聚合場景的資料傾斜優化等新功能。
  • Blink Planner 的優化政策是基于公共子圖的優化算法,包含了基于成本的優化(CBO)和基于規則的優化(CRO)兩種政策,優化更為全面。同時,Blink Planner 支援從 catalog 中擷取資料源的統計資訊,這對CBO優化非常重要。
  • Blink Planner 提供了更多的内置函數,更标準的 SQL 支援,在 Flink 1.9 版本中已經完整支援 TPC-H ,對高階的 TPC-DS 支援也計劃在下一個版本實作。

整體看來,Blink 查詢處理器在架構上更為先進,功能上也更為完善。出于穩定性的考慮,Flink 1.9 預設依然使用 Flink Planner,使用者如果需要使用 Blink Planner,可以作業中顯式指定。

4.如何啟用 Blink Planner

在IDE環境裡,隻需要引入兩個 Blink Planner 的相關依賴,就可以啟用 Blink Planner。

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.9.0</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.9.0</version>
</dependency>           

對于流計算作業和批處理作業的配置非常類似,隻需要在 EnvironmentSettings 中設定 StreamingMode 或 BatchMode 即可,流計算作業的設定如下:

// **********************
// BLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;

StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);

bsTableEnv.sqlUpdate(…);
bsTableEnv.execute();           

批處理作業的設定如下 :

// ******************
// BLINK BATCH QUERY
// ******************
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
bbTableEnv.sqlUpdate(…)
bbTableEnv.execute()           

如果作業需要運作在叢集環境,打包時将 Blink Planner 相關依賴的 scope 設定為 provided,表示這些依賴由叢集環境提供。這是因為 Flink 在編譯打包時, 已經将 Blink Planner 相關的依賴打包,不需要再次引入,避免沖突。

5. 社群長遠計劃

目前,TableAPI & SQL 已經成為 Flink API 的一等公民,社群也将投入更大的精力在這個子產品。在不遠的将來,待 Blink Planner 穩定之後,将會作為預設的 Planner ,而 Old Planner 也将會在合适的時候退出曆史的舞台。目前社群也在努力賦予 DataStream 批處理的能力,進而統一流批技術棧,屆時 DataSet API 也将退出曆史的舞台。

▼ Apache Flink 社群推薦 ▼

Apache Flink 及大資料領域頂級盛會 Flink Forward Asia 2019 重磅開啟,目前正在征集議題,限量早鳥票優惠ing。了解 Flink Forward Asia 2019 的更多資訊,請檢視:

https://developer.aliyun.com/special/ffa2019

首屆 Apache Flink 極客挑戰賽重磅開啟,聚焦機器學習與性能優化兩大熱門領域,40萬獎金等你拿,加入挑戰請點選:

https://tianchi.aliyun.com/markets/tianchi/flink2019