天天看點

Flink1.14 前言預覽

作者:宋辛童(花名:五藏) Apache Flink PMC Member & Committer

整理:陳政羽(Apache Flink China 社群志願者)本文由社群志願者陳政羽整理,内容源自阿裡巴巴技術專家宋辛童 (五藏) 在 8 月 7 日線上 Flink Meetup 分享的《Flink 1.14 新特性預覽》。主要内容為:

  1. 簡介
  2. 流批一體
  3. Checkpoint 機制
  4. 性能與效率
  5. Table / SQL / Python API
  6. 總結

GitHub 位址 

https://github.com/apache/flink

社群文章倉庫整理位址

https://github.com/czy006/FlinkClub

歡迎大家給 Flink 點贊送 star~

此文章為 8 月 7 日的分享整理,1.14 版本最新進展采用注釋的方式在文末進行說明。

一、簡介

1.14 新版本原本規劃有 35 個比較重要的新特性以及優化工作,目前已經有 26 個工作完成;5 個任務不确定是否能準時完成;另外 4 個特性由于時間或者本身設計上的原因,會放到後續版本完成。[1]

Flink1.14 前言預覽

1.14 相對于曆屆版本來說,囊括的優化和新增功能點其實并不算多。其實通過觀察發版的節奏可以發現,通常在 1-2 個大版本後都會釋出一個變化稍微少一點的版本,主要目的是把一些特性穩定下來。

1.14 版本就是這樣一個定位,我們稱之為品質改進和維護的版本。這個版本預計 8 月 16 日停止新特性開發,可能在 9 月份能夠和大家正式見面,有興趣可以關注以下連結去跟蹤功能釋出進度。

二、流批一體

流批一體其實從 Flink 1.9 版本開始就受到持續的關注,它作為社群 RoadMap 的重要組成部分,是大資料實時化必然的趨勢。但是另一方面,傳統離線的計算需求其實并不會被實時任務完全取代,而是會長期存在。

在實時和離線的需求同時存在的狀态下,以往的流批獨立技術方案存在着一些痛點,比如:

  • 需要維護兩套系統,相應的就需要兩組開發人員,人力的投入成本很高;
  • 另外,兩套資料鍊路處理相似内容帶來維護的風險性和備援;
  • 最重要的一點是,如果流批使用的不是同一套資料處理系統,引擎本身差異可能會存在資料口徑不一緻的問題,進而導緻業務資料存在一定的誤差。這種誤差對于大資料分析會有比較大的影響。

在這樣的背景下,Flink 社群認定了實時離線一體化的技術路線是比較重要的技術趨勢和方向。

Flink 在過去的幾個版本中,在流批一體方面做了很多的工作。可以認為 Flink 在引擎層面,API 層面和算子的執行層面上做到了真正的流與批用同一套機制運作。但是在任務具體的執行模式上會有 2 種不同的模式:

  • 對于無限的資料流,統一采用了流的執行模式。流的執行模式指的是所有計算節點是通過 Pipeline 模式去連接配接的,Pipeline 是指上遊和下遊計算任務是同時運作的,随着上遊不斷産出資料,下遊同時在不斷消費資料。這種全 Pipeline 的執行方式可以:
    • 通過 eventTime 表示資料是什麼時候産生的;
    • 通過 watermark 得知在哪個時間點,資料已經到達了;
    • 通過 state 來維護計算中間狀态;
    • 通過 Checkpoint 做容錯的處理。
  • 下圖是不同的執行模式:
Flink1.14 前言預覽
  • 對于有限的資料集有 2 種執行模式,我們可以把它看成一個有限的資料流去做處理,也可以把它看成批的執行模式。批的執行模式雖然也有 eventTime,但是對于 watermark 來說隻支援正無窮。對資料和 state 排序後,它在任務的排程和 shuffle 上會有更多的選擇。流批的執行模式是有差別的,最主要的就是批的執行模式會有落盤的中間過程,隻有目前面任務執行完成,下遊的任務才會觸發,這個容錯機制是通過 shuffle 進行容錯的。這 2 者也各有各的執行優勢:
    • 對于流的執行模式來說,它沒有落盤的壓力,同時容錯是基于資料的分段,通過不斷對資料進行打點 Checkpoint 去保證斷點恢複;
    • 然而在批處理上,因為要經過 shuffle 落盤,是以對磁盤會有壓力。但是因為資料是經過排序的,是以對批來說,後續的計算效率可能會有一定的提升。同時,在執行時候是經過分段去執行任務的,無需同時執行。在容錯計算方面是根據 stage 進行容錯。
  • 這兩種各有優劣,可以根據作業的具體場景來進行選擇。

