天天看點

萬字長文幹貨 | Kafka 事務性之幂等性實作

作者 | 柳年思水

整理 | 王知無

Apache Kafka 從 0.11.0 開始,支援了一個非常大的 feature,就是對事務性的支援,在 Kafka 中關于事務性,是有三種層面上的含義:一是幂等性的支援;二是事務性的支援;三是 Kafka Streams 的 exactly once 的實作,關于 Kafka 事務性系列的文章我們隻重點關注前兩種層面上的事務性,與 Kafka Streams 相關的内容暫時不做讨論。社群從開始讨論事務性,前後持續近半年時間,相關的設計文檔有六十幾頁(參考 Exactly Once Delivery and Transactional Messaging in Kafka)。事務性這部分的實作也是非常複雜的,之前 Producer 端的代碼實作其實是非常簡單的,增加事務性的邏輯之後,這部分代碼複雜度提高了很多,本篇及後面幾篇關于事務性的文章會以 2.0.0 版的代碼實作為例,對這部分做了一下分析:

本篇主要講述幂等性實作的整體流程,幂等性的實作相對于事務性的實作簡單很多,也是事務性實作的基礎。

Producer 的幂等性指的是當發送同一條消息時,資料在 Server 端隻會被持久化一次,資料不丟不重,但是這裡的幂等性是有條件的:

隻能保證 Producer 在單個會話内不丟不重,如果 Producer 出現意外挂掉再重新開機是無法保證的(幂等性情況下,是無法擷取之前的狀态資訊,是以是無法做到跨會話級别的不丢不重);

幂等性不能跨多個 Topic-Partition,隻能保證單個 partition 内的幂等性,當涉及多個 Topic-Partition 時,這中間的狀态并沒有同步。

如果需要跨會話、跨多個 topic-partition 的情況,需要使用 Kafka 的事務性來實作。

Producer 使用幂等性的示例非常簡單,與正常情況下 Producer 使用相比變化不大,隻需要把 Producer 的配置 enable.idempotence 設定為 true 即可,如下所示:

Prodcuer 幂等性對外保留的接口非常簡單,其底層的實作對上層應用做了很好的封裝,應用層并不需要去關心具體的實作細節,對使用者非常友好。

在看 Producer 是如何實作幂等性之前,首先先考慮一個問題:幂等性是來解決什麼問題的? 在 0.11.0 之前,Kafka 通過 Producer 端和 Server 端的相關配置可以做到資料不丢,也就是 at least once,但是在一些情況下,可能會導緻資料重複,比如:網絡請求延遲等導緻的重試操作,在發送請求重試時 Server 端并不知道這條請求是否已經處理(沒有記錄之前的狀态資訊),是以就會有可能導緻資料請求的重複發送,這是 Kafka 自身的機制(異常時請求重試機制)導緻的資料重複。

對于大多數應用而言,資料保證不丢是可以滿足其需求的,但是對于一些其他的應用場景(比如支付資料等),它們是要求精确計數的,這時候如果上遊資料有重複,下遊應用隻能在消費資料時進行相應的去重操作,應用在去重時,最常用的手段就是根據唯一 id 鍵做 check 去重。

在這種場景下,因為上遊生産導緻的資料重複問題,會導緻所有有精确計數需求的下遊應用都需要做這種複雜的、重複的去重處理。試想一下:如果在發送時,系統就能保證 exactly once,這對下遊将是多麼大的解脫。這就是幂等性要解決的問題,主要是解決資料重複的問題,正如前面所述,資料重複問題,通用的解決方案就是加唯一 id,然後根據 id 判斷資料是否重複,Producer 的幂等性也是這樣實作的,這一小節就讓我們看下 Kafka 的 Producer 如何保證資料的 exactly once 的。

在講述幂等性處理流程之前,先看下 Producer 是如何來保證幂等性的,正如前面所述,幂等性要解決的問題是:Producer 設定 at least once 時,由于異常觸發重試機制導緻資料重複,幂等性的目的就是為了解決這個資料重複的問題,簡單來說就是:

at least once + 幂等 = exactly once

