天天看點

RabbitMQ工作隊列之Routing模式(四)RabbitMQ工作隊列之Routing模式(四)

RabbitMQ工作隊列之Routing模式(四)

上一篇講了關于fanout釋出訂閱模式,允許系統推送消息至所有的訂閱者(消費者),擴充一下問題,如果将生産者生産的消息進行分類,那麼所有的消費者都會收到全部消息,這不是我們樂意見到的。如果我們希望可以對消費者進行分類,每個消費者能接收特定的消息,那該如何改造呢?

答案是肯定哒,标題已經說明了,本篇博文主要講解的是RabbitMQ工作隊列中的Routing路由密鑰模式。

RabbitMQ工作隊列中的Routing路由密鑰模式三種使用方式:

  • 基于原生client用戶端連接配接使用
  • 基于spring xml配置檔案內建使用
  • 基于spring boot內建使用

老規矩,來一張官網模型截圖:

RabbitMQ工作隊列之Routing模式(四)RabbitMQ工作隊列之Routing模式(四)

目錄

[TOC]

來生成目錄:

文章目錄

  • RabbitMQ工作隊列之Routing模式(四)
      • 目錄
    • @[toc]
    • 1、基于原生client用戶端連接配接使用
        • 1.1、maven建構,pom檔案加入依賴
        • 1.2、建立連接配接工廠
        • 1.3、建立消息生産者
        • 1.4、定義info類型消費者,此消費者可以接收info、warning、error類型
        • 1.5、定義warning類型消費者,隻能接收warning類型消息
        • 1.6、定義Error類型消費者,隻能接收error類型消息
        • 1.7、運作效果
        • 1.8、RabbitMQ管控台
    • 2、基于spring xml配置檔案內建使用
        • 2.1、maven工廠引入依賴包:
        • 2.2、建立spring加載配置檔案context.xml
        • 2.3、建立rabbitMQ配置檔案rabbitmq-routing.xml
        • 2.4、建立生産者SendRouting
        • 2.5、info消費者
        • 2.6、error消費者
        • 2.7、warning消費者
        • 2.8、運作效果圖
    • 3、基于spring boot內建使用
        • 3.1、maven加入依賴包
        • 3.2、建立application.yml配置檔案
        • 3.3、建立擷取靜态參數類
        • 3.4、建立rabbitMQ注解配置
        • 3.5、建立controller層模拟生産者
        • 3.6、info消費者
        • 3.7、error消費者
        • 3.8、warning消費者
        • 3.9、運作效果
    • 4、小結
        • 小結

1、基于原生client用戶端連接配接使用

1.1、maven建構,pom檔案加入依賴

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
        <!--引入rbiitmq的連接配接工具包-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.3.0</version>
        </dependency>
           

1.2、建立連接配接工廠

package com.edu.rabbitmq;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author : alex
 * @version :1.0.0
 * @Date : create by 2018/7/19 22:01
 * @description :擷取連接配接
 * @note 注意事項
 */
public class ConnectionUtils {

    //擷取接連
    public static Connection getConnection() throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();

        //設定連接配接MQ的IP位址
        factory.setHost("192.168.199.128");
        //設定連接配接端口号
        factory.setPort(5672);
        //設定要接連MQ的庫(域)
        factory.setVirtualHost("/test_vh");
        //連接配接帳号
        factory.setUsername("root");
        //連接配接密碼
        factory.setPassword("123456");
        return factory.newConnection();
    }

}

           

1.3、建立消息生産者

package com.edu.routing;

import com.edu.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author : alex
 * @version :1.0.0
 * @Date : create by 2018/7/19 22:16
 * @description :消息生産者
 * @note 注意事項
 */
public class Sender {
    //定義交換器名稱
    public static  String EXCHANGE_NAME = "test_origin_routing_exchange";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        //擷取連接配接
        Connection connection = ConnectionUtils.getConnection();

        //從連接配接中擷取一個通道
        Channel channel = connection.createChannel();