Flink 1.14 的優化點主要是針對在流的執行模式下,如何去處理有限資料集。之前處理無限資料集,和現在處理有限資料集最大的差別在于引入了 "任務可能會結束" 的概念。在這種情況下帶來一些新的問題,如下圖:

Flink1.14 前言預覽
  • 在流的執行模式下的 Checkpoint 機制
    • 對于無限流,它的 Checkpoint 是由所有的 source 節點進行觸發的,由 source 節點發送 Checkpoint Barrier ,當 Checkpoint Barrier 流過整個作業時候,同時會存儲目前作業所有的 state 狀态。
    • 而在有限流的 Checkpoint 機制中,Task 是有可能提早結束的。上遊的 Task 有可能先處理完任務提早退出了,但下遊的 Task 卻還在執行中。在同一個 stage 不同并發下,有可能因為資料量不一緻導緻部分任務提早完成了。這種情況下,在後續的執行作業中,如何進行 Checkpoint?在 1.14 中,JobManager 動态根據目前任務的執行情況,去明确 Checkpoint Barrier 是從哪裡開始觸發。同時在部分任務結束後,後續的 Checkpoint 隻會儲存仍在運作 Task 所對應的 stage,通過這種方式能夠讓任務執行完成後,還可以繼續做 Checkpoint ,在有限流執行中提供更好的容錯保障。
Flink1.14 前言預覽
  • Task 結束後的兩階段送出

我們在部分 Sink 使用上,例如下圖的 Kafka Sink 上,涉及到 Task 需要依靠 Checkpoint 機制,進行二階段送出,進而保證資料的 Exactly-once 一緻性。

Flink1.14 前言預覽

具體可以這樣說:在 Checkpoint 過程中,每個算子隻會進行準備送出的操作。比如資料會送出到外部的臨時存儲目錄下,所有任務都完成這次 Checkpoint 後會收到一個信号,之後才會執行正式的 commit,把所有分布式的臨時檔案一次性以事務的方式送出到外部系統。

這種算法在目前有限流的情況下,作業結束後并不能保證有 Checkpoint,那麼最後一部分資料如何送出?

在 1.14 中,這個問題得到了解決。Task 處理完所有資料之後,必須等待 Checkpoint 完成後才可以正式的退出,這是流批一體方面針對有限流任務結束的一些改進。

三、Checkpoint 機制

1. 現有 Checkpoint 機制痛點

目前 Flink 觸發 Checkpoint 是依靠 barrier 在算子間進行流通,barrier 随着算子一直往下遊進行發送,當算子下遊遇到 barrier 的時候就會進行快照操作,然後再把 barrier 往下遊繼續發送。對于多路的情況我們會把 barrier 進行對齊,把先到 barrier 的這一路資料暫時性的 block,等到兩路 barrier 都到了之後再做快照,最後才會去繼續往下發送 barrier。

Flink1.14 前言預覽

現有的 Checkpoint 機制存在以下問題:

  • 反壓時無法做出 Checkpoint :在反壓時候 barrier 無法随着資料往下遊流動,造成反壓的時候無法做出 Checkpoint。但是其實在發生反壓情況的時候,我們更加需要去做出對資料的 Checkpoint,因為這個時候性能遇到了瓶頸,是更加容易出問題的階段;
  • Barrier 對齊阻塞資料處理 :阻塞對齊對于性能上存在一定的影響;
  • 恢複性能受限于 Checkpoint 間隔 :在做恢複的時候,延遲受到多大的影響很多時候是取決于 Checkpoint 的間隔,間隔越大,需要 replay 的資料就會越多,進而造成中斷的影響也就會越大。但是目前 Checkpoint 間隔受制于持久化操作的時間,是以沒辦法做的很快。

2. Unaligned Checkpoint

針對這些痛點,Flink 在最近幾個版本一直在持續的優化,Unaligned Checkpoint 就是其中一個機制。barrier 算子在到達 input buffer 最前面的時候,就會開始觸發 Checkpoint 操作。它會立刻把 barrier 傳到算子的 OutPut Buffer 的最前面,相當于它會立刻被下遊的算子所讀取到。通過這種方式可以使得 barrier 不受到資料阻塞,解決反壓時候無法進行 Checkpoint 的問題。