通過在 al least once 的基礎上加上 幂等性來坐到 exactly once,當然這個層面的 exactly once 是有限制的,比如它會要求單會話内有效或者跨會話使用事務性有效等。這裡我們先分析最簡單的情況,那就是在單會話内如何做到幂等性,進而保證 exactly once。

要做到幂等性,要解決下面的問題:

系統需要有能力鑒别一條資料到底是不是重複的資料?常用的手段是通過 唯一鍵/唯一 id 來判斷,這時候系統一般是需要緩存已經處理的唯一鍵記錄,這樣才能更有效率地判斷一條資料是不是重複;

唯一鍵應該選擇什麼粒度?對于分布式存儲系統來說,肯定不能用全局唯一鍵(全局是針對叢集級别),核心的解決思路依然是 分而治之,資料密集型系統為了實作分布式都是有分區概念的,而分區之間是有相應的隔離,對于 Kafka 而言,這裡的解決方案就是在分區的次元上去做,重複資料的判斷讓 partition 的 leader 去判斷處理,前提是 Produce 請求需要把唯一鍵值告訴 leader;

分區粒度實作唯一鍵會不會有其他問題?這裡需要考慮的問題是當一個 Partition 有來自多個 client 寫入的情況,這些 client 之間是很難做到使用同一個唯一鍵(一個是它們之間很難做到唯一鍵的實時感覺,另一個是這樣實作是否有必要)。而如果系統在實作時做到了  client + partition 粒度,這樣實作的好處是每個 client 都是完全獨立的(它們之間不需要有任何的聯系,這是非常大的優點),隻是在 Server 端對不同的 client 做好相應的區分即可,當然同一個 client 在處理多個 Topic-Partition 時是完全可以使用同一個 PID 的。

有了上面的分析(都是個人見解,如果有誤,歡迎指教),就不難了解 Producer 幂等性的實作原理,Kafka Producer 在實作時有以下兩個重要機制:

PID(Producer ID),用來辨別每個 producer client;

sequence numbers,client 發送的每條消息都會帶相應的 sequence number,Server 端就是根據這個值來判斷資料是否重複。

下面詳細講述這兩個實作機制。

每個 Producer 在初始化時都會被配置設定一個唯一的 PID,這個 PID 對應用是透明的,完全沒有暴露給使用者。對于一個給定的 PID,sequence number 将會從0開始自增,每個 Topic-Partition 都會有一個獨立的 sequence number。Producer 在發送資料時,将會給每條 msg 辨別一個 sequence number,Server 也就是通過這個來驗證資料是否重複。這裡的 PID 是全局唯一的,Producer 故障後重新啟動後會被配置設定一個新的 PID,這也是幂等性無法做到跨會話的一個原因。

這裡看下 PID 在 Server 端是如何配置設定的?Client 通過向 Server 發送一個 InitProducerIdRequest 請求擷取 PID(幂等性時,是選擇一台連接配接數最少的 Broker 發送這個請求),這裡看下 Server 端是如何處理這個請求的?KafkaApis 中 <code>handleInitProducerIdRequest()</code> 方法的實作如下:

這裡實際上是調用了 TransactionCoordinator (Broker 在啟動 server 服務時都會初始化這個執行個體)的 <code>handleInitProducerId()</code> 方法做了相應的處理,其實作如下(這裡隻關注幂等性的處理):

Server 在給一個 client 初始化 PID 時,實際上是通過 ProducerIdManager 的 <code>generateProducerId()</code> 方法産生一個 PID。

如前面所述,在幂等性的情況下,直接通過 ProducerIdManager 的 <code>generateProducerId()</code> 方法産生一個 PID,其中 ProducerIdManager 是在 TransactionCoordinator 對象初始化時初始化的,這個對象主要是用來管理 PID 資訊:

在本地的 PID 端用完了或者處于建立狀态時,申請 PID 段(預設情況下,每次申請 1000 個 PID);

TransactionCoordinator 對象通過 <code>generateProducerId()</code> 方法擷取下一個可以使用的 PID;

PID 端申請是向 ZooKeeper 申請,zk 中有一個 <code>/latest_producer_id_block</code> 節點,每個 Broker 向 zk 申請一個 PID 段後,都會把自己申請的 PID 段資訊寫入到這個節點,這樣當其他 Broker 再申請 PID 段時,會首先讀寫這個節點的資訊,然後根據 block_end 選擇一個 PID 段,最後再把資訊寫會到 zk 的這個節點,這個節點資訊格式如下所示:

ProducerIdManager 向 zk 申請 PID 段的方法如下:

ProducerIdManager 申請 PID 段的流程如下:

先從 zk 的 <code>/latest_producer_id_block</code> 節點讀取最新已經配置設定的 PID 段資訊;

如果該節點不存在,直接從 0 開始配置設定,選擇 0~1000 的 PID 段(ProducerIdManager 的 PidBlockSize 預設為 1000,即是每次申請的 PID 段大小);

如果該節點存在,讀取其中資料,根據 block_end 選擇

這個 PID 段(如果 PID 段超過 Long 類型的最大值,這裡會直接傳回一個異常);

在選擇了相應的 PID 段後,将這個 PID 段資訊寫回到 zk 的這個節點中,如果寫入成功,那麼 PID 段就證明申請成功,如果寫入失敗(寫入時會判斷目前節點的 zkVersion 是否與步驟1擷取的 zkVersion 相同,如果相同,那麼可以成功寫入,否則寫入就會失敗,證明這個節點被修改過),證明此時可能其他的 Broker 已經更新了這個節點(目前的 PID 段可能已經被其他 Broker 申請),那麼從步驟 1 重新開始,直到寫入成功。

明白了 ProducerIdManager 如何申請 PID 段之後,再看 <code>generateProducerId()</code> 這個方法就簡單很多了,這個方法在每次調用時,都會更新 nextProducerId 值(下一次可以使用 PID 值),如下所示:

這裡就是 Producer PID 如何申請(事務性情況下 PID 的申請會複雜一些,下篇文章再講述)以及 Server 端如何管理 PID 的。

再有了 PID 之後,在 PID + Topic-Partition 級别上添加一個 sequence numbers 資訊,就可以實作 Producer 的幂等性了。ProducerBatch 也提供了一個 <code>setProducerState()</code> 方法,它可以給一個 batch 添加一些 meta 資訊(pid、baseSequence、isTransactional),這些資訊是會伴随着 ProduceRequest 發到 Server 端,Server 端也正是通過這些 meta 來做相應的判斷,如下所示:

在前面講述完 Kafka 幂等性的兩個實作機制(PID+sequence numbers)之後,這裡詳細講述一下,幂等性時其整體的處理流程,主要講述幂等性相關的内容,其他的部分會簡單介紹(可以參考前面【Kafka 源碼分析系列文章】了解 Producer 端處理流程以及 Server 端關于 ProduceRequest 請求的處理流程),其流程如下圖所示:

Producer 幂等性時處理流程

這個圖隻展示了幂等性情況下,Producer 的大概流程,很多部分在前面的文章中做過分析,本文不再講述,這裡重點關注與幂等性相關的内容(事務性實作更加複雜,後面的文章再講述),首先 KafkaProducer 在初始化時會初始化一個 TransactionManager 執行個體,它的作用有以下幾個部分:

記錄本地的事務狀态(事務性時必須);

記錄一些狀态資訊以保證幂等性,比如:每個 topic-partition 對應的下一個 sequence numbers 和 last acked batch(最近一個已經确認的 batch)的最大的 sequence number 等;

記錄 ProducerIdAndEpoch 資訊(PID 資訊)。

如前面圖中所示,幂等性時,Producer 的發送流程如下:

應用通過 KafkaProducer 的 <code>send()</code> 方法将資料添加到 RecordAccumulator 中,添加時會判斷是否需要建立一個 ProducerBatch,這時這個 ProducerBatch 還是沒有 PID 和 sequence number 資訊的;

Producer 背景發送線程 Sender,在 <code>run()</code> 方法中,會先根據 TransactionManager 的 <code>shouldResetProducerStateAfterResolvingSequences()</code> 方法判斷目前的 PID 是否需要重置,重置的原因是因為:如果有 topic-partition 的 batch 重試多次失敗最後因為逾時而被移除,這時 sequence number 将無法做到連續,因為 sequence number 有部分已經配置設定出去,這時系統依賴自身的機制無法繼續進行下去(因為幂等性是要保證不丢不重的),相當于程式遇到了一個 fatal 異常,PID 會進行重置,TransactionManager 相關的緩存資訊被清空(Producer 不會重新開機),隻是儲存狀态資訊的 TransactionManager 做了 <code>clear+new</code> 操作,遇到這個問題時是無法保證 exactly once 的(有資料已經發送失敗了,并且超過了重試次數);

