天天看點

Apache kafka是如何實作消息的精确一次(Exactly-once-semantics)語義的?

       原文連結:https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/

       精确一次消息語義(Exactly-once semantics)是可以實作的:讓我們看看Kafka是怎麼實作的。

       我很興奮,我們到達了Kafka社群一直以來期待的令人激動的裡程碑:我們在Apache Kafka 0.11 release版本和Confluent Platform 3.3中引入了精确一次消息語義。在這篇文章中,我會告訴你Apache Kafka中的精确一次語義是什麼意思,為什麼這是一個難以實作的問題,還有Kafka中的幂等(idempotence)和事務(transactions)新特性是如何保證使用Kafka Stream API來正确地進行精确一次流處理的(exactly-once stream processing)。

精确一次确實很難實作(Exactly-once is a really hard problem)

       我知道你們中的一些人在想什麼。你們可能會認為精确一次投遞(exactly-once delivery)是不可能的, 它的代價太高而無法在實際中使用,或者認為我完全錯了! 不僅僅隻有你這麼想。 我的一些業内同僚承認,精确一次性傳遞是分布式系統中最難解決的問題之一。

       Mathias Verraes說,分布式系統中最難解決的兩個問題是:1.消息順序保證(Guaranteed order of messages)。2.消息的精确一次投遞(Exactly-once delivery)。

       還有一些人直接坦白地說,精确一次投遞根本不可能實作。

       我并不否認引入一次性傳遞語義,并且隻支援一次流處理,是一個真正難以解決的問題。 但我也見證了Confluent公司的機智的分布式系統工程師在開源社群努力工作了一年多,以便在Apache Kafka中解決這個問題。 是以,讓我們直奔主題,先來了解消息傳遞語義的概述。

消息系統語義概述(Overview of messaging system semantics)

       在一個分布式釋出訂閱消息系統中,組成系統的計算機總會由于各自的故障而不能工作。在Kafka中,一個單獨的broker,可能會在生産者發送消息到一個topic的時候當機,或者出現網絡故障,進而導緻生産者發送消息失敗。根據生産者如何處理這樣的失敗,産生了不同的語義:

  • 至少一次語義(At least once semantics):如果生産者收到了Kafka broker的确認(acknowledgement,ack),并且生産者的acks配置項設定為all(或-1),這就意味着消息已經被精确一次寫入Kafka topic了。然而,如果生産者接收ack逾時或者收到了錯誤,它就會認為消息沒有寫入Kafka topic而嘗試重新發送消息。如果broker恰好在消息已經成功寫入Kafka topic後,發送ack前,出了故障,生産者的重試機制就會導緻這條消息被寫入Kafka兩次,進而導緻同樣的消息會被消費者消費不止一次。每個人都喜歡一個興高采烈的給予者,但是這種方式會導緻重複的工作和錯誤的結果。
  • 至多一次語義(At most once semantics):如果生産者在ack逾時或者傳回錯誤的時候不重試發送消息,那麼消息有可能最終并沒有寫入Kafka topic中,是以也就不會被消費者消費到。但是為了避免重複處理的可能性,我們接受有些消息可能被遺漏處理。
  • 精确一次語義(Exactly once semantics):即使生産者重試發送消息,也隻會讓消息被發送給消費者一次。精确一次語義是最令人滿意的保證,但也是最難了解的。因為它需要消息系統本身和生産消息的應用程式還有消費消息的應用程式一起合作。比如,在成功消費一條消息後,你又把消費的offset重置到之前的某個offset位置,那麼你将收到從那個offset到最新的offset之間的所有消息。這解釋了為什麼消息系統和用戶端程式必須合作來保證精确一次語義。

