天天看點

學習消息隊列RabbitMQ

文章目錄

    • 一、簡介
    • 二、導入RabbitMQ的依賴包:
    • 三、RabbitMQ幾種使用方式
      • 1、Hello World
        • 添加釋出者代碼:
        • 添加接收者代碼:
      • 2、工作隊列(work queue)
        • 釋出者代碼:
        • 消費者1代碼:
        • 消費者2代碼:
      • 3、訂閱模式:
        • (1)訂閱之Fanout模型
          • 生産者代碼:
          • 消費者1代碼:
          • 消費者2代碼:
        • (2)訂閱之Direct模型:
          • 釋出者代碼:
          • 消費者代碼:
        • (3)訂閱之Topic模型:
          • 釋出者代碼:
          • 消費者代碼:

一、簡介

RabbitMQ是一個消息代理。它的核心思想非常簡單:接收并轉發消息。你可以把它想象成一個郵局:當你把郵件丢進郵箱時,你非常确定郵差先生會把它送到收件人手中。在這個比喻中,RabbitMQ就是郵箱、郵局和郵差。

RabbitMQ和郵局的主要差別是它處理的不是紙張。它接收、存儲并轉發二進制資料塊,也就是message,消息。

RabbitMQ官網: https://www.rabbitmq.com/

RabbitMQ官方教程: https://www.rabbitmq.com/#getstarted

學習教程:https://www.rabbitmq.com/tutorials/tutorial-one-python.html

RabbitMQ常用的交換器類型有direct、topic、fanout三種。

二、導入RabbitMQ的依賴包:

<!--消息隊列RabbitMQ的依賴包-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit-test</artifactId>
    <scope>test</scope>
</dependency>
           

三、RabbitMQ幾種使用方式

1、Hello World

學習消息隊列RabbitMQ

P(producer/ publisher):生産者,發送消息的服務

C(consumer):消費者,接收消息的服務

紅色區域就是MQ中的Queue,可以把它了解成一個郵箱

  • 首先信件來了不強求必須馬上馬去拿
  • 其次,它是有最大容量的(受主機和磁盤的限制,是一個緩存區)
  • 允許多個消費者監聽同一個隊列,争搶消息

添加釋出者代碼:

@Slf4j
public class RabbitMQProducer {
    private final static String QUEEN_NAME = "Hello World";
    private final static String MESSAGE = "第一個RabbitMQ";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");//設定主機名
        try {
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEEN_NAME, false, false, false, null);
            channel.basicPublish("", QUEEN_NAME, null, MESSAGE.getBytes());
            log.info("發送消息:'{}'", MESSAGE);
            channel.close();
            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

}
           

添加接收者代碼:

@Slf4j
public class RabbitMQConsumer {
    private final static String QUEEN_NAME = "Hello World";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try {
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEEN_NAME, false, false, false, null);
            /**
             *  @Author: chen
             *  @Date: 2021/8/17 17:28
             *  @Description: 第一種方式利用匿名類建立
             */
//            Consumer consumer = new DefaultConsumer(channel){
//                @Override
//                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                    String message = new String(body, "UTF-8");
//                    log.info("消費者收到消息:'{}'", message);
//                }
//            };
//            channel.basicConsume(QUEEN_NAME, true, consumer);
            /**
             *  @Author: chen
             *  @Date: 2021/8/17 17:28
             *  @Description: 第二種方式利用Lambda建立
             */
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                log.info("消費者收到消息:'{}'", message);
            };
            channel.basicConsume(QUEEN_NAME, true, deliverCallback, consumerTag -> {});
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

    }
}
           

發送者會通過RabbitMQ發送一條資訊,而接收者會把它列印出來。接收者将會一直運作,等待接收消息。

布置成功之後就可以在浏覽器輸入:http://伺服器的ip:15672/ 可以找到登陸入口。

2、工作隊列(work queue)

學習消息隊列RabbitMQ

Worker模型中也隻有一個工作隊列。但它是一種競争消費模式。可以看到同一個隊列我們綁定上了多個消費者,消費者争搶着消費消息,這可以有效的避免消息堆積。

比如對于短信微服務叢集來說就可以使用這種消息模型,來了請求大家搶着消費掉。

如何實作這種架構:對于上面的HelloWorld這其實就是相同的服務我們啟動了多次罷了,自然就是這種架構。

