天天看點

官宣|Apache Flink 1.13.0 正式釋出,流處理應用更加簡單高效!

​翻譯 | 高赟

Review | 朱翥、馬國維

GitHub 位址

https://github.com/apache/flink

歡迎大家給 Flink 點贊送 star~

Flink 1.13 釋出了!Flink 1.13 包括了超過 200 名貢獻者所送出的 1000 多項修複和優化。

這一版本中,Flink 的一個主要目标取得了重要進展,即讓流處理應用的使用和普通應用一樣簡單和自然。Flink 1.13 新引入的被動擴縮容使得流作業的擴縮容和其它應用一樣簡單,使用者僅需要修改并發度即可。

這個版本還包括一系列重要改動使使用者可以更好的了解流作業的性能。當流作業的性能不及預期的時候,這些改動可以使使用者可以更好的分析原因。這些改動包括用于識别瓶頸節點的負載和反壓可視化、分析算子熱點代碼的 CPU 火焰圖和分析 State Backend 狀态的 State 通路性能名額。

除了這些特性外,Flink 社群還添加了大量的其它優化,我們會在本文後續讨論其中的一些。我們希望使用者可以享受新的版本和特性帶來的便利,在本文最後,我們還會介紹更新Flink版本需要注意的一些變化。

我們鼓勵使用者

下載下傳試用新版 Flink 并且通過 郵件清單 JIRA 來回報遇到的問題。

重要特性

被動擴縮容

Flink 項目的一個初始目标,就是希望流處理應用可以像普通應用一樣簡單和自然,被動擴縮容是 Flink 針對這一目标上的最新進展。

當考慮資源管理和部分的時候,Flink 有兩種可能的模式。使用者可以将 Flink 應用部署到 k8s、yarn 等資源管理系統之上,并且由 Flink 主動的來管理資源并按需配置設定和釋放資源。這一模式對于經常改變資源需求的作業和應用非常有用,比如批作業和實時 SQL 查詢。在這種模式下,Flink 所啟動的 Worker 數量是由應用設定的并發度決定的。在 Flink 中我們将這一模式叫做主動擴縮容。

對于長時間運作的流處理應用,一種更适合的模型是使用者隻需要将作業像其它的長期運作的服務一樣啟動起來,而不需要考慮是部署在 k8s、yarn 還是其它的資源管理平台上,并且不需要考慮需要申請的資源的數量。相反,它的規模是由所配置設定的 worker 數量來決定的。當 worker 數量發生變化時,Flink 自動的改動應用的并發度。在 Flink 中我們将這一模式叫做被動擴縮容。

Flink 的

Application 部署模式

開啟了使 Flink 作業更接近普通應用(即啟動 Flink 作業不需要執行兩個獨立的步驟來啟動叢集和送出應用)的努力,而被動擴縮容完成了這一目标:使用者不再需要使用額外的工具(如腳本、K8s 算子)來讓 worker 的數量與應用并發度設定保持一緻。

使用者現在可以将自動擴縮容的工具應用到 Flink 應用之上,就像普通的應用程式一樣,隻要使用者了解擴縮容的代價:有狀态的流應用在擴縮容的時候需要将狀态重新分發。

如果想要嘗試被動擴縮容,使用者可以增加 scheduler-mode: reactive 這一配置項,然後啟動一個應用叢集(

Standalone

或者

K8s

)。更多細節見

被動擴縮容的文檔

分析應用的性能

對所有應用程式來說,能夠簡單的分析和了解應用的性能是非常關鍵的功能。這一功能對 Flink 更加重要,因為 Flink 應用一般是資料密集的(即需要處理大量的資料)并且需要在(近)實時的延遲内給出結果。

當 Flink 應用處理的速度跟不上資料輸入的速度時,或者當一個應用占用的資源超過預期,下文介紹的這些工具可以幫你分析原因。

瓶頸檢測與反壓監控

Flink 性能分析首先要解決的問題經常是:哪個算子是瓶頸?

為了回答這一問題,Flink 引入了描述作業繁忙(即在處理資料)與反壓(由于下遊算子不能及時處理結果而無法繼續輸出)程度的名額。應用中可能的瓶頸是那些繁忙并且上遊被反壓的算子。