        /**
         *
         * 聲明一個交換器
         *
         * 文檔位址:https://rabbitmq.github.io/rabbitmq-java-client/api/current/com/rabbitmq/client/Channel.html#exchangeDeclare-java.lang.String-com.rabbitmq.client.BuiltinExchangeType-boolean-boolean-java.util.Map-
         * 該方法定義了許多的重載,拿出完整參數的方法來講
         * AMQP.Exchange.DeclareOk exchangeDeclare​(String exchange,
         *                                         BuiltinExchangeType type,
         *                                         boolean durable,
         *                                         boolean autoDelete,
         *                                         Map<String,Object> arguments)
         *                                  throws IOException
         *
         *  @param exchange 交換器的名字
         *  @param type 交換器的類型:direct, topic, headers, fanout
         *  @param durable 是否持久化,true持久化,false不持久化
         *  @param autoDelete 伺服器不再使用該隊列時,是否自動删除,true删除,false不删除
         *  @param arguments 其他參數,其實是定義交換器的構造方法
         */
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");


        Integer log_count = 3;

        for (int i = 0; i < 50; i++) {
            Thread.sleep(500);//模拟耗時操作,别發那麼快
            //自定義消息
            String msg = "hello word " + i;

            Integer qumo =i % log_count;//取餘,模拟分發不同類型的routingKey

            switch (qumo){
                case 0:
                    //釋出消息,通過交換器名稱進行釋出消息,指定routingKey路由密鑰
                    channel.basicPublish(EXCHANGE_NAME,"info",null,msg.getBytes());
                    System.out.println("-->info send " + msg);
                    break;
                case 1:
                    //釋出消息,通過交換器名稱進行釋出消息,指定routingKey路由密鑰
                    channel.basicPublish(EXCHANGE_NAME,"error",null,msg.getBytes());
                    System.out.println("-->error send " + msg);
                    break;
                case 2:
                    //釋出消息,通過交換器名稱進行釋出消息,指定routingKey路由密鑰
                    channel.basicPublish(EXCHANGE_NAME,"warning",null,msg.getBytes());
                    System.out.println("-->warning send " + msg);
                    break;
            }


        }

        channel.close();//關閉通道
        connection.close();//關閉連接配接


    }

}
           

1.4、定義info類型消費者,此消費者可以接收info、warning、error類型

package com.edu.routing;

import com.edu.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author : alex
 * @version :1.0.0
 * @Date : create by 2018/7/19 22:16
 * @description :消息消費者
 * @note 注意事項
 */
public class CustomeriInfo {

    //隊列名稱
    public static String QUEUE_NAME = "test_origin_routing_queue_info";

    //定義交換器名稱
    public static  String EXCHANGE_NAME = "test_origin_routing_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {

        //建立連接配接
        Connection connection = ConnectionUtils.getConnection();

        //擷取通道
        Channel channel = connection.createChannel();

        //擷取交換器
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        //建立消息聲明
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);


        /**
         * 交換器綁定隊列
         * 該方法定義了2個重載
         * AMQP.Queue.BindOk queueBind​(String queue,
         *                             String exchange,
         *                             String routingKey,
         *                             Map<String,Object> arguments)
         *                      throws IOException
         *
         *  @param queue 隊列名稱
         *  @param exchange 交換器名稱
         *  @param routingKey 用于綁定的路由密鑰
         *  @param arguments 其他參數,其實是定義交換器的構造方法
         */
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");//綁定routingKey
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");//綁定routingKey
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");//綁定routingKey

        //定義消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                //擷取并轉成String
                String message = new String(body, "UTF-8");
                System.out.println("-->info消費者收到消息,msg:"+message);
            }
        };

        //監聽隊列
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }

}
           

1.5、定義warning類型消費者,隻能接收warning類型消息

package com.edu.routing;

import com.edu.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author : alex
 * @version :1.0.0
 * @Date : create by 2018/7/19 22:16
 * @description :消息消費者
 * @note 注意事項
 */
public class CustomerWarning {

    //隊列名稱
    public static String QUEUE_NAME = "test_origin_routing_queue_warning";

    //定義交換器名稱
    public static  String EXCHANGE_NAME = "test_origin_routing_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {

        //建立連接配接
        Connection connection = ConnectionUtils.getConnection();

        //擷取通道
        Channel channel = connection.createChannel();

        //擷取交換器
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        //建立消息聲明
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);


        /**
         * 交換器綁定隊列
         * 該方法定義了2個重載
         * AMQP.Queue.BindOk queueBind​(String queue,
         *                             String exchange,
         *                             String routingKey,
         *                             Map<String,Object> arguments)
         *                      throws IOException
         *
         *  @param queue 隊列名稱
         *  @param exchange 交換器名稱
         *  @param routingKey 用于綁定的路由密鑰
         *  @param arguments 其他參數,其實是定義交換器的構造方法
         */
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");

        //定義消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                //擷取并轉成String
                String message = new String(body, "UTF-8");
                System.out.println("-->warning消費者收到消息,msg:"+message);
            }
        };

        //監聽隊列
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }

}
           

