消息确認機制
生産端消息确認機制
消息确認,是指生産端投遞消息後,如果
Broker
收到消息,則會給我們生産端一個應答。生産端接收應答,用來确定這條消息是否正常發送到
Broker
,這種方式也是消息可靠性投遞的核心保障!
消息确認機制流程圖
如何實作消息确認機制
我們需要實作消息确認機制,首先我們必須要開啟這種機制。
// 開啟消息确認機制
channel.confirmSelect();
confirmSelect()
源碼如下:
/**
* Enables publisher acknowledgements on this channel.
* @see com.rabbitmq.client.AMQP.Confirm.Select
* @throws java.io.IOException if an error is encountered
*/
Confirm.SelectOk confirmSelect() throws IOException;
Enables publisher acknowledgements on this channel.
從注釋中也可以知道,通過
Channel
調用這個方法會開啟生産端消息确認機制。
現在我們開啟了生産端消息确認機制,但當
Broker
收到消息,給我們生産端一個應答,我們需要有處理這個應答的方法(異步回調方法處理消息發送成功或者失敗,這很像部落客之前用AIO模型實作的簡易多人聊天室,也是異步回調),是以需要一個處理應答的回調函數,根據具體的結果對消息進行重新發送、或記錄日志等後續處理,如下:
// 添加消息确認監聽(異步回調)
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("------------ack---------------");
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("------------no ack---------------");
}
});
現在我們實作了消息确認機制,生産端投遞消息,
Broker
收到消息,給生産端一個應答,如果消息成功收到,回調函數會輸出
ack
,否則輸出
no ack
。
生産端
我們這裡使用的是預設交換機,它的路由規則可以看看下面這篇部落格。
RabbitMQ:交換機(default exchange)
package com.kaven.rabbitmq.api.confirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
// 自己伺服器的IP
private static String ip = "IP";
// RabbitMQ啟動的預設端口,也是應用程式進行連接配接RabbitMQ的端口
private static int port = 5672;
// RabbitMQ有一個 "/" 的虛拟主機
private static String virtualHost = "/";
// default exchange
private static String exchange = "";
// default exchange 的路由規則: routingKey(test) 将比對同名的 queue(test)
private static String routingKey = "test";
public static void main(String[] args) throws IOException, TimeoutException {
// 1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(ip);
connectionFactory.setPort(port);
connectionFactory.setVirtualHost(virtualHost);
// 2 建立Connection
Connection connection = connectionFactory.newConnection();
// 3 建立Channel
Channel channel = connection.createChannel();
// 4 開啟消息确認機制
channel.confirmSelect();
// 5 發送消息
for (int i = 0; i < 10; i++) {
String msg = "消息确認模式:RabbitMQ send confirm message "+i;
channel.basicPublish(exchange , routingKey , null , msg.getBytes());
}
// 6 添加消息确認監聽(異步回調)
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("------------ack---------------");
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("------------no ack---------------");
}
});
}
}
消費端
package com.kaven.rabbitmq.api.confirm;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
// 自己伺服器的IP
private static String ip = "IP";
// RabbitMQ啟動的預設端口,也是應用程式進行連接配接RabbitMQ的端口
private static int port = 5672;
// RabbitMQ有一個 "/" 的虛拟主機
private static String virtualHost = "/";
// default exchange
private static String exchange = "";
// 隊列名
private static String queueName = "test";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(ip);
connectionFactory.setPort(port);
connectionFactory.setVirtualHost(virtualHost);
// 2 建立Connection
Connection connection = connectionFactory.newConnection();
// 3 建立Channel
Channel channel = connection.createChannel();
// 4 建立Queue
channel.queueDeclare(queueName , true , false , false , null);
// 5 建立消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName , true , consumer);
// 6 接收消息
while (true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println(msg);
}
}
}
測試
運作生産端會發現每次運作結果都可能不一樣,會有多種情況出現,因為
Broker
會進行優化,有時會批量一次性Confirm ,有時會分開幾條Confirm。
五條。
------------ack---------------
------------ack---------------
------------ack---------------
------------ack---------------
------------ack---------------
三條。
------------ack---------------
------------ack---------------
------------ack---------------
還有其他情況就不列舉了。
我們來看看
RabbitMQ Management
,有
40
條消息準備好了,因為我啟動了四次生産端來進行測試。
現在,我們啟動消費端,消費端也成功收到了這
40
條消息。
消息确認模式:RabbitMQ send confirm message 0
消息确認模式:RabbitMQ send confirm message 1
消息确認模式:RabbitMQ send confirm message 2
消息确認模式:RabbitMQ send confirm message 3
消息确認模式:RabbitMQ send confirm message 4
消息确認模式:RabbitMQ send confirm message 5
消息确認模式:RabbitMQ send confirm message 6
消息确認模式:RabbitMQ send confirm message 7
消息确認模式:RabbitMQ send confirm message 8
消息确認模式:RabbitMQ send confirm message 9
消息确認模式:RabbitMQ send confirm message 0
消息确認模式:RabbitMQ send confirm message 1
消息确認模式:RabbitMQ send confirm message 2
消息确認模式:RabbitMQ send confirm message 3
消息确認模式:RabbitMQ send confirm message 4
消息确認模式:RabbitMQ send confirm message 5
消息确認模式:RabbitMQ send confirm message 6
消息确認模式:RabbitMQ send confirm message 7
消息确認模式:RabbitMQ send confirm message 8
消息确認模式:RabbitMQ send confirm message 9
消息确認模式:RabbitMQ send confirm message 0
消息确認模式:RabbitMQ send confirm message 1
消息确認模式:RabbitMQ send confirm message 2
消息确認模式:RabbitMQ send confirm message 3
消息确認模式:RabbitMQ send confirm message 4
消息确認模式:RabbitMQ send confirm message 5
消息确認模式:RabbitMQ send confirm message 6
消息确認模式:RabbitMQ send confirm message 7
消息确認模式:RabbitMQ send confirm message 8
消息确認模式:RabbitMQ send confirm message 9
消息确認模式:RabbitMQ send confirm message 0
消息确認模式:RabbitMQ send confirm message 1
消息确認模式:RabbitMQ send confirm message 2
消息确認模式:RabbitMQ send confirm message 3
消息确認模式:RabbitMQ send confirm message 4
消息确認模式:RabbitMQ send confirm message 5
消息确認模式:RabbitMQ send confirm message 6
消息确認模式:RabbitMQ send confirm message 7
消息确認模式:RabbitMQ send confirm message 8
消息确認模式:RabbitMQ send confirm message 9