天天看點

RabbitMQ:路由模式

✨ RabbitMQ:路由模式

  • ​​1.基本介紹​​
  • ​​2.生産者​​
  • ​​3.消費者​​
  • ​​4.測試​​

📃個人首頁:​​不斷前進的皮卡丘​​​

🌞部落格描述:夢想也許遙不可及,但重要的是追夢的過程,用部落格記錄自己的成長,記錄自己一步一步向上攀登的印記

🔥個人專欄:​​消息中間件​​

1.基本介紹

RabbitMQ:路由模式

在路由工作模式中,我們需要配置一個類型為direct的交換機,并且需要指定不同的路由鍵(routing key),把對應的消息從交換機路由到不同的消息隊列進行存儲,由消費者進行消費。

  • P:生産者,向交換機發送消息的時候,會指定一個routing key
  • X:Exchange(交換機),接收生産者的消息,然後把消息傳遞給和routing key完全比對的隊列
  • C1:消費者,它所在隊列指定了需要routing key為error的資訊
  • C2:消費者,其所在隊列指定了需要routing key 為 info、error、warning 的消息

路由模式的特點

隊列和交換機的綁定是需要指定routing key的,不可以随意綁定

消息的發送方向交換機發送消息的時候,也需要指定消息的routing key

交換機不再把消息交給每一個綁定的隊列,而是根據消息的routing key來進行判斷,隻有隊列的routing key和消息的routing key完全一樣才會接收到消息。

2.生産者

public class Producer {
    public static String DIRECT_EXCHANGE = " direct_exchange";
    public static String DIRECT_QUEUE_1 = "direct_queue_1";
    public static String DIRECT_QUEUE_2 = "direct_queue_2";

    public static void main(String[] args) {
        try {
            Channel channel = ConnectUtil.getChannel();
            //聲明交換機(交換機名稱,交換機類型)
            channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);
            //聲明隊列
            channel.queueDeclare(DIRECT_QUEUE_1,true,false,false,null);
            channel.queueDeclare(DIRECT_QUEUE_2,true,false,false,null);
            //把交換機和隊列1進行綁定
            channel.queueBind(DIRECT_QUEUE_1,DIRECT_EXCHANGE,"error");
            //把交換機和隊列2進行綁定
            channel.queueBind(DIRECT_QUEUE_2,DIRECT_EXCHANGE,"info");
            channel.queueBind(DIRECT_QUEUE_2,DIRECT_EXCHANGE,"error");
            channel.queueBind(DIRECT_QUEUE_2,DIRECT_EXCHANGE,"warning");
            //發送消息
            String msg="日志資訊:調用了xxx方法,日志級别是info";
             channel.basicPublish(DIRECT_EXCHANGE,"info",null,msg.getBytes());
            System.out.println("消息發送成功");
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }


    }


}      

3.消費者

消費者1

public class Consumer1 {
    public static void main(String[] args) {

        try {
            //擷取信道對象
            Channel channel = ConnectUtil.getChannel();
            //消費消息
            DefaultConsumer consumer=new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消費者1接收到消息:"+new String(body,"UTF-8"));
                    System.out.println("消費者1把日志資訊儲存到資料庫");
                }
            };
            channel.basicConsume(Producer.DIRECT_QUEUE_1,true,consumer);



        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}      

消費者2

public class Consumer2 {
    public static void main(String[] args) {

        try {
            //擷取信道對象
            Channel channel = ConnectUtil.getChannel();
            //消費消息
            DefaultConsumer consumer=new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消費者2接收到消息:"+new String(body,"UTF-8"));
                    System.out.println("消費者2把日志資訊輸出到控制台");
                }
            };
            channel.basicConsume(Producer.DIRECT_QUEUE_2,true,consumer);



        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}      

4.測試

RabbitMQ:路由模式
RabbitMQ:路由模式

第一次測試,發送日志級别為info的資訊

RabbitMQ:路由模式
RabbitMQ:路由模式
RabbitMQ:路由模式

第二次測試,發送日志級别為error的資訊

RabbitMQ:路由模式