Sender 線程通過 <code>maybeWaitForProducerId()</code> 方法判斷是否需要申請 PID,如果需要的話,這裡會阻塞直到擷取到相應的 PID 資訊;

Sender 線程通過 <code>sendProducerData()</code> 方法發送資料,整體流程與之前的 Producer 流程相似,不同的地方是在 RecordAccumulator 的 <code>drain()</code> 方法中,在加了幂等性之後,<code>drain()</code> 方法多了如下幾步判斷:

正常的判斷:判斷這個 topic-partition 是否可以繼續發送(如果出現前面2中的情況是不允許發送的)、判斷 PID 是否有效、如果這個 batch 是重試的 batch,那麼需要判斷這個 batch 之前是否還有 batch 沒有發送完成,如果有,這裡會先跳過這個 Topic-Partition 的發送,直到前面的 batch 發送完成,最壞情況下,這個 Topic-Partition 的 in-flight request 将會減少到1(這個涉及也是考慮到 server 端的一個設定,文章下面會詳細分析);

如果這個 ProducerBatch 還沒有這個相應的 PID 和 sequence number 資訊,會在這裡進行相應的設定;

最後 Sender 線程再調用 <code>sendProduceRequests()</code> 方法發送 ProduceRequest 請求,後面的就跟之前正常的流程保持一緻了。

這裡看下幾個關鍵方法的實作,首先是 Sender 線程擷取 PID 資訊的方法  <code>maybeWaitForProducerId()</code> ,其實作如下:

再看下 RecordAccumulator 的 <code>drain()</code> 方法,重點需要關注的是關于幂等性和事務性相關的處理,具體如下所示,這裡面關于事務性相關的判斷在上面的流程中已經講述。

如前面途中所示,當 Broker 收到 ProduceRequest 請求之後,會通過 <code>handleProduceRequest()</code> 做相應的處理,其處理流程如下(這裡隻講述關于幂等性相關的内容):

如果請求是事務請求,檢查是否對 TXN.id 有 Write 權限,沒有的話傳回 TRANSACTIONAL_ID_AUTHORIZATION_FAILED;

如果請求設定了幂等性,檢查是否對 ClusterResource 有 IdempotentWrite 權限,沒有的話傳回 CLUSTER_AUTHORIZATION_FAILED;

驗證對 topic 是否有 Write 權限以及 Topic 是否存在,否則傳回 TOPIC_AUTHORIZATION_FAILED 或 UNKNOWN_TOPIC_OR_PARTITION 異常;

檢查是否有 PID 資訊,沒有的話走正常的寫入流程;

LOG 對象會在 <code>analyzeAndValidateProducerState()</code> 方法先根據 batch 的 sequence number 資訊檢查這個 batch 是否重複(server 端會緩存 PID 對應這個 Topic-Partition 的最近5個 batch 資訊),如果有重複,這裡當做寫入成功傳回(不更新 LOG 對象中相應的狀态資訊,比如這個 replica 的 the end offset 等);

有了 PID 資訊,并且不是重複 batch 時,在更新 producer 資訊時,會做以下校驗:

檢查該 PID 是否已經緩存中存在(主要是在 ProducerStateManager 對象中檢查);

如果不存在,那麼判斷 sequence number 是否 從0 開始,是的話,在緩存中記錄 PID 的 meta(PID,epoch, sequence number),并執行寫入操作,否則傳回 UnknownProducerIdException(PID 在 server 端已經過期或者這個 PID 寫的資料都已經過期了,但是 Client 還在接着上次的 sequence number 發送資料);

如果該 PID 存在,先檢查 PID epoch 與 server 端記錄的是否相同;

如果不同并且 sequence number 不從 0 開始,那麼傳回 OutOfOrderSequenceException 異常;

如果不同并且 sequence number 從 0 開始,那麼正常寫入;

