天天看點

kafka筆記之如何保證資料不丢失

kafka筆記之如何保證資料不丢失

Kafka是一種高吞吐量的分布式釋出訂閱消息系統。在使用過程中如果使用不當,經常會出現消息丢失的情況,這是業務系統不能容忍的,消息系統最重要的是保證資料不丢失。本文主要記錄kafka是如何保證資料不丢失的,主要從三方面來介紹,消息發送端保證資料不丢失,kafka服務保證消息不丢失,消費者保證消息不丢失。

基礎知識

kafka 可以保證分區消息的順序,同一個分區,先發送到kafka分區的消息,會被先消費掉。 kafka 是通過一個叢集對外提供服務,隻要是叢集中多個副本中有一個副本是活躍的,那麼收到的消息就不會丢失。

kafka叢集保證資料不丢失

先思考一個問題: kafka叢集什麼時候會丢失消息?

這就要從kafka的複制機制開始講了。

kafka每個topic有多個分區,分區存儲在磁盤上,kafka可以保證分區的資料是有序的,每個分區可以有多個副本。

副本按照是否是首領,可以分為首領副本和跟随者副本(這裡對應的就是kafka叢集中的leader和follower)。

所有的消息都是發送給leader的,消息消費也是從leader擷取的。首領副本第一時間收到消息,或者消費消息,他一定是同步副本。

其他follower都是和leader保持通信,同步leader的消息。當leader不可用時,會選舉一個follower會變成leader。

對于一個主一個從的兩個kafka,做的叢集來說。

(此時的kafka複制系數是2. 對應的配置參數是replication,factor) 一個是leader副本,一個是follower副本。當follower副本一直能與leader副本保持同步的時候 follower副本是 同步副本, 當follower與leader無法保持同步的時候 follower副本則變成非同步副本。

如果leader當機,這時候系統需要選舉一個follower來作為首領,kafka優先選擇同步副本作為首領,當系統沒有同步副本的時候。 kafka如果選擇非同步副本作為首領,則會丢失一部分資料,(這一部分資料就是非同步副本無法及時從首領副本更新的消息)。 kafka如果不選擇非同步副本作為首領,則此時kafka叢集不可用。

kafka 選擇非同步副本作為首領副本的行為叫做,不完全首領選舉。如何控制kafka在leader當機時,同步副本不可用時,是否選擇非同步 作為首領?通過kafka的另外一個參數來控制的 : unclean.leader.election. 如果是true 則會發生不完全首領選舉。

副本數建議3個就可以,多的話需要更多的磁盤,unclean.leader.election 建議false.

對于兩個kafka做的叢集來說,肯定是不安全的。那麼三個節點的kafka安全嗎?

答案是 也不一定安全 因為即使三個副本,也有可能是兩個從都是非同步副本,此時主當機,從要麼不可用(影響高可用),要麼成為主(資料丢失)。 這裡就需要保證kafka系統中至少有兩個同步副本。一個肯定是首領副本,另外一個是從的副本。 此時需要kafka的另外一個參數 最小同步副本數 min,insync.replicas 隻有保證kafka收到生産者的消息之後,至少有 “最小同步副本數“ 的副本收到消息,才能保證在主當機時消息不丢失。 這個參數的意思是 kafka收到生産者消息之後,至少幾個同步副本,同步之後,才給用戶端消息确認。 數量多能保證高可用,但是犧牲效率。

kafka 如何判斷一個follower副本是不是同步副本? 滿足兩個個條件 1.在過去10秒内從首領擷取過消息,并且是最新消息。 2.過去6秒内 和 zk直接發送過心跳。

疑問:如果kafka 長時間未收到消息,第一條如何滿足?

消息發送者正确的發送姿勢

消息怎麼才算是發生成功?

消息的生産者向kafka叢集發送消息,需要等待kafka叢集确認,這裡涉及到一個參數 acks 他的值有三個 0, 1, all 如果是0 ,那麼代表發送過去,不等待kafka消息确認,認為成功 一定會丢失消息,可能kafka叢集正在選舉,此時就無法收到任何異常。 如果是1,那麼代表發送過去,等待首領副本确認消息,認為成功 首領肯定收到了消息,寫入了分區檔案(不一定落盤)。 如果是all, 那麼代表發送過去之後,消息被寫入所有同步副本之後 ,認為成功。 注意這裡是 所有同步副本,不是所有副本。 具體是多少同步副本,還要取決于kafka叢集設定的最小同步副本數,和叢集目前的同步副本數。 選擇這種配置,會可靠,但是犧牲效率,可以通過,增大批和使用異步模式,提高效率。

如果發生消息發生異常怎麼辦?重試嗎?

哪些異常需要重試? 網絡異常和叢集無主,或者正在選舉的異常是可以重試的。 哪些不需要重試? 配置異常。 其他異常怎麼辦? 序列化異常,記憶體溢出,棧溢出等。

重要的配置參數

如果網絡異常收不到響應,則等待,這裡有個配置等待時間 request,timeout.ms 發送消息等待時間。 metadata.fetch.time.out 從kafka 擷取中繼資料的等待時間。 max.block.ms : 配置控制了KafkaProducer.send()并将KafkaProducer.partitionsFor()被阻塞多長時間。 由于緩沖區已滿或中繼資料不可用,這些方法可能會被阻塞止。使用者提供的序列化程式或分區程式中的阻塞将不計入此逾時 重試次數 retries 重試直接的等待時間, 預設是100 ms ,可以通過 retry.backoff.ms 配置 多個消息發送給同一個分區的時候,生産者會把消息打成一個批,批大小設定 batch.size 過大占記憶體,過小發送頻繁,并且生産者不是 必須滿批發送,有個等待時間,linger.ms設定 等待多久批不滿則發送。

消息消費者正确的消費姿勢

消費者需要向kafka叢集送出 已經消費的消息的offset來确定消息消費到了那裡。 消息隊列的消費方式有兩種,一種是釋出訂閱模式,一種是隊列模式。 釋出訂閱模式 一個消息可以被多個消費者消費。隊列模式多個消費者隻能消費到一部分消息。 kafka是通過group-id來區分消費組的。 一個topic被 同一個消費組的不同消費者消費 ,相當于是隊列模式。被不同消費組消費相當于是 訂閱模式。 一個partition在同一個時刻隻有一個consumer instance在消費。 對于正确的模式,我們需要配置正确的group-id

auto.offset.reset 沒有偏移量可以送出的時候,系統從哪裡開始消費。 有兩種設定 :earliest 和latest 。

enable.auto.commit

自動送出 ,如果開啟了自動送出,那麼系統會自動進行送出offset。可能會引起,并未消費掉,就送出了offset.引起資料的丢失。

與自動送出相關的是自動送出的間隔時間 auto.commit.interval.ms 預設是5秒鐘送出一次,可以通過檢視 kafka config目錄下的

配置檔案,查詢配置的預設值。

自動送出 還可能引起消息的重複消費,特别是 多個用戶端直接出現重平衡時。