當我們把 barrier 發下去後,需要做一個短暫的暫停,暫停的時候會把算子的 State 和 input output buffer 中的資料進行一個标記,以友善後續随時準備上傳。對于多路情況會一直等到另外一路 barrier 到達之前資料,全部進行标注。

通過這種方式整個在做 Checkpoint 的時候,也不需要對 barrier 進行對齊,唯一需要做的停頓就是在整個過程中對所有 buffer 和 state 标注。這種方式可以很好的解決反壓時無法做出 Checkpoint ,和 Barrier 對齊阻塞資料影響性能處理的問題。

Flink1.14 前言預覽

3. Generalized Incremental Checkpoint [2]

Generalized Incremental Checkpoint 主要是用于減少 Checkpoint 間隔,如左圖 1 所示,在 Incremental Checkpoint 當中,先讓算子寫入 state 的 changelog。寫完後才把變化真正的資料寫入到 StateTable 上。state 的 changelog 不斷向外部進行持久的存儲化。在這個過程中我們其實無需等待整個 StateTable 去做一個持久化操作,我們隻需要保證對應的 Checkpoint 這一部分的 changelog 能夠持久化完成,就可以開始做下一次 Checkpoint。StateTable 是以一個周期性的方式,獨立的去對外做持續化的一個過程。

Flink1.14 前言預覽

這兩個過程進行拆分後,就有了從之前的需要做全量持久化 (Per Checkpoint) 變成 增量持久化 (Per Checkpoint) + 背景周期性全量持久化,進而達到同樣容錯的效果。在這個過程中,每一次 Checkpoint 需要做持久化的資料量減少了,進而使得做 Checkpoint 的間隔能夠大幅度減少。

其實在 RocksDB 也是能支援 Incremental Checkpoint 。但是有兩個問題:

  • 第一個問題是 RocksDB 的 Incremental Checkpoint 是依賴它自己本身的一些實作,當中會存在一些資料壓縮,壓縮所消耗的時間以及壓縮效果具有不确定性,這個是和資料是相關的;
  • 第二個問題是隻能針對特定的 StateBackend 來使用,目前在做的 Generalized Incremental Checkpoint 實際上能夠保證的是,它與 StateBackend 是無關的,從運作時的機制來保證了一個比較穩定、更小的 Checkpoint 間隔。

目前 Unaligned Checkpoint 是在 Flink 1.13 就已經釋出了,在 1.14 版本主要是針對 bug 的修複和補充,針對 Generalized Incremental Checkpoint,目前社群還在做最後的沖刺,比較有希望在 1.14 中和大家見面。[2]

四、性能與效率

1. 大規模作業排程的優化

  • 建構 Pipeline Region 的性能提升:所有由 pipline 邊所連接配接構成的子圖 。在 Flink 任務排程中需要通過識别 Pipeline Region 來保證由同一個 Pipline 邊所連接配接的任務能夠同時進行排程。否則有可能上遊的任務開始排程,但是下遊的任務并沒有運作。進而導緻上遊運作完的資料無法給下遊的節點進行消費,可能會造成死鎖的情況
  • 任務部署階段:每個任務都要從哪些上遊讀取資料,這些資訊會生成 Result Partition Deployment Descriptor。

這兩個建構過程在之前的版本都有 O (n^2) 的時間複雜度,主要問題需要對于每個下遊節點去周遊每一個上遊節點的情況。例如去周遊每一個上遊是不是一個 Pipeline 邊連接配接的關系,或者去周遊它的每一個上遊生成對應的 Result Partition 資訊。

目前通過引入 group 概念,假設已知上下遊 2 個任務的連接配接方式是 all-to-all,那相當于把所有 Pipeline Region 資訊或者 Result Partition 資訊以 Group 的形式進行組合,這樣隻需知道下遊對應的是上遊的哪一個 group,就可以把一個 O (n^2) 的複雜度優化到了 O (n)。我們用 wordcount 任務做了一下測試,對比優化前後的性能。

Flink1.14 前言預覽

