天天看點

Kafka 如果丢了消息,怎麼處理的?

Kafka 如果丢了消息,怎麼處理的?

Kafka存在丢消息的問題,消息丢失會發生在Broker,Producer和Consumer三種。

Broker

Broker丢失消息是由于Kafka本身的原因造成的,kafka為了得到更高的性能和吞吐量,将資料異步批量的存儲在磁盤中。消息的刷盤過程,為了提高性能,減少刷盤次數,kafka采用了批量刷盤的做法。即,按照一定的消息量,和時間間隔進行刷盤。這種機制也是由于linux作業系統決定的。将資料存儲到linux作業系統種,會先存儲到頁緩存(Page cache)中,按照時間或者其他條件進行刷盤(從page cache到file),或者通過fsync指令強制刷盤。資料在page cache中時,如果系統挂掉,資料會丢失。

Kafka 如果丢了消息,怎麼處理的?

Broker在linux伺服器上高速讀寫以及同步到Replica

上圖簡述了broker寫資料以及同步的一個過程。broker寫資料隻寫到PageCache中,而pageCache位于記憶體。這部分資料在斷電後是會丢失的。pageCache的資料通過linux的flusher程式進行刷盤。刷盤觸發條件有三:

主動調用sync或fsync函數

可用記憶體低于閥值

dirty data時間達到閥值。dirty是pagecache的一個辨別位,當有資料寫入到pageCache時,pagecache被标注為dirty,資料刷盤以後,dirty标志清除。

Broker配置刷盤機制,是通過調用fsync函數接管了刷盤動作。從單個Broker來看,pageCache的資料會丢失。

Kafka沒有提供同步刷盤的方式。同步刷盤在RocketMQ中有實作,實作原理是将異步刷盤的流程進行阻塞,等待響應,類似ajax的callback或者是java的future。下面是一段rocketmq的源碼。

Kafka 如果丢了消息,怎麼處理的?
GroupCommitRequest request = new  GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout()); // 刷盤      

也就是說,理論上,要完全讓kafka保證單個broker不丢失消息是做不到的,隻能通過調整刷盤機制的參數緩解該情況。比如,減少刷盤間隔,減少刷盤資料量大小。時間越短,性能越差,可靠性越好(盡可能可靠)。這是一個選擇題。

為了解決該問題,kafka通過producer和broker協同處理單個broker丢失參數的情況。一旦producer發現broker消息丢失,即可自動進行retry。除非retry次數超過閥值(可配置),消息才會丢失。此時需要生産者用戶端手動處理該情況。那麼producer是如何檢測到資料丢失的呢?是通過ack機制,類似于http的三次握手的方式。

The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are allowed: acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the retries configuration will not take effect (as the client won’t generally know of any failures). The offset given back for each record will always be set to -1. acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost. acks=allThis means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting.

http://kafka.apache.org/20/documentation.html

以上的引用是kafka官方對于參數acks的解釋(在老版本中,該參數是request.required.acks)。

acks=0,producer不等待broker的響應,效率最高,但是消息很可能會丢。

acks=1,leader broker收到消息後,不等待其他follower的響應,即傳回ack。也可以了解為ack數為1。此時,如果follower還沒有收到leader同步的消息leader就挂了,那麼消息會丢失。按照上圖中的例子,如果leader收到消息,成功寫入PageCache後,會傳回ack,此時producer認為消息發送成功。但此時,按照上圖,資料還沒有被同步到follower。如果此時leader斷電,資料會丢失。

acks=-1,leader broker收到消息後,挂起,等待所有ISR清單中的follower傳回結果後,再傳回ack。-1等效與all。這種配置下,隻有leader寫入資料到pagecache是不會傳回ack的,還需要所有的ISR傳回“成功”才會觸發ack。如果此時斷電,producer可以知道消息沒有被發送成功,将會重新發送。如果在follower收到資料以後,成功傳回ack,leader斷電,資料将存在于原來的follower中。在重新選舉以後,新的leader會持有該部分資料。資料從leader同步到follower,需要2步:

資料從pageCache被刷盤到disk。因為隻有disk中的資料才能被同步到replica。

資料同步到replica,并且replica成功将資料寫入PageCache。在producer得到ack後,哪怕是所有機器都停電,資料也至少會存在于leader的磁盤内。

上面第三點提到了ISR的清單的follower,需要配合另一個參數才能更好的保證ack的有效性。ISR是Broker維護的一個“可靠的follower清單”,in-sync Replica清單,broker的配置包含一個參數:min.insync.replicas。該參數表示ISR中最少的副本數。如果不設定該值,ISR中的follower清單可能為空。此時相當于acks=1。

Kafka 如果丢了消息,怎麼處理的?

如上圖中:

acks=0,總耗時f(t) = f(1)。

acks=1,總耗時f(t) = f(1) + f(2)。

acks=-1,總耗時f(t) = f(1) + max( f(A) , f(B) ) + f(2)。

性能依次遞減,可靠性依次升高。

Producer

Producer丢失消息,發生在生産者用戶端。

為了提升效率,減少IO,producer在發送資料時可以将多個請求進行合并後發送。被合并的請求咋發送一線緩存在本地buffer中。緩存的方式和前文提到的刷盤類似,producer可以将請求打包成“塊”或者按照時間間隔,将buffer中的資料發出。通過buffer我們可以将生産者改造為異步的方式,而這可以提升我們的發送效率。

但是,buffer中的資料就是危險的。在正常情況下,用戶端的異步調用可以通過callback來處理消息發送失敗或者逾時的情況,但是,一旦producer被非法的停止了,那麼buffer中的資料将丢失,broker将無法收到該部分資料。又或者,當Producer用戶端記憶體不夠時,如果采取的政策是丢棄消息(另一種政策是block阻塞),消息也會被丢失。抑或,消息産生(異步産生)過快,導緻挂起線程過多,記憶體不足,導緻程式崩潰,消息丢失。

Kafka 如果丢了消息,怎麼處理的?

根據上圖,可以想到幾個解決的思路:

異步發送消息改為同步發送消。或者service産生消息時,使用阻塞的線程池,并且線程數有一定上限。整體思路是控制消息産生速度。

擴大Buffer的容量配置。這種方式可以緩解該情況的出現,但不能杜絕。

service不直接将消息發送到buffer(記憶體),而是将消息寫到本地的磁盤中(資料庫或者檔案),由另一個(或少量)生産線程進行消息發送。相當于是在buffer和service之間又加了一層空間更加富裕的緩沖層。

Consumer

Consumer消費消息有下面幾個步驟:

接收消息

處理消息

回報“處理完畢”(commited)

Consumer的消費方式主要分為兩種:

自動送出offset,Automatic Offset Committing

手動送出offset,Manual Offset Control

Consumer自動送出的機制是根據一定的時間間隔,将收到的消息進行commit。commit過程和消費消息的過程是異步的。也就是說,可能存在消費過程未成功(比如抛出異常),commit消息已經送出了。此時消息就丢失了。

Kafka 如果丢了消息,怎麼處理的?
Kafka 如果丢了消息,怎麼處理的?

将送出類型改為手動以後,可以保證消息“至少被消費一次”(at least once)。但此時可能出現重複消費的情況,重複消費不屬于本篇讨論範圍。

上面兩個例子,是直接使用Consumer的High level API,用戶端對于offset等控制是透明的。也可以采用Low level API的方式,手動控制offset,也可以保證消息不丢,不過會更加複雜。

Kafka 如果丢了消息,怎麼處理的?

來源:

https://blog.dogchao.cn/?p=305

近期熱文推薦:

1.Java 15 正式釋出, 14 個新特性,重新整理你的認知!!

2.終于靠開源項目弄到 IntelliJ IDEA 激活碼了,真香!

3.我用 Java 8 寫了一段邏輯,同僚直呼看不懂,你試試看。。

4.吊打 Tomcat ,Undertow 性能很炸!!

5.《Java開發手冊(嵩山版)》最新釋出,速速下載下傳!

覺得不錯,别忘了随手點贊+轉發哦!