天天看點

官宣 | 千呼萬喚,Apache Flink 1.11.0 正式釋出啦!

來源 | Apache Flink 官方部落格

翻譯 | 高赟(雲骞)

Apache Flink 社群很榮幸的宣布 Flink 1.11.0 版本正式釋出!超過 200 名貢獻者參與了 Flink 1.11.0 的開發,送出了超過 1300 個修複或優化。這些修改極大的提高了 Flink 的可用性,并且增強了各個 API 棧的功能。其中一些比較重要的修改包括:

  1. 核心引擎部分引入了非對齊的 Checkpoint 機制。這一機制是對 Flink 容錯機制的一個重要改進,它可以提高嚴重反壓作業的 Checkpoint 速度。
  2. 實作了一套新的 Source 接口。通過統一流和批作業 Source 的運作機制,提供常用的内部實作如事件時間處理,watermark 生成和空閑并發檢測,這套新的 Source 接口可以極大的降低實作新的 Source 時的開發複雜度。
  3. Flink SQL 引入了對 CDC(Change Data Capture,變動資料捕獲)的支援,它使 Flink 可以友善的通過像 Debezium 這類工具來翻譯和消費資料庫的變動日志。Table API 和 SQL 也擴充了檔案系統連接配接器對更多使用者場景和格式的支援,進而可以支援将流式資料從 Kafka 寫入 Hive 等場景。
  4. PyFlink 優化了多個部分的性能,包括對向量化的使用者自定義函數(Python UDF)的支援。這些改動使 Flink Python 接口可以與常用的 Python 庫(如 Pandas 和 NumPy)進行互操作,進而使 Flink 更适合資料處理與機器學習的場景。

Flink 1.11.0 的二進制釋出包和源代碼可以在 Flink 官網的下載下傳頁面獲得,對應的 PyFlink 釋出包可以在 PyPI 網站下載下傳。詳情可以參閱釋出說明,釋出功能更新與更新後的文檔。

我們希望您下載下傳試用這一版本後,可以通過 Flink 郵件清單和 JIRA 網站和我們分享您的回報意見。

▼ GitHub 下載下傳位址 ▼

https://flink.apache.org/downloads.html#apache-flink-1110

新的功能和優化

非對齊的 Checkpoints(Beta 版本)

當 Flink 發起一次 Checkpoint 時, Checkpoint Barrier 會從整個拓撲的 Source 出發一直流動到 Sink。對于超過一個輸入的算子,來自各個輸入的 Barrier 首先需要對齊,然後這個算子才能進行 state 的快照操作以及将 Barrier 釋出給後續的算子。一般情況下對齊可以在幾毫秒内完成,但是當反壓時,對齊可能成為一個瓶頸:

  1. Checkpoint Barrier 在有反壓的輸入通道中傳播的速度非常慢(需要等待前面的資料處理完成),這将會阻塞對其它輸入通道的資料處理并最終進一步反壓上遊的算子。
  2. Checkpoint Barrier 傳播慢還會導緻 Checkpoint 時間過長甚至逾時,在最壞的情況下,這可能導緻整個作業進度無法更新。

為了提高 Checkpoint 在反壓情況下的性能,Flink 社群在 1.11.0 版本中初步實作了非對齊的 Checkpoint 機制(FLIP-76)。與對齊的 Checkpoint(圖1)相比,這種方式下算子不需要等待來自各個輸入通道的 Barrier 對齊,相反,這種方式允許 Barrier 越過前面的待處理的資料(即在輸出和輸入 Buffer 中的資料)并且直接觸發 Checkpoint 的同步階段。這一過程如圖2所示。

官宣 | 千呼萬喚,Apache Flink 1.11.0 正式釋出啦!

圖1. 對齊的Checkpoint

官宣 | 千呼萬喚,Apache Flink 1.11.0 正式釋出啦!

圖2. 非對齊的Checkpoint

