天天看點

一文讀懂kafka消息丢失問題和解決方案

作者:搬山道猿

前言

今天分享一下kafka的消息丢失問題,kafka的消息丢失是一個很值得關注的問題,根據消息的重要性,消息丢失的嚴重性也會進行放大,如何從最大程度上保證消息不丢失,要從生産者,消費者,broker幾個端來說。

消息發送和接收流程

kafka生産者生産好消息後,會将消息發送到broker節點,broker對資料進行存儲,kafka的消息是順序存儲在磁盤上,以主題(topic),分區(partition)的邏輯進行劃分,消息最終存儲在日志檔案中,消費者會循環從broker拉取消息。

一文讀懂kafka消息丢失問題和解決方案

那麼從上圖的圖中可以看出kafka丢消息可能存在的三個地方分别為:

  • 生産者到broker
  • broker到磁盤
  • 消費者

生産者到broker消息丢失

生産者發送消息到broker是會存在消息丢失的,大多可能是由于網絡原因引起的,消息中間件中一般都是通過ack來解決這個問題的,kafka中可以通過設定ack來解決這個問題。

acks有三種類型:

  • 1
  • -1(all)

acks為0

acks設定為0,代表生産者發送消息後就不管不顧了,不用等待broker的任何響應,那麼可能網絡異常或者其他原因導緻broker沒有處理到到這條消息,那麼消息就丢失了。

一文讀懂kafka消息丢失問題和解決方案

acks為1

acks設定為1,代表生産者發送消息到broker後,隻需要broker的leader副本确認收到後就成功響應,不需要follower副本響應,就算follower副本崩潰了,也會成功響應。

一文讀懂kafka消息丢失問題和解決方案

acks為-1(all)

acks設定為-1,或者為all,那麼生産者發送消息需要leader和follower都收到并寫入消息才成功響應生産者,也就是ISR集合要全部寫入,當ISR集合中隻要有一個沒有寫入成功,那麼就收到失敗響應,是以acks=-1能夠在最大程度上保證消息的不丢失,但是也是有條件的,需要ISR集合中有兩個以上副本才能保證,如果隻有一個副本,那麼就是就隻有一個leader,沒有follower,如果leader挂掉,就不能選舉出一個eader,消息自然也就丢失,這和acks=1是一樣的。

一文讀懂kafka消息丢失問題和解決方案

解決消息丢失

從上面三種類型的acks中我們可以看出,acks=-1是保證消息從生産者到broker不丢失的最佳設定方式,不過我們也能想到,它需要ISR每個副本都成功應答,是以它的效率自然沒有前面兩個高,不過此篇我們讨論的是保證消息不丢失問題,是以一切從不丢失層面區說。

如果消息發送失敗,那麼生産者可以重試發送消息,可以手動在代碼中編寫消息重發邏輯,也可以配置重試參數。

  • retries
  • retry.backoff.ms

retries表示重試次數,retry.backoff.ms表示重試時間間隔,比如第一次重試依舊沒成功,那麼隔多久再進行重試,kafka重試的底層邏輯是将沒發送成功的消息重新入隊,因為kafka的生産者生産消息後,消息并非就直接發送到broker,而是儲存在生産者端的收集器(RecordAccumulator),然後由Sender線程去擷取RecordAccumulator中的消息,然後再發送給broker,當消息發送失敗後,會将消息重新放入RecordAccumulator中,具體邏輯可以看kafka的生産者端Sender的源碼。

一文讀懂kafka消息丢失問題和解決方案

消息重發引起的消息順序性問題

要注意,消息發送失敗進行重發不能保證消息發送的順序性,這裡的順序性是單分區順序性,如果服務對于消息的順序性有嚴格的要求,那麼我們可以通過設定屬性max.in.flight.requests.per.connection=1來保證消息的順序性,這個配置對應的是kafka中InFlightRequests,max.in.flight.requests.per.connection代表請求的個數,kafka在建立Sender的時候會判斷,如果maxInflightRequests為1,那麼guaranteeMessageOrder就為true,就能保證消息的順序性。