必須被處理的故障(Failures that must be handled)

       為了描述為了支援精确一次消息投遞語義而引入的挑戰,讓我們從一個簡單的例子開始。

       假設有一個單程序生産者程式,發送了消息“Hello Kafka“給一個叫做“EoS“的單分區Kafka topic。然後有一個單執行個體的消費者程式在另一端從topic中拉取消息,然後列印。在沒有故障的理想情況下,這能很好的工作,“Hello Kafka“隻被寫入到EoS topic一次。消費者拉取消息,處理消息,送出偏移量來說明它完成了處理。然後,即使消費者程式出故障重新開機也不會再收到“Hello Kafka“這條消息了。

       然而,我們知道,我們不能總認為一切都是順利的。在上規模的叢集中,即使最不可能發生的故障場景都可能最終發生。比如:

  1. broker可能故障:Kafka是一個高可用、持久化的系統,每一條寫入一個分區的消息都會被持久化并且多副本備份(假設有n個副本)。是以,Kafka可以容忍n-1個broker故障,意味着一個分區隻要至少有一個broker可用,分區就可用。Kafka的副本協定保證了隻要消息被成功寫入了主副本,它就會被複制到其他所有的可用副本(ISR)。
  2. producer到broker的RPC調用可能失敗:Kafka的持久性依賴于生産者接收broker的ack。沒有接收成功ack不代表生産請求本身失敗了。broker可能在寫入消息後,發送ack給生産者的時候挂了。甚至broker也可能在寫入消息前就挂了。由于生産者沒有辦法知道錯誤是什麼造成的,是以它就隻能認為消息沒寫入成功,并且會重試發送。在一些情況下,這會造成同樣的消息在Kafka分區日志中重複,進而造成消費端多次收到這條消息。
  3. 用戶端可能會故障:精确一次傳遞也必須考慮用戶端故障。但是我們如何知道一個用戶端已經故障而不是暫時和brokers斷開,或者經曆一個程式短暫的暫停?區分永久性故障和臨時故障是很重要的,為了正确性,broker應該丢棄僵住的生産者發送來的消息,同樣,也應該不向已經僵住的消費者發送消息。一旦一個新的用戶端執行個體啟動,它應該能夠從失敗的執行個體留下的任何狀态中恢複,從一個安全點開始處理。這意味着,消費的偏移量必須始終與生産的輸出保持同步。

Apache Kafka中的精确一次語義(Exactly-once semantics in Apache Kafka, explained)

       在0.11版本之前,Apache Kafka支援最少一次傳遞語義,和分區内有序傳遞。從上面的例子可以知道,生産者重試可能會造成重複消息。在新的精确一次語義特性中,我們以三個不同且互相關聯的方式加強了Kafka軟體的處理語義。

幂等性:每個分區中精确一次且有序(Idempotence: Exactly-once in order semantics per partition)

       一個幂等性的操作就是一種被執行多次造成的影響和隻執行一次造成的影響一樣的操作。現在生産者發送的操作是幂等的了。如果出現導緻生産者重試的錯誤,同樣的消息,仍由同樣的生産者發送多次,将隻被寫到kafka broker的日志中一次。對于單個分區,幂等生産者不會因為生産者或broker故障而發送多條重複消息。想要開啟這個特性,獲得每個分區内的精确一次語義,也就是說沒有重複,沒有丢失,并且有序的語義,隻需要設定producer配置中的"enable.idempotence=true"。

       這個特性是怎麼實作的呢?在底層,它和TCP的工作原理有點像,每一批發送到Kafka的消息都将包含一個序列号,broker将使用這個序列号來删除重複的發送。和隻能在瞬态記憶體中的連接配接中保證不重複的TCP不同,這個序列号被持久化到副本日志,是以,即使分區的leader挂了,其他的broker接管了leader,新leader仍可以判斷重新發送的是否重複了。這種機制的開銷非常低:每批消息隻有幾個額外的字段。你将在這篇文章的後面看到,這種特性比非幂等的生産者隻增加了可忽略的性能開銷。

事務:跨分區原子寫入(Transactions: Atomic writes across multiple partitions)

       Kafka現在通過新的事務API支援跨分區原子寫入。這将允許一個生産者發送一批到不同分區的消息,這些消息要麼全部對任何一個消費者可見,要麼對任何一個消費者都不可見。這個特性也允許你在一個事務中處理消費資料和送出消費偏移量,進而實作端到端的精确一次語義。下面是的代碼片段示範了事務API的使用:

producer.initTransactions();
try {
  producer.beginTransaction();
  producer.send(record1);
  producer.send(record2);
  producer.commitTransaction();
} catch(ProducerFencedException e) {
  producer.close();
} catch(KafkaException e) {
  producer.abortTransaction();
}
           

       上面的代碼片段示範了你可以如何使用新生産者API來原子性地發送消息到topic的多個partition。值得注意的是,一個Kafka topic的分區中的消息,可以有些是在事務中,有些不在事務中。

       是以在消費者方面,你有兩種選擇來讀取事務性消息,通過隔離等級“isolation.level”消費者配置表示:

  1. read_commited

    :除了讀取不屬于事務的消息之外,還可以讀取事務送出後的消息。
  2. read_uncommited

    :按照偏移位置讀取所有消息,而不用等事務送出。這個選項類似Kafka消費者的目前語義。

       為了使用事務,需要配置消費者使用正确的隔離等級,使用新版生産者,并且将生産者的“transactional.id”配置項設定為某個唯一ID。 需要此唯一ID來提供跨越應用程式重新啟動的事務狀态的連續性。