由于被越過的傳播中的資料必須作為快照的一部分被持久化,非對齊的 Checkpoint 機制會增加 Checkpoint 的大小。但是,好的方面是它可以極大的減少 Checkpoint 需要的時間,是以即使在非穩定的環境中,使用者也可以看到更多的作業進度。這是由于非對齊的 Checkpoint 可以減少 Recovery 的負載。關于非對齊的 Checkpoint 更詳細的資訊以及未來的開發計劃,可以分别參考相關文檔和 FLINK-14551。

和其它 Beta 版本的特性一樣,我們非常期待和感謝您試用之後和社群分享您的感受。

注意:開啟這一特征需要通過 Chekpoint 選項配置 enableUnalignedCheckpoints 參數。需要注意的是,非對齊的 Checkpoint 隻有在 CheckpointMode 被設定為 CheckpointMode.EXACTLY_ONCE 的時候才有效。

統一的 Watermark 生成器

目前 Flink 的 Watermark 生成(也叫做配置設定)依賴于兩個接口:AssignerWithPunctuatedWatermarks 與 AssignerWithPeriodicWatermarks,這兩個接口與記錄時間戳提取的關系也比較混亂,進而使 Flink 難以實作一些使用者急需的功能,如支援空閑檢測;此外,這還會導緻代碼重複且難以維護。通過 FLIP-126,現有的 watermark 生成接口被統一為一個單獨的接口,即 WatermarkGenerator,并且它和 TimestampAssigner 獨立。

這一修改使使用者可以更好的控制 watermark 的發送邏輯,并且簡化實作支援watermark 生成和時間戳提取的 Source 的難度(可以參考新的 Source 接口)。基于這一接口,Flink 1.11 中還提供了許多内置的 Watermark 生成政策(例如 forBoundedOutOfOrderness, forMonotonousTimestamps),并且使用者可以使用自己的實作。

■ 支援 Watermark 空閑檢測

WatermarkStrategy.withIdleness()方法允許使用者在配置的時間内(即逾時時間内)沒有記錄到達時将一個流标記為空閑,進而進一步支援 Flink 正确處理多個并發之間的事件時間傾斜的問題,并且避免了空閑的并發延遲整個系統的事件時間。通過将 Kafka 連接配接器遷移至新的接口(FLINK-17669),使用者可以受益于針對單個并發的空閑檢測。

注意:這一 FLIP 的修改目前不會影響現有程式,但是我們推薦使用者後續盡量使用新的 Watermark 生成接口,避免後續版本禁用之前的 Watermark 生成器帶來的影響。

新的 Source 接口(Beta)

1.11 以編寫一個生産可用的 Flink Source 連接配接器并不是一個簡單的任務,它需要使用者對 Flink 内部實作有一定的了解,并且需要在連接配接器中自行實作事件時間提取、Watermark 生成和空閑檢測等功能。針對這一問題,Flink 1.11 引入了一套新的Source 接口 FLIP-27 來解決上述問題,并且同時解決了需要為批作業和流作業編寫兩套 Source 實作的問題。

官宣 | 千呼萬喚,Apache Flink 1.11.0 正式釋出啦!

通過将分區發現和實作消費每一個分區的資料分成不同的元件(即 SplitEnumerator 和 SourceReader),新的 Source 接口允許将不同的分區發現政策和分區消費的具體實作任意組合。

例如,現有的 Kafka 連接配接器提供了多種不同的分區發現政策,這些政策的實作和其實代碼的實作耦合在一起。如果遷移到新的接口,Kafka Source 将可以使用相同的分區消費的實作(即 SourceReader),并且針對不同的分區發現政策編寫單獨的 SplitEnumerator 的實作。

■ 流批統一

使用新版 Source 接口的 Source 連接配接器将可以同時用于有限資料(批)作業和無限資料(流)作業。這兩種場景僅有一個很小的差別:在有限資料的情況下,分區發現政策将傳回一個固定大小的分區并且每一個分區的資料都是有限的;在無限資料的情況下,要麼每個分區的資料量是無限的,要麼分區發現政策可以不斷的産生新的分區。

