天天看點

頭疼的 Kafka 消息重複問題,立馬解決!

作者:NEDHOME
一、前言

資料重複這個問題其實也是挺正常,全鍊路都有可能會導緻資料重複。

頭疼的 Kafka 消息重複問題,立馬解決!

通常,消息消費時候都會設定一定重試次數來避免網絡波動造成的影響,同時帶來副作用是可能出現消息重複。

整理下消息重複的幾個場景:

  1. 生産端: 遇到異常,基本解決措施都是 重試。
  • 場景一:leader分區不可用了,抛 LeaderNotAvailableException 異常,等待選出新 leader 分區。
  • 場景二:Controller 所在 Broker 挂了,抛 NotControllerException 異常,等待 Controller 重新選舉。
  • 場景三:網絡異常、斷網、網絡分區、丢包等,抛 NetworkException 異常,等待網絡恢複。

消費端: poll 一批資料,處理完畢還沒送出 offset ,機子當機重新開機了,又會 poll 上批資料,再度消費就造成了消息重複。

怎麼解決?

先來了解下消息的三種投遞語義:

  • 最多一次(at most once): 消息隻發一次,消息可能會丢失,但絕不會被重複發送。例如:mqtt 中 QoS = 0。
  • 至少一次(at least once): 消息至少發一次,消息不會丢失,但有可能被重複發送。例如:mqtt 中 QoS = 1
  • 精确一次(exactly once): 消息精确發一次,消息不會丢失,也不會被重複發送。例如:mqtt 中 QoS = 2。

了解了這三種語義,再來看如何解決消息重複,即如何實作精準一次,可分為三種方法:

  1. Kafka 幂等性 Producer: 保證生産端發送消息幂等。局限性,是隻能保證單分區且單會話(重新開機後就算新會話)
  2. Kafka 事務: 保證生産端發送消息幂等。解決幂等 Producer 的局限性。
  3. 消費端幂等: 保證消費端接收消息幂等。蔸底方案。

1)Kafka幂等性Producer

幂等性指:無論執行多少次同樣的運算,結果都是相同的。即一條指令,任意多次執行所産生的影響均與一次執行的影響相同。

幂等性使用示例:在生産端添加對應配置即可

Properties props = new Properties();props.put("enable.idempotence", ture); // 1. 設定幂等props.put("acks", "all"); // 2. 當 enable.idempotence 為 true,這裡預設為 allprops.put("max.in.flight.requests.per.connection", 5); // 3. 注意           
  1. 設定幂等,啟動幂等。
  2. 配置 acks,注意:一定要設定 acks=all,否則會抛異常。
  3. 配置 max.in.flight.requests.per.connection 需要 <= 5,否則會抛異常 OutOfOrderSequenceException。
  • 0.11 >= Kafka < 1.1, max.in.flight.request.per.connection = 1
  • Kafka >= 1.1, max.in.flight.request.per.connection <= 5

為了更好了解,需要了解下Kafka 幂等機制:

頭疼的 Kafka 消息重複問題,立馬解決!
  1. Producer 每次啟動後,會向 Broker 申請一個全局唯一的 pid。(重新開機後 pid 會變化,這也是弊端之一)
  2. Sequence Numbe:針對每個 <Topic, Partition> 都對應一個從0開始單調遞增的 Sequence,同時 Broker端會緩存這個 seq num
  3. 判斷是否重複: 拿 <pid, seq num> 去 Broker 裡對應的隊列 ProducerStateEntry.Queue(預設隊列長度為 5)查詢是否存在
  • 如果 nextSeq == lastSeq + 1,即 服務端seq + 1 == 生産傳入seq,則接收。
  • 如果 nextSeq == 0 && lastSeq == Int.MaxValue,即剛初始化,也接收。
  • 反之,要麼重複,要麼丢消息,均拒絕。
頭疼的 Kafka 消息重複問題,立馬解決!

這種設計針對解決了兩個問題:

  1. 消息重複: 場景 Broker 儲存消息後還沒發送 ack 就當機了,這時候 Producer 就會重試,這就造成消息重複。
  2. 消息亂序: 避免場景,前一條消息發送失敗而其後一條發送成功,前一條消息重試後成功,造成的消息亂序。

那什麼時候該使用幂等:

  1. 如果已經使用 acks=all,使用幂等也可以。
  2. 如果已經使用 acks=0 或者 acks=1,說明你的系統追求高性能,對資料一緻性要求不高。不要使用幂等。

2)Kafka事務

使用 Kafka 事務解決幂等的弊端:單會話且單分區幂等。

Tips: 這塊篇幅較長,這先稍微提及下使用,之後另起一篇。

事務使用示例:分為生産端 和 消費端

Properties props = new Properties();props.put("enable.idempotence", ture); // 1. 設定幂等props.put("acks", "all"); // 2. 當 enable.idempotence 為 true,這裡預設為 allprops.put("max.in.flight.requests.per.connection", 5); // 3. 最大等待數props.put("transactional.id", "my-transactional-id"); // 4. 設定事務 id Producer<String, String> producer = new KafkaProducer<String, String>(props); // 初始化事務producer.initTransactions(); try{    // 開始事務    producer.beginTransaction();     // 發送資料    producer.send(new ProducerRecord<String, String>("Topic", "Key", "Value"));     // 資料發送及 Offset 發送均成功的情況下,送出事務    producer.commitTransaction();} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {    // 資料發送或者 Offset 發送出現異常時,終止事務    producer.abortTransaction();} finally {    // 關閉 Producer 和 Consumer    producer.close();    consumer.close();}           