如果相同,那麼根據緩存中記錄的最近一次 sequence number(currentLastSeq)檢查是否為連續(會區分為 0、Int.MaxValue 等情況),不連續的情況下傳回 OutOfOrderSequenceException 異常。

下面與正常寫入相同。

幂等性時,Broker 在處理 ProduceRequest 請求時,多了一些校驗操作,這裡重點看一下其中一些重要實作,先看下 <code>analyzeAndValidateProducerState()</code> 方法的實作,如下所示:

如果這個 batch 有 PID 資訊,會首先檢查這個 batch 是否為重複的 batch 資料,其實作如下,batchMetadata 會緩存最新 5個 batch 的資料(如果超過5個,添加時會進行删除,這個也是幂等性要求 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 小于等于5 的原因,與這個值的設定有關),根據 batchMetadata 緩存的 batch 資料來判斷這個 batch 是否為重複的資料。

如果 batch 不是重複的資料,<code>analyzeAndValidateProducerState()</code> 會通過 <code>updateProducers()</code> 更新 producer 的相應記錄,在更新的過程中,會做一步校驗,校驗方法如下所示:

其校驗邏輯如前面流程中所述。

這裡主要思考兩個問題:

Producer 在設定幂等性時,為什麼要求 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 小于等于 5,如果設定大于 5(不考慮 Producer 端參數校驗的報錯),會帶來什麼後果?

Producer 在設定幂等性時,如果我們設定 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 大于 1,那麼是否可以保證有序,如果可以,是怎麼做到的?

先說一下結論,問題 1 的這個設定要求其實上面分析的時候已經講述過了,主要跟 server 端隻會緩存最近 5 個 batch 的機制有關;問題 2,即使 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 大于 1,幂等性時依然可以做到有序,下面來詳細分析一下這兩個問題。

其實這裡,要求 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 小于等于 5 的主要原因是:Server 端的 ProducerStateManager 執行個體會緩存每個 PID 在每個 Topic-Partition 上發送的最近 5 個batch 資料(這個 5 是寫死的,至于為什麼是 5,可能跟經驗有關,當不設定幂等性時,當這個設定為 5 時,性能相對來說較高,社群是有一個相關測試文檔,忘記在哪了),如果超過 5,ProducerStateManager 就會将最舊的 batch 資料清除。

假設應用将 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 設定為 6,假設發送的請求順序是 1、2、3、4、5、6,這時候 server 端隻能緩存 2、3、4、5、6 請求對應的 batch 資料,這時候假設請求 1 發送失敗,需要重試,當重試的請求發送過來後,首先先檢查是否為重複的 batch,這時候檢查的結果是否,之後會開始 check 其 sequence number 值,這時候隻會傳回一個 OutOfOrderSequenceException 異常,client 在收到這個異常後,會再次進行重試,直到超過最大重試次數或者逾時,這樣不但會影響 Producer 性能,還可能給 Server 帶來壓力(相當于client 狂發錯誤請求)。

那有沒有更好的方案呢?我認為是有的,那就是對于 OutOfOrderSequenceException 異常,再進行細分,區分這個 sequence number 是大于 nextSeq (期望的下次 sequence number  值)還是小于 nextSeq,如果是小于,那麼肯定是重複的資料。

先來分析一下,在什麼情況下 Producer 會出現亂序的問題?沒有幂等性時,亂序的問題是在重試時出現的,舉個例子:client 依然發送了 6 個請求 1、2、3、4、5、6(它們分别對應了一個 batch),這 6 個請求隻有 2-6 成功 ack 了,1 失敗了,這時候需要重試,重試時就會把 batch 1 的資料添加到待發送的資料列隊中),那麼下次再發送時,batch 1 的資料将會被發送,這時候資料就已經出現了亂序,因為 batch 1 的資料已經晚于了 batch 2-6。

當 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 設定為 1 時,是可以解決這個為題,因為同時隻允許一個請求正在發送,隻有目前的請求發送完成(成功 ack 後),才能繼續下一條請求的發送,類似單線程處理這種模式,每次請求發送時都會等待上次的完成,效率非常差,但是可以解決亂序的問題(當然這裡有序隻是針對單 client 情況,多 client 并發寫是無法做到的)。