■ 内置的 Watermark 和事務時間處理

在新版 Source 接口中,TimestampAssigner 和 WatermarkGenerator 将透明的作為分區消費具體實作(SourceReader)的一部分,是以使用者不需要實作任何時間戳提取和 Watermark 生成的代碼。

注意:現有的 Source 連接配接器尚未基于新的 Source 接口重新實作,這将在後續版本中逐漸完成。如果想要基于新的 Source 接口實作自己的 Source,可以參考 Data Source 文檔和 Source 開發的一些建議。

Application 部署模式

在1.11之前,Flink 的作業有兩種部署模式,其中 Session 模式是将作業送出到一個長期運作的 Flink Session 叢集,Job 模式是為每個作業啟動一個專門的 Flink 作業叢集。這兩種模式下使用者作業的 main 方法都是用戶端執行的,但是這種方式存在一定的問題:如果用戶端是更大程式的一部分的話,生成 JobGraph 容易成為系統的瓶頸;其次,這種方式也不能很好的适應像 Docker 和 K8s 這樣的容器環境。

Flink 1.11 引入了一種新的部署模式,即 Application 模式(FLIP-85)。這種模式下使用者程式的 main 方法将在叢集中而不是用戶端運作。這樣,作業送出就會變得非常簡單:使用者将程式邏輯和依賴打包進一人可執行的 jar 包裡,叢集的入口程式(ApplicationClusterEntryPoint)負責調用其中的 main 方法來生成 JobGraph。

Flink 1.11 已經可以支援基于 K8s 的 Application 模式(FLINK-10934)。

其它功能修改

■ 統一 JM 的記憶體配置(FLIP-116)

在1.10中,Flink 統一了 TM 端的記憶體管理和配置,相應的在1.11中,Flink 進一步對JM 端的記憶體配置進行了修改,使它的選項和配置方式與 FLIP-49 中引入的 TM 端的配置方式保持一緻。這一修改影響所有的部署類型,包括 standalone,Yarn,Mesos 和新引入的 K8s。

注意:複用之前的 Flink 配置将會得到不同的 JVM 參數,進而可能影響性能甚至導緻異常。如果想要更新到 1.11 的話,請一定要參考遷移文檔。

■ Web UI 功能增強

在1.11中,社群對 Flink Web UI 進行了一系列的優化。首要的修改是優化了 TM 和 JM 的日志展示(FLIP-103),其次,Flink Web UI 還引入了列印所有線程清單的工具(FLINK-14816)。在後續的版本中,Web UI 還将進一步優化,包括更好的反壓檢測,更靈活和可配置的異常展示以及對 Task 出錯曆史的展示。

■ 統一 Docker 鏡像

1.11 将所有 Docker 相關的資源都統一整理到了 apache/flink-docker項目中,并且擴充了入口腳本進而允許使用者在不同模式下使用預設的 docker 鏡像,避免了許多情況下使用者自己建立鏡像的麻煩。關于如何在不同環境和部署模式下使用和定制 Flink 官方 Docker 鏡像,請參考詳細文檔。

Table API/SQL:支援 CDC(Change Data Capture)

CDC 是資料庫中一種常用的模式,它捕獲資料庫送出的修改并且将這些修改廣播給其它的下遊消費者。CDC 可以用于像同步多個資料存儲和避免雙寫導緻的問題等場景。長期以來 Flink 的使用者都希望能夠将 CDC 資料通過 Table API/SQL 導入到作業中,而 Flink 1.11 實作了這一點。

為了能夠在 Table API / SQL 中使用 CDC,Flink 1.11 更新了 Table Source 與 Sink 的接口來支援 changelog 模式(參考新的 Table Source 與 Sink 接口)并且支援了 Debezium 與 Canal 格式(FLIP-105)。這一改動使動态 Table Source 不再隻支援 append-only 的操作,而且可以導入外部的修改日志(插入事件)将它們翻譯為對應的修改操作(插入,修改和删除)并将這些操作與操作的類型發送到後續的流中。

