Return消息機制
什麼是Return消息機制
Return消息機制用于處理一些不可路由的消息。正常情況下,消息生産端通過指定一個Exchange和RoutingKey,把消息路由到某一個隊列中去,然後消費端監聽隊列,進行消費。但在某些情況下,如在發送消息的時候,目前的Exchange不存在或者指定的RoutingKey路由不到Queue,這個時候,如果我們需要監聽這種不可達的消息, 就要使用Return消息機制(
ReturnListener
)。
在基礎API中有一個關鍵的配置項
mandatory
:
- 如果為true,則監聽器會接收到路由不可達的消息,然後進行後續處理。
- 如果為false,那麼
會自動删除該消息。broker
// mandatory 為 true
channel.basicPublish(exchange , routingKey , true , null , msg.getBytes());
生産端
我們這裡使用的是預設交換機,它的路由規則可以看看下面這篇部落格。
RabbitMQ:交換機(default exchange)
package com.kaven.rabbitmq.api.returnListener;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
// 自己伺服器的IP
private static String ip = "IP";
// RabbitMQ啟動的預設端口,也是應用程式進行連接配接RabbitMQ的端口
private static int port = 5672;
// RabbitMQ有一個 "/" 的虛拟主機
private static String virtualHost = "/";
// default exchange
private static String exchange = "";
// default exchange 的路由規則: routingKey(test) 将比對同名的 queue(test)
private static String routingKey = "test";
private static String routingKeyError = "test_1";
public static void main(String[] args) throws IOException, TimeoutException {
// 1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(ip);
connectionFactory.setPort(port);
connectionFactory.setVirtualHost(virtualHost);
// 2 建立Connection
Connection connection = connectionFactory.newConnection();
// 3 建立Channel
Channel channel = connection.createChannel();
//4 添加ReturnListener
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("---------- handle return ------------");
System.out.println("replyCode:" + replyCode);
System.out.println("replyText:" + replyText);
System.out.println("exchange:" + exchange);
System.out.println("routingKey:" + routingKey);
System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
}
});
// 5 發送消息
String msg = "RabbitMQ: return message";
String msgError = "RabbitMQ: error return message";
// mandatory 為 true
channel.basicPublish(exchange , routingKey , true , null , msg.getBytes());
channel.basicPublish(exchange , routingKeyError , true,null , msgError.getBytes());
}
}
消費端
package com.kaven.rabbitmq.api.returnListener;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
// 自己伺服器的IP
private static String ip = "IP";
// RabbitMQ啟動的預設端口,也是應用程式進行連接配接RabbitMQ的端口
private static int port = 5672;
// RabbitMQ有一個 "/" 的虛拟主機
private static String virtualHost = "/";
// default exchange
private static String exchange = "";
// 隊列名
private static String queueName = "test";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(ip);
connectionFactory.setPort(port);
connectionFactory.setVirtualHost(virtualHost);
// 2 建立Connection
Connection connection = connectionFactory.newConnection();
// 3 建立Channel
Channel channel = connection.createChannel();
// 4 建立Queue
channel.queueDeclare(queueName , true , false , false , null);
// 5 建立消費者
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName , true , consumer);
// 6 接收消息
while (true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println(msg);
}
}
}
測試
根據生産端和消費端的代碼可以知道,
RoutingKey = "test_1"
的消息是路由不到對應的
Queue
,因為我們這裡是沒有定義名稱為
test_1
的隊列,從
RabbitMQ Management
可以看到,目前已經定義的
Queue
。
啟動生産端,從
RabbitMQ Management
可以看到,名稱為
test
的隊列有一條消息已經準備好了。
生産端輸出:
---------- handle return ------------
replyCode:312
replyText:NO_ROUTE
exchange:
routingKey:test_1
properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body:RabbitMQ: error return message
顯然結果是正确的。
啟動消費端,輸出如下:
RabbitMQ: return message
顯然消費端已經收到消息了。
從
RabbitMQ Management
可以看到,名稱為
test
的隊列沒有準備好的消息,因為已經被消費端接收了。
再來試一下
mandatory
為
false
的情況,生産端需要修改代碼,消費端不需要修改代碼。
生産端:
package com.kaven.rabbitmq.api.returnListener;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
// 自己伺服器的IP
private static String ip = "IP";
// RabbitMQ啟動的預設端口,也是應用程式進行連接配接RabbitMQ的端口
private static int port = 5672;
// RabbitMQ有一個 "/" 的虛拟主機
private static String virtualHost = "/";
// default exchange
private static String exchange = "";
// default exchange 的路由規則: routingKey(test) 将比對同名的 queue(test)
private static String routingKey = "test";
private static String routingKeyError = "test_1";
public static void main(String[] args) throws IOException, TimeoutException {
// 1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(ip);
connectionFactory.setPort(port);
connectionFactory.setVirtualHost(virtualHost);
// 2 建立Connection
Connection connection = connectionFactory.newConnection();
// 3 建立Channel
Channel channel = connection.createChannel();
//4 添加ReturnListener
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("---------- handle return ------------");
System.out.println("replyCode:" + replyCode);
System.out.println("replyText:" + replyText);
System.out.println("exchange:" + exchange);
System.out.println("routingKey:" + routingKey);
System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
}
});
// 5 發送消息
String msg = "RabbitMQ: return message";
String msgError = "RabbitMQ: error return message";
// mandatory 為 false
channel.basicPublish(exchange , routingKeyError , false,null , msgError.getBytes());
}
}
啟動生産端,生産端的
ReturnListener
并沒有被調用(沒有輸出),看看
RabbitMQ Management
,也沒有消息準備好,
broker
會自動删除這條消息。