從表格中可以看到建構速度具有大幅度提升,建構 Pipeline Region 的性能從秒級提升至毫秒級别。任務部署我們是從第一個任務開始部署到所有任務開始運作的狀态,這邊隻統計了流,因為批需要上遊結束後才能結束排程。從整體時間來看,整個任務初始化,排程以及部署的階段,大概能夠減少分鐘級的時間消耗。

2. 細粒度資源管理

細粒度資源管理在過去很多的版本都一直在做,在 Flink1.14 終于可以把這一部分 API 開放出來在 DataSteam 提供給使用者使用了。使用者可以在 DataStream 中自定義 SlotSharingGroup 的劃分情況,如下圖所示的方式去定義 Slot 的資源劃分,實作了支援 DataStream API,自定義 SSG 劃分方式以及資源配置 TaskManager 動态資源扣減。

Flink1.14 前言預覽

對于每一個 Slot 可以通過比較細粒度的配置,我們在 Runtime 上會自動根據使用者資源配置進行動态的資源切割。

這樣做的好處是不會像之前那樣有固定資源的 Slot,而是做資源的動态扣減,通過這樣的方式希望能夠達到更加精細的資源管理和資源的使用率。

五、Table / SQL / Python API

1. Table API / SQL

Window Table-Valued Function 支援更多算子與視窗類型 ,可以看如下表格的對比:

Flink1.14 前言預覽

從表格中可以看出對于原有的三個視窗類型進行加強,同時新增 Session 視窗類型,目前支援 Aggregate 的操作。

1.1 支援聲明式注冊 Source/Sink

  • Table API 支援使用聲明式的方式注冊 Source / Sink 功能對齊 SQL DDL;
  • 同時支援 FLIP-27 新的 Source 接口;
  • new Source 替代舊的 connect() 接口。
Flink1.14 前言預覽

1.2 全新代碼生成器

解決了大家在生成代碼超過 Java 最長代碼限制,新的代碼生成器會對代碼進行拆解,徹底解決代碼超長的問題。

1.3 移除 Flink Planner

新版本中,Blink Planner 将成為 Flink Planner 的唯一實作。

2. Python API

在之前的版本中,如果有先後執行的兩個 UDF,它的執行過程如下圖左方。在 JVM 上面有 Java 的 Operator,先把資料發給 Python 下面的 UDF 去執行,執行後又發回給 Java,然後傳送給下遊的 Operator,最後再進行一次 Python 的這種跨程序的傳輸去處理,會導緻存在很多次備援的資料傳輸。

Flink1.14 前言預覽

在 1.14 版本中,改進如右圖,可以把它們連接配接在一起,隻需要一個來回的 Java 和 Python 進行資料通信,通過減少傳輸資料次數就能夠達到比較好的性能上的提升。

3. 支援 LoopBack 模式

在以往本地執行實際是在 Python 的程序中去運作用戶端程式,送出 Java 程序啟動一個迷你叢集去執行 Java 部分代碼。Java 部分代碼也會和生産環境部分的一樣,去啟動一個新的 Python 程序去執行對應的 Python UDF,從圖下可以看出新的程序其實在本地調試中是沒有必要存在的。

Flink1.14 前言預覽

是以支援 lookback 模式後可以讓 Java 的 opt 直接把 UDF 運作在之前 Python client 所運作的相同的程序内,通過這種方式:

  • 首先是避免了啟動額外程序所帶來的開銷;
  • 最重要的是在本地調試中,我們可以在同一個程序内能夠更好利用一些工具進行 debug,這個是對開發者體驗上的一個提升。

六、總結

本文主要講解了 Flink1.14 的主要新特性介紹。

  • 首先介紹了目前社群在批流一體上的工作,通過介紹批流不同的執行模式和 JM 節點任務觸發的優化改進更好的去相容批作業;
  • 然後通過分析現有的 Checkpoint 機制痛點,在新版本中如何改進,以及在大規模作業排程優化和細粒度的資源管理上面如何做到對性能優化;
  • 最後介紹了 TableSQL API 和 Pyhton上相關的性能優化。

歡迎繼續關注發版的一些最新動态以及我們在後續的 Release 過程中的一些其他技術分享和專題。

同時歡迎各位關注Flink中文社群和Java大咖說(java_talkfun),為你提供更多的Flink資訊

Flink1.14 前言預覽

注釋

[1] 截至到 8 月 31 日,确定進入新版本的是 33 個,已全部完成。

[2] Generalized Incremental Checkpoint 最終在 1.14 中沒有完成。