官宣 | 千呼萬喚,Apache Flink 1.11.0 正式釋出啦!

為了消費 CDC 資料,使用者需要在使用 SQL DDL 建立表時指指定“format=debezium-json“或者“format=canal-json”:

CREATE TABLE my_table (
  ...
) WITH (
  'connector'='...', -- e.g. 'kafka'
  'format'='debezium-json',
  'debezium-json.schema-include'='true' -- default: false (Debezium can be configured to include or exclude the message schema)
  'debezium-json.ignore-parse-errors'='true' -- default: false
);           

Flink 1.11 僅支援 Kafka 作為修改日志的資料源以及 JSON 編碼格式的修改日志;後續 Flink 将進一步支援 Avro(Debezium)和 Protobuf(Canal)格式。Flink 還計劃在未來支援 UDF MySQL 的 Binlog 以及 Kafka 的 Compact Topic 作為資料源,并且将對修改日志的支援擴充到批作業。

注意:目前有一個已知的 BUG(FLINK-18461)會導緻使用修改日志的 Source 無法寫入到 Upsert Sink 中(例如,MySQL,HBase,ElasticSearch)。這個問題會在下一個版本(即 1.11.1)中修複。

Table API/SQL:支援 JDBC Catalog 和 Postgres Catalog

Flink 1.11 支援了一種通用的 JDBC Catalog 接口(FLIP-93),這一接口允許 Table API/SQL 的使用者自動的從通過 JDBC 連接配接的關系資料庫中導出表結構。這一功能避免了之前使用者需要手動複制表結構以及進行類型映射的麻煩,并且允許 Flink 在編譯時而不是運作時對表結構進行檢查。

首先在1.11中實作的是 Postgres Catalog。

Table API/SQL:支援 Avro,ORC 和 Parquet 格式的檔案系統連接配接器

為了提高使用者使用 Flink 進行端到端的流式 ETL 的體驗,Flink 1.11 在 Table API/SQL 中引入了新的檔案系統連接配接器。它基于 Flink 自己的檔案系統抽象和 StreamingFileSink 來實作,進而保證和 DataStream API 有相同的能力和一緻的行為。

這也意味着 Table API/SQL 的使用者可以使用 StreamingFileSink 現在已經支援的檔案格式,例如 (Avro) Parquet,以及在這1.11中新增加的檔案格式,例如 Avro 和 ORC。

CREATE TABLE my_table (
  column_name1 INT,
  column_name2 STRING,
  ...
  part_name1 INT,
  part_name2 STRING
) PARTITIONED BY (part_name1, part_name2) WITH (
  'connector' = 'filesystem',         
  'path' = 'file:///path/to/file,
  'format' = '...',  -- supported formats: Avro, ORC, Parquet, CSV, JSON         
  ...
);           

新的全能的檔案系統連接配接器可以透明的支援流作業和批作業,提供 Exactly-once 語義并且提供了完整的分區的支援,進而相對于之前的 Connector 極大的擴充了可以支援的場景。例如,使用者可以容易的實作将流式資料從 Kafka 寫入 Hive 的場景。

後續的檔案系統連接配接器的優化可以參考 FLINK-17778。

Table API/SQL:支援 Python UDF

在1.11之前 Table API/SQL 的使用者隻能通過 Java 或 Scala 來實作 UDF。在1.11中,Flink 擴充了 Python 語言的應用範圍,除了 PyFlink 外,Flink 1.11 還在 SQL DDL 文法(FLIP-106)和 SQL Client(FLIP-114)中支援了 Python UDF。使用者還可以在系統 Catalog 中通過 SQL DDL 或者 Java Catalog API 來注冊 Python UDF,這樣這些 UDF 可以在作業中共享。

其它的 Table API/SQL 優化

■ Hive Connect 相容 Hive DDL 和 DML(FLIP-123)

