天天看點

【Kafka】——順序消費、重複消費、消息丢失

前言

Kafka可以說是為分布式而生的一個消息中間件,功能很強大,提到這個,我們可能就會想到消息中間件常提到的幾個問題,消費的順序性、重複消費、消息丢失等問題,接下來我們一一來看。

一、消費的順序性

現實場景

  • 資料庫中的binlog
  • 一些業務需要,比如希望把某個訂單的資料消費是有順序的

問題描述

生産者在寫的時候,其實可以指定一個 key,比如說我們指定了某個訂單 id 作為 key,那麼這個訂單相關的資料,一定會被分發到同一個 partition 中去,寫入同一個partion中的資料是一定有順序的,如果是單線程是沒有問題的,但是吞吐量太低了,但是如果是多線程是話,順序就可能會亂掉。

【Kafka】——順序消費、重複消費、消息丢失

解決辦法

  1. 一個 topic,一個 partition,一個 consumer,内部單線程消費,單線程吞吐量太低,一般不會用這個。
  2. 寫 N 個記憶體 queue,具有相同 key 的資料都到同一個記憶體 queue;然後對于 N 個線程,每個線程分别消費一個記憶體 queue 即可,這樣就能保證順序性。
【Kafka】——順序消費、重複消費、消息丢失

二、重複消費

  1. 消費方幂等操作,重複消費不會産生問題
  2. 對每個partitionID,産生一個uniqueID,.隻有這個partition的資料被完全消費,才算成功,否則失敗復原。下次若重複執行,就skip

三、消息丢失

1.生産者資料不丢失

同步模式:配置=1(隻有Leader收到,-1所有副本成功,0不等待)。leader partition挂了,資料就會丢失。

解決:設定為-1保證produce寫入所有副本算成功

      producer.type=sync

      request.required.acks=-1

異步模式:當緩沖區滿了,如果配置為0(沒有收到确認,一滿就丢棄),資料立刻丢棄

解決:不限制阻塞逾時時間。就是一滿生産者就阻塞

producer.type=async

request.required.acks=1

queue.buffering.max.ms=5000

queue.buffering.max.messages=10000

queue.enqueue.timeout.ms = -1

batch.num.messages=200

2.消費者資料不丢失 :流計算,基本資料源不适用。進階資料源以kafka為例,由2種方式:receiver(開啟WAL,失敗可恢複)和director(checkpoint保證)

3.若是storm在消費,開啟storm的ackfail機制;若不是storm,資料處理完更新offset,低級API手動控制offset

4.Kafka發送資料過快,導緻伺服器網卡流量暴增。或磁盤過忙,出現丢包。

(1)  首先,對kafka進行限速,

(2)  其次啟用重試機制,使重試間隔變長。

(3)  Kafka設定ack=all,即需要處于ISR(副本清單)的分區都确認,才算發送成功。       

                     props.put("compression.type", "gzip");

        props.put("linger.ms", "50");

        props.put("acks", "all")表示至少成功發送一次;

        props.put("retries ", 30);

        props.put("reconnect.backoff.ms ", 20000);

         props.put("retry.backoff.ms", 20000)

5.消費者速度很慢,導緻一個session周期(0.1版本是預設30s)内未完成消費。導緻心跳機制檢測報告出問題。比如消費了的資料未及時送出offset,配置有可能是自動送出

問題場景:1.offset為自動送出,正在消費資料,kill消費者線程,下次重複消費。2.設定自動送出,關閉kafka,close之前,調用consumer.unsubscribed()則由可能部分offset沒有送出。3.消費程式和業務邏輯在一個線程,導緻offset送出逾時。

繼續閱讀