Flink 1.13 優化了反壓檢測的邏輯(使用基于任務 Mailbox 計時,而不在再于堆棧采樣),并且重新實作了作業圖的 UI 展示:Flink 現在在 UI 上通過顔色和數值來展示繁忙和反壓的程度。

官宣|Apache Flink 1.13.0 正式釋出,流處理應用更加簡單高效!

Web UI 中的 CPU 火焰圖

Flink 關于性能另一個經常需要回答的問題:瓶頸算子中的哪部分計算邏輯消耗巨大?

針對這一問題,一個有效的可視化工具是火焰圖。它可以幫助回答以下問題:

  • 哪個方法調現在在占用 CPU?
  • 不同方法占用 CPU 的比例如何?
  • 一個方法被調用的棧是什麼樣子的?

火焰圖是通過重複采樣線程的堆棧來建構的。在火焰圖中,每個方法調用被表示為一個矩形,矩形的長度與這個方法出現在采樣中的次數成正比。火焰圖在 UI 上的一個例子如下圖所示。

官宣|Apache Flink 1.13.0 正式釋出,流處理應用更加簡單高效!
火焰圖的文檔

包括啟用這一功能的更多細節和指令。

State 通路延遲名額

另一個可能的性能瓶頸是 state backend,尤其是當作業的 state 超過記憶體容量而必須使用

RocksDB state backend

時。

這裡并不是想說 RocksDB 性能不夠好(我們非常喜歡 RocksDB!),但是它需要滿足一些條件才能達到最好的性能。例如,使用者可能很容易遇到非故意的

在雲上由于使用了錯誤的磁盤資源類型而不能滿足 RockDB 的 IO 性能需求

的問題。

基于 CPU 火焰圖,新的 State Backend 的延遲名額可以幫助使用者更好的判斷性能不符合預期是否是由 State Backend 導緻的。例如,如果使用者發現 RocksDB 的單次通路需要幾毫秒的時間,那麼就需要檢視記憶體和 I/O 的配置。這些名額可以通過設定 state.backend.rocksdb.latency-track-enabled 這一選項來啟用。這些名額是通過采樣的方式來監控性能的,是以它們對 RocksDB State Backend 的性能影響是微不足道的。

通過 Savepoint 來切換 State Backend

使用者現在可以在從一個 Savepoint 重新開機時切換一個 Flink 應用的 State Backend。這使得 Flink 應用不再被限制隻能使用應用首次運作時選擇的 State Backend。

基于這一功能,使用者現在可以首先使用一個 HashMap State Backend(純記憶體的 State Backend),如果後續狀态變得過大的話,就切換到 RocksDB State Backend 中。

在實作層,Flink 現在統一了所有 State Backend 的 Savepoint 格式來實作這一功能。

K8s 部署時使用使用者指定的 Pod 模式

原生 kubernetes 部署

(Flink 主動要求 K8s 來啟動 Pod)中,現在可以使用自定義的 Pod 模闆。

使用這些模闆,使用者可以使用一種更符合 K8s 的方式來設定 JM 和 TM 的 Pod,這種方式比 Flink K8s 內建内置的配置項更加靈活。

生産可用的 Unaligned Checkpoint

Unaligned Checkpoint 目前已達到了生産可用的狀态,我們鼓勵使用者在存在反壓的情況下試用這一功能。

具體來說,Flink 1.13 中引入的這些功能使 Unaligned Checkpoint 更容易使用:

  • 使用者現在使用 Unaligned Checkpoint 時也可以擴縮容應用。如果使用者需要因為性能原因不能使用 Savepoint而必須使用 Retained checkpoint 時,這一功能會非常友善。
  • 對于沒有反壓的應用,啟用 Unaligned Checkpoint 現在代價更小。Unaligned Checkpoint 現在可以通過逾時來自動觸發,即一個應用預設會使用 Aligned Checkpoint(不存儲傳輸中的資料),而隻在對齊超過一定時間範圍時自動切換到 Unaligned Checkpoint(存儲傳輸中的資料)。

關于如何啟用 Unaligned Checkpoint 可以參考

相關文檔

機器學習遷移到單獨的倉庫