從1.11開始,使用者可以在 Table API/SQL 和 SQL Client 中使用 Hive 文法(HiveQL)來編寫 SQL 語句。為了支援這一特性,Flink 引入了一種新的 SQL 方言,使用者可以動态的為每一條語句選擇使用Flink(default)或Hive(hive)方法。對于所支援的 DDL 和 DML 的完整清單,請參考 Hive 方言的文檔。

■ Flink SQL 文法的擴充和優化

  • Flink 1.11 引入了主鍵限制的概念,進而可以在 Flink SQL DDL 的運作時優化中使用(FLIP-87)。
  • 視圖對象已經在 SQL DDL 中完整支援,可以通過 CREATE/ALTER/DROP VIEW 等語句使用(FLIP-71)。
  • 使用者可以在 DQL 和 DML 中使用動态表屬性動态指定或覆寫 Table 的選項(FLIP-113)。
  • 為了簡化 connector 參數的配置,提高異常處理的能力,Table API/SQL 修改了一些配置項的名稱(FLIP-122)。這一改動不會破壞相容性,使用者仍然可以使用老的名稱。

■ 新的 Table Source 和 Sink 接口(FLIP-95)

Flink 1.11 引入了新的 Table Source 和 Sink 接口(即 DynamicTableSource 和 DynamicTableSink),這一接口可以統一批作業和流作業,在使用 Blink Planner 時提供更高效的資料處理并且可以支援修改日志的處理(參考支援修改日志)。新的接口簡化了使用者實作新的自定義的連接配接器和修改現有連接配接器的複雜度。一個基于支援修改日志語義的資料解析格式來實作定制表掃描的Source的案例請參考這一文檔。

注意:盡管這一修改不會破壞相容性,但是我們推薦 Table API/SQL 的使用者盡快将現有的Source和Sink更新到新的接口上。

■ 重構 Table Env 接口(FLIP-84)

1.11之前 TableEnvironment 和 Table 上相似的接口的行為并不完全相同,這導緻了接口的不一緻并使使用者感到困惑。為了解決這一問題并使基于 Table API/SQL 的編寫程式更加流暢,Flink 1.11 引入了新的方法來統一這些不一緻的行為,例如執行觸發的時機(即executeSql()),結果展示(即 print(),collecto())并且為後續版本的重要功能(如多語句執行)打下了基礎。

注意:在 FLIP-84 中被标記為過期的方法不會被立刻删掉,但是我們建議使用者采用新的方法。對于新的方法和過期方法的完整清單,可以檢視 FLIP-84 的總結部分。

■ 新的類型推斷和 Table API UDF(FLIP-65)

在 Flink 1.9 中,社群開始在 Table API 中支援一種新的類型系統來提高與标準 SQL 的一緻性(FLIP-37)。在1.11中這一工作接近完成,通過支援在 Table API UDF 中使用新的類型系統(目前支援 scalar 函數與 table 函數,計劃下一版本也支援 aggregate 函數)。

PyFlink:支援 Pandas UDF

在1.11之前,PyFlink 中的 Python UDF 僅支援标準的 Python 标量類型。這帶來了一些限制:

  1. 在 JVM 和 Python 程序之間傳遞資料會導緻較大序列化、反序列化開銷。
  2. 難以內建常用的高性能 Python 數值計算架構,例如 Pandas 和 NumPy。

為了克服這些限制,社群引入了對基于 Pandas 的(标量)向量 Python UDF 的支援(FLIP-97)。由于可以通過利用 Apache Arrow 來最小化序列化/反序列化的開銷,向量 UDF 的性能一般會非常好;此外,将 pandas.Series 作為輸入輸出的類型可以充分複用 Pandas 和 NumPy 庫。這些特點使 Pandas UDF 特别适合并行機器學習和其它大規模、分布式的資料科學的計算作業(例如特征提取或分布式模式服務)。

@udf(input_types=[DataTypes.BIGINT(),DataTypes.BIGINT()],result_type=DataTypes.BIGINT(),udf_type="pandas")
defadd(i,j):
  returni+j           

