天天看點

Apache Flink結合Kafka建構端到端的Exactly-Once處理

本文翻譯自:https://data-artisans.com/blog/end-to-end-exactly-once-processing-apache-flink-apache-kafka

Apache Flink自2017年12月釋出的1.4.0版本開始,為流計算引入了一個重要的裡程碑特性:TwoPhaseCommitSinkFunction(相關的Jira)。它提取了兩階段送出協定的通用邏輯,使得通過Flink來建構端到端的Exactly-Once程式成為可能。同時支援一些資料源(source)和輸出端(sink),包括Apache Kafka 0.11及更高版本。它提供了一個抽象層,使用者隻需要實作少數方法就能實作端到端的Exactly-Once語義。

有關TwoPhaseCommitSinkFunction的使用詳見文檔: TwoPhaseCommitSinkFunction。或者可以直接閱讀Kafka 0.11 sink的文檔: kafka。

接下來會詳細分析這個新功能以及Flink的實作邏輯,分為如下幾點。

  • 描述Flink checkpoint機制是如何保證Flink程式結果的Exactly-Once的
  • 顯示Flink如何通過兩階段送出協定與資料源和資料輸出端互動,以提供端到端的Exactly-Once保證
  • 通過一個簡單的示例,了解如何使用TwoPhaseCommitSinkFunction實作Exactly-Once的檔案輸出

Apache Flink應用程式中的Exactly-Once語義

當我們說『Exactly-Once』時,指的是每個輸入的事件隻影響最終結果一次。即使機器或軟體出現故障,既沒有重複資料,也不會丢資料。

Flink很久之前就提供了Exactly-Once語義。在過去幾年中,我們對Flink的checkpoint機制有過深入的描述,這是Flink有能力提供Exactly-Once語義的核心。Flink文檔還提供了該功能的全面概述。

在繼續之前,先看下對checkpoint機制的簡要介紹,這對了解後面的主題至關重要。

一次checkpoint是以下内容的一緻性快照:

  • 應用程式的目前狀态
  • 輸入流的位置

Flink可以配置一個固定的時間點,定期産生checkpoint,将checkpoint的資料寫入持久存儲系統,例如S3或HDFS。将checkpoint資料寫入持久存儲是異步發生的,這意味着Flink應用程式在checkpoint過程中可以繼續處理資料。

如果發生機器或軟體故障,重新啟動後,Flink應用程式将從最新的checkpoint點恢複處理; Flink會恢複應用程式狀态,将輸入流復原到上次checkpoint儲存的位置,然後重新開始運作。這意味着Flink可以像從未發生過故障一樣計算結果。

在Flink 1.4.0之前,Exactly-Once語義僅限于Flink應用程式内部,并沒有擴充到Flink資料處理完後發送的大多數外部系統。Flink應用程式與各種資料輸出端進行互動,開發人員需要有能力自己維護元件的上下文來保證Exactly-Once語義。

為了提供端到端的Exactly-Once語義 - 也就是說,除了Flink應用程式内部,Flink寫入的外部系統也需要能滿足Exactly-Once語義 - 這些外部系統必須提供送出或復原的方法,然後通過Flink的checkpoint機制來協調。

分布式系統中,協調送出和復原的常用方法是兩階段送出協定。在下一節中,我們将讨論Flink的TwoPhaseCommitSinkFunction是如何利用兩階段送出協定來提供端到端的Exactly-Once語義。

Flink應用程式端到端的Exactly-Once語義

我們将介紹兩階段送出協定,以及它如何在一個讀寫Kafka的Flink程式中實作端到端的Exactly-Once語義。Kafka是一個流行的消息中間件,經常與Flink一起使用。Kafka在最近的0.11版本中添加了對事務的支援。這意味着現在通過Flink讀寫Kafaka,并提供端到端的Exactly-Once語義有了必要的支援。

Flink對端到端的Exactly-Once語義的支援不僅局限于Kafka,您可以将它與任何一個提供了必要的協調機制的源/輸出端一起使用。例如Pravega,來自DELL/EMC的開源流媒體存儲系統,通過Flink的TwoPhaseCommitSinkFunction也能支援端到端的Exactly-Once語義。

Apache Flink結合Kafka建構端到端的Exactly-Once處理

在今天讨論的這個示例程式中,我們有:

  • 從Kafka讀取的資料源(Flink内置的KafkaConsumer)
  • 視窗聚合
  • 将資料寫回Kafka的資料輸出端(Flink内置的KafkaProducer)

要使資料輸出端提供Exactly-Once保證,它必須将所有資料通過一個事務送出給Kafka。送出捆綁了兩個checkpoint之間的所有要寫入的資料。這可確定在發生故障時能復原寫入的資料。但是在分布式系統中,通常會有多個并發運作的寫入任務的,簡單的送出或復原是不夠的,因為所有元件必須在送出或復原時“一緻”才能確定一緻的結果。Flink使用兩階段送出協定及預送出階段來解決這個問題。

在checkpoint開始的時候,即兩階段送出協定的“預送出”階段。當checkpoint開始時,Flink的JobManager會将checkpoint barrier(将資料流中的記錄分為進入目前checkpoint與進入下一個checkpoint)注入資料流。

brarrier在operator之間傳遞。對于每一個operator,它觸發operator的狀态快照寫入到state backend。

Apache Flink結合Kafka建構端到端的Exactly-Once處理

資料源儲存了消費Kafka的偏移量(offset),之後将checkpoint barrier傳遞給下一個operator。