1.6、定義Error類型消費者,隻能接收error類型消息

package com.edu.routing;

import com.edu.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author : alex
 * @version :1.0.0
 * @Date : create by 2018/7/19 22:16
 * @description :消息消費者
 * @note 注意事項
 */
public class CustomerError {

    //隊列名稱
    public static String QUEUE_NAME = "test_origin_routing_queue_error";

    //定義交換器名稱
    public static  String EXCHANGE_NAME = "test_origin_routing_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {

        //建立連接配接
        Connection connection = ConnectionUtils.getConnection();

        //擷取通道
        Channel channel = connection.createChannel();

        //擷取交換器
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        //建立消息聲明
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);


        /**
         * 交換器綁定隊列
         * 該方法定義了2個重載
         * AMQP.Queue.BindOk queueBind​(String queue,
         *                             String exchange,
         *                             String routingKey,
         *                             Map<String,Object> arguments)
         *                      throws IOException
         *
         *  @param queue 隊列名稱
         *  @param exchange 交換器名稱
         *  @param routingKey 用于綁定的路由密鑰
         *  @param arguments 其他參數,其實是定義交換器的構造方法
         */
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");

        //定義消費者
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                //擷取并轉成String
                String message = new String(body, "UTF-8");
                System.out.println("-->error消費者收到消息,msg:"+message);
            }
        };

        //監聽隊列
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }

}
           

1.7、運作效果

生産者生産消息

RabbitMQ工作隊列之Routing模式(四)RabbitMQ工作隊列之Routing模式(四)

info

RabbitMQ工作隊列之Routing模式(四)RabbitMQ工作隊列之Routing模式(四)

error

RabbitMQ工作隊列之Routing模式(四)RabbitMQ工作隊列之Routing模式(四)

warning

RabbitMQ工作隊列之Routing模式(四)RabbitMQ工作隊列之Routing模式(四)

1.8、RabbitMQ管控台

RabbitMQ工作隊列之Routing模式(四)RabbitMQ工作隊列之Routing模式(四)
RabbitMQ工作隊列之Routing模式(四)RabbitMQ工作隊列之Routing模式(四)

檢視exchange綁定的queue

RabbitMQ工作隊列之Routing模式(四)RabbitMQ工作隊列之Routing模式(四)

2、基于spring xml配置檔案內建使用

2.1、maven工廠引入依賴包:

<!--引入rbiitmq的連接配接工具包-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>4.0.0</version>
        </dependency>
        <!-- 引入spring內建rabbit的包 -->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>1.7.3.RELEASE</version>
        </dependency>

        <!-- spring核心庫 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>5.0.7.RELEASE</version>
        </dependency>

        <!-- springbean庫 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
            <version>5.0.7.RELEASE</version>
        </dependency>

        <!-- 上下文 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.0.7.RELEASE</version>
        </dependency>
           

2.2、建立spring加載配置檔案context.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-4.0.xsd">

    <!--加載routing路由密鑰模式-->
    <import resource="classpath:rabbitmq-routing.xml" />

</beans>
           