一文讀懂kafka消息丢失問題和解決方案

broker到磁盤丢消息

broker收到消息後,需要将消息寫入磁盤的log檔案中,但是并不是馬上寫,因為我們知道,生産者發送消息後,消費者那邊需要馬上擷取,如果broker要寫入磁盤,那麼消費者拉取消息,broker還要從log檔案中擷取消息,這顯然是不合理的,是以kafka引入了(page cache)頁緩存。

page cache是磁盤和broker之間的消息映射關系,它是基于記憶體的,當broker收到消息後,會将消息寫入page cache,然後由作業系統進行刷盤,将page cache中的資料寫入磁盤。

如果broker發生故障,那麼此時page cache的資料就會丢失,broker端可以設定刷盤的參數,比如多久刷盤一次,不過這個參數不建議去修改,最好的方案還是設定多副本,一個分區設定幾個副本,當broker故障的時候,如果還有其他副本,那麼資料就不會丢失。

消費者丢消息

kafka的消費模式是拉模式,需要不斷地向broker拉取消息,拉取的消息消費了以後需要送出offset,也就是送出offset這裡可能會出現丢消息,kafka中提供了和offset相關的幾個配置項。

  • enable.auto.commit
  • auto.commit.interval.ms
  • auto.offset.reset

下面我們先了解一下kafka offset的送出和參數詳解。

enable.auto.commit代表是否自動送出offset,預設為true,auto.commit.interval.ms代表多久送出一次offset,預設為5秒。

如下圖,目前消費者消費到了分區中為3的消息。

一文讀懂kafka消息丢失問題和解決方案

那麼下次當消費者讀取消息的時候是從哪裡讀取呢,當然從4開始讀取,因為是從上次讀取的offset的下一位開始讀取,是以我們就說目前消費組的offset為4,,因為下次是從4開始消費,如果5秒之内又消費了兩條消息然後自動送出了offset,那麼此時的offset如下:

一文讀懂kafka消息丢失問題和解決方案

enable.auto.commit如果為false,就代表不會自動送出offset。

auto.offset.reset=latest代表從分區中最新的offset處開始讀取消息,比如某個消費者組上次送出的偏移量為5,然後後面又生産了2條消息,再次讀取消息時,讀取到的是6,7,8這個三個消息,如果enable.auto.commit設定為false,那麼不管往分區中寫入多少消息,都是從6開始讀取消息。

一文讀懂kafka消息丢失問題和解決方案

此時如果一個新的的消費組訂閱了這個分區,因為這個消費者組沒有在這個分區送出過offset,是以它擷取消息并不是從6開始擷取,而是從1開始擷取。

一文讀懂kafka消息丢失問題和解決方案
是以可知每個消費者組在分區中的offset是獨立的。

auto.offset.reset還可以設定為earliest和none,使用earliest,如果此消費組從來沒有送出過offset,那麼就從頭開始消費,如果送出過offset,那麼就從最新的offset處消費,就和latest一樣了,使用none,如果消費組沒有送出過offset,在分區中找不到任何offset,那麼就會抛出異常。

org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [stock1-0]
複制代碼           

上面我們初步了解了offset的一些知識,對offset的送出和和讀取有一些了解,因為上面我們隻提及offset的自動送出,而自動送出的主動權在kafka,而不在我們,是以可能因為一些原因而導緻消息丢失。

消息處理異常

當我們收到消息後對消息進行處理,如果在處理的過程中發生異常,而又設定為自動送出offset,那麼消息沒有處理成功,offset已經送出了,當下次擷取消息的時候,由于已經送出過ofset,是以之前的消息就擷取不到了,是以應該改為手動送出offset,當消息處理成功後,再進行手動送出offset。

總結

關于kafka的消息丢失問題和解決方案就說到這裡,我們分别從生産者到broker,broker到磁盤以及消費者端進行說明,也引申出一些知識點,可能平時沒有遇到消息丢失的情況,那是因為網絡比較可靠,資料量可能不大,但是如果要真的實作高可用,高可靠,那麼就需要對其進行設計。

繼續閱讀