一、概述
1、什麼是MQ
消息中間件,通過消息的發送和消息的接收分離實作應用程式的解耦,但是這個是MQ的效果并非目的,真正的目的是為了通訊,屏蔽底層複雜的通訊協定,定義一套更加簡單的通訊,一個分布式系統中兩個子產品之間通訊要麼是HTTP,要麼是自己開發的(rpc) TCP,但是這兩種協定其實都是原始的協定。HTTP 協定很難實作兩端通訊——子產品 A 可以調用 B,B 也可以主動調用 A,如果要做到這個兩端都要背上WebServer,而且還不支援⻓連接配接(HTTP 2.0 的庫根本找不到)。TCP 就更加原始了,粘包、心跳、私有的協定,想一想頭皮就發麻。MQ 所要做的就是在這些協定之上建構一個簡單的“協定”——生産者/消費者模型。MQ 帶給我的“協定”不是具體的通訊協定,而是更高層次通訊模型。它定義了兩個對象——發送資料的叫生産者;接收資料的叫消費者, 提供一個SDK 讓我們可以定義自己的生産者和消費者實作消息通訊而無視底層通訊協定
2、MQ分類
2.1、有broker
又分為中topic和輕topic
kafka、JMS(ActiveMQ)就屬于這個流派,生産者會發送 key 和資料到 Broker,由 Broker比較 key 之後決定給哪個消費者。這種模式是我們最常⻅的模式,是我們對 MQ 最多的印象。在這種模式下一個 topic 往往是一個比較大的概念,甚至一個系統中就可能隻有一個topic,topic 某種意義上就是 queue,生産者發送 key 相當于說:“hi,把資料放到 key 的隊列中”
2.2、無broker
這種的代表是 RabbitMQ(或者說是 AMQP)。生産者發送 key 和資料,消費者定義訂閱的隊列,Broker 收到資料之後會通過一定的邏輯計算出 key 對應的隊列,然後把資料交給隊列
3、kafka
基于分布式的分布/訂閱消息隊列,主要用于資料實時處理領域
官網:Apache Kafka
4、消息隊列的兩種模式
點對點模式,一對一,消費者主動拉取資料,消息收到後消息清除
消息生産者生産消息發送到Queue中,然後消息消費者從Queue中取出并且消費消息。消息被消費以後, queue 中不再有存儲,是以消息消費者不可能消費到已經被消費的消息。Queue 支援存在多個消費者,但是對一個消息而言,隻會有一個消費者可以消費。

