天天看點

深度解讀 Flink 1.11:流批一體 Hive 數倉如何了解更多 Flink 1.11 新版功能特性?

作者:李勁松、李銳

Flink 1.11 features 已經當機,流批一體在新版中是濃墨重彩的一筆,在此提前對 Flink 1.11 中流批一體方面的改善進行深度解讀,大家可期待正式版本的釋出。

首先恭喜 Table/SQL 的 blink planner 成為預設 Planner,撒花、撒花。

Flink 1.11 中流計算結合 Hive 批處理數倉,給離線數倉帶來 Flink 流處理實時且 Exactly-once 的能力。另外,Flink 1.11 完善了 Flink 自身的 Filesystem connector,大大提高了 Flink 的易用性。

數倉架構

離線數倉

深度解讀 Flink 1.11:流批一體 Hive 數倉如何了解更多 Flink 1.11 新版功能特性?

傳統的離線數倉是由 Hive 加上 HDFS 的方案,Hive 數倉有着成熟和穩定的大資料分析能力,結合排程和上下遊工具,建構一個完整的資料處理分析平台,流程如下:

  • Flume 把資料導入 Hive 數倉
  • 排程工具,排程 ETL 作業進行資料處理
  • 在 Hive 數倉的表上,可以進行靈活的 Ad-hoc 查詢
  • 排程工具,排程聚合作業輸出到BI層的資料庫中

這個流程下的問題是:

  • 導入過程不夠靈活,這應該是一個靈活 SQL 流計算的過程
  • 基于排程作業的級聯計算,實時性太差
  • ETL 不能有流式的增量計算

實時數倉

針對離線數倉的特點,随着實時計算的流行,越來越多的公司引入實時數倉,實時數倉基于 Kafka + Flink streaming,定義全流程的流計算作業,有着秒級甚至毫秒的實時性。

但是,實時數倉的一個問題是曆史資料隻有 3-15 天,無法在其上做 Ad-hoc 的查詢。如果搭建 Lambda 的離線+實時的架構,維護成本、計算存儲成本、一緻性保證、重複的開發會帶來很大的負擔。

Hive 實時化

Flink 1.11 為解決離線數倉的問題,給 Hive 數倉帶來了實時化的能力,加強各環節的實時性的同時,又不會給架構造成太大的負擔。

Hive streaming sink

實時資料導入 Hive 數倉,你是怎麼做的?Flume、Spark Streaming 還是 Flink Datastream?千呼萬喚,Table / SQL 層的 streaming file sink 來啦,Flink 1.11 支援 Filesystem connector [1] 和 Hive connector 的 streaming sink [2]。

深度解讀 Flink 1.11:流批一體 Hive 數倉如何了解更多 Flink 1.11 新版功能特性?

(注:圖中 StreamingFileSink 的 Bucket 概念就是 Table/SQL 中的 Partition)

Table/SQL 層的 streaming sink 不僅:

  • 帶來 Flink streaming 的實時/準實時的能力
  • 支援 Filesystem connector 的全部 formats(csv,json,avro,parquet,orc)
  • 支援 Hive table 的所有 formats
  • 繼承 Datastream StreamingFileSink 的所有特性:Exactly-once、支援HDFS, S3

而且引入了新的機制:Partition commit。

一個合理的數倉的資料導入,它不止包含資料檔案的寫入,也包含了 Partition 的可見性送出。當某個 Partition 完成寫入時,需要通知 Hive metastore 或者在檔案夾内添加 SUCCESS 檔案。Flink 1.11 的 Partition commit 機制可以讓你:

  • Trigger:控制Partition送出的時機,可以根據Watermark加上從Partition中提取的時間來判斷,也可以通過Processing time來判斷。你可以控制:是想先盡快看到沒寫完的Partition;還是保證寫完Partition之後,再讓下遊看到它。
  • Policy:送出政策,内置支援SUCCESS檔案和Metastore的送出,你也可以擴充送出的實作,比如在送出階段觸發Hive的analysis來生成統計資訊,或者進行小檔案的合并等等。

一個例子:

