天天看點

kafka實作無消息丢失與精确一次語義(exactly once)處理

kafka實作無消息丢失與精确一次語義(exactly once)處理

在很多的流處理架構的介紹中,都會說kafka是一個可靠的資料源,并且推薦使用Kafka當作資料源來進行使用。這是因為與其他消息引擎系統相比,kafka提供了可靠的資料儲存及備份機制。并且通過消費者位移這一概念,可以讓消費者在因某些原因當機而重新開機後,可以輕易得回到當機前的位置。

但其實kafka的可靠性也隻能說是相對的,在整條資料鍊條中,總有可以讓資料出現丢失的情況,今天就來讨論如何避免kafka資料丢失,以及實作精确一緻處理的語義。

kafka無消息丢失處理

在讨論如何實作kafka無消息丢失的時候,首先要先清楚大部分情況下消息丢失是在什麼情況下發生的。為什麼是大部分,因為總有一些非常特殊的情況會被人忽略,而我們隻需要關注普遍的情況就足夠了。接下來我們來讨論如何較為普遍的資料丢失情況。

1.1 生産者丢失

前面介紹Kafka分區和副本的時候,有提到過一個producer用戶端有一個acks的配置,這個配置為0的時候,producer是發送之後不管的,這個時候就很有可能因為網絡等原因造成資料丢失,是以應該盡量避免。但是将ack設定為1就沒問題了嗎,那也不一定,因為有可能在leader副本接收到資料,但還沒同步給其他副本的時候就挂掉了,這時候資料也是丢失了。并且這種時候是用戶端以為消息發送成功,但kafka丢失了資料。

要達到最嚴格的無消息丢失配置,應該是要将acks的參數設定為-1(也就是all),并且将min.insync.replicas配置項調高到大于1,這部分内容在上一篇副本機制有介紹詳細解析kafka之kafka分區和副本。

同時還需要使用帶有回調的producer api,來發送資料。注意這裡讨論的都是異步發送消息,同步發送不在讨論範圍。

public class send{

......
public static void main(){
    ...
    /*
    *  第一個參數是 ProducerRecord 類型的對象,封裝了目标 Topic,消息的 kv
    *  第二個參數是一個 CallBack 對象,當生産者接收到 Kafka 發來的 ACK 确認消息的時候,
    *  會調用此 CallBack 對象的 onCompletion() 方法,實作回調功能
    */
     producer.send(new ProducerRecord<>(topic, messageNo, messageStr),
                    new DemoCallBack(startTime, messageNo, messageStr));
    ...
}
......           

}

class DemoCallBack implements Callback {

/* 開始發送消息的時間戳 */
private final long startTime;
private final int key;
private final String message;

public DemoCallBack(long startTime, int key, String message) {
    this.startTime = startTime;
    this.key = key;
    this.message = message;
}

/**
 * 生産者成功發送消息,收到 Kafka 服務端發來的 ACK 确認消息後,會調用此回調函數
 * @param metadata 生産者發送的消息的中繼資料,如果發送過程中出現異常,此參數為 null
 * @param exception 發送過程中出現的異常,如果發送成功為 null
 */
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
    long elapsedTime = System.currentTimeMillis() - startTime;
    if (metadata != null) {
        System.out.printf("message: (%d, %s) send to partition %d, offset: %d, in %d\n",
                key, message, metadata.partition(), metadata.offset(), elapsedTime);
    } else {
        exception.printStackTrace();
    }
}           

更詳細的代碼可以參考這裡:Kafka生産者分析——KafkaProducer。

我們之前提到過,producer發送到kafka broker的時候,是有多種可能會失敗的,而回調函數能準确告訴你是否确認發送成功,當然這依托于acks和min.insync.replicas的配置。而當資料發送丢失的時候,就可以進行手動重發或其他操作,進而確定生産者發送成功。

1.2 kafka内部丢失

有些時候,kafka内部因為一些不大好的配置,可能會出現一些極為隐蔽的資料丢失情況,那麼我們分别讨論下大緻都有哪幾種情況。

首先是replication.factor配置參數,這個配置決定了副本的數量,預設是1。注意這個參數不能超過broker的數量。說這個參數其實是因為如果使用預設的1,或者不在建立topic的時候指定副本數量(也就是副本數為1),那麼當一台機器出現磁盤損壞等情況,那麼資料也就從kafka裡面丢失了。是以replication.factor這個參數最好是配置大于1,比如說3。

接下來要說的還是和副本相關的,也是上一篇副本中提到的unclean.leader.election.enable 參數,這個參數是在主副本挂掉,然後在ISR集合中沒有副本可以成為leader的時候,要不要讓進度比較慢的副本成為leader的。不用多說,讓進度比較慢的副本成為leader,肯定是要丢資料的。雖然可能會提高一些可用性,但如果你的業務場景丢失資料更加不能忍受,那還是将unclean.leader.election.enable設定為false吧。

1.3 消費者丢失

消費者丢失的情況,其實跟消費者位移處理不當有關。消費者位移送出有一個參數,enable.auto.commit,預設是true,決定是否要讓消費者自動送出位移。如果開啟,那麼consumer每次都是先送出位移,再進行消費,比如先跟broker說這5個資料我消費好了,然後才開始慢慢消費這5個資料。

