導讀:随着 Flink 在流式計算的應用場景逐漸成熟和流行,如果 Flink 能同時把批量計算的應用場景處理好,就能減少使用者在使用 Flink 時開發和維護的成本,并且能夠豐富 Flink 的生态。SQL 是批計算中比較常用的工具,是以 Flink 針對于批計算也以 SQL 為主要接口。本次分享主要介紹 Flink 對批處理的設計與 Hive 的內建。主要分為下面三點展開:
- 設計架構
- 項目進展
- 性能測試
首先和大家分享一下 Flink 批處理的設計架構。
1. 背景

Flink 提升批處理的主要原因是為了減少客戶的維護成本和更新成本和更好的完善 Flink 生态環境。SQL 是批計算場景中一個非常重要的工具,是以希望以 SQL 作為在批計算場景的主要接口,為此我們着重優化了 Flink SQL 的功能。目前 Flink SQL 主要有下面幾點需要優化:
- 需要完整的中繼資料管理體制。
- 缺少對 DDL(資料定義語言 DDL 用來建立資料庫中的各種對象,如表、視圖、索引、同義詞、聚簇等)的支援。
- 與外部系統進行對接不是很友善,尤其是 Hive, 因為 Hive 是大資料領域最早的 SQL 引擎,是以 Hive 的使用者基礎非常廣泛,新的一些 SQL 工具,如 Spark SQL、Impala 都提供了與 Hive 對接的功能,這樣使用者才能更好地将其應用從 Hive 遷移過來,是以與 Hive 對接對 Flink SQL 而言也十分重要。
2. 目标
是以我們要完成以下目标:
- 定義統一的 Catalog 接口,這個是 Flink SQL 更友善與外部對接的前提條件。如果大家用過 Flink 的 TableSource 和 TableSink 來對接外部的系統的表,會發現不管是通過寫程式還是配置 yaml 檔案會跟傳統的 SQL 使用方式會有些不同。是以我們肯定不希望 Hive 的使用者遷移 Flink SQL 需要通過定義 TableSouces 和 TableSink 的方式來與 Hive 進行互動。是以我們提供了一套新的 Catalog 接口以一種更接近傳統 SQL 的方式與 Hive 進行互動。
- 提供基于記憶體和可持久化的實作。基于記憶體就是 Flink 原有的方式,使用者所有的中繼資料的生命周期是跟他的 Session(會話)綁定的,Session(會話)結束之後所有的中繼資料都沒有了。因為要跟 Hive 互動是以肯定還要提供一個持久化的 Catalog。
- 支援 Hive 的互操作。有了 Catalog 之後使用者就可以通過 Catalog 通路 Hive 的中繼資料,提供 Data Connector 讓使用者能通過 Flink 讀寫 Hive 的實際資料,實作 Flink 與 Hive 的互動。
- 支援 Flink 作為 Hive 的計算引擎(長期目标),像 Hive On Spark,Hive On Tez。
3. 全新設計的 Catalog API(FlIP-30)
使用者通過 SQL Client 或者 Table API 送出請求,Flink 會建立 TableEnvironment, TableEnvironment 會建立 CatalogManager 加載并配置 Catalog 執行個體,并且 Catalog 支援多種中繼資料類型 table、database、function、view、partition 等,在 1.9.0 的版本當中 Catalog 會有兩個實作:
- 一個是基于記憶體的 GenericinMemoryCatalog。
- 另一是 HiveCatalog,HiveCatalog 通過 HiveShim 與 Hive Metasotre 互動來操作 Hive 中繼資料,HiveShim 的作用是處理 Hive 在大版本中 Hive Metastore 不相容的問題。
從這種實作的方式可以看出,使用者可以建立多個 Catalog,也可以通路多個 Hive Metastore,來達到跨 Catalog 查詢的操作。
4. 讀寫 Hive 資料
有了中繼資料之後我們就可以實作 Flink SQL 的 Data Connector 來真正的讀寫 Hive 實際資料。Flink SQL 寫入的資料必須要相容 Hive 的資料格式,也就是 Hive 可以正常讀取 Flink 寫入的資料,反過來也是一樣的。為了實作這一點我們大量複用 Hive 原有的 Input/Output Format、SerDe 等 API,一是為了減少代碼備援,二是盡可能的保持相容性。
在 Data Connect 中讀取 Hive 表資料具體實作類為:HiveTableSource、HiveTableInputFormat。寫 Hive 表的具體實作類為:HiveTableSink、HiveTableOutputFormat。
其次和大家分享 Flink 1.9.0 的現狀和 1.10.0 中的新特性還有未來工作。
1. Flink 1.9.0 的現狀
Flink SQL 作為 1.9.0 版本中作為試用功能釋出的,它的功能還不是很完善:
- 支援的資料類型還不全。(1.9.0 中帶參數的資料類型基本上都不支援:如 DECIMAL,CHAR 等)
- 對分區表的支援不完善,隻能讀取分區表,不能寫分區表。
- 不支援表的 INSERT OVERWRITE。
2. Flink 1.10.0 中的新特性
Flink SQL 在 1.10.0 版本裡我們做了比較多的進一步開發,與 Hive 內建的功能更加完整。
- 支援讀寫靜态分區和動态分區表。
- 在表級别和分區級别都支援 INSERT OVERWRITE。
- 支援了更多地資料類型。(除 UNION 類型都支援)
- 支援更多地 DDL。(CREATE TABLE/DATABASE)
- 支援在 Flink 中調用 Hive 的内置函數。(Hive 大約 200 多個内置函數)
- 支援了更多的 Hive 版本。(Hive 的 1.0.0~3.1.1)
- 做了很多性能優化如,Project/Predicate Pushdown,向量的讀取 ORC 資料等。
3. Module 接口
為了能讓使用者調用 Flink SQL 中調用 Hive 的内置函數,我們在 Flink 1.10 當中引入了一個 Module 接口。這個 Module 是為了讓使用者能夠友善的把外部系統的内置函數接入到系統當中。
- 使用方式和 Catalog 類似,使用者可以通過 Table API 或 Yaml 檔案來配置 Module。
- Module 可以同時加載多個,Flink 解析函數的時候通過 Module 的加載順序在多個 Module 中查找函數的解析。也就是如果兩個 Module 包含名字相同的 Function,先加載的 Module 會提供 Function 的定義。
- 目前 Module 有兩個實作,CoreModule 提供了 Flink 原生的内置函數,HiveModule 提供了 Hive 的内置函數。
4. 未來工作
未來的工作主要是先做功能的補全,其中包括:
- View 的支援(有可能在 1.11 中完成)。
- 持續改進 SQL CLI 的易用性,現在支援翻頁顯示查詢結果,後續支援滾動顯示。并支援 Hive 的 -e -f 這種非互動式的使用方式。
- 支援所有的 Hive 常用 DDL,例如 CREATE TABLE AS。
- 相容 Hive 的文法,讓原來在 Hive 上的工程在 Flink 的順滑的遷移過來。
- 支援 SQL CLI 的遠端模式,類似 HiveServer2 的遠端連接配接模式。
- 支援流式的寫入 Hive 資料。
下面是 Flink 在批處理作業下與 HiveMR 對比測試的測試環境和結果。
1. 測試環境
首先我們的測試環境使用了 21 個節點的實體機群,一個 Master 節點和 20 個 Slave 節點。節點的硬體配置是 32 核,64 個線程,256 記憶體,網絡做了端口聚合,每個機器是 12 塊的 HDD 硬碟。
2. 測試工具
測試工具使用了 Hortonworks 的 hive-testbench,github 中一個開源的工具。我們使用這個工具生成了 10TB 的 TPC-DS 測試資料集,然後分别通過 Flink SQL 和 Hive 對該資料集進行 TPC-DS 的測試。
一方面我們對比了 Flink 和 Hive 的性能,另一方面我們驗證了 Flink SQL 能夠很好的通路 Hive 的資料。測試用到了 Hive 版本是 3.1.1,Flink 用到的是 Master 分支代碼。
3. 測試結果
測試結果 Flink SQL 對比 Hive On MapReduce 取得了大約 7 倍的性能提升。這得益于 Flink SQL 所做的一系列優化,比如在排程方面的優化,以及執行計劃的優化等。總體來說如果用的是 Hive On MapReduce,遷移到 Flink SQL 會有很大性能的提升。
附最新性能對比詳情及思路解析:
Flink 1.10 和 Hive 3.0 性能對比作者介紹:
李銳(天離),阿裡巴巴技術專家,Apache Hive PMC 成員,加入阿裡巴巴之前曾就職于 Intel、IBM 等公司,主要參與 Hive、HDFS、Spark 等開源項目。
王剛(喬燃),阿裡巴巴進階開發工程師, Flink Contributor。浙大計算機系畢業後曾任職于蘑菇街資料平台,從事資料交換系統開發。目前在阿裡主要專注在 Flink 與 Hive 生态建設。