這裡消費端 Consumer 需要設定下配置:isolation.level 參數

  • read_uncommitted: 這是預設值,表明 Consumer 能夠讀取到 Kafka 寫入的任何消息,不論事務型 Producer 送出事務還是終止事務,其寫入的消息都可以讀取。如果你用了事務型 Producer,那麼對應的 Consumer 就不要使用這個值。
  • read_committed: 表明 Consumer 隻會讀取事務型 Producer 成功送出事務寫入的消息。當然了,它也能看到非事務型 Producer 寫入的所有消息。

3)消費端幂等

“如何解決消息重複?” 這個問題,其實換一種說法:就是如何解決消費端幂等性問題。

隻要消費端具備了幂等性,那麼重複消費消息的問題也就解決了。

典型的方案是使用:消息表,來去重:

頭疼的 Kafka 消息重複問題,立馬解決!
  • 上述例子中,消費端拉取到一條消息後,開啟事務,将消息Id 新增到本地消息表中,同時更新訂單資訊。
  • 如果消息重複,則新增操作 insert 會異常,同時觸發事務復原。

二、案例:

Kafka 幂等性 Producer 使用

環境搭建可參考:https://developer.confluent.io/tutorials/message-ordering/kafka.html#view-all-records-in-the-topic

https://blog.csdn.net/g6u8w7p06dco99fq3/article/details/128751111

準備工作如下:

1、Zookeeper:本地使用 Docker 啟動

$ docker run -d --name zookeeper -p 2181:2181 zookeepera86dff3689b68f6af7eb3da5a21c2dba06e9623f3c961154a8bbbe3e9991dea4           

2、Kafka:版本 2.7.1,源碼編譯啟動(看上文源碼搭建啟動)

3、啟動生産者:Kafka 源碼中 exmaple 中

4、啟動消息者:可以用 Kafka 提供的腳本

# 舉個栗子:topic 需要自己去修改$ cd ./kafka-2.7.1-src/bin$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic           

建立 topic : 1副本,2 分區

$ ./kafka-topics.sh --bootstrap-server localhost:9092 --topic myTopic --create --replication-factor 1 --partitions 2 # 檢視$ ./kafka-topics.sh --bootstrap-server broker:9092 --topic myTopic --describe           

生産者代碼:

頭疼的 Kafka 消息重複問題,立馬解決!
public class KafkaProducerApplication {     private final Producer<String, String> producer;    final String outTopic;     public KafkaProducerApplication(final Producer<String, String> producer,                                    final String topic) {        this.producer = producer;        outTopic = topic;    }     public void produce(final String message) {        final String[] parts = message.split("-");        final String key, value;        if (parts.length > 1) {            key = parts[0];            value = parts[1];        } else {            key = null;            value = parts[0];        }        final ProducerRecord<String, String> producerRecord             = new ProducerRecord<>(outTopic, key, value);        producer.send(producerRecord,                (recordMetadata, e) -> {                    if(e != null) {                        e.printStackTrace();                    } else {                        System.out.println("key/value " + key + "/" + value + "\twritten to topic[partition] " + recordMetadata.topic() + "[" + recordMetadata.partition() + "] at offset " + recordMetadata.offset());                    }                }        );    }     public void shutdown() {        producer.close();    }     public static void main(String[] args) {         final Properties props = new Properties();         props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");        props.put(ProducerConfig.ACKS_CONFIG, "all");         props.put(ProducerConfig.CLIENT_ID_CONFIG, "myApp");        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);         final String topic = "myTopic";        final Producer<String, String> producer = new KafkaProducer<>(props);        final KafkaProducerApplication producerApp = new KafkaProducerApplication(producer, topic);         String filePath = "/home/donald/Documents/Code/Source/kafka-2.7.1-src/examples/src/main/java/kafka/examples/input.txt";        try {            List<String> linesToProduce = Files.readAllLines(Paths.get(filePath));            linesToProduce.stream().filter(l -> !l.trim().isEmpty())                    .forEach(producerApp::produce);            System.out.println("Offsets and timestamps committed in batch from " + filePath);        } catch (IOException e) {            System.err.printf("Error reading file %s due to %s %n", filePath, e);        } finally {            producerApp.shutdown();        }    }}           

啟動生産者後,控制台輸出如下:

頭疼的 Kafka 消息重複問題,立馬解決!

啟動消費者:

$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myTopic           
頭疼的 Kafka 消息重複問題,立馬解決!

修改配置 acks

啟用幂等的情況下,調整 acks 配置,生産者啟動後結果是怎樣的:

  • 修改配置 acks = 1
  • 修改配置 acks = 0

會直接報錯:

Exception in thread "main" org.apache.kafka.common.config.ConfigException: Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence.           
頭疼的 Kafka 消息重複問題,立馬解決!

修改配置 max.in.flight.requests.per.connection

啟用幂等的情況下,調整此配置,結果是怎樣的:

将 max.in.flight.requests.per.connection > 5 會怎樣?

當然會報錯:

Caused by: org.apache.kafka.common.config.ConfigException: Must set max.in.flight.requests.per.connection to at most 5 to use the idempotent producer.           
頭疼的 Kafka 消息重複問題,立馬解決!