前言
Kafka可以說是為分布式而生的一個消息中間件,功能很強大,提到這個,我們可能就會想到消息中間件常提到的幾個問題,消費的順序性、重複消費、消息丢失等問題,接下來我們一一來看。
一、消費的順序性
現實場景
- 資料庫中的binlog
- 一些業務需要,比如希望把某個訂單的資料消費是有順序的
問題描述
生産者在寫的時候,其實可以指定一個 key,比如說我們指定了某個訂單 id 作為 key,那麼這個訂單相關的資料,一定會被分發到同一個 partition 中去,寫入同一個partion中的資料是一定有順序的,如果是單線程是沒有問題的,但是吞吐量太低了,但是如果是多線程是話,順序就可能會亂掉。
解決辦法
- 一個 topic,一個 partition,一個 consumer,内部單線程消費,單線程吞吐量太低,一般不會用這個。
- 寫 N 個記憶體 queue,具有相同 key 的資料都到同一個記憶體 queue;然後對于 N 個線程,每個線程分别消費一個記憶體 queue 即可,這樣就能保證順序性。
二、重複消費
- 消費方幂等操作,重複消費不會産生問題
- 對每個partitionID,産生一個uniqueID,.隻有這個partition的資料被完全消費,才算成功,否則失敗復原。下次若重複執行,就skip
三、消息丢失
1.生産者資料不丢失
同步模式:配置=1(隻有Leader收到,-1所有副本成功,0不等待)。leader partition挂了,資料就會丢失。
解決:設定為-1保證produce寫入所有副本算成功
producer.type=sync
request.required.acks=-1
異步模式:當緩沖區滿了,如果配置為0(沒有收到确認,一滿就丢棄),資料立刻丢棄
解決:不限制阻塞逾時時間。就是一滿生産者就阻塞
producer.type=async
request.required.acks=1
queue.buffering.max.ms=5000
queue.buffering.max.messages=10000
queue.enqueue.timeout.ms = -1
batch.num.messages=200
2.消費者資料不丢失 :流計算,基本資料源不适用。進階資料源以kafka為例,由2種方式:receiver(開啟WAL,失敗可恢複)和director(checkpoint保證)
3.若是storm在消費,開啟storm的ackfail機制;若不是storm,資料處理完更新offset,低級API手動控制offset
4.Kafka發送資料過快,導緻伺服器網卡流量暴增。或磁盤過忙,出現丢包。
(1) 首先,對kafka進行限速,
(2) 其次啟用重試機制,使重試間隔變長。
(3) Kafka設定ack=all,即需要處于ISR(副本清單)的分區都确認,才算發送成功。
props.put("compression.type", "gzip");
props.put("linger.ms", "50");
props.put("acks", "all")表示至少成功發送一次;
props.put("retries ", 30);
props.put("reconnect.backoff.ms ", 20000);
props.put("retry.backoff.ms", 20000)
5.消費者速度很慢,導緻一個session周期(0.1版本是預設30s)内未完成消費。導緻心跳機制檢測報告出問題。比如消費了的資料未及時送出offset,配置有可能是自動送出
問題場景:1.offset為自動送出,正在消費資料,kill消費者線程,下次重複消費。2.設定自動送出,關閉kafka,close之前,調用consumer.unsubscribed()則由可能部分offset沒有送出。3.消費程式和業務邏輯在一個線程,導緻offset送出逾時。