這樣處理的話,好處是簡單,壞處就是漏消費資料,比如你說要消費5個資料,消費了2個自己就挂了。那下次該consumer重新開機後,在broker的記錄中這個consumer是已經消費了5個的。

是以最好的做法就是将enable.auto.commit設定為false,改為手動送出位移,在每次消費完之後再手動送出位移資訊。當然這樣又有可能會重複消費資料,畢竟exactly once處理一直是一個問題呀(/攤手)。遺憾的是kafka目前沒有保證consumer幂等消費的措施,如果确實需要保證consumer的幂等,可以對每條消息維持一個全局的id,每次消費進行去重,當然耗費這麼多的資源來實作exactly once的消費到底值不值,那就得看具體業務了。

1.4 無消息丢失小結

那麼到這裡先來總結下無消息丢失的主要配置吧:

producer的acks設定位-1,同時min.insync.replicas設定大于1。并且使用帶有回調的producer api發生消息。

預設副本數replication.factor設定為大于1,或者建立topic的時候指定大于1的副本數。

unclean.leader.election.enable 設定為false,防止定期副本leader重選舉

消費者端,自動送出位移enable.auto.commit設定為false。在消費完後手動送出位移。

那麼接下來就來說說kafka實作精确一次(exactly once)處理的方法吧。

實作精确一次(exactly once)處理

在分布式環境下,要實作消息一緻與精确一次(exactly once)語義處理是很難的。精确一次處理意味着一個消息隻處理一次,造成一次的效果,不能多也不能少。

那麼kafka如何能夠實作這樣的效果呢?在介紹之前,我們先來介紹其他兩個語義,至多一次(at most once)和至少一次(at least once)。

最多一次和至少一次

最多一次就是保證一條消息隻發送一次,這個其實最簡單,異步發送一次然後不管就可以,缺點是容易丢資料,是以一般不采用。

至少一次語義是kafka預設提供的語義,它保證每條消息都能至少接收并處理一次,缺點是可能有重複資料。

前面有介紹過acks機制,當設定producer用戶端的acks是1的時候,broker接收到消息就會跟producer确認。但producer發送一條消息後,可能因為網絡原因消息逾時未達,這時候producer用戶端會選擇重發,broker回應接收到消息,但很可能最開始發送的消息延遲到達,就會造成消息重複接收。

那麼針對這些情況,要如何實作精确一次處理的語義呢?

幂等的producer

要介紹幂等的producer之前,得先了解一下幂等這個詞是什麼意思。幂等這個詞最早起源于函數式程式設計,意思是一個函數無論執行多少次都會傳回一樣的結果。比如說讓一個數加1就不是幂等的,而讓一個數取整就是幂等的。因為這個特性是以幂等的函數适用于并發的場景下。

但幂等在分布式系統中含義又做了進一步的延申,比如在kafka中,幂等性意味着一個消息無論重複多少次,都會被當作一個消息來持久化處理。

kafka的producer預設是支援最少一次語義,也就是說不是幂等的,這樣在一些比如支付等要求精确資料的場景會出現問題,在0.11.0後,kafka提供了讓producer支援幂等的配置操作。即:

props.put("enable.idempotence", ture)

在建立producer用戶端的時候,添加這一行配置,producer就變成幂等的了。注意開啟幂等性的時候,acks就自動是“all”了,如果這時候手動将ackss設定為0,那麼會報錯。

而底層實作其實也很簡單,就是對每條消息生成一個id值,broker會根據這個id值進行去重,進而實作幂等,這樣一來就能夠實作精确一次的語義了。

但是!幂等的producery也并非萬能。有兩個主要是缺陷:

幂等性的producer僅做到單分區上的幂等性,即單分區消息不重複,多分區無法保證幂等性。

隻能保持單會話的幂等性,無法實作跨會話的幂等性,也就是說如果producer挂掉再重新開機,無法保證兩個會話間的幂等(新會話可能會重發)。因為broker端無法擷取之前的狀态資訊,是以無法實作跨會話的幂等。

事務的producer

當遇到上述幂等性的缺陷無法解決的時候,可以考慮使用事務了。事務可以支援多分區的資料完整性,原子性。并且支援跨會話的exactly once處理語義,也就是說如果producer當機重新開機,依舊能保證資料隻處理一次。

開啟事務也很簡單,首先需要開啟幂等性,即設定enable.idempotence為true。然後對producer發送代碼做一些小小的修改。

//初始化事務

producer.initTransactions();

try {

//開啟一個事務
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
//送出
producer.commitTransaction();           

} catch (KafkaException e) {

//出現異常的時候,終止事務
producer.abortTransaction();           

但無論開啟幂等還是事務的特性,都會對性能有一定影響,這是必然的。是以kafka預設也并沒有開啟這兩個特性,而是交由開發者根據自身業務特點進行處理。

以上~

原文位址

https://www.cnblogs.com/listenfwind/p/12207693.html

繼續閱讀