2.3、建立rabbitMQ配置檔案rabbitmq-routing.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/rabbit
    http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-4.0.xsd">

    <!--配置connection-factory,指定連接配接rabbit server參數 -->
    <rabbit:connection-factory id="connectionFactory" virtual-host="/test_vh" username="root"
                               password="123456" host="192.168.199.128" port="5672" />


    <!--MQ的管理,包括隊列,交換器,聲明等-->
    <rabbit:admin connection-factory="connectionFactory"   />

    <!--定義rabbit模版,指定連接配接工廠以及定義exchange-->
    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="directExchangeName"  />

    <!--定義隊列,自動聲明(可以用于發消息和監聽使用)-->
    <!--定義queue  說明:durable:是否持久化 exclusive: 僅建立者可以使用的私有隊列,斷開後自動删除 auto_delete: 當所有消費用戶端連接配接斷開後,是否自動删除隊列-->
    <rabbit:queue id="test_routing_spring_xml1" name="test_routing_spring_xml1" auto-declare="true" auto-delete="true" />
    <!--定義隊列-->
    <rabbit:queue id="test_routing_spring_xml2" name="test_routing_spring_xml2" auto-declare="true" auto-delete="true" />
    <!--定義隊列-->
    <rabbit:queue id="test_routing_spring_xml3" name="test_routing_spring_xml3" auto-declare="true" auto-delete="true" />


    <!--定義direct交換器,并且隊列綁定交換器,記住生産者發送消息的時候,使用的是這個交換器的name-->
    <rabbit:direct-exchange id="directExchangeName" name="test_routing_spring_exchange">
        <rabbit:bindings>
            <rabbit:binding queue="test_routing_spring_xml1" key="info"  />
            <rabbit:binding queue="test_routing_spring_xml1" key="error"  />
            <rabbit:binding queue="test_routing_spring_xml1" key="warning"  />
            <rabbit:binding queue="test_routing_spring_xml2" key="warning"  />
            <rabbit:binding queue="test_routing_spring_xml3" key="error" />
        </rabbit:bindings>
    </rabbit:direct-exchange>
    

    <!--定義消費者-->
    <bean id="myCustomer" class="routing.CustomerInfo"/>
    <bean id="myCustomer2" class="routing.CustomerWarning"/>
    <bean id="myCustomer3" class="routing.CustomerError"/>

    <!--隊列監聽 acknowledge應答方式:auto,manual,none -->
    <rabbit:listener-container connection-factory="connectionFactory" >
        <rabbit:listener ref="myCustomer" method="listen" queue-names="test_routing_spring_xml1"  />
        <rabbit:listener ref="myCustomer2" method="listen" queue-names="test_routing_spring_xml2"  />
        <rabbit:listener ref="myCustomer3" method="listen" queue-names="test_routing_spring_xml3"  />
    </rabbit:listener-container>



</beans>
           

2.4、建立生産者SendRouting

package com.rabbit;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * @author: Alex
 * @DateTime: 2018/8/22 19:40
 * @Description: 生産者
 * @Version: 1.0.0
 **/
public class SendRouting {

    //交換器名稱,這裡的exchange名稱要和rabbitmq-routing.xml裡面配置對應
    private static String EXCHANGE_NAME = "test_routing_spring_exchange";

    public static void main(String[] args) throws InterruptedException {


        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:context.xml");

        //擷取rabbit模版(等價于@Autowired)
        RabbitTemplate bean = context.getBean(RabbitTemplate.class);

        Integer log_count = 3;

        for (int i = 0; i < 30; i++) {
            Thread.sleep(500);//模拟耗時操作,别發那麼快
            //自定義消息
            String msg = "hello word " + i;

            Integer qumo =i % log_count;//取餘,模拟分發不同類型的routingKey

            switch (qumo){
                case 0:
                    //釋出消息,通過交換器名稱進行釋出消息,指定routingKey路由密鑰
                    bean.convertAndSend(EXCHANGE_NAME,"info","hello word"+i);
                    System.out.println("-->info send " + msg);
                    break;
                case 1:
                    //釋出消息,通過交換器名稱進行釋出消息,指定routingKey路由密鑰
                    bean.convertAndSend(EXCHANGE_NAME,"error","hello word"+i);
                    System.out.println("-->error send " + msg);
                    break;
                case 2:
                    //釋出消息,通過交換器名稱進行釋出消息,指定routingKey路由密鑰
                    bean.convertAndSend(EXCHANGE_NAME,"warning","hello word"+i);
                    System.out.println("-->warning send " + msg);
                    break;
            }


        }

        Thread.sleep(10000);//休眠2秒後,關閉spring容器
        context.close();

    }
}

           

2.5、info消費者

package routing;

/**
 * @author : alex
 * @version :1.0.0
 * @Date : create by 2018/7/19 23:39
 * @description :我的消費者info
 * @note 注意事項
 */
public class CustomerInfo {

    public void listen(String foo){
        System.out.println("消費者消費info,擷取消息msg:"+foo);
    }

}
           

2.6、error消費者

package routing;

/**
 * @author : alex
 * @version :1.0.0
 * @Date : create by 2018/7/19 23:39
 * @description :我的消費者error
 * @note 注意事項
 */
public class CustomerError {

    public void listen(String foo){
        System.out.println("消費者消費error,擷取消息msg:"+foo);
    }

}
           

2.7、warning消費者

package routing;

/**
 * @author : alex
 * @version :1.0.0
 * @Date : create by 2018/7/19 23:39
 * @description :我的消費者warning
 * @note 注意事項
 */
public class CustomerWarning {

    public void listen(String foo){
        System.out.println("消費者消費warning,擷取消息msg:"+foo);
    }

}
           

2.8、運作效果圖