為了加速 Flink 機器學習的進展(流批統一的機器學習),現在 Flink 機器學習開啟了新的

flink-ml

倉庫。我們采用類似于 Stateful Function 項目的管理方式,通過使用一個單獨的倉庫進而簡化代碼合并的流程并且可以進行單獨的版本釋出,進而提高開發的效率。

使用者可以關注 Flink 在機器學習方面的進展,比如與

Alink

(Flink 常用機器學習算法套件)的互操作以及

Flink 與 Tensorflow 的內建

SQL / Table API 進展

與之前的版本類似,SQL 和 Table API 仍然在所有開發中占用很大的比例。

通過 Table-valued 函數來定義時間視窗

在流式 SQL 查詢中,一個最經常使用的是定義時間視窗。Flink 1.13 中引入了一種新的定義視窗的方式:通過 Table-valued 函數。這一方式不僅有更強的表達能力(允許使用者定義新的視窗類型),并且與 SQL 标準更加一緻。

Flink 1.13 在新的文法中支援 TUMBLE 和 HOP 視窗,在後續版本中也會支援 SESSION 視窗。我們通過以下兩個例子來展示這一方法的表達能力:

  • 例 1:一個新引入的 CUMULATE 視窗函數,它可以支援按特定步長擴充的視窗,直到達到最大視窗大小:
SELECT window_time, window_start, window_end, SUM(price) AS total_price 
  FROM TABLE(CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, window_time;           
  • 例 2:使用者在 table-valued 視窗函數中可以通路視窗的起始和終止時間,進而使使用者可以實作新的功能。例如,除了正常的基于視窗的聚合和 Join 之外,使用者現在也可以實作基于視窗的 Top-K 聚合:
SELECT window_time, ...
  FROM (
    SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY total_price DESC) 
      as rank 
    FROM t
  ) WHERE rank <= 100;            

提高 DataStream API 與 Table API / SQL 的互操作能力

這一版本極大的簡化了 DataStream API 與 Table API 混合的程式。

Table API 是一種非常友善的應用開發接口,因為這經支援表達式的程式編寫并提供了大量的内置函數。但是有時候使用者也需要切換回 DataStream,例如當使用者存在表達能力、靈活性或者 State 通路的需求時。

Flink 新引入的 StreamTableEnvironment.toDataStream()/.fromDataStream() 可以将一個 DataStream API 聲明的 Source 或者 Sink 當作 Table 的 Source 或者 Sink 來使用。主要的優化包括:

  • DataStream 與 Table API 類型系統的自動轉換。
  • Event Time 配置的無縫內建,Watermark 行為的高度一緻性。
  • Row 類型(即 Table API 中資料的表示)有了極大的增強,包括 toString() / hashCode() 和 equals() 方法的優化,按名稱通路字段值的支援與稀疏表示的支援。
Table table = tableEnv.fromDataStream(
  dataStream,
  Schema.newBuilder()
    .columnByMetadata("rowtime", "TIMESTAMP(3)")
    .watermark("rowtime", "SOURCE_WATERMARK()")
    .build());

DataStream<Row> dataStream = tableEnv.toDataStream(table)
  .keyBy(r -> r.getField("user"))
  .window(...);           

SQL Client: 初始化腳本和語句集合 (Statement Sets)

SQL Client 是一種直接運作和部署 SQL 流或批作業的簡便方式,使用者不需要編寫代碼就可以從指令行調用 SQL,或者作為 CI / CD 流程的一部分。

這個版本極大的提高了 SQL Client 的功能。現在基于所有通過 Java 程式設計(即通過程式設計的方式調用 TableEnvironment 來發起查詢)可以支援的文法,現在 SQL Client 和 SQL 腳本都可以支援。這意味着 SQL 使用者不再需要添加膠水代碼來部署他們的SQL作業。

配置簡化和代碼共享

Flink 後續将不再支援通過 Yaml 的方式來配置 SQL Client(注:目前還在支援,但是已經被标記為廢棄)。作為替代,SQL Client 現在支援使用一個初始化腳本在主 SQL 腳本執行前來配置環境。

這些初始化腳本通常可以在不同團隊/部署之間共享。它可以用來加載常用的 catalog,應用通用的配置或者定義标準的視圖。

