天天看點

kafka系列之幂等生産者(11)

幂等”這個詞原是數學領域中的概念,指的是某些操作或函數能夠被執行多次,但每次得到的結果都是不變的

在指令式程式設計語言(比如 C)中,若一個子程式是幂等的,那它必然不能修改系統狀态。這樣不管運作這個子程式多少次,與該子程式關聯的那部分系統狀态保持不變。

在函數式程式設計語言(比如 Scala 或 Haskell)中,很多純函數(pure function)天然就是幂等的,它們不執行任何的 side effect。

幂等性有很多好處,其最大的優勢在于我們可以安全地重試任何幂等性操作,反正它們也不會破壞我們的系統狀态

在很多系統中消息重複是不被允許的,例如一些業務結算平台(如物流平台、銀行結算平台等)

為了解決重試導緻的消息重複、亂序問題,kafka引入了幂等消息。幂等消息保證producer在一次會話内寫入同一個partition内的消息具有幂等性,也就是說消息不會重複。

Kafka的幂等性其實就是将原來需要在下遊系統中進行的去重操作放在了資料上遊kafka 中。

所謂的消息傳遞可靠性保障,是指 Kafka 對 Producer 和 Consumer 要處理的消息提供什麼樣的承諾。常見的承諾有以下三種:

最多一次(at most once):消息可能會丢失,但絕不會被重複發送。

至少一次(at least once):消息不會丢失,但有可能被重複發送。

精确一次(exactly once):消息不會丢失,也不會被重複發送。

Kafka 預設提供的傳遞可靠性保障是即至少一次,因為kafka 的producer 在消息發送失敗(沒有接收到kafka broker 的ACK資訊)的時候則會進行重試,這就是kafka 為什麼預設提供的是至少一次的傳遞語義,但是這樣可能導緻消息重複

Kafka 也可以提供最多一次傳遞保障,隻需要讓 Producer 禁止重試即可。這樣一來,消息要麼寫入成功,要麼寫入失敗,但絕不會重複發送。我們通常不會希望出現消息丢失的情況,但一些場景裡偶發的消息丢失其實是被允許的,相反,消息重複是絕對要避免的。此時,使用最多一次傳遞保障就是最恰當的。

無論是至少一次還是最多一次,都不如精确一次來得有吸引力。大部分使用者還是希望消息隻會被傳遞一次,這樣的話,消息既不會丢失,也不會被重複處理。或者說,即使 Producer 端重複發送了相同的消息,Broker 端也能做到自動去重。在下遊 Consumer 看來,消息依然隻有一條,而這就是我們今天要介紹的幂等性。

producer 預設不是幂等性的,但我們可以建立幂等性 Producer。它其實是 0.11.0.0 版本引入的新功能。在此之前,Kafka 向分區發送資料時,可能會出現同一條消息被發送了多次,導緻消息重複的情況。

在 0.11 之後,指定 Producer 幂等性的方法很簡單,僅需要設定一個參數即可,即 props.put(“enable.idempotence”, ture),或 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)。

enable.idempotence 被設定成 true 後,Producer 自動更新成幂等性 Producer,其他所有的代碼邏輯都不需要改變

Prodcuer 幂等性對外保留的接口非常簡單,其底層的實作對上層應用做了很好的封裝,應用層并不需要去關心具體的實作細節,對使用者非常友好。

Kafka 自動幫你做消息的重複去重,底層具體的原理很簡單,就是經典的用空間去換時間的優化思路,即在 Broker 端多儲存一些字段。

當 Producer 發送了具有相同字段值的消息後,Broker 能夠自動知曉這些消息已經重複了,于是可以在背景默默地把它們“丢棄”掉。

producer每次啟動後,首先向broker申請一個全局唯一的pid,用來辨別本次會話,這個ProducerID對用戶端使用者是不可見的

重新開機之後辨別producer的PID就變化了,broker就不認識,是以幂等性是隻能在單次會話内的

對于每個PID,該Producer發送資料的每個<Topic,Partition>都對應一個從0開始單調遞增的Sequence Number,Broker端在緩存中儲存了這seq number

對于接收的每條消息,如果其序号比Broker緩存中序号大于1則接受它,否則将其丢棄,這樣就可以實作了消息重複送出了

但是隻能保證單個Producer對于同一個<Topic,Partition>的Exactly Once語義