釋出訂閱模式,一對多,消費者消費資料之後不會清除消息
消息生産者(釋出)将消息釋出到 topic 中,同時有多個消息消費者(訂閱)消費該消息。和點對點方式不同,釋出到 topic 的消息會被所有訂閱者消費。
二、基本架構和概念
- Producer: 消息生産者,就是向 Kafka發消息 ;
- Consumer: 消息消費者,向 Kafka 取消息的用戶端;
- Consumer Group (CG): 消費者組,由多個 consumer 組成。 消費者組内每個消費者負責消費不同分區的資料,一個分區隻能由一個組内消費者消費;消費者組之間互不影響。 所有的消費者都屬于某個消費者組,即消費者組是邏輯上的一個訂閱者。
- Broker:經紀人 一台 Kafka 伺服器就是一個 broker。一個叢集由多個 broker 組成。一個 broker可以容納多個 topic。
- Topic: 話題,可以了解為一個隊列, 生産者和消費者面向的都是一個 topic;
- Partition: 為了實作擴充性,一個非常大的 topic 可以分布到多個 broker(即伺服器)上,一個 topic 可以分為多個 partition,每個 partition 是一個有序的隊列;
- Replica: 副本(Replication),為保證叢集中的某個節點發生故障時, 該節點上的 partition 資料不丢失,且 Kafka仍然能夠繼續工作, Kafka 提供了副本機制,一個 topic 的每個分區都有若幹個副本,一個 leader 和若幹個 follower。
- Leader: 每個分區多個副本的“主”,生産者發送資料的對象,以及消費者消費資料的對象都是 leader。
- Follower: 每個分區多個副本中的“從”,實時從 leader 中同步資料,保持和 leader 資料的同步。 leader 發生故障時,某個 Follower 會成為新的 leader。
三、安裝
1、window上安裝
kafka(3版本之後不需要安裝zk)
在Windows系統上安裝消息隊列kafka
2、Linux安裝
2.1、安裝
需要提前安裝好java和zk
下載下傳kafka的壓縮包:http://kafka.apache.org/downloads
解壓到路徑 /usr/local/kafka
修改配置檔案/usr/local/kafka/kafka2.11-2.4/config/server.properties
#broker.id屬性在kafka叢集中必須要是唯一
broker.id= 0
#kafka部署的機器ip和提供服務的端口号
listeners=PLAINTEXT://192.168.65.60:9092
#kafka的消息存儲檔案
log.dir=/usr/local/data/kafka-logs
#kafka連接配接zookeeper的位址
zookeeper.connect= 192.168.65.60:2181
啟動kafka
./kafka-server-start.sh -daemon../config/server.properties
驗證是否啟動成功:
進入到zk中的節點看id是 0 的broker有沒有存在(上線)
ls /brokers/ids/
2.2、server.properties核心配置詳解:
Property | Default | Description |
broker.id | 每個broker都可以⽤⼀個唯⼀的⾮負整數id進⾏辨別;這個id可以作為broker的“名字”,你可以選擇任意你喜歡的數字作為id,隻要id是唯⼀的即可。 | |
log.dirs | /tmp/kafka-logs | kafka存放資料的路徑。這個路徑并不是唯⼀的,可以是多個,路徑之間隻需要使⽤逗号分隔即可;每當建立新partition時,都會選擇在包含最少partitions的路徑下進⾏。 |
listeners | PLAINTEXT://192.168.65.60:9092 | server接受用戶端連接配接的端⼝,ip配置kafka本機ip即可 |
zookeeper.connect | localhost:2181 | zooKeeper連接配接字元串的格式為:hostname:port,此處hostname和port分别是ZooKeeper叢集中某個節點的host和port;zookeeper如果是叢集,連接配接⽅式為hostname1:port1, hostname2:port2,hostname3:port3 |
log.retention.hours | 168 | 每個⽇志⽂件删除之前儲存的時間。預設資料儲存時間對所有topic都⼀樣。 |
num.partitions | 1 | 建立topic的預設分區數 |
default.replication.factor | 1 | ⾃動建立topic的預設副本數量,建議設定為⼤于等于2 |
min.insync.replicas | 1 | 當producer設定acks為-1時,min.insync.replicas指定replicas的最⼩數⽬(必須确認每⼀個repica的寫資料都是成功的),如果這個數⽬沒有達到,producer發送消息會産⽣異常 |
delete.topic.enable | false | 是否允許删除主題 |
四、基本知識
1、建立主題、生産消息、消費消息
執行以下指令建立名為“test”的topic,這個topic隻有一個partition,并且備份因子也設定為1
./kafka-topics.sh --create --zookeeper 172.16.253.35:2181 --replication-factor 1 --partitions 1 --topic test
檢視目前kafka内有哪些topic
./kafka-topics.sh --list --zookeeper 172.16.253.35:2181
kafka自帶了一個producer指令用戶端,可以從本地檔案中讀取内容,或者我們也可以以指令行中直接輸入内容,并将這些内容以消息的形式發送到kafka叢集中。在預設情況下,每一個行會被當做成一個獨立的消息。使用kafka的發送消息的用戶端,指定發送到的kafka伺服器位址和topic
./kafka-console-producer.sh --broker-list 172.16.253.38:9092 --topic test
對于consumer,kafka同樣也攜帶了一個指令行用戶端,會将擷取到内容在指令中進行輸 出, 預設是消費最新的消息 。使用kafka的消費者消息的用戶端,從指定kafka伺服器的指定 topic中消費消息
方式一:從最後一條消息的偏移量+1開始消費
./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --topic testCopy to clipboardErrorCopied
方式二:從頭開始消費
./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --from-beginning --topic test
2、單點傳播消息、多點傳播消息
單點傳播場景:同一個消費組裡的多個消費者隻有一個消費者能消費到某一個topic消息
多點傳播場景:一個topic中的一條消息要被多個消費者消費,需要讓不同的消費者處于不同的消費組
3、消費組詳細資訊
# 檢視目前主題下有哪些消費組
./kafka-consumer-groups.sh --bootstrap-server 10.31.167.10:9092 --list
# 檢視消費組中的具體資訊:比如目前偏移量、最後一條消息的偏移量、堆積的消息數量
./kafka-consumer-groups.sh --bootstrap-server 172.16.253.38:9092 --describe --group testGroup
● current-offset:最後被消費的消息的偏移量
● Log-end-offset:消息總量(最後一條消息的偏移量)
● Lag:積壓了多少條消息
4、小結
- 消息是被存儲的,發送給broker儲存至本地的日志檔案中(配置檔案中配置的目錄)
- 消息的儲存是有順序的,通過offset偏移量來描述消息的有序性
- 消費者消費消息時也是通過offset描述目前要消費的那條消息的位置
五、主題、分區、_comsumer_offsets
一個topic中的檔案可以非常多,多到幾個g,存在log檔案中,為了解決這個檔案問題,引入了partition分區,分開存儲一個topic中的消息,比如一個topic三個partition,,這個topic中的消息可以分三個地方部署,partition不僅解決了檔案存儲過大的問題,同時支援讀寫多個分區
給主題建立多個分區
./kafka-topics.sh --create --zookeeper localhost:2181 --partitions 2 --topic test1
檢視topic主題的分區資訊
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test1
實際上是存在data/kafka-logs/test1-0 和 test1-1中的0000000.log檔案中
000.index:相當于一個稀疏索引檔案
000.timeindex:相當于一個時間索引檔案
000.log就是消息
kafka内部自己建立了一個_consumer_offsets主題,這個主題有49個分區,在data/log目錄下可以檢視,kafka會定期将分區的offset送出給這個主題,即這個主題主要用來儲存消費者消費消息的偏移量,比如一個test主題有100條消息,一個消費者兩給消費者a和b,a從1消費到50突然挂掉了,此時kafka會将偏移量50儲存到_consumer_offsets主題中,b消費的時候從51消費
儲存到_consumer_offsets時,key是consumerGroupId+topic+分區号,value就是目前的offset,
因為要處理高并發,是以預設有50個分區,可以通過offset.num.topic.partitions設定,這樣通過加機器抗大并發,通過如下公式計算要送出到_consumer_offset中的哪個分區
公式:hash(consumerGroupId)%_consumer_offsets分區數
檔案中儲存的消息,預設儲存7天,7天到期自動清除
六、kafka叢集、副本
1、搭建
略
2、副本
在一個叢集即3個broker,同一個主題建立2個分區 3個人副本
./kafka-topics.sh --create --zookeeper 172.16.253.35:2181 --replication-factor 3 --partitions 2 --topic my-replicated-topic
檢視詳情
./kafka-topics.sh --describe --zookeeper 172.16.253.35:2181 --topic my-replicated-topic
kafka叢集中由多個broker組成
一個broker中存放一個topic的不同partition——副本
leader:副本裡的概念
- 每個partition都有一個broker作為leader。
- 消息發送方要把消息發給哪個broker?就看副本的leader是在哪個broker上面。副本裡的leader專⻔用來接收消息。
- 接收到消息,其他follower通過poll的方式來同步資料。
通過kill掉leader之後檢視
# kill掉leader
ps -aux | grep server.properties
kill 17631
# 檢視topic情況
./kafka-topics.sh --describe --zookeeper 172.16.253.35:2181 --topic my-replicated-topic
follower:leader處理所有針對這個partition的讀寫請求,而follower被動複制leader,不提供讀寫(主要是為了保證多副本資料與消費的一緻性),如果leader所在的broker挂掉,那麼就會進行新leader的選舉,至于怎麼選,在之後的controller的概念中介紹。
isr: 可以同步的broker節點和已同步的broker節點,存放在isr集合中。
3、消息的生産和發送
#生産
./kafka-console-producer.sh --broker-list 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094 --topic my-replicated-topic
#消費
./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094 --from-beginning --topic my-replicated-topic
- 一個partitions隻能被一個消費組裡的一個消費者消費目的是為了保證消費的順序性,但是多個partition的多個消費者消費的消息順序性是沒法保證的,如何保證了?後續
- partition的數量決定了消費者的數量,建議同一個消費組裡的消費者數量不要大于partition數量,多的消費者消費不到消息
- 如果消費者挂了,會觸發rebalance機制,讓其他消費者消費該分區
七、kafka java生産者和消費者
1、引入依賴
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
2、生産消息
#### //消息的發送方
public class MyProducer {
private final static String TOPIC_NAME = "my-replicated-topic";
public static void main(String[] args) throws ExecutionException,InterruptedException {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.31.167.10:9092,10.31.167.10:9093,10.31.167.10:9094");
//把發送的key從字元串序列化為位元組數組
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
//把發送消息value從字元串序列化為位元組數組
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
Producer<String, String> producer = new KafkaProducer<String,String>(props);
Order order = new Order((long) i, i);
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, order.getOrderId().toString(), JSON.toJSONString(order));
RecordMetadata metadata = producer.send(producerRecord).get();
//=====阻塞=======
System.out.println("同步方式發送消息結果:" + "topic-" +metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" +metadata.offset());
發送到指定分區
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, 0 , order.getOrderId().toString(), JSON.toJSONString(order));
未指定分區 根據業務key計算分區
//未指定發送分區,具體發送的分區計算公式:hash(key)%partitionNum
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, order.getOrderId().toString(), JSON.toJSONString(order));
同步發消息
在收到kafka的ack告知發送成功之前一直處于阻塞狀态
//等待消息發送成功的同步阻塞方法
RecordMetadata metadata = producer.send(producerRecord).get();
System.out.println("同步方式發送消息結果:" + "topic-" +metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" +metadata.offset());
異步發送,發送完不用等boker回複,直接執行下面的業務邏輯,可以提供callback,讓broker異步調用callback,告訴消息發送的結果
//要發送 5 條消息
Order order = new Order((long) i, i);
//指定發送分區
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, 0 , order.getOrderId().toString(),JSON.toJSONString(order));
//異步回調方式發送消息
producer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.err.println("發送消息失敗:" +
exception.getStackTrace());
}
if (metadata != null) {
System.out.println("異步方式發送消息結果:" + "topic-" +metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" + metadata.offset());
}
}
});
ack參數配置,在同步調用情況下,消息發送時ack有三種配置
- ( 1 )acks=0: 表示producer不需要等待任何broker确認收到消息的回複,就可以繼續發送下一條消息。性能最高,但是最容易丢消息。
- ( 2 )acks=1: 至少要等待leader已經成功将資料寫入本地log,但是不需要等待所有follower是否成功寫入。就可以繼續發送下一條消息。這種情況下,如果follower沒有成功備份資料,而此時leader又挂掉,則消息會丢失。
- ( 3 )acks=-1或all: 需要等待 min.insync.replicas(預設為 1 ,推薦配置大于等于2) 這個參數配置的副本個數都成功寫入日志,這種政策會保證隻要有一個備份存活就不會丢失資料。這是最強的資料保證。一般除非是金融級别,或跟錢打交道的場景才會使用這種配置。
props.put(ProducerConfig.ACKS_CONFIG, "1");
其他
- 發送會預設會重試 3 次,每次間隔100ms
- 發送的消息會先進入到本地緩沖區(32mb),kakfa會跑一個線程,該線程去緩沖區中取16k的資料,發送到kafka,如果到 10 毫秒資料沒取滿16k,也會發送一次。
3、消費消息
3.1、基本實作
public class MyConsumer {
private final static String TOPIC_NAME = "my-replicated-topic";
private final static String CONSUMER_GROUP_NAME = "testGroup";
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.31.167.10:9092,10.31.167.10:9093,10.31.167.10:9094");
// 消費分組名
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
//建立一個消費者的用戶端
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
// 消費者訂閱主題清單
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
/*
* poll() API 是拉取消息的⻓輪詢
*/
ConsumerRecords<String, String> records =consumer.poll(Duration.ofMillis( 1000 ));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("收到消息:partition = %d,offset = %d, key =%s, value = %s%n", record.partition(),record.offset(), record.key(), record.value());
}
}
}
}
3.2、設定自動送出offset——預設
// 是否自動送出offset,預設就是true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 自動送出offset的間隔時間
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
消費者poll到消息後預設情況下,會自動向broker的_consumer_offsets主題送出目前主題-分區消費的偏移量。
自動送出會丢消息: 因為如果消費者還沒消費完poll下來的消息就自動送出了偏移量,那麼此 時消費者挂了,于是下一個消費者會從已送出的offset的下一個位置開始消費消息。之前未被消費的消息就丢失掉了。
3.3、手動送出offset
設定手動送出參數
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
消費完消息進行手動送出
手動同步送出
if (records.count() > 0 ) {
// 手動同步送出offset,目前線程會阻塞直到offset送出成功
// 一般使用同步送出,因為送出之後一般也沒有什麼邏輯代碼了
consumer.commitSync();
}
手動異步送出
if (records.count() > 0 ) {
// 手動異步送出offset,目前線程送出offset不會阻塞,可以繼續處理後面的程式邏輯
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata>offsets, Exception exception) {
if (exception != null) {
System.err.println("Commit failed for " + offsets);
System.err.println("Commit failed exception: " +exception.getStackTrace());
}
}
});
}
3.4、消費者poll消息過程
- 消費者建立了與broker之間的⻓連接配接,開始poll消息。
- 預設一次poll 500條消息
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500 );
可以根據消費速度的快慢來設定,因為如果兩次poll的時間如果超出了30s的時間間隔,kafka會認為其消費能力過弱,将其踢出消費組。将分區配置設定給其他消費者。
可以通過這個值進行設定:
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000 );
如果每隔1s内沒有poll到任何消息,則繼續去poll消息,循環往複,直到poll到消息。如果超出了1s,則此次⻓輪詢結束。
ConsumerRecords<String, String> records =consumer.poll(Duration.ofMillis( 1000 ));
消費者發送心跳的時間間隔
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000 );
kafka如果超過 10 秒沒有收到消費者的心跳,則會把消費者踢出消費組,進行rebalance,把分區配置設定給其他消費者。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000 );
3.5、指定分區消費
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0 )));
3.6、消息回溯消費
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0 )));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME,0 )));
3.7、指定offset消費
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0 )));
consumer.seek(new TopicPartition(TOPIC_NAME, 0 ), 10 );
3.8、從指定時間點消費
List<PartitionInfo> topicPartitions =consumer.partitionsFor(TOPIC_NAME);
//從 1 小時前開始消費
long fetchDataTime = new Date().getTime() - 1000 * 60 * 60 ;
Map<TopicPartition, Long> map = new HashMap<>();
for (PartitionInfo par : topicPartitions) {
map.put(new TopicPartition(TOPIC_NAME, par.partition()),fetchDataTime);
}
Map<TopicPartition, OffsetAndTimestamp> parMap =consumer.offsetsForTimes(map);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry :parMap.entrySet()) {
TopicPartition key = entry.getKey();
OffsetAndTimestamp value = entry.getValue();
if (key == null || value == null) continue;
Long offset = value.offset();
System.out.println("partition-" + key.partition() +"|offset-" + offset);
System.out.println();
//根據消費裡的timestamp确定offset
if (value != null) {
consumer.assign(Arrays.asList(key));
consumer.seek(key, offset);
}
}
3.9、新消費組的消費偏移量
當消費主題的是一個新的消費組,或者指定offset的消費方式,offset不存在,那麼應該如何消費?
- latest(預設) :隻消費自己啟動之後發送到主題的消息
- earliest:第一次從頭開始消費,以後按照消費offset記錄繼續消費,這個需要差別于consumer.seekToBeginning(每次都從頭開始消費)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
八、Springboot整合kafka待補充
九、深入了解&生産優化
1、controller
建立一個主題多個分區時候,Kafka叢集中的broker在zk中建立臨時序号節點,序号最小的節點(最先建立的節點)将作為叢集的controller,負責管理整個叢集中的所有分區和副本的狀态:
- 當某個分區的leader副本出現故障時,由控制器負責為該分區選舉新的leader副本,選舉的規則是從ISR集合最左邊獲得。
- 當檢測到某個分區的ISR集合發生變化時,由控制器負責通知所有broker更新其中繼資料資訊。
- 當使用kafka-topics.sh腳本為某個topic增加分區數量時,同樣還是由控制器負責讓新分區被其他節點感覺到。
2、rebalance機制
前提是:消費者沒有指明分區消費。
觸發條件:當消費組裡消費者和分區的關系發生變化,那麼就會觸發rebalance機制。這個機制會重新調整消費者消費哪個分區。在觸發rebalance機制之前,消費者消費哪個分區有三種政策:
- range:通過公示來計算某個消費者消費哪個分區,前面的消費者是分區總數/消費者數量+1 之後的消費者是分區政策/消費者數量
- 輪詢:大家輪着消費
- sticky:在觸發了rebalance後,在消費者消費的原分區不變的基礎上進行調整。如果這個政策沒有打開那麼就要全部重新配置設定,建議開啟
3、HW和LEO
HW:已完成同步的位置,消息在寫入broker時,且每個broker完成這條消息的變化,hw才發生變化,在這之前消費者是消費不到這條消息的
4、如何防止消息丢失
- 發送方: ack是 1 或者-1/all 可以防止消息丢失,如果要做到99.9999%,ack設成all,把min.insync.replicas配置成分區備份數
- 消費方:把自動送出改為手動送出。