./sql-client.sh -i init1.sql init2.sql -f sqljob.sql           

更多的配置項

通過增加配置項,優化 SET / RESET 指令,使用者可以更友善的在 SQL Client 和 SQL 腳本内部來控制執行的流程。

通過語句集合來支援多查詢

多查詢允許使用者在一個 Flink 作業中執行多個 SQL 查詢(或者語句)。這對于長期運作的流式 SQL 查詢非常有用。

語句集可以用來将一組查詢合并為一組同時執行。

以下是一個可以通過 SQL Client 來執行的 SQL 腳本的例子。它初始化和配置了執行多查詢的環境。這一腳本包括了所有的查詢和所有的環境初始化和配置的工作,進而使它可以作為一個自包含的部署元件。

-- set up a catalog
CREATE CATALOG hive_catalog WITH ('type' = 'hive');
USE CATALOG hive_catalog;

-- or use temporary objects
CREATE TEMPORARY TABLE clicks (
  user_id BIGINT,
  page_id BIGINT,
  viewtime TIMESTAMP
) WITH (
  'connector' = 'kafka',
  'topic' = 'clicks',
  'properties.bootstrap.servers' = '...',
  'format' = 'avro'
);

-- set the execution mode for jobs
SET execution.runtime-mode=streaming;

-- set the sync/async mode for INSERT INTOs
SET table.dml-sync=false;

-- set the job's parallelism
SET parallism.default=10;

-- set the job name
SET pipeline.name = my_flink_job;

-- restore state from the specific savepoint path
SET execution.savepoint.path=/tmp/flink-savepoints/savepoint-bb0dab;

BEGIN STATEMENT SET;

INSERT INTO pageview_pv_sink
SELECT page_id, count(1) FROM clicks GROUP BY page_id;

INSERT INTO pageview_uv_sink
SELECT page_id, count(distinct user_id) FROM clicks GROUP BY page_id;

END;           

Hive 查詢文法相容性

使用者現在在 Flink 上也可以使用 Hive SQL 文法。除了 Hive DDL 方言之外,Flink現在也支援常用的 Hive DML 和 DQL 方言。

為了使用 Hive SQL 方言,需要設定 table.sql-dialect 為 hive 并且加載 HiveModule。後者非常重要,因為必須要加載 Hive 的内置函數後才能正确實作對 Hive 文法和語義的相容性。例子如下:

CREATE CATALOG myhive WITH ('type' = 'hive'); -- setup HiveCatalog
USE CATALOG myhive;
LOAD MODULE hive; -- setup HiveModule
USE MODULES hive,core;
SET table.sql-dialect = hive; -- enable Hive dialect
SELECT key, value FROM src CLUSTER BY key; -- run some Hive queries            

需要注意的是, Hive 方言中不再支援 Flink 文法的 DML 和 DQL 語句。如果要使用 Flink 文法,需要切換回 default 的方言配置。

優化的 SQL 時間函數

在資料進行中時間處理是一個重要的任務。但是與此同時,處理不同的時區、日期和時間是一個

日益複雜

的任務。

在 Flink 1.13 中,我們投入了大量的精力來簡化時間函數的使用。我們調整了時間相關函數的傳回類型使其更加精确,例如 PROCTIME(),CURRENT_TIMESTAMP() 和 NOW()。

其次,使用者現在還可以基于一個 TIMESTAMP_LTZ 類型的列來定義 Event Time 屬性,進而可以優雅的在視窗進行中支援夏令時。

使用者可以參考 Release Note 來檢視該部分的完整變更。

PyFlink 核心優化

這個版本對 PyFlink 的改進主要是使基于 Python 的 DataStream API 與 Table API 與 Java/scala 版本的對應功能更加一緻。

Python DataStream API 中的有狀态算子

在 Flink 1.13 中,Python 程式員可以享受到 Flink 狀态處理 API 的所有能力。在 Flink 1.12 版本重構過的 Python DataStream API 現在已經擁有完整的狀态通路能力,進而使使用者可以将資料的資訊記錄到 state 中并且在後續通路。

帶狀态的處理能力是許多依賴跨記錄狀态共享(例如 Window Operator)的複雜資料處理場景的基礎。

