天天看點

RabbitMQ:Return消息機制

Return消息機制

什麼是Return消息機制

Return消息機制用于處理一些不可路由的消息。正常情況下,消息生産端通過指定一個Exchange和RoutingKey,把消息路由到某一個隊列中去,然後消費端監聽隊列,進行消費。但在某些情況下,如在發送消息的時候,目前的Exchange不存在或者指定的RoutingKey路由不到Queue,這個時候,如果我們需要監聽這種不可達的消息, 就要使用Return消息機制(​

​ReturnListener​

​)。

RabbitMQ:Return消息機制

在基礎API中有一個關鍵的配置項​

​mandatory​

​ :

  • 如果為true,則監聽器會接收到路由不可達的消息,然後進行後續處理。
  • 如果為false,那麼​

    ​broker​

    ​會自動删除該消息。
// mandatory 為 true
        channel.basicPublish(exchange , routingKey , true , null , msg.getBytes());      

生産端

我們這裡使用的是預設交換機,它的路由規則可以看看下面這篇部落格。

​RabbitMQ:交換機(default exchange)​​

package com.kaven.rabbitmq.api.returnListener;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    // 自己伺服器的IP
    private static String ip = "IP";
    // RabbitMQ啟動的預設端口,也是應用程式進行連接配接RabbitMQ的端口
    private static int port = 5672;
    // RabbitMQ有一個 "/" 的虛拟主機
    private static String virtualHost = "/";

    // default exchange
    private static String exchange = "";
    // default exchange 的路由規則: routingKey(test) 将比對同名的 queue(test)
    private static String routingKey = "test";
    private static String routingKeyError = "test_1";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1 建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(ip);
        connectionFactory.setPort(port);
        connectionFactory.setVirtualHost(virtualHost);

        // 2 建立Connection
        Connection connection = connectionFactory.newConnection();

        // 3 建立Channel
        Channel channel = connection.createChannel();

        //4 添加ReturnListener
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
                                     AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("---------- handle return ------------");
                System.out.println("replyCode:" + replyCode);
                System.out.println("replyText:" + replyText);
                System.out.println("exchange:" + exchange);
                System.out.println("routingKey:" + routingKey);
                System.out.println("properties:" + properties);
                System.out.println("body:" + new String(body));
            }
        });

        // 5 發送消息
        String msg = "RabbitMQ: return message";
        String msgError = "RabbitMQ: error return message";
        // mandatory 為 true
        channel.basicPublish(exchange , routingKey , true , null , msg.getBytes());
        channel.basicPublish(exchange , routingKeyError , true,null , msgError.getBytes());
    }
}      

消費端

package com.kaven.rabbitmq.api.returnListener;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    // 自己伺服器的IP
    private static String ip = "IP";
    // RabbitMQ啟動的預設端口,也是應用程式進行連接配接RabbitMQ的端口
    private static int port = 5672;
    // RabbitMQ有一個 "/" 的虛拟主機
    private static String virtualHost = "/";

    // default exchange
    private static String exchange = "";
    // 隊列名
    private static String queueName = "test";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 1 建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(ip);
        connectionFactory.setPort(port);
        connectionFactory.setVirtualHost(virtualHost);

        // 2 建立Connection
        Connection connection = connectionFactory.newConnection();

        // 3 建立Channel
        Channel channel = connection.createChannel();

        // 4 建立Queue
        channel.queueDeclare(queueName , true , false , false , null);

        // 5 建立消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName , true , consumer);

        // 6 接收消息
        while (true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println(msg);
        }
    }
}      

測試

根據生産端和消費端的代碼可以知道,​

​RoutingKey = "test_1"​

​​的消息是路由不到對應的​

​Queue​

​​,因為我們這裡是沒有定義名稱為 ​

​test_1​

​​的隊列,從​

​RabbitMQ Management​

​​可以看到,目前已經定義的​

​Queue​

​。

啟動生産端,從​

​RabbitMQ Management​

​​可以看到,名稱為​

​test​

​的隊列有一條消息已經準備好了。

RabbitMQ:Return消息機制

生産端輸出:

---------- handle return ------------
replyCode:312
replyText:NO_ROUTE
exchange:
routingKey:test_1
properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
body:RabbitMQ: error return message      

顯然結果是正确的。

啟動消費端,輸出如下:

RabbitMQ: return message      

顯然消費端已經收到消息了。

從​

​RabbitMQ Management​

​​可以看到,名稱為​

​test​

​的隊列沒有準備好的消息,因為已經被消費端接收了。

RabbitMQ:Return消息機制

再來試一下​

​mandatory​

​​ 為 ​

​false​

​的情況,生産端需要修改代碼,消費端不需要修改代碼。

生産端:

package com.kaven.rabbitmq.api.returnListener;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    // 自己伺服器的IP
    private static String ip = "IP";
    // RabbitMQ啟動的預設端口,也是應用程式進行連接配接RabbitMQ的端口
    private static int port = 5672;
    // RabbitMQ有一個 "/" 的虛拟主機
    private static String virtualHost = "/";

    // default exchange
    private static String exchange = "";
    // default exchange 的路由規則: routingKey(test) 将比對同名的 queue(test)
    private static String routingKey = "test";
    private static String routingKeyError = "test_1";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1 建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(ip);
        connectionFactory.setPort(port);
        connectionFactory.setVirtualHost(virtualHost);

        // 2 建立Connection
        Connection connection = connectionFactory.newConnection();

        // 3 建立Channel
        Channel channel = connection.createChannel();

        //4 添加ReturnListener
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
                                     AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("---------- handle return ------------");
                System.out.println("replyCode:" + replyCode);
                System.out.println("replyText:" + replyText);
                System.out.println("exchange:" + exchange);
                System.out.println("routingKey:" + routingKey);
                System.out.println("properties:" + properties);
                System.out.println("body:" + new String(body));
            }
        });

        // 5 發送消息
        String msg = "RabbitMQ: return message";
        String msgError = "RabbitMQ: error return message";
        // mandatory 為 false
        channel.basicPublish(exchange , routingKeyError , false,null , msgError.getBytes());
    }
}      

啟動生産端,生産端的​

​ReturnListener​

​​并沒有被調用(沒有輸出),看看​

​RabbitMQ Management​

​​,也沒有消息準備好,​

​broker​

​會自動删除這條消息。