天天看點

Kafka筆記—可靠性、幂等性和事務

點選上方藍色字型,選擇“設為星标”

回複”資源“擷取更多資源

Kafka筆記—可靠性、幂等性和事務
Kafka筆記—可靠性、幂等性和事務

大資料技術與架構

點選右側關注,大資料開發領域最強公衆号!

Kafka筆記—可靠性、幂等性和事務
Kafka筆記—可靠性、幂等性和事務

暴走大資料

點選右側關注,暴走大資料!

Kafka筆記—可靠性、幂等性和事務

Kafka隻對“已送出”的消息(committed message)做有限度的持久化保證。

已送出的消息

當Kafka的若幹個Broker成功地接收到一條消息并寫入到日志檔案後,它們會告訴生産者程式這條消息已成功送出。

有限度的持久化保證

假如一條消息儲存在N個Kafka Broker上,那麼至少這N個Broker至少有一個存活,才能保證消息不丢失。

由于Kafka Producer是異步發送的,調用完producer.send(msg)并不能認為消息已經發送成功。

是以,在Producer永遠要使用帶有回調通知的發送API,使用producer.send(msg,callback)。一旦出現消息送出失敗的情況,可以由針對性地進行處理。

消費者是先更新offset,再消費消息。如果這個時候消費者突然當機了,那麼這條消息就會丢失。

是以我們要先消費消息,再更新offset位置。但是這樣會導緻消息重複消費。

還有一種情況就是consumer擷取到消息後開啟了多個線程異步處理消息,而consumer自動地向前更新offset。假如其中某個線程運作失敗了,那麼消息就丢失了。

遇到這樣的情況,consumer不要開啟自動送出位移,而是要應用程式手動送出位移。

使用producer.send(msg,callback)。

設定acks = all。acks是Producer的參數,代表了所有副本Broker都要接收到消息,該消息才算是“已送出”。

設定retries為一個較大的值。是Producer的參數,對應Producer自動重試。如果出現網絡抖動,那麼可以自動重試消息發送,避免消息丢失。

unclean.leader.election.enable = false。控制有哪些Broker有資格競選分區的Leader。表示不允許落後太多的Broker競選Leader。

設定replication.factor>=3。Broker參數,備援Broker。

設定min.insync.replicas>1。Broker參數。控制消息至少要被寫入到多少個副本才算是“已送出”。

確定replication.factor>min.insync.replicas。如果兩個相等,那麼隻要有一個副本挂機,整個分區就無法正常工作了。推薦設定成replication.factor=min.insync.replicas+1.

確定消息消費完成在送出。Consumer端參數enbale.auto.commit,設定成false,手動送出位移。

解釋第二條和第六條:

如果ISR中隻有1個副本了,acks=all也就相當于acks=1了,引入min.insync.replicas的目的就是為了做一個下限的限制:不能隻滿足于ISR全部寫入,還要保證ISR中的寫入個數不少于min.insync.replicas。

在0.11.0.0版本引入了建立幂等性Producer的功能。僅需要設定props.put(“enable.idempotence”,true),或props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true)。

enable.idempotence設定成true後,Producer自動更新成幂等性Producer。Kafka會自動去重。Broker會多儲存一些字段。當Producer發送了相同字段值的消息後,Broker能夠自動知曉這些消息已經重複了。

作用範圍:

隻能保證單分區上的幂等性,即一個幂等性Producer能夠保證某個主題的一個分區上不出現重複消息。

隻能實作單回話上的幂等性,這裡的會話指的是Producer程序的一次運作。當重新開機了Producer程序之後,幂等性不保證。

Kafka在0.11版本開始提供對事務的支援,提供是read committed隔離級别的事務。保證多條消息原子性地寫入到目标分區,同時也能保證Consumer隻能看到事務成功送出的消息。

保證多條消息原子性地寫入到多個分區中。這批消息要麼全部成功,要不全部失敗。事務性Producer也不懼程序重新開機。

Producer端的設定:

開啟<code>enable.idempotence = true</code>

設定Producer端參數 <code>transactional.id</code>

除此之外,還要加上調用事務API,如initTransaction、beginTransaction、commitTransaction和abortTransaction,分别應對事務的初始化、事務開始、事務送出以及事務終止。

如下:

這段代碼能保證record1和record2被當做一個事務同一送出到Kafka,要麼全部成功,要麼全部寫入失敗。

Consumer端的設定:

設定isolation.level參數,目前有兩個取值:

read_uncommitted:預設值表明Consumer端無論事務型Producer送出事務還是終止事務,其寫入的消息都可以讀取。

read_committed:表明Consumer隻會讀取事務型Producer成功送出事務寫入的消息。注意,非事務型Producer寫入的所有消息都能看到。

歡迎點贊+收藏+轉發朋友圈素質三連

Kafka筆記—可靠性、幂等性和事務

文章不錯?點個【在看】吧! ????

繼續閱讀