一、前言
資料重複這個問題其實也是挺正常,全鍊路都有可能會導緻資料重複。
通常,消息消費時候都會設定一定重試次數來避免網絡波動造成的影響,同時帶來副作用是可能出現消息重複。
整理下消息重複的幾個場景:
- 生産端: 遇到異常,基本解決措施都是 重試。
- 場景一:leader分區不可用了,抛 LeaderNotAvailableException 異常,等待選出新 leader 分區。
- 場景二:Controller 所在 Broker 挂了,抛 NotControllerException 異常,等待 Controller 重新選舉。
- 場景三:網絡異常、斷網、網絡分區、丢包等,抛 NetworkException 異常,等待網絡恢複。
消費端: poll 一批資料,處理完畢還沒送出 offset ,機子當機重新開機了,又會 poll 上批資料,再度消費就造成了消息重複。
怎麼解決?
先來了解下消息的三種投遞語義:
- 最多一次(at most once): 消息隻發一次,消息可能會丢失,但絕不會被重複發送。例如:mqtt 中 QoS = 0。
- 至少一次(at least once): 消息至少發一次,消息不會丢失,但有可能被重複發送。例如:mqtt 中 QoS = 1
- 精确一次(exactly once): 消息精确發一次,消息不會丢失,也不會被重複發送。例如:mqtt 中 QoS = 2。
了解了這三種語義,再來看如何解決消息重複,即如何實作精準一次,可分為三種方法:
- Kafka 幂等性 Producer: 保證生産端發送消息幂等。局限性,是隻能保證單分區且單會話(重新開機後就算新會話)
- Kafka 事務: 保證生産端發送消息幂等。解決幂等 Producer 的局限性。
- 消費端幂等: 保證消費端接收消息幂等。蔸底方案。
1)Kafka幂等性Producer
幂等性指:無論執行多少次同樣的運算,結果都是相同的。即一條指令,任意多次執行所産生的影響均與一次執行的影響相同。
幂等性使用示例:在生産端添加對應配置即可
Properties props = new Properties();props.put("enable.idempotence", ture); // 1. 設定幂等props.put("acks", "all"); // 2. 當 enable.idempotence 為 true,這裡預設為 allprops.put("max.in.flight.requests.per.connection", 5); // 3. 注意
- 設定幂等,啟動幂等。
- 配置 acks,注意:一定要設定 acks=all,否則會抛異常。
- 配置 max.in.flight.requests.per.connection 需要 <= 5,否則會抛異常 OutOfOrderSequenceException。
- 0.11 >= Kafka < 1.1, max.in.flight.request.per.connection = 1
- Kafka >= 1.1, max.in.flight.request.per.connection <= 5
為了更好了解,需要了解下Kafka 幂等機制:
- Producer 每次啟動後,會向 Broker 申請一個全局唯一的 pid。(重新開機後 pid 會變化,這也是弊端之一)
- Sequence Numbe:針對每個 <Topic, Partition> 都對應一個從0開始單調遞增的 Sequence,同時 Broker端會緩存這個 seq num
- 判斷是否重複: 拿 <pid, seq num> 去 Broker 裡對應的隊列 ProducerStateEntry.Queue(預設隊列長度為 5)查詢是否存在
- 如果 nextSeq == lastSeq + 1,即 服務端seq + 1 == 生産傳入seq,則接收。
- 如果 nextSeq == 0 && lastSeq == Int.MaxValue,即剛初始化,也接收。
- 反之,要麼重複,要麼丢消息,均拒絕。
這種設計針對解決了兩個問題:
- 消息重複: 場景 Broker 儲存消息後還沒發送 ack 就當機了,這時候 Producer 就會重試,這就造成消息重複。
- 消息亂序: 避免場景,前一條消息發送失敗而其後一條發送成功,前一條消息重試後成功,造成的消息亂序。
那什麼時候該使用幂等:
- 如果已經使用 acks=all,使用幂等也可以。
- 如果已經使用 acks=0 或者 acks=1,說明你的系統追求高性能,對資料一緻性要求不高。不要使用幂等。
2)Kafka事務
使用 Kafka 事務解決幂等的弊端:單會話且單分區幂等。
Tips: 這塊篇幅較長,這先稍微提及下使用,之後另起一篇。
事務使用示例:分為生産端 和 消費端
Properties props = new Properties();props.put("enable.idempotence", ture); // 1. 設定幂等props.put("acks", "all"); // 2. 當 enable.idempotence 為 true,這裡預設為 allprops.put("max.in.flight.requests.per.connection", 5); // 3. 最大等待數props.put("transactional.id", "my-transactional-id"); // 4. 設定事務 id Producer<String, String> producer = new KafkaProducer<String, String>(props); // 初始化事務producer.initTransactions(); try{ // 開始事務 producer.beginTransaction(); // 發送資料 producer.send(new ProducerRecord<String, String>("Topic", "Key", "Value")); // 資料發送及 Offset 發送均成功的情況下,送出事務 producer.commitTransaction();} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { // 資料發送或者 Offset 發送出現異常時,終止事務 producer.abortTransaction();} finally { // 關閉 Producer 和 Consumer producer.close(); consumer.close();}
這裡消費端 Consumer 需要設定下配置:isolation.level 參數
- read_uncommitted: 這是預設值,表明 Consumer 能夠讀取到 Kafka 寫入的任何消息,不論事務型 Producer 送出事務還是終止事務,其寫入的消息都可以讀取。如果你用了事務型 Producer,那麼對應的 Consumer 就不要使用這個值。
- read_committed: 表明 Consumer 隻會讀取事務型 Producer 成功送出事務寫入的消息。當然了,它也能看到非事務型 Producer 寫入的所有消息。
3)消費端幂等
“如何解決消息重複?” 這個問題,其實換一種說法:就是如何解決消費端幂等性問題。
隻要消費端具備了幂等性,那麼重複消費消息的問題也就解決了。
典型的方案是使用:消息表,來去重:
- 上述例子中,消費端拉取到一條消息後,開啟事務,将消息Id 新增到本地消息表中,同時更新訂單資訊。
- 如果消息重複,則新增操作 insert 會異常,同時觸發事務復原。
二、案例:
Kafka 幂等性 Producer 使用
環境搭建可參考:https://developer.confluent.io/tutorials/message-ordering/kafka.html#view-all-records-in-the-topic
https://blog.csdn.net/g6u8w7p06dco99fq3/article/details/128751111
準備工作如下:
1、Zookeeper:本地使用 Docker 啟動
$ docker run -d --name zookeeper -p 2181:2181 zookeepera86dff3689b68f6af7eb3da5a21c2dba06e9623f3c961154a8bbbe3e9991dea4
2、Kafka:版本 2.7.1,源碼編譯啟動(看上文源碼搭建啟動)
3、啟動生産者:Kafka 源碼中 exmaple 中
4、啟動消息者:可以用 Kafka 提供的腳本
# 舉個栗子:topic 需要自己去修改$ cd ./kafka-2.7.1-src/bin$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic
建立 topic : 1副本,2 分區
$ ./kafka-topics.sh --bootstrap-server localhost:9092 --topic myTopic --create --replication-factor 1 --partitions 2 # 檢視$ ./kafka-topics.sh --bootstrap-server broker:9092 --topic myTopic --describe
生産者代碼:
public class KafkaProducerApplication { private final Producer<String, String> producer; final String outTopic; public KafkaProducerApplication(final Producer<String, String> producer, final String topic) { this.producer = producer; outTopic = topic; } public void produce(final String message) { final String[] parts = message.split("-"); final String key, value; if (parts.length > 1) { key = parts[0]; value = parts[1]; } else { key = null; value = parts[0]; } final ProducerRecord<String, String> producerRecord = new ProducerRecord<>(outTopic, key, value); producer.send(producerRecord, (recordMetadata, e) -> { if(e != null) { e.printStackTrace(); } else { System.out.println("key/value " + key + "/" + value + "\twritten to topic[partition] " + recordMetadata.topic() + "[" + recordMetadata.partition() + "] at offset " + recordMetadata.offset()); } } ); } public void shutdown() { producer.close(); } public static void main(String[] args) { final Properties props = new Properties(); props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.CLIENT_ID_CONFIG, "myApp"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); final String topic = "myTopic"; final Producer<String, String> producer = new KafkaProducer<>(props); final KafkaProducerApplication producerApp = new KafkaProducerApplication(producer, topic); String filePath = "/home/donald/Documents/Code/Source/kafka-2.7.1-src/examples/src/main/java/kafka/examples/input.txt"; try { List<String> linesToProduce = Files.readAllLines(Paths.get(filePath)); linesToProduce.stream().filter(l -> !l.trim().isEmpty()) .forEach(producerApp::produce); System.out.println("Offsets and timestamps committed in batch from " + filePath); } catch (IOException e) { System.err.printf("Error reading file %s due to %s %n", filePath, e); } finally { producerApp.shutdown(); } }}
啟動生産者後,控制台輸出如下:
啟動消費者:
$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myTopic
修改配置 acks
啟用幂等的情況下,調整 acks 配置,生産者啟動後結果是怎樣的:
- 修改配置 acks = 1
- 修改配置 acks = 0
會直接報錯:
Exception in thread "main" org.apache.kafka.common.config.ConfigException: Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence.
修改配置 max.in.flight.requests.per.connection
啟用幂等的情況下,調整此配置,結果是怎樣的:
将 max.in.flight.requests.per.connection > 5 會怎樣?
當然會報錯:
Caused by: org.apache.kafka.common.config.ConfigException: Must set max.in.flight.requests.per.connection to at most 5 to use the idempotent producer.