以下例子展示了一個自定義的計算視窗的實作:

class CountWindowAverage(FlatMapFunction):
    def __init__(self, window_size):
        self.window_size = window_size

    def open(self, runtime_context: RuntimeContext):
        descriptor = ValueStateDescriptor("average", Types.TUPLE([Types.LONG(), Types.LONG()]))
        self.sum = runtime_context.get_state(descriptor)

    def flat_map(self, value):
        current_sum = self.sum.value()
        if current_sum is None:
            current_sum = (0, 0)
        # update the count
        current_sum = (current_sum[0] + 1, current_sum[1] + value[1])
        # if the count reaches window_size, emit the average and clear the state
        if current_sum[0] >= self.window_size:
            self.sum.clear()
            yield value[0], current_sum[1] // current_sum[0]
        else:
            self.sum.update(current_sum)

ds = ...  # type: DataStream
ds.key_by(lambda row: row[0]) \
  .flat_map(CountWindowAverage(5))           

PyFlink DataStream API 中的使用者自定義視窗

Flink 1.13 中 PyFlink DataStream 接口增加了對使用者自定義視窗的支援,現在使用者可以使用标準視窗之外的視窗定義。

由于視窗是處理無限資料流的核心機制 (通過将流切分為多個有限的『桶』),這一功能極大的提高的 API 的表達能力。

PyFlink Table API 中基于行的操作

Python Table API 現在支援基于行的操作,例如使用者對行資料的自定義函數。這一功能使得使用者可以使用非内置的資料處理函數。

一個使用 map() 操作的 Python Table API 示例如下:

@udf(result_type=DataTypes.ROW(
  [DataTypes.FIELD("c1", DataTypes.BIGINT()),
   DataTypes.FIELD("c2", DataTypes.STRING())]))
def increment_column(r: Row) -> Row:
  return Row(r[0] + 1, r[1])

table = ...  # type: Table
mapped_result = table.map(increment_column)           

除了 map(),這一 API 還支援 flat_map(),aggregate(),flat_aggregate() 和其它基于行的操作。這使 Python Table API 的功能與 Java Table API 的功能更加接近。

PyFlink DataStream API 支援 Batch 執行模式

對于有限流,PyFlink DataStream API 現在已經支援 Flink 1.12 DataStream API 中引入的 Batch 執行模式。

通過複用資料有限性來跳過 State backend 和 Checkpoint 的處理,Batch 執行模式可以簡化運維,并且提高有限流處理的性能。

其它優化

基于 Hugo 的 Flink 文檔

Flink 文檔從 JekyII 遷移到了 Hugo。如果您發現有問題,請務必通知我們,我們非常期待使用者對新的界面的感受。

Web UI 支援曆史異常

Flink Web UI 現在可以展示導緻作業失敗的 n 次曆史異常,進而提升在一個異常導緻多個後續異常的場景下的調試體驗。使用者可以在異常曆史中找到根異常。

優化失敗 Checkpoint 的異常和失敗原因的彙報

Flink 現在提供了失敗或被取消的 Checkpoint 的統計,進而使使用者可以更簡單的判斷 Checkpoint 失敗的原因,而不需要去檢視日志。

Flink 之前的版本隻有在 Checkpoint 成功的時候才會彙報名額(例如持久化資料的大小、觸發時間等)。

提供『恰好一次』一緻性的 JDBC Sink

從 1.13 開始,通過使用事務送出資料,JDBC Sink 可以對支援 XA 事務的資料庫提供『恰好一次』的一緻性支援。這一特性要求目标資料庫必須有(或連結到)一個 XA 事務處理器。

這一 Sink 現在隻能在 DataStream API 中使用。使用者可以通過 JdbcSink.exactlyOnceSink(…) 來建立這一 Sink(或者通過顯式初始化一個 JdbcXaSinkFunction)。

PyFlink Table API 在 Group 視窗上支援使用者自定義的聚合函數

PyFlink Table API 現在對 Group 視窗同時支援基于 Python 的使用者自定義聚合函數(User-defined Aggregate Functions, UDAFs)以及 Pandas UDAFs。這些函數對許多資料分析或機器學習訓練的程式非常重要。