RabbitMQ工作隊列之Routing模式(四)RabbitMQ工作隊列之Routing模式(四)

由此圖我們可以看出,我們的

info消費者綁定了info,error,warning三個類型的消息

error消費者綁定error消息

warning消費者綁定warning消息

每次info都會列印消息,

error和warning隻有接收屬于自身的消息才會列印消息

3、基于spring boot內建使用

3.1、maven加入依賴包

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- rabbitmq -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
           

3.2、建立application.yml配置檔案

spring:
  rabbitmq:
    username: root
    password: 123456
    host: 192.168.199.128
    port: 5672
    virtual-host: /test_vh


rabbitMQconfig:
  queueName:
    info: test_spring_boot_routing_info
    error: test_spring_boot_routing_error
    warning: test_spring_boot_routing_warning
  exchangeName:
    directName: test_spring_boot_exchange_routing
           

3.3、建立擷取靜态參數類

package cn.rabbitmq.edu.rabbitmq_spring_boot.utils;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * @author: Alex
 * @DateTime: 2018/8/23 11:19
 * @Description: 靜态參數類
 * @Version: 1.0.0
 **/
@Component
public class ParamUtil {

    @Value("${rabbitMQconfig.queueName.info}")
    public String queueNameInfo;

    @Value("${rabbitMQconfig.queueName.error}")
    public String queueNameError;

    @Value("${rabbitMQconfig.queueName.warning}")
    public String queueNameWarning;

    @Value("${rabbitMQconfig.exchangeName.directName}")
    public String directName;
}

           

3.4、建立rabbitMQ注解配置

package cn.rabbitmq.edu.rabbitmq_spring_boot.utils;


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author: Alex
 * @DateTime: 2018/8/23 10:52
 * @Description: rabbitMQ配置聲明
 * @Version: 1.0.0
 **/
@Configuration
public class RabbitMQDeclareUtil {

    @Autowired
    private ParamUtil paramUtil;

    @Bean
    Queue getQueue1(){
        //定義第一個隊列隊列,Ctrl+滑鼠左鍵,點選Queue可以看到定義
        return new Queue(paramUtil.queueNameInfo);
    }

    @Bean
    Queue getQueue2(){
        //定義第二個隊列
        return new Queue(paramUtil.queueNameError);
    }

    @Bean
    Queue getQueue3(){
        //定義第三個隊列
        return new Queue(paramUtil.queueNameWarning);
    }

    @Bean
    DirectExchange getExchange() {
        //定義一個DirectExchange交換器
        return new DirectExchange(paramUtil.directName);
    }


    /**
     * info隊列與交換器綁定,指定routingKey為info
     * @param getQueue1     定義的第一個隊列
     * @param getExchange   定義的交換器
     * @return
     */
    @Bean
    Binding bindingInfo(Queue getQueue1, DirectExchange getExchange) {
        return BindingBuilder.bind(getQueue1).to(getExchange).with("info");
    }

    /**
     * info隊列與交換器綁定,指定routingKey為error
     * @param getQueue1     定義的第一個隊列
     * @param getExchange   定義的交換器
     * @return
     */
    @Bean
    Binding bindingInfo1(Queue getQueue1, DirectExchange getExchange) {
        return BindingBuilder.bind(getQueue1).to(getExchange).with("error");
    }
    /**
     * info隊列與交換器綁定,指定routingKey為warning
     * @param getQueue1     定義的第一個隊列
     * @param getExchange   定義的交換器
     * @return
     */
    @Bean
    Binding bindingInfo2(Queue getQueue1, DirectExchange getExchange) {
        return BindingBuilder.bind(getQueue1).to(getExchange).with("warning");
    }

    /**
     * error隊列與交換器綁定,指定routingKey為error
     * @param getQueue2
     * @param getExchange
     * @return
     */
    @Bean
    Binding bindingError(Queue getQueue2, DirectExchange getExchange) {
        return BindingBuilder.bind(getQueue2).to(getExchange).with("error");
    }

    /**
     * warning隊列與交換器綁定,指定routingKey為warning
     * @param getQueue3
     * @param getExchange
     * @return
     */
    @Bean
    Binding bindingWarning(Queue getQueue3, DirectExchange getExchange) {
        return BindingBuilder.bind(getQueue3).to(getExchange).with("warning");
    }
}

           

3.5、建立controller層模拟生産者

package cn.rabbitmq.edu.rabbitmq_spring_boot.controller;