producer在收到明确的的消息丢失ack,或者逾時後未收到ack,要進行重試。

重試的時候sequence number不變,因為sequence number在第一次發送的時候已經确定了,重試隻是重新發送

生産者重新開機時PID 就會發生變化,同時不同的 分區(Partition)也具有不同的編号,是以生産者幂等性無法保證跨分區和跨會話的 Exactly Once。

首先,它隻能保證單分區上的幂等性,即一個幂等性 Producer 能夠保證某個主題的一個分區上不出現重複消息,它無法實作多個分區的幂等性。

一個幂等性的producer,隻保證單分區的幂等性,而producer的消息會發給一個主題的多個分區,這就是為什麼不能保證整個Topic的幂等了。

其實這裡有一個問題值得思考,那就是為什麼幂等性是針對單分區的

它隻能實作單會話上的幂等性,不能實作跨會話的幂等性。這裡的會話,你可以了解為 Producer 程序的一次運作。當你重新開機了 Producer 程序之後,這種幂等性保證就喪失了

重新開機之後辨別producer的PID就變化了,broker就不認識了——這個是幂等性的另一個限制條件,無法實作誇會話的幂等性。

如果想實作多分區以及多會話上的消息無重複,就是事務(transaction)或者依賴事務型 Producer。

這也是幂等性 Producer 和事務型 Producer 的最大差別!

其實前面我們也說過一次,可以看看kafka的client 的源碼,看client 的源碼其實可以讓你快速了解kafka 都有哪些東西可以用,server 端的源碼可以讓你了解原理,其實很多時候知道有什麼樣的工具,就能幫你解決很多問題了。

kafka系列之幂等生産者(11)

其實prodcuer 端的很多東西我們都介紹過了,這裡面的幾個類我們都學過了

名稱

作用

Producer

這是一個接口,我們使用的KafkaProducer就是繼承自這個接口

KafkaProducer

我們發送資料的時候使用的對象

Partitioner

分區器的接口,我們也看過它的幾個預設實作(DefaultPartitioner,RoundRobinPartitioner,UniformStickyPartitioner),你也可以自己繼承這個接口寫一個

ProducerConfig

用戶端配置類,我們在建立Properties的時候都是直接寫的字元串,也可以使用這個配置類提供的常量,ProducerConfig.BOOTSTRAP_SERVERS_CONFIG和“bootstrap.servers”等價的

ProducerRecord

我們發送的消息對象

RecordMetadata

發送消息之後傳回的對象,記錄了消息發送的元資訊

Callback

回調接口,主要用在異步發送,這個接口kafka 也提供了一個預設實作ErrorLoggingCallback

ProducerInterceptor

生産者攔截器

接下來我們看一下今天的幾個主角,也就是internals 包下的幾個類

kafka系列之幂等生産者(11)

首先說一下為什麼要說這個類呢,大家看一下下面的截圖就知道了

kafka系列之幂等生産者(11)

這是KafkaProducer裡面doSend方法的段,也就是我們用戶端調用的send方法,這個說明了一個問題,什麼問題呢,那就是KafkaProducer的send 方法并不是直接将消息發送出去的,而是将消息追加到緩存區。

如果目前緩存區已寫滿或建立了一個新的緩存區,則喚醒 Sender(消息發送線程),将緩存區中的消息發送到 broker 伺服器,最終傳回 future。從這裡也能得知,doSend 方法執行完成後,此時消息還不一定成功發送到 broker,因為還沒有發送呢。

這裡當緩存區滿了之後則将sender 喚醒,進行消息發送,其實到這裡我們應該能猜到sender 是個什麼了,是個線程<code>public class Sender implements Runnable</code>

前面我們說PID是實作幂等的關鍵元素,我們下面看一下PID是怎麼獲得的,就是在Sender 的run 方法裡面

run 方法裡面有一個<code>maybeWaitForProducerId</code>方法就是用來擷取ProducerId的,這個名稱真的是見名知意,我們也簡單看一下這個方法

當transactionManager 沒有ProducerId()的時候才執行的

而且我們看到這InitProducerIdResponse使用response上獲得的,是以我們認為這個ProducerId其實不是用戶端生成的,而是服務端生成的。接下來我們看一下這個請求的方法<code>sendAndAwaitInitProducerIdRequest</code>