在 Flink 1.13 之前,這些函數僅能在無限的 Group-by 聚合場景下使用。Flink 1.13 優化了這一限制。

Batch 執行模式下 Sort-merge Shuffle 優化

Flink 1.13 優化了針對批處理程式的 Sort-merge Blocking Shuffle 的性能和記憶體占用情況。這一 Shuffle 模式是在Flink 1.12 的

FLIP-148

中引入的。

這一優化避免了大規模作業下不斷出現 OutOfMemoryError: Direct Memory 的問題,并且通過 I/O 排程和 broadcast 優化提高了性能(尤其是在機械硬碟上)。

HBase 連接配接器支援異步維表查詢和查詢緩存

HBase Lookup Table Source 現在可以支援異步查詢模式和查詢緩存。這極大的提高了使用這一 Source 的 Table / SQL 維表 Join 的性能,并且在一些典型情況下可以減少對 HBase 的 I/O 請求數量。

在之前的版本中,HBase Lookup Source 僅支援同步通信,進而導緻作業吞吐以及資源使用率降低。

更新 Flink 1.13 需要注意的改動

  • FLINK-21709 – 老的 Table & SQL API 計劃器已經被标記為廢棄,并且将在 Flink 1.14 中被删除。Blink 計劃器在若幹版本之前已經被設定為預設計劃器,并且将成為未來版本中的唯一計劃器。這意味着 BatchTableEnvironment 和 DataSet API 互操作後續也将不再支援。使用者需要切換到統一的 TableEnvironment 來編寫流或者批的作業。
  • FLINK-22352 – Flink 社群決定廢棄對 Apache mesos 的支援,未來有可能會進一步删除這部分功能。使用者最好能夠切換到其它的資源管理系統上。
  • FLINK-21935 – state.backend.async 這一配置已經被禁用了,因為現在 Flink 總是會異步的來儲存快照(即之前的配置預設值),并且現在沒有實作可以支援同步的快照儲存操作。
  • FLINK-17012 – Task 的 RUNNING 狀态被細分為兩步:INITIALIZING 和 RUNNING。Task 的 INITIALIZING 階段包括加載 state 和在啟用 unaligned checkpoint 時恢複 In-flight 資料的過程。通過顯式區分這兩種狀态,監控系統可以更好的區分任務是否已經在實際工作。
  • FLINK-21698 – NUMERIC 和 TIMESTAMP 類型之間的直接轉換存在問題,現在已經被禁用,例如 CAST(numeric AS TIMESTAMP(3))。使用者應該使用 TO_TIMESTAMP(FROM_UNIXTIME(numeric)) 來代替。
  • FLINK-22133 – 新的 Source 接口有一個小的不相容的修改,即 SplitEnumerator.snapshotState() 方法現在多接受一個 checkpoint id 參數來表示正在進行的 snapshot 操作所屬的 checkpoint 的 id。
  • FLINK-19463 – 由于老的 Statebackend 接口承載了過多的語義并且容易引起困惑,這一接口被标記為廢棄。這是一個純 API 層的改動,而并不會影響應用運作時。對于如何更新現有作業,請參考 作業遷移指引

其它資源

二進制和代碼可以從 Flink 官網的

下載下傳頁面

獲得,最新的 PyFlink 釋出可以從

PyPI

獲得。

如果想要更新到 Flink 1.13,請參考

釋出說明

。這一版本與之前 1.x 的版本在标記為@Public 的接口上是相容的。

使用者也可以檢視

新版本修改清單

更新後的文檔

來獲得修改和新功能的詳細清單。

原文連結: https://flink.apache.org/news/2021/05/03/release-1.13.0.html

更多 Flink 相關技術問題,可掃碼加入社群釘釘交流群~

官宣|Apache Flink 1.13.0 正式釋出,流處理應用更加簡單高效!

活動推薦

阿裡雲基于 Apache Flink 建構的企業級産品-實時計算Flink版現開啟活動:

99元試用

實時計算Flink版

(包年包月、10CU)即有機會獲得 Flink 獨家定制T恤;另包3個月及以上還有85折優惠!

了解活動詳情:

https://www.aliyun.com/product/bigdata/sc
官宣|Apache Flink 1.13.0 正式釋出,流處理應用更加簡單高效!