-- 結合Hive dialect使用Hive DDL文法
SET table.sql-dialect=hive;
CREATE TABLE hive_table (
  user_id STRING,
  order_amount DOUBLE
) PARTITIONED BY (
  dt STRING,
  hour STRING
) STORED AS PARQUET TBLPROPERTIES (
  -- 使用partition中抽取時間,加上watermark決定partiton commit的時機
  'sink.partition-commit.trigger'='partition-time',
  -- 配置hour級别的partition時間抽取政策,這個例子中dt字段是yyyy-MM-dd格式的天,hour是0-23的小時,timestamp-pattern定義了如何從這兩個partition字段推出完整的timestamp
  'partition.time-extractor.timestamp-pattern'=’$dt $hour:00:00’,
  -- 配置dalay為小時級,當 watermark > partition時間 + 1小時,會commit這個partition
  'sink.partition-commit.delay'='1 h',
  -- partitiion commit的政策是:先更新metastore(addPartition),再寫SUCCESS檔案
  'sink.partition-commit.policy.kind’='metastore,success-file'
)
 
SET table.sql-dialect=default;
CREATE TABLE kafka_table (
  user_id STRING,
  order_amount DOUBLE,
  log_ts TIMESTAMP(3),
  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
)
 
-- 可以結合Table Hints動态指定table properties [3]
INSERT INTO TABLE hive_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table;           

Hive streaming source

Hive 數倉中存在大量的 ETL 任務,這些任務往往是通過排程工具來周期性的運作,這樣做主要有兩個問題:

  1. 實時性不強,往往排程最小是小時級。
  2. 流程複雜,元件多,容易出現問題。

針對這些離線的 ETL 作業,Flink 1.11 為此開發了實時化的 Hive 流讀,支援:

  • Partition 表,監控 Partition 的生成,增量讀取新的 Partition。
  • 非 Partition 表,監控檔案夾内新檔案的生成,增量讀取新的檔案。

你甚至可以使用10分鐘級别的分區政策,使用 Flink 的 Hive streaming source 和Hive streaming sink 可以大大提高 Hive 數倉的實時性到準實時分鐘級 4,在實時化的同時,也支援針對 Table 全量的 Ad-hoc 查詢,提高靈活性。

SELECT * FROM hive_table
/*+ OPTIONS('streaming-source.enable'=’true’,
'streaming-source.consume-start-offset'='2020-05-20') */;           

實時資料關聯 Hive 表

在 Flink 與 Hive 內建的功能釋出以後,我們收到最多的使用者回報之一就是希望能夠将 Flink 的實時資料與離線的 Hive 表進行關聯。是以,在 Flink 1.11 中,我們支援将實時表與 Hive 表進行 temporal join [6]。沿用 Flink 官方文檔中的例子,假定 Orders 是實時表,而 LatestRates 是一張 Hive 表,使用者可以通過以下語句進行temporal join:

SELECT
  o.amout, o.currency, r.rate, o.amount * r.rate
FROM
  Orders AS o
  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
  ON r.currency = o.currency           

與 Hive 表進行 temporal join 目前隻支援 processing time,我們會把 Hive 表的資料緩存到記憶體中,并按照固定的時間間隔去更新緩存的資料。使用者可以通過參數“lookup.join.cache.ttl” 來控制緩存更新的間隔,預設間隔為一個小時。

“lookup.join.cache.ttl” 需要配置到 Hive 表的 property 當中,是以每張表可以有不同的配置。另外,由于需要将整張 Hive 表加載到記憶體中,是以目前隻适用于 Hive 表較小的場景。

Hive 增強

Hive Dialect 文法相容

Flink on Hive 使用者并不能很好的使用 DDL,主要是因為:

  • Flink 1.10 中進一步完善了 DDL,但由于 Flink 與 Hive 在中繼資料語義上的差異,通過 Flink DDL 來操作 Hive 中繼資料的可用性比較差,僅能覆寫很少的應用場景。
  • 使用 Flink 對接 Hive 的使用者經常需要切換到 Hive CLI 來執行 DDL。

針對上述兩個問題,我們提出了 FLIP-123 [7],通過 Hive Dialect 為使用者提供 Hive文法相容。該功能的最終目标,是為使用者提供近似 Hive CLI/Beeline 的使用體驗,讓使用者無需在 Flink 和 Hive 的 CLI 之間進行切換,甚至可以直接遷移部分 Hive 腳本到 Flink 中執行。

