主要講述 Kafka 事務性的實作,這部分的實作要比幂等性的實作複雜一些,幂等性實作是事務性實作的基礎,幂等性提供了單會話單 Partition Exactly-Once 語義的實作,正是因為 Idempotent Producer 不提供跨多個 Partition 和跨會話場景下的保證,是以,我們是需要一種更強的事務保證,能夠原子處理多個 Partition 的寫入操作,資料要麼全部寫入成功,要麼全部失敗,不期望出現中間狀态。這就是 Kafka Transactions 希望解決的問題,簡單來說就是能夠實作 <code>atomic writes across partitions</code>,本文以 Apache Kafka 2.0.0 代碼實作為例,深入分析一下 Kafka 是如何實作這一機制的。
Apache Kafka 在 Exactly-Once Semantics(EOS)上三種粒度的保證如下(來自 Exactly-once Semantics in Apache Kafka):
Idempotent Producer:Exactly-once,in-order,delivery per partition;
Transactions:Atomic writes across partitions;
Exactly-Once stream processing across read-process-write tasks;
第二種情況就是本文講述的主要内容,在講述整個事務處理流程時,也順便分析第三種情況。
Kafka 事務性最開始的出發點是為了在 Kafka Streams 中實作 Exactly-Once 語義的資料處理,這個問題提出之後,在真正的方案讨論階段,社群又挖掘了更多的應用場景,也為了盡可能覆寫更多的應用場景,在真正的實作中,在很多地方做了相應的 tradeoffs,後面會寫篇文章對比一下 RocketMQ 事務性的實作,就能明白 Kafka 事務性實作及應用場景的複雜性了。
Kafka 的事務處理,主要是允許應用可以把消費和生産的 batch 處理(涉及多個 Partition)在一個原子單元内完成,操作要麼全部完成、要麼全部失敗。為了實作這種機制,我們需要應用能提供一個唯一 id,即使故障恢複後也不會改變,這個 id 就是 TransactionnalId(也叫 txn.id,後面會詳細講述),txn.id 可以跟内部的 PID 1:1 配置設定,它們不同的是 txn.id 是使用者提供的,而 PID 是 Producer 内部自動生成的(并且故障恢複後這個 PID 會變化),有了 txn.id 這個機制,就可以實作多 partition、跨會話的 EOS 語義。
當使用者使用 Kafka 的事務性時,Kafka 可以做到的保證:
跨會話的幂等性寫入:即使中間故障,恢複後依然可以保持幂等性;
跨會話的事務恢複:如果一個應用執行個體挂了,啟動的下一個執行個體依然可以保證上一個事務完成(commit 或者 abort);
跨多個 Topic-Partition 的幂等性寫入,Kafka 可以保證跨多個 Topic-Partition 的資料要麼全部寫入成功,要麼全部失敗,不會出現中間狀态。
上面是從 Producer 的角度來看,那麼如果從 Consumer 角度呢?Consumer 端很難保證一個已經 commit 的事務的所有 msg 都會被消費,有以下幾個原因:
對于 compacted topic,在一個事務中寫入的資料可能會被新的值覆寫;
一個事務内的資料,可能會跨多個 log segment,如果舊的 segmeng 資料由于過期而被清除,那麼這個事務的一部分資料就無法被消費到了;
Consumer 在消費時可以通過 seek 機制,随機從一個位置開始消費,這也會導緻一個事務内的部分資料無法消費;
Consumer 可能沒有訂閱這個事務涉及的全部 Partition。
簡單總結一下,關于 Kafka 事務性語義提供的保證主要以下三個:
Atomic writes across multiple partitions.
All messages in a transaction are made visible together, or none are.
Consumers must be configured to skip uncommitted messages.
Kafka 事務性的使用方法也非常簡單,使用者隻需要在 Producer 的配置中配置 <code>transactional.id</code>,通過 <code>initTransactions()</code> 初始化事務狀态資訊,再通過 <code>beginTransaction()</code> 辨別一個事務的開始,然後通過 <code>commitTransaction()</code> 或 <code>abortTransaction()</code> 對事務進行 commit 或 abort,示例如下所示:
事務性的 API 也同樣保持了 Kafka 一直以來的簡潔性,使用起來是非常友善的。
對于 Kafka 的事務性實作,最關鍵的就是其事務操作原子性的實作。對于一個事務操作而言,其會涉及到多個 Topic-Partition 資料的寫入,如果是一個 long transaction 操作,可能會涉及到非常多的資料,如何才能保證這個事務操作的原子性(要麼全部完成,要麼全部失敗)呢?
關于這點,最容易想到的應該是引用 2PC 協定(它主要是解決分布式系統資料一緻性的問題)中協調者的角色,它的作用是統計所有參與者的投票結果,如果大家一緻認為可以 commit,那麼就執行 commit,否則執行 abort:
我們來想一下,Kafka 是不是也可以引入一個類似的角色來管理事務的狀态,隻有當 Producer 真正 commit 時,事務才會送出,否則事務會還在進行中(實際的實作中還需要考慮 timeout 的情況),不會處于完成狀态;
Producer 在開始一個事務時,告訴【協調者】事務開始,然後開始向多個 Topic-Partition 寫資料,隻有這批資料全部寫完(中間沒有出現異常),Producer 會調用 commit 接口進行 commit,然後事務真正送出,否則如果中間出現異常,那麼事務将會被 abort(Producer 通過 abort 接口告訴【協調者】執行 abort 操作);
這裡的協調者與 2PC 中的協調者略有不同,主要為了管理事務相關的狀态資訊,這就是 Kafka Server 端的 TransactionCoordinator 角色;
有了上面的機制,是不是就可以了?很容易想到的問題就是 TransactionCoordinator 挂的話怎麼辦?TransactionCoordinator 如何實作高可用?
TransactionCoordinator 需要管理事務的狀态資訊,如果一個事務的 TransactionCoordinator 挂的話,需要轉移到其他的機器上,這裡關鍵是在 事務狀态資訊如何恢複? 也就是事務的狀态資訊需要很強的容錯性、一緻性;
關于資料的強容錯性、一緻性,存儲的容錯性方案基本就是多副本機制,而對于一緻性,就有很多的機制實作,其實這個在 Kafka 内部已經實作(不考慮資料重複問題),那就是 <code>min.isr + ack</code> 機制;
分析到這裡,對于 Kafka 熟悉的同學應該就知道,這個是不是跟 <code>__consumer_offset</code> 這個内部的 topic 很像,TransactionCoordinator 也跟 GroupCoordinator 類似,而對應事務資料(transaction log)就是 <code>__transaction_state</code> 這個内部 topic,所有事務狀态資訊都會持久化到這個 topic,TransactionCoordinator 在做故障恢複也是從這個 topic 中恢複資料;
有了上面的機制,就夠了麼?我們再來考慮一種情況,我們期望一個 Producer 在 Fail 恢複後能主動 abort 上次未完成的事務(接上之前未完成的事務),然後重新開始一個事務,這種情況應該怎麼辦?之前幂等性引入的 PID 是無法解決這個問題的,因為每次 Producer 在重新開機時,PID 都會更新為一個新值:
Kafka 在 Producer 端引入了一個 TransactionalId 來解決這個問題,這個 txn.id 是由應用來配置的;
TransactionalId 的引入還有一個好處,就是跟 consumer group 類似,它可以用來辨別一個事務操作,便于這個事務的所有操作都能在一個地方(同一個 TransactionCoordinator)進行處理;
再來考慮一個問題,在具體的實作時,我們應該如何辨別一個事務操作的開始、進行、完成的狀态?正常來說,一個事務操作是由很多操作組成的一個操作單元,對于 TransactionCoordinator 而言,是需要準确知道目前的事務操作處于哪個階段,這樣在容錯恢複時,新選舉的 TransactionCoordinator 才能恢複之前的狀态:
這個就是事務狀态轉移,一個事務從開始,都會有一個相應的狀态辨別,直到事務完成,有了事務的狀态轉移關系之後,TransactionCoordinator 對于事務的管理就會簡單很多,TransactionCoordinator 會将目前事務的狀态資訊都會緩存起來,每當事務需要進行轉移,就更新緩存中事務的狀态(前提是這個狀态轉移是有效的)。
如果多個 Producer 共用一個 txn.id,那麼最後啟動的 Producer 會成功運作,會它之前啟動的 Producer 都 Fencing 掉(至于為什麼會 Fencing 下一小節會做分析)。
有了前面的分析,這個問題就很好回答了,順序性還是嚴格按照 offset 的,隻不過遇到 abort trsansaction 的資料時就丢棄掉,其他的與普通 Consumer 并沒有差別。
Producer 在開始一個事務操作時,可以設定其事務逾時時間(參數是 <code>transaction.timeout.ms</code>,預設60s),而且 Server 端還有一個最大可允許的事務操作逾時時間(參數是 <code>transaction.timeout.ms</code>,預設是15min),Producer 設定逾時時間不能超過 Server,否則的話會抛出異常。
上面是關于事務操作的逾時設定,而對于 txn.id,我們知道 TransactionCoordinator 會緩存 txn.id 的相關資訊,如果沒有逾時機制,這個 meta 大小是無法預估的,Server 端提供了一個 <code>transaction.id.expiration.ms</code> 參數來配置這個逾時時間(預設是7天),如果超過這個時間沒有任何事務相關的請求發送過來,那麼 TransactionCoordinator 将會使這個 txn.id 過期。
對于每個 Topic-Partition,Broker 都會在記憶體中維護其 PID 與 sequence number(最後成功寫入的 msg 的 sequence number)的對應關系(這個在上面幂等性文章應講述過,主要是為了不丢補充的實作)。
Broker 重新開機時,如果想恢複上面的狀态資訊,那麼它讀取所有的 log 檔案。相比于之下,定期對這個 state 資訊做 checkpoint(Snapshot),明顯收益是非常大的,此時如果 Broker 重新開機,隻需要讀取最近一個 Snapshot 檔案,之後的資料再從 log 檔案中恢複即可。
對于上面所講述的一個事務操作流程,實際生産環境中,任何一個地方都有可能出現的失敗:
Producer 在發送 <code>beginTransaction()</code> 時,如果出現 timeout 或者錯誤:Producer 隻需要重試即可;
Producer 在發送資料時出現錯誤:Producer 應該 abort 這個事務,如果 Produce 沒有 abort(比如設定了重試無限次,并且 batch 逾時設定得非常大),TransactionCoordinator 将會在這個事務逾時之後 abort 這個事務操作;
Producer 發送 <code>commitTransaction()</code> 時出現 timeout 或者錯誤:Producer 應該重試這個請求;
Coordinator Failure:如果 Transaction Coordinator 發生切換(事務 topic leader 切換),Coordinator 可以從日志中恢複。如果發送事務有處于 PREPARE_COMMIT 或 PREPARE_ABORT 狀态,那麼直接執行 commit 或者 abort 操作,如果是一個正在進行的事務,Coordinator 的失敗并不需要 abort 事務,producer 隻需要向新的 Coordinator 發送請求即可。
1 生産者幂等性
幂等性引入目的:
生産者重複生産消息。生産者進行retry會産生重試時,會重複産生消息。有了幂等性之後,在進行retry重試時,隻會生成一個消息。
為了實作Producer的幂等性,Kafka引入了Producer ID(即PID)和Sequence Number。
PID。每個新的Producer在初始化的時候會被配置設定一個唯一的PID,這個PID對使用者是不可見的。
Sequence Numbler。(對于每個PID,該Producer發送資料的每個<Topic, Partition>都對應一個從0開始單調遞增的Sequence Number。
Broker端在緩存中儲存了這seq number,對于接收的每條消息,如果其序号比Broker緩存中序号大于1則接受它,否則将其丢棄。這樣就可以實作了消息重複送出了。但是,隻能保證單個Producer對于同一個<Topic, Partition>的Exactly Once語義。不能保證同一個Producer一個topic不同的partion幂等。
1、配置屬性
需要設定:
enable.idempotence,需要設定為ture,此時就會預設把acks設定為all,是以不需要再設定acks屬性了。