為了使 UDF 變為 Pandas UDF,需要在 udf 的裝飾器中添加額外的參數 udf_type=”pandas”,如文檔所示。

PyFlink 的其它優化

■ 支援轉換器 fromPandas/toPandas(FLIP-120)

Arrow 還被用來優化 PyFlink Table 和 pandas.DataFrame 之間的轉換,進而使使用者可以在不同的處理引擎之間無縫切換,而不需要編寫特殊的連接配接器進行中轉。使用 fromPandas()和toPandas() 方法的安例,可以參考相關文檔。

■ 支援使用者自定義的 Table Function(User-defined Table Function,UDTF)(FLINK-14500)

從1.11開始,使用者可以在 PyFlink 定義和注冊自定義的 UDTF。與 Python UDF 類似,UDTF 可以接受0個,一個或多個标量值作為參數,但是可以傳回任意多行資料作為輸出而不是隻能傳回單個值。

■ 基于 Cython 對 UDF 的性能進行優化(FLIP-121)

Cython 是一個 Python 語言預編譯的超集,它經常被用來提高大規模資料計算函數的性能,因為它可以将代碼執行速度優化到機器指令級别,并且可以很好的與常用的基于 C 語言實作的庫配合,例如 NumPy。從 Flink 1.11 開始,使用者可以構造包括 Cython支援的 PyFlink[60]并且可以通過 Cython 來優化 Python UDF。這種優化可以極大的提升代碼的性能(與 1.10 的 Python UDF 相比最高能有 30 倍的提升)。

■ Python UDF 支援使用者自定義的 Metrics(FLIP-112)

為了使使用者可以更容易的監控和調試 Python UDF 的執行,PyFlink 現在支援收集和輸出 Metric 的值到外部系統中,并且支援自定義域和變量。使用者可以在 UDF 的 open 方法中通過調用 function_context.get_metric_group() 來通路一個 Metric 系統,如文檔所示。

其它重要優化

  • [FLINK-17339] 從1.11開始,Blink Planner 将變為 Table API/SQL 的預設 Planner。實際上,在1.10中 SQL Client 的預設 Planner 已經變為 Blink Planner。老的 Planner 仍然将會支援,但是後續不會再有大的變更。
  • [FLINK-5763] Savepoints 将所有的狀态寫入到單個目錄下(包括中繼資料和程式狀态)。這使得使用者可以容易的看出每個 Savepoint 的 State 包含哪些檔案,并且允許使用者直接通過移動目錄來實作 Savepoint 的重定位。
  • [FLINK-16408] 為了減少 JVM 中繼資料空間的壓力,Flink 1.11 中對于單個 TaskExecutor 隻要上面還有某個作業的 Slot,該作業的 ClassLoader 就會被複用。這一改動會改變 Flink 錯誤恢複的行為,因為 static 字段不會被重新初始化。
  • [FLINK-11086] Flink 現在可以支援 Hadoop 3.0.0 以上的版本。注意 Flink 項目并未提供任何更新的“flink-shaded-hadoop-*”的 jar 包,而是需要使用者自己将相應的 Hadoop 依賴加入 HADOOP_CLASSPATH 環境變量(推薦的方式)或者将 Hadoop 依賴加入到 lib/目錄下。
  • [FLINK-16963] 所有 Flink 内置的 Metric Report 現在被修改為 Flink 的插件。如果要使用它們,不應該放置到 lib/目錄下(會導緻類沖突),而是要放置到 plugins/目錄下。
  • [FLINK-12639] 社群正在對 Flink 文檔進行重構,從1.11開始,您可能會注意到文檔的導航和内容組織發生了一些變化。

詳細釋出說明

如果你想要更新到1.11的話,請詳細閱讀詳細釋出說明。與之前所有1.x版本相比,1.11可以保證所有标記為@Public的接口的相容。

點選「

閱讀原文

」即可檢視原版官方部落格~