釋出者代碼:

/**
 *  @Author: chen
 *  @Date: 2021/8/18 10:53
 *  @Description: 學習位址:https://www.rabbitmq.com/tutorials/tutorial-two-java.html
 */
public class NewTask {
    //定義一個消息隊列的名稱
    private static final String QUEEN_NAME = "work";

    public static void main(String[] args) {
        //開啟連接配接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設定主機名稱或IP
        factory.setHost("localhost");
        try {
            //開啟連接配接,抛出異常
            Connection connection = factory.newConnection();
            //開啟連接配接通道
            Channel channel = connection.createChannel();
            //設定隊列參數
            boolean autoAck = true; //消息确認與持久性,消費者發回确認消息,告訴 RabbitMQ 特定消息已被接收、處理,并且 RabbitMQ 可以自由删除它。
            channel.queueDeclare(QUEEN_NAME, autoAck, false, false, null);
            //釋出消息
            String []msg = {"one", "two", "three", "four", "five", "six", "seven", "eight", "night", "ten"};
            String message = String.join("-", msg);//從控制台編譯
            System.out.println(message);
            //我們需要将我們的消息标記為持久性 - 通過将MessageProperties(實作BasicProperties)設定為值PERSISTENT_TEXT_PLAIN
            channel.basicPublish("", QUEEN_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
            //關閉通道
            channel.close();
            //關閉連接配接
            connection.close();
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}
           

消費者1代碼:

@Slf4j
public class WorkConsumer {
    //定義消息隊列名稱
    private static final String QUEEN_NAME = "work";

    public static void main(String[] args) {
        //開啟連接配接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設定主機名稱或IP
        factory.setHost("localhost");
        try {
            //開啟連接配接,抛出異常
            Connection connection = factory.newConnection();
            //開啟連接配接通道
            Channel channel = connection.createChannel();
            //設定隊列參數
            boolean autoAck = true; //消息确認與持久性,消費者發回确認消息,告訴 RabbitMQ 特定消息已被接收、處理,并且 RabbitMQ 可以自由删除它。
            channel.queueDeclare(QUEEN_NAME, autoAck, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            channel.basicQos(1);    //公平排程,一次隻接受一條未确認的消息

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println("接收到的資訊:" + message);
                try {
                    //do somethings
                    doWork(message);
                } finally {
                    System.out.println(" [x] Done");
                    //當您的用戶端退出時,消息将被重新傳送(這可能看起來像随機重新傳送),但 RabbitMQ 将消耗越來越多的記憶體,因為它無法釋放任何未确認的消息。是以利用basicAck()必須在接收傳遞的同一通道上發送确認
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);//确認消息,設定為false
                }
            };
            channel.basicConsume(QUEEN_NAME, false, deliverCallback, consumerTag -> {});
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }

    //模拟執行時間的假任務
    private static void doWork(String message) {
        int count = 0;
        for (char ch
        : message.toCharArray()){
            //遇到'-'符号耗時1s
            if (ch == '-'){
                try {
                    log.info("等待時間:{}秒" , ++count);
                    Thread.sleep(1000);
                } catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
        }
    }
}
           

消費者2代碼:

@Slf4j
public class WorkConsumer2 {
    //定義消息隊列名稱
    private static final String QUEEN_NAME = "work";

    public static void main(String[] args) {
        //開啟連接配接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設定主機名稱或IP
        factory.setHost("localhost");
        try {
            //開啟連接配接,抛出異常
            Connection connection = factory.newConnection();
            //開啟連接配接通道
            Channel channel = connection.createChannel();
            //設定隊列參數
            boolean autoAck = true; //消息确認與持久性,消費者發回确認消息,告訴 RabbitMQ 特定消息已被接收、處理,并且 RabbitMQ 可以自由删除它。
            channel.queueDeclare(QUEEN_NAME, autoAck, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            channel.basicQos(1);    //公平排程,一次隻接受一條未确認的消息

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println("接收到的資訊:" + message);
                try {
                    //do somethings
                    doWork(message);
                } finally {
                    System.out.println(" [x] Done");
                    //當您的用戶端退出時,消息将被重新傳送(這可能看起來像随機重新傳送),但 RabbitMQ 将消耗越來越多的記憶體,因為它無法釋放任何未确認的消息。是以利用basicAck()必須在接收傳遞的同一通道上發送确認
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);//确認消息,設定為false
                }
            };
            channel.basicConsume(QUEEN_NAME, false, deliverCallback, consumerTag -> {});
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }

    //模拟執行時間的假任務
    private static void doWork(String message) {
        int count = 0;
        for (char ch
                : message.toCharArray()){
            //遇到'-'符号耗時1s
            if (ch == '-'){
                try {
                    log.info("勞工做這項工作時間:{}秒" , ++count);
                    Thread.sleep(1000);
                } catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
        }
    }
}
           

總結:在工作隊列當中,假如将消費者比作勞工,隊清單示勞工需要被分派的任務,那麼當勞工沒有完成一項工作的時候,是不會重新擷取其他任務,同時如果因為某個原因導緻任務中斷,那麼RabbitMQ就不會将這項任務在消息隊列中去除,而是會被分派給空閑的勞工,直到這項任務被完成,也可稱為消息應答或消息确認。

3、訂閱模式:

訂閱模型借助一個新的概念:Exchange(交換機)實作,不同的訂閱模型本質上是根據交換機(Exchange)的類型劃分的。

訂閱模型有三種

Fanout(廣播模型): 将消息發送給綁定給交換機的所有隊列(因為他們使用的是同一個RoutingKey)。

Direct(定向): 把消息發送給擁有指定Routing Key (路由鍵)的隊列。

Topic(通配符): 把消息傳遞給擁有 符合Routing Patten(路由模式)的隊列。

(1)訂閱之Fanout模型

這個模型的特點就是它在發送消息的時候,并沒有指明Rounting Key , 或者說他指定了Routing Key,但是所有的消費者都知道,大家都能接收到消息,就像聽廣播。

臨時隊列:

在 Java 用戶端中,當我們不向queueDeclare()提供參數時, 我們會建立一個具有生成名稱的非持久、獨占、自動删除隊列:

此時queueName包含一個随機隊列名稱。例如,它可能看起來像amq.gen-JzTY20BRgKO-HjmUJj0wLg。

綁定:

學習消息隊列RabbitMQ

我們已經建立了一個fanout交換和一個隊列。現在我們需要告訴交換器向我們的隊列發送消息。交換和隊列之間的這種關系稱為綁定。

從現在開始,logs交換會将消息附加到我們的隊列中。

學習消息隊列RabbitMQ

發出日志消息的生産者程式與之前的教程看起來沒有太大差別。最重要的變化是我們現在想要将消息釋出到我們的logs交換而不是無名的交換。我們需要在發送時提供一個routingKey,但它的值在fanout交換時被忽略。

生産者代碼:
/**
 *  @Author: chen
 *  @Date: 2021/8/18 15:02
 *  @Description: RabbitMQ訂閱模型:Fanout
 *  學習位址:https://www.rabbitmq.com/tutorials/tutorial-three-java.html
 */
public class EmitLog {
    //建立交換機名稱
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) {
        //建立連接配接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設定主機名或者IP
        factory.setHost("localhost");
        try {
            //建立連接配接
            Connection connection = factory.newConnection();
            //開啟連接配接通道
            Channel channel = connection.createChannel();
            //定義交換機類型為Fanout
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            //設定釋出的消息
            String []msg = {"one", "two", "three", "four", "five", "six", "seven", "eight", "night", "ten"};
            for (int i = 0; i < 10; i++) {
                channel.basicPublish(EXCHANGE_NAME, "", null, msg[i].getBytes("UTF-8"));
            }
            System.out.println(msg);
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }

    }
}
           
消費者1代碼:
public class LogsConsumer {
    //建立交換機名稱
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) {
        //建立連接配接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設定主機名或IP
        factory.setHost("localhost");
        try {
            //建立連接配接
            Connection connection = factory.newConnection();
            //開啟通道
            Channel channel = connection.createChannel();
            //定義交換機名稱
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            //建立随機隊列
            String queueName = channel.queueDeclare().getQueue();
            //綁定随機隊列與交換機
            channel.queueBind(queueName, EXCHANGE_NAME, "");

            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message =  new String(delivery.getBody(),"UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});

        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}
           
消費者2代碼:
public class LogsConsumer2 {
    //建立交換機名稱
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) {
        //建立連接配接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設定主機名或IP
        factory.setHost("localhost");
        try {
            //建立連接配接
            Connection connection = factory.newConnection();
            //開啟通道
            Channel channel = connection.createChannel();
            //定義交換機名稱
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            //建立随機隊列
            String queueName = channel.queueDeclare().getQueue();
            //綁定随機隊列與交換機
            channel.queueBind(queueName, EXCHANGE_NAME, "");

            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message =  new String(delivery.getBody(),"UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});

        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}
           

總結:這種釋出/訂閱模式是一種廣播機制,并沒有設定路由key模式。它在釋出者代碼中建立一個交換機,并沒有沒有在釋出者代碼中建立隊列,而是在消費者中建立臨時隊列,通過臨時隊列與交換機相連,進而達到廣播效果。

(2)訂閱之Direct模型:

我們使用的是fanout交換,它沒有給我們很大的靈活性——它隻能進行無意識的廣播。

我們将改用direct交換。direct交換背後的路由算法很簡單 - 消息進入其綁定密鑰與消息的路由密鑰完全比對的隊列 。

學習消息隊列RabbitMQ

P:生産者,向Exchange發送消息,發送消息時,會指定一個routing key。

X:Exchange(交換機),接收生産者的消息,然後把消息遞交給 與routing key完全比對的隊列

C1:消費者,其所在隊列指定了需要routing key 為 error 的消息

C2:消費者,其所在隊列指定了需要routing key 為 info、error、warning 的消息

擁有不同的RoutingKey的消費者,會收到來自交換機的不同資訊,而不是大家都使用同一個Routing Key 和廣播模型區分開來。

釋出者代碼:
/**
 *  @Author: chen
 *  @Date: 2021/8/18 16:14
 *  @Description: 訂閱模式:Direct
 */
public class EmitLogDirect {
    //建立交換機名稱
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) {
        //建立連接配接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設定主機名或IP
        factory.setHost("localhost");
        try {
            //建立連接配接
            Connection connection = factory.newConnection();
            //開啟連接配接通道
            Channel channel = connection.createChannel();
            //定義交換機類型與名稱
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            //設定綁定key與資訊
            String []bindKey = {"info","warn", "error", "test"};
            String []message = {"Hello","Warning","Error","ok"};
            //釋出消息
            for (int i = 0; i < bindKey.length; i++) {
                channel.basicPublish(EXCHANGE_NAME, bindKey[i], null, message[i].getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + bindKey[i] + "':'" + message[i] + "'");
            }
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }

    }
}
           
消費者代碼:
public class LogsDirectConsumer {
    //建立交換機名稱
    private static final String EXCHANGE_NAME = "direct_logs";

    //建立線程訂閱'info','warn','error','test'
    public static Runnable runnable = () -> {
        //建立連接配接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設定主機名或IP
        factory.setHost("localhost");
        try {
            //建立連接配接
            Connection connection = factory.newConnection();
            //開啟連接配接通道
            Channel channel = connection.createChannel();
            //定義交換機類型與名稱
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            //建立臨時隊列
            String queueName = channel.queueDeclare().getQueue();
            //綁定隊列、交換機和路由密鑰
            String []bindKey = {"info","warn", "error", "test"};
            for (String severity :
                    bindKey) {
                channel.queueBind(queueName, EXCHANGE_NAME, severity);
            }
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" +
                        delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
            };
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    };

    //建立線程隻訂閱'info','warn','error'
    public static Runnable runnable1 = () -> {
        //建立連接配接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設定主機名或IP
        factory.setHost("localhost");
        try {
            //建立連接配接
            Connection connection = factory.newConnection();
            //開啟連接配接通道
            Channel channel = connection.createChannel();
            //定義交換機類型與名稱
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            //建立臨時隊列
            String queueName = channel.queueDeclare().getQueue();
            //綁定隊列、交換機和路由密鑰
            String []bindKey = {"info","warn", "error"};
            for (String severity :
                    bindKey) {
                channel.queueBind(queueName, EXCHANGE_NAME, severity);
            }
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received queue2 '" +
                        delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
            };
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    };

    public static void main(String[] args) {
        new Thread(runnable, "queue1").start();//訂閱'info','warn','error','test'
        new Thread(runnable1, "queue2").start();//隻訂閱'info','warn','error'
        new Thread(runnable, "queue3").start();//訂閱'info','warn','error','test'
    }
}
           

總結:在direct訂閱模型中,生産者訂閱交換機釋出消息,産生routingKey(用于消費者綁定);而在消費者中,根據消費者的需要,可以随意訂閱自己所需的資訊(通過綁定routingKey實作),即選擇性接收。

(3)訂閱之Topic模型:

學習消息隊列RabbitMQ

類似于Direct模型。差別是Topic的Routing Key支援通配符。

topic交換功能強大,可以像其他交換一樣運作。

當隊列與“ # ”(散列)綁定鍵綁定時——它将接收所有消息,而不管路由鍵——就像在fanout交換中一樣。

當綁定中不使用特殊字元“ * ”(星号)和“ # ”(散列)時,主題交換的行為就像direct交換一樣。

通配符如下:

  • (星号)可以正好代替一個詞。
  • # (hash) 可以代替零個或多個單詞。

根據上圖所示,以下的代碼是根據上圖進行編寫的。

我們發送描述動物的資訊。這是由三個字元以及兩個點組合的路由密鑰進行發送。消費者那邊用*.orange.*、*.*.rabbit以及lazy.#進行适配;生産者那邊釋出動物的消息以及設定路由key:quick.orange.rabbit、lazy.orange.elephant、quick.orange.fox、lazy.brown.fox、lazy.pink.rabbit、quick.brown.fox、quick.orange.male.rabbit、lazy.orange.male.rabbit

釋出者代碼:
/**
 * @Author: chen
 * @Date: 2021/8/19 9:42
 * @Description: RabbitMQ訂閱模型: Topics
 * 學習網址: https://www.rabbitmq.com/tutorials/tutorial-five-java.html
 */
public class EmitLogTopic {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        String[] routingKey = {"quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox", "lazy.brown.fox",
                "lazy.pink.rabbit", "quick.brown.fox", "quick.orange.male.rabbit", "lazy.orange.male.rabbit"};

        String[] message = {"快.橙子.兔子", "慢.橙色.大象", "快.橙色.狐狸", "慢.棕色.狐狸",
                "慢.粉色.兔子", "快.棕色.狐狸", "快.橙色.雄性.兔子", "慢.橙色.雄性.兔子"};

        for (int i = 0; i < routingKey.length; i++) {
            channel.basicPublish(EXCHANGE_NAME, routingKey[i], null, message[i].getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + routingKey[i] + "':'" + message[i] + "'");
        }
        channel.close();
        connection.close();
    }

}
           
消費者代碼:
public class LogsTopicsConsumer {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void consumer(String []routingKey){
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = null;
        try {
            connection = factory.newConnection();
            Channel channel = connection.createChannel();

            channel.exchangeDeclare(EXCHANGE_NAME, "topic");

            String queueName = channel.queueDeclare().getQueue();

            for (String key :
                    routingKey) {
                channel.queueBind(queueName, EXCHANGE_NAME, key);
            }

            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" +
                        delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
            };
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }


    public static void main(String[] args) {
        String[] routingKey1 = {"*.orange.*"};
        String[] routingKey2 = {"*.*.rabbit"};
        String[] routingKey3 = {"lazy.#"};
        consumer(routingKey1);
        consumer(routingKey2);
        consumer(routingKey3);
    }

}
           

結果分析:

路由鍵設定為“ quick.orange.rabbit ”的消息将發送到兩個隊列。

消息“ lazy.orange.elephant ”也會發給他們兩個。

“ quick.orange.fox ”隻會進入第一個隊列,而“ lazy.brown.fox ”隻會進入第二個隊列。“ lazy.pink.rabbit ”隻會被傳送到第二個隊列一次,即使它比對了兩個綁定。“ quick.brown.fox ”不比對任何綁定,是以将被丢棄。

如果我們違反合同并發送一到四個字的消息,例如 “ quick.orange.male.rabbit ”,這些消息不會比對任何綁定并且會丢失。

另一方面,“ lazy.orange.male.rabbit ”,即使它有四個單詞,也會比對最後一個綁定,并将被傳遞到第二個隊列。

下一篇blog文章:SpringBoot + RabbitMQ整合