這種方式僅适用于operator具有『内部』狀态。所謂内部狀态,是指Flink state backend儲存和管理的 -例如,第二個operator中window聚合算出來的sum值。當一個程序有它的内部狀态的時候,除了在checkpoint之前需要将資料變更寫入到state backend,不需要在預送出階段執行任何其他操作。Flink負責在checkpoint成功的情況下正确送出這些寫入,或者在出現故障時中止這些寫入。

Apache Flink結合Kafka建構端到端的Exactly-Once處理

示例Flink應用程式啟動預送出階段

但是,當程序具有『外部』狀态時,需要作些額外的處理。外部狀态通常以寫入外部系統(如Kafka)的形式出現。在這種情況下,為了提供Exactly-Once保證,外部系統必須支援事務,這樣才能和兩階段送出協定內建。

在本文示例中的資料需要寫入Kafka,是以資料輸出端(Data Sink)有外部狀态。在這種情況下,在預送出階段,除了将其狀态寫入state backend之外,資料輸出端還必須預先送出其外部事務。

Apache Flink結合Kafka建構端到端的Exactly-Once處理

當checkpoint barrier在所有operator都傳遞了一遍,并且觸發的checkpoint回調成功完成時,預送出階段就結束了。所有觸發的狀态快照都被視為該checkpoint的一部分。checkpoint是整個應用程式狀态的快照,包括預先送出的外部狀态。如果發生故障,我們可以復原到上次成功完成快照的時間點。

下一步是通知所有operator,checkpoint已經成功了。這是兩階段送出協定的送出階段,JobManager為應用程式中的每個operator發出checkpoint已完成的回調。

資料源和widnow operator沒有外部狀态,是以在送出階段,這些operator不必執行任何操作。但是,資料輸出端(Data Sink)擁有外部狀态,此時應該送出外部事務。

Apache Flink結合Kafka建構端到端的Exactly-Once處理

我們對上述知識點總結下:

  • 一旦所有operator完成預送出,就送出一個commit。
  • 如果至少有一個預送出失敗,則所有其他送出都将中止,我們将復原到上一個成功完成的checkpoint。
  • 在預送出成功之後,送出的commit需要保證最終成功 - operator和外部系統都需要保障這點。如果commit失敗(例如,由于間歇性網絡問題),整個Flink應用程式将失敗,應用程式将根據使用者的重新開機政策重新啟動,還會嘗試再送出。這個過程至關重要,因為如果commit最終沒有成功,将會導緻資料丢失。

是以,我們可以确定所有operator都同意checkpoint的最終結果:所有operator都同意資料已送出,或送出被中止并復原。

在Flink中實作兩階段送出Operator

完整的實作兩階段送出協定可能有點複雜,這就是為什麼Flink将它的通用邏輯提取到抽象類TwoPhaseCommitSinkFunction中的原因。

接下來基于輸出到檔案的簡單示例,說明如何使用TwoPhaseCommitSinkFunction。使用者隻需要實作四個函數,就能為資料輸出端實作Exactly-Once語義:

  • beginTransaction - 在事務開始前,我們在目标檔案系統的臨時目錄中建立一個臨時檔案。随後,我們可以在處理資料時将資料寫入此檔案。
  • preCommit - 在預送出階段,我們重新整理檔案到存儲,關閉檔案,不再重新寫入。我們還将為屬于下一個checkpoint的任何後續檔案寫入啟動一個新的事務。
  • commit - 在送出階段,我們将預送出階段的檔案原子地移動到真正的目标目錄。需要注意的是,這會增加輸出資料可見性的延遲。
  • abort - 在中止階段,我們删除臨時檔案。

我們知道,如果發生任何故障,Flink會将應用程式的狀态恢複到最新的一次checkpoint點。一種極端的情況是,預送出成功了,但在這次commit的通知到達operator之前發生了故障。在這種情況下,Flink會将operator的狀态恢複到已經預送出,但尚未真正送出的狀态。

我們需要在預送出階段儲存足夠多的資訊到checkpoint狀态中,以便在重新開機後能正确的中止或送出事務。在這個例子中,這些資訊是臨時檔案和目标目錄的路徑。

TwoPhaseCommitSinkFunction已經把這種情況考慮在内了,并且在從checkpoint點恢複狀态時,會優先發出一個commit。我們需要以幂等方式實作送出,一般來說,這并不難。在這個示例中,我們可以識别出這樣的情況:臨時檔案不在臨時目錄中,但已經移動到目标目錄了。

在TwoPhaseCommitSinkFunction中,還有一些其他邊界情況也會考慮在内,請參考Flink文檔了解更多資訊。

總結

總結下本文涉及的一些要點:

  • Flink的checkpoint機制是支援兩階段送出協定并提供端到端的Exactly-Once語義的基礎。
  • 這個方案的優點是: Flink不像其他一些系統那樣,通過網絡傳輸存儲資料 - 不需要像大多數批處理程式那樣将計算的每個階段寫入磁盤。
  • Flink的TwoPhaseCommitSinkFunction提取了兩階段送出協定的通用邏輯,基于此将Flink和支援事務的外部系統結合,建構端到端的Exactly-Once成為可能。
  • 從Flink 1.4.0開始,Pravega和Kafka 0.11 producer都提供了Exactly-Once語義;Kafka在0.11版本首次引入了事務,為在Flink程式中使用Kafka producer提供Exactly-Once語義提供了可能性。
  • Kafaka 0.11 producer的事務是在TwoPhaseCommitSinkFunction基礎上實作的,和at-least-once producer相比隻增加了非常低的開銷。

這是個令人興奮的功能,期待Flink TwoPhaseCommitSinkFunction在未來支援更多的資料接收端。