系統能提供的方案,基本上就是有序性與性能之間二選一,無法做到相容,實際上系統出現請求重試的幾率是很小的(一般都是網絡問題觸發的),可能連 0.1% 的時間都不到,但是就是為了這 0.1% 時間都不到的情況,應用需要犧牲性能問題來解決,在大資料場景下,我們是希望有更友好的方式來解決這個問題。簡單來說,就是當出現重試時,max-in-flight-request 可以動态減少到 1,在正常情況下還是按 5 (5是舉例說明)來處理,這有點類似于分布式系統 CAP 理論中關于 P 的考慮,當出現問題時,可以容忍性能變差,但是其他的情況下,我們希望的是能擁有原來的性能,而不是一刀切。令人高興的,在 Kafka 2.0.0 版本中,如果 Producer 開始了幂等性,Kafka 是可以做到這一點的,如果不開啟幂等性,是無法做到的,因為它的實作是依賴了 sequence number。

當請求出現重試時,batch 會重新添加到隊列中,這時候是根據 sequence number 添加到隊列的合适位置(有些 batch 如果還沒有 sequence number,那麼就保持其相對位置不變),也就是隊列中排在這個 batch 前面的 batch,其 sequence number 都比這個 batch 的 sequence number 小,其實作如下,這個方法保證了在重試時,其 batch 會被放到合适的位置:

另外 Sender 在發送請求時,會首先通過 RecordAccumulator 的 <code>drain()</code> 方法擷取其發送的資料,在周遊 Topic-Partition 對應的 queue 中的 batch 時,如果發現 batch 已經有了 sequence number 的話,則證明這個 batch 是重試的 batch,因為沒有重試的 batch 其 sequence number 還沒有設定,這時候會做一個判斷,會等待其 in-flight-requests 中請求發送完成,才允許再次發送這個 Topic-Partition 的資料,其判斷實作如下:

僅有 client 端這兩個機制還不夠,Server 端在處理 ProduceRequest 請求時,還會檢查 batch 的 sequence number 值,它會要求這個值必須是連續的,如果不連續都會傳回異常,Client 會進行相應的重試,舉個栗子:假設 Client 發送的請求順序是 1、2、3、4、5(分别對應了一個 batch),如果中間的請求 2 出現了異常,那麼會導緻 3、4、5 都傳回異常進行重試(因為 sequence number 不連續),也就是說此時 2、3、4、5 都會進行重試操作添加到對應的 queue 中。

Producer 的 TransactionManager 執行個體的 inflightBatchesBySequence 成員變量會維護這個 Topic-Partition 與目前正在發送的 batch 的對應關系(通過 <code>addInFlightBatch()</code> 方法添加 batch 記錄),隻有這個 batch 成功 ack 後,才會通過 <code>removeInFlightBatch()</code> 方法将這個 batch 從 inflightBatchesBySequence 中移除。接着前面的例子,此時 inflightBatchesBySequence 中還有 2、3、4、5 這幾個 batch(有順序的,2 在前面),根據前面的 RecordAccumulator 的 <code>drain()</code> 方法可以知道隻有這個 Topic-Partition 下次要發送的 batch 是 batch 2(跟 transactionManager 的這個 <code>firstInFlightSequence()</code> 方法擷取 inFlightBatches 中第一個 batch 的 baseSequence 來判斷) 時,才可以發送,否則會直接 break,跳過這個 Topic-Partition 的資料發送。這裡相當于有一個等待,等待 batch 2 重新加入到 queue 中,才可以發送,不能跳過 batch 2,直接重試 batch 3、4、5,這是不允許的。

簡單來說,其實作機制概括為:

Server 端驗證 batch 的 sequence number 值,不連續時,直接傳回異常;

Client 端請求重試時,batch 在 reenqueue 時會根據 sequence number 值放到合适的位置(有序保證之一);

Sender 線程發送時,在周遊 queue 中的 batch 時,會檢查這個 batch 是否是重試的 batch,如果是的話,隻有這個 batch 是最舊的那個需要重試的 batch,才允許發送,否則本次發送跳過這個 Topic-Partition 資料的發送等待下次發送。

繼續閱讀