真實案例:Apache Kafka中的精确一次流處理(The real deal: Exactly-once stream processing in Apache Kafka)

       建構于幂等性和原子性之上,精确一次流處理現在可以通過Apache Kafka的流處理API實作了。使Streams應用程式使用精确一次語義所需要的就是設定配置“processing.guarantee = exact_once”。 這可以保證所有處理恰好發生一次; 包括處理和由寫回Kafka的處理作業建立的所有具體狀态的精确一次。

       這就是為什麼Kafka的Streams API提供的精确一次性保證是迄今為止任何流處理系統提供的最強保證。 它為流處理應用程式提供端到端的一次性保證,從Kafka讀取的資料,Streams應用程式物化到Kafka的任何狀态,到寫回Kafka的最終輸出。 僅依靠外部資料系統來實作狀态支援的流處理系統對于精确一次的流處理提供了較少的保證。 即使他們使用Kafka作為流處理的源并需要從失敗中恢複,他們也隻能倒回他們的Kafka偏移量來重建和重新處理消息,但是不能復原外部系統中的關聯狀态,導緻狀态不正确,更新不是幂等的。

       讓我再詳細解釋一下。 流處理系統的關鍵問題是“我的流處理應用程式是否得到了正确的答案,即使其中一個執行個體在處理過程中崩潰了?”。在恢複失敗的執行個體時,恢複到崩潰前相同的狀态進行處理是很關鍵的。

       現在,流處理隻不過是對Kafka topic的讀取-處理-寫入操作; 消費者從Kafka topic讀取消息,一些處理邏輯轉換這些消息或修改由處理程式維護的狀态,然後生産者将結果消息寫入另一個Kafka topic。精确一次的流處理隻是一種保證僅執行一次讀-處理-寫操作的能力。在這種情況下,“獲得正确答案”意味着不會丢失任何輸入消息或産生任何重複輸出。 這是使用者期望從精确一次性流處理器中獲得的行為。

除了我們到目前為止讨論的簡單場景之外,還有許多其他失敗場景需要考慮:

  • 流處理器可能從多個源topic擷取輸入,并且跨多個源topic的消息順序在多次運作中是不确定的。 是以,如果重新運作從多個源topic擷取輸入的流處理器,可能會産生不同的結果。
  • 同樣,流處理器可以生成到多個目标topic的輸出。 如果生産者無法跨多個topic進行原子寫入,那麼如果對某些(但不是所有)分區的寫入失敗,則生産者輸出可能不正确。
  • 流處理器可以使用Streams API提供的狀态管理工具在多個輸入之間聚合或連接配接資料。 如果流處理器的其中一個執行個體失敗,那麼你需要能夠復原由流處理器的該執行個體儲存的狀态。 在重新啟動執行個體時,你還需要能夠恢複處理并重新建立其狀态。
  • 流處理器可能會查找外部資料庫中濃縮的資訊,或者通過調用外面的服務來查找資訊。 依賴外部服務使流處理器從根本上不确定; 如果外部服務在兩次運作流處理器之間更改其内部狀态,則會導緻下遊的結果不正确。 但是,如果處理得當,這不應導緻完全錯誤的結果。 它應該隻導緻流處理器輸出屬于一組合法輸出。 稍後将在部落格中詳細介紹。

       應用失敗和重新啟動,尤其是與非确定性操作相結合時,并且應用程式計算的持久狀态的更改時,可能不僅會導緻重複,還可能會導緻錯誤的結果。 例如,如果一個處理階段正在計算所看到的事件數量,那麼上遊處理階段中的重複可能導緻下遊的錯誤計數。 是以,我們必須限定短語“精确一次流處理。”它指的是從topic消費,生成中間狀态到Kafka topic中,并把結果寫到另一個Kafka topic中,而不是使用Streams API對消息進行的所有可能的計算。某些計算(例如,取決于外部服務或從多個源topic消費)從根本上是不确定的。

繼續閱讀