import cn.rabbitmq.edu.rabbitmq_spring_boot.utils.ParamUtil;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author: Alex
 * @DateTime: 2018/8/17 16:36
 * @Description: web通路模拟發送
 * @Version: 1.0.0
 **/
@RestController
public class IndexController {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Autowired
    private ParamUtil paramUtil;

    /**
     * 使用AmqpTemplate
     * @return
     * @throws Exception
     */
    @PostMapping("/amqpSend")
    public String amqpSend() throws Exception{
        Integer log_count = 3;
        for (int i = 0; i < 30; i++) {
            Thread.sleep(500);//模拟耗時操作,别發那麼快
            String msg = "hello amqp " + i;//自定義消息
            Integer qumo =i % log_count;//取餘,模拟分發不同類型的routingKey
            switch (qumo){
                case 0:
                    //釋出消息,通過交換器名稱進行釋出消息,指定routingKey路由密鑰
                    amqpTemplate.convertAndSend(paramUtil.directName,"info",msg);//根據指定的exchange發送資料
                    System.out.println("-->info send " + msg);
                    break;
                case 1:
                    //釋出消息,通過交換器名稱進行釋出消息,指定routingKey路由密鑰
                    amqpTemplate.convertAndSend(paramUtil.directName,"error",msg);//根據指定的exchange發送資料
                    System.out.println("-->error send " + msg);
                    break;
                case 2:
                    //釋出消息,通過交換器名稱進行釋出消息,指定routingKey路由密鑰
                    amqpTemplate.convertAndSend(paramUtil.directName,"warning",msg);//根據指定的exchange發送資料
                    System.out.println("-->warning send " + msg);
                    break;
            }
        }
        return "success";
    }
}

           

3.6、info消費者

package cn.rabbitmq.edu.rabbitmq_spring_boot.customer;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author: Alex
 * @DateTime: 2018/8/17 16:35
 * @Description: 模拟消費者1
 * @Version: 1.0.0
 **/
@Component
//監聽的隊列
@RabbitListener(queues = "test_spring_boot_routing_info")
public class CustomerMsg {

    /**
     * 進行接收處理
     * @param string
     */
    @RabbitHandler
    public void onMessage(String string,Channel channel, Message message) throws IOException, InterruptedException {
        System.out.println("消費者info,接收時間:"+System.currentTimeMillis()+",收到消息,消息: " + string);
//        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//手動确認
        //丢棄這條消息
        //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
    }

}

           

3.7、error消費者

package cn.rabbitmq.edu.rabbitmq_spring_boot.customer;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author: Alex
 * @DateTime: 2018/8/17 16:35
 * @Description: 模拟消費者2
 * @Version: 1.0.0
 **/
@Component
//監聽的隊列
@RabbitListener(queues = "test_spring_boot_routing_error")
public class CustomerMsg2 {

    /**
     * 進行接收處理
     * @param string
     */
    @RabbitHandler
    public void onMessage(String string,Channel channel, Message message) throws IOException, InterruptedException {
        System.out.println("消費者error,接收時間:"+System.currentTimeMillis()+",收到消息,消息: " + string);
//        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//手動确認
        //丢棄這條消息
        //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
    }

}

           

3.8、warning消費者

package cn.rabbitmq.edu.rabbitmq_spring_boot.customer;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author: Alex
 * @DateTime: 2018/8/17 16:35
 * @Description: 模拟消費者2
 * @Version: 1.0.0
 **/
@Component
//監聽的隊列
@RabbitListener(queues = "test_spring_boot_routing_warning")
public class CustomerMsg3 {

    /**
     * 進行接收處理
     * @param string
     */
    @RabbitHandler
    public void onMessage(String string,Channel channel, Message message) throws IOException, InterruptedException {
        System.out.println("消費者warning,接收時間:"+System.currentTimeMillis()+",收到消息,消息: " + string);
//        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//手動确認
        //丢棄這條消息
        //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
    }

}

           

3.9、運作效果

RabbitMQ工作隊列之Routing模式(四)RabbitMQ工作隊列之Routing模式(四)

4、小結

小結

通過本章的學習,學到了routing模式,也就是exchange為direct類型的定義,routing模式為路由密鑰模式,通過給定的routingKey進行與隊列的比對,比對綁定後,發送消息就到達指定的隊列啦。

回頭再看上一章,上一章開始接觸exchange,目前學習了2類exchange的類型,fanout和direct模式。

fanout為扇形廣播消息

direct為比對路由密鑰廣播消息