可以看出它是通過node的資訊,建立了一個<code>ClientRequest</code> 發送出去,更準确的是<code>InitProducerIdRequest</code> 然後擷取傳回後的response 中的ProducerId,如果你感興趣的話,也可以看一下服務端是怎麼處理這個請求的,下面是服務端的代碼,scala 寫的

這裡我們看到最終是通過<code>Coordinator</code> 生成的

可以看到Server 在給一個 client 初始化 PID 時,實際上是通過 ProducerIdManager 的 <code>generateProducerId()</code> 方法産生一個 PID。

接下來其實就到了ZK 了,你要是感興趣可以接着往下走走

為什麼突然又冒出來這麼一個類呢,前面我們提到KafkaProducer的send 方法其實隻是将消息添加到了緩存之中,并沒有真正的發送,我們知道發送是在Sender 的run 方法裡面完成的,我們的Producer Id也是在run 方法的裡面擷取的

kafka系列之幂等生産者(11)

我們這裡将Sender 的run 方法的内容分為了兩塊,第一塊執行一些初始化的操作,第二部分發送資料,我們接下來看一下這個方法的實作,做了一定的删減

accumulator 就是我們緩存消息的地方,這裡我們看到,資料最終是封裝到ProducerBatch裡面進去發送的,接下來到了我們的主角了<code>sequence numbe</code>,我們知道<code>sequence numbe</code>應該是在ProducerBatch裡面的,那應該就是在accumulator 的<code>drain</code> 方法裡面實作的了,那我們就看一下這個代碼的實作,代碼有點長,我們隻選取部分

這個代碼首先擷取了<code>ProducerIdAndEpoch</code> 這個類的對象,這個對象就封裝了ProducerId,然後我們就關注一下sequence number,我們看到這裡有一個判斷那就是<code>!batch.hasSequence()</code>,下面也寫了一段注釋,就是Sequence一旦生成不可改變。

接下來到看了關鍵之處了<code>batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional);</code>我們知道<code>batch</code>對象應該有sequence number這樣的一個屬性,是以我們猜測這個屬性就是在<code>batch.setProducerState</code> 方法中完成指派的。

注意一下第二個參數<code>transactionManager.sequenceNumber(batch.topicPartition)</code>,根據Partition 擷取到了sequenceNumber,我們也可以看一下這個代碼

這裡我們就完成了我們的擷取,你看一下下面截圖的地方,擷取完成之後會調用自增的操作,來維持SequenceNumber的自增特性

kafka系列之幂等生産者(11)

最後我們再看一下setProducerState的内容,我們看到<code>setProducerState</code> 方法之後,我們的批量資料就有了一個屬性baseSequence,可以用于伺服器端進行判斷。

有了 PID 之後,在 PID + Topic-Partition 級别上添加一個 sequence numbers 資訊,就可以實作 Producer 的幂等性了。

ProducerBatch 也提供了一個 <code>setProducerState()</code> 方法,它可以給一個 batch 添加一些 meta 資訊(pid、baseSequence、isTransactional),這些資訊是會伴随着 ProduceRequest 發到 Server 端,Server 端也正是通過這些 meta 來做相應的判斷,接下來我們看一下伺服器端的處理,代碼入口在<code>KafkaApis</code> 在這個類中(scala),我們找到對應處理請求的方法<code>handleProduceRequest</code>,然後我們看一下這個代碼的實作。我們這裡隻截取了部分

kafka系列之幂等生産者(11)

可以看到後面前面主要還是權限校驗,後面則開始周遊處理資料

跨會話

不支援跨會話的原因是重新開機之後辨別producer的PID就變化了,這就導緻broker無法根據這個&lt;PID,TP,SEQNUM&gt;條件去去判斷是否重複。

跨分區

我們知道在某一個partition 上判斷是否重複是通過一個遞增的sequence number,也就是說這個遞增是針對目前特定分區的,如果你要是發送到其他分區上去了,那麼遞增關系就不存在了。

思考

retry會保證發送到同一個分區嗎?

什麼情況下單分區的幂等能保證全局的幂等

主要介紹了什麼是幂等以及它的實作原理

從源碼層面上分析了Producer Id 和 Sequence Number 的擷取和它們的工作原理

Sequence Number它不是和某一條消息進行綁定的,而是和一批消息進行綁定的。

幂等的不足之處不支援跨回話和跨分區,優點的話也很明顯,使用簡單

繼續閱讀