在 Flink 1.11中,Hive Dialect 可以支援大部分常用的 DDL,比如 CREATE/ALTER TABLE、CHANGE/REPLACE COLUMN、ADD/DROP PARTITION 等等。為此,我們為 Hive Dialect 實作了一個獨立的 parser,Flink 會根據使用者指定的 Dialect 決定使用哪個 parser 來解析 SQL 語句。使用者可以通過配置項“ table.sql-dialect ” 來指定使用的 SQL Dialect。它的預設值為 “default”,即 Flink 原生的 Dialect,而将其設定為 “hive” 時就開啟了 Hive Dialect。對于 SQL 使用者,可以在 yaml 檔案中設定“table.sql-dialect” 來指定 session 的初始 Dialect,也可以通過 set 指令來動态調整需要使用的 Dialect,而無需重新開機 session。

Hive Dialect 目前所支援的具體功能可以參考 FLIP-123 或 Flink 的官方文檔。另外,該功能的一些設計原則和使用注意事項如下:

  1. Hive Dialect 隻能用于操作 Hive 表,而不是 Flink 原生的表(如 Kafka、ES 的表),這也意味着 Hive Dialect 需要配合 HiveCatalog 使用。
  2. 使用 Hive Dialect 時,原有的 Flink 的一些文法可能會無法使用(例如 Flink 定義的類型别名),在需要使用 Flink 文法時可以動态切換到預設的 Dialect。
  3. Hive Dialect 的 DDL 文法定義基于 Hive 的官方文檔,而不同 Hive 版本之間文法可能會有輕微的差異,需要使用者進行一定的調整。
  4. Hive Dialect 的文法實作基于 Calcite,而 Calcite 與 Hive 有不同的保留關鍵字。是以,某些在 Hive 中可以直接作為辨別符的關鍵字(如 “default” ),在Hive Dialect 中可能需要用“`”進行轉義。

向量化讀取

Flink 1.10中,Flink 已經支援了 ORC (Hive 2+) 的向量化讀取支援,但是這很局限,為此,Flink 1.11 增加了更多的向量化支援:

  • ORC for Hive 1.x [8]
  • Parquet for Hive 1,2,3 [9]

也就是說已經補全了所有版本的 Parquet 和 ORC 向量化支援,預設是開啟的,提供開關。

簡化 Hive 依賴

Flink 1.10 中,Flink 文檔中列出了所需的 Hive 相關依賴,推薦使用者自行下載下傳。但是這仍然稍顯麻煩,是以在1.11 中,Flink 提供了内置的依賴支援 [10]:

  • flink-sql-connector-hive-1.2.2_2.11-1.11.jar:Hive 1 的依賴版本。
  • flink-sql-connector-hive-2.2.0_2.11-1.11.jar:Hive 2.0 - 2.2 的依賴版本。
  • flink-sql-connector-hive-2.3.6_2.11-1.11.jar:Hive 2.3 的依賴版本。
  • flink-sql-connector-hive-3.1.2_2.11-1.11.jar:Hive 3 的依賴版本。

現在,你隻需要單獨下一個包,再搞定 HADOOP_CLASSPATH,即可運作 Flink on Hive。

Flink 增強

除了 Hive 相關的 features,Flink 1.11 也完成了大量其它關于流批一體的增強。

Flink Filesystem connector

Flink table 在長久以來隻支援一個 csv 的 file system table,而且它還不支援Partition,行為上在某些方面也有些不符合大資料計算的直覺。

在 Flink 1.11,重構了整個 Filesystem connector 的實作 [1]:

  • 結合 Partition,現在,Filesystem connector 支援 SQL 中 Partition 的所有語義,支援 Partition 的 DDL,支援 Partition Pruning,支援靜态/動态 Partition 的插入,支援 overwrite 的插入。
  • 支援各種 Formats:
    • CSV
    • JSON
    • Aparch AVRO
    • Apache Parquet
    • Apache ORC.
  • 支援 Batch 的讀寫。
  • 支援 Streaming sink,也支援上述 Hive 支援的 Partition commit,支援寫Success 檔案。

例子:

CREATE TABLE fs_table (
  user_id STRING,
  order_amount DOUBLE,
  dt STRING,
  hour STRING
) PARTITIONED BY (dt, hour) WITH (
  ’connector’=’filesystem’,
  ’path’=’...’,
  ’format’=’parquet’,
  'partition.time-extractor.timestamp-pattern'=’$dt $hour:00:00’,
  'sink.partition-commit.delay'='1 h',
  ‘sink.partition-commit.policy.kind’='success-file')
)

-- stream environment or batch environment
INSERT INTO TABLE fs_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table;

-- 通過 Partition 查詢
SELECT * FROM fs_table WHERE dt=’2020-05-20’ and hour=’12’;           

引入 Max Slot

Yarn perJob 或者 session 模式在 1.11 之前是無限擴張的,沒有辦法限制它的資源使用,隻能用 Yarn queue 等方式來限制。但是傳統的批作業其實都是大并發,運作在局限的資源上,一部分一部分階段性的運作,為此,Flink 1.11 引入 Max Slot 的配置[11],限制 Yarn application 的資源使用。

slotmanager.number-of-slots.max           

定義 Flink 叢集配置設定的最大 Slot 數。此配置選項用于限制批處理工作負載的資源消耗。不建議為流作業配置此選項,如果沒有足夠的 Slot,則流作業可能會失敗。

結 語

Flink 1.11 也是一個大版本,社群做了大量的 Features 和 Improvements,Flink 的大目标是幫助業務建構流批一體的數倉,提供完善、順滑、高性能的一體式數倉。希望大家多多參與社群,積極回報問題和想法,甚至參與社群的讨論和開發,一起把 Flink 做得越來越好!

參考資料:

[1]

https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table

[2]

https://issues.apache.org/jira/browse/FLINK-14255

[3]

https://cwiki.apache.org/confluence/display/FLINK/FLIP-113%3A+Supports+Dynamic+Table+Options+for+Flink+SQL

[4]

https://issues.apache.org/jira/browse/FLINK-17434

[5]

https://issues.apache.org/jira/browse/FLINK-17435

[6]

https://issues.apache.org/jira/browse/FLINK-17387

[7]

https://cwiki.apache.org/confluence/display/FLINK/FLIP-123%3A+DDL+and+DML+compatibility+for+Hive+connector

[8]

https://issues.apache.org/jira/browse/FLINK-14802

[9]

https://issues.apache.org/jira/browse/FLINK-16450

[10]

https://issues.apache.org/jira/browse/FLINK-16455

[11]

https://issues.apache.org/jira/browse/FLINK-16605

作者介紹:

李勁松(之信),Apache Flink Committer, 阿裡巴巴技術專家,長期專注于流批一體的計算與數倉架構。

李銳(天離),Apache Hive PMC,阿裡巴巴技術專家,加入阿裡巴巴之前曾就職于 Intel、IBM 等公司,主要參與 Hive、HDFS、Spark 等開源項目。

如何了解更多 Flink 1.11 新版功能特性?

機會來了 !

6月14日,阿裡巴巴計算平台事業部與阿裡雲開發者社群共同舉辦的大資料+AI Meetup 系列第一季即将重磅開啟,此次 Meetup 邀請了來自阿裡巴巴、Databricks、快手、網易雲音樂的7位技術專家,集中解讀大資料目前熱門話題!

其中,Apache Flink Committer,阿裡巴巴技術專家李勁松(之信)将現場分享《Flink 1.11 Table&SQL 深度解讀》,還有快手春晚項目的獨家實踐、網易雲音樂 Flink + Kafka 的生産落地等。點選「閱讀原文」即可預約報名~

▼ 活動亮點 ▼

超豪華嘉賓陣容!多位資深技術專家線上分享對行業趨勢的洞察!

極豐富幹貨分享!集結大資料熱門議題,一次看完:資料處理、數倉、資料湖、AI 等技術實踐與生産應用落地。

多種獎品拿到手軟!直播間已準備超多精美禮品,現場送送送!預約直播并參與互動即有機會領走哦。

點選連結即可預約報名:

https://developer.aliyun.com/live/2894
深度解讀 Flink 1.11:流批一體 Hive 數倉如何了解更多 Flink 1.11 新版功能特性?

獎品詳情:

深度解讀 Flink 1.11:流批一體 Hive 數倉如何了解更多 Flink 1.11 新版功能特性?