天天看點

RabbitMQ的工作模式Topics  通配符,test測試代

RabbitMQ有以下幾種工作模式 :

1、Work queues  工作隊列

2、Publish/Subscribe 釋出訂閱

3、Routing      路由

4、Topics        通配符

5、Header      Header 轉發器

6、RPC     遠端調用

進入浏覽器,輸入:http://localhost:15672

初始賬号和密碼:guest/guest

RabbitMQ的工作模式Topics  通配符,test測試代

通配符工作模式:                                                                                                       

1、一個交換機可以綁定多個隊列,每個隊列可以設定一個或多個帶通配符的    routingkey.

 2、生産者将消息發給交換機,交換機根據routingkey的值來比對隊列,比對時采用 統配符的方式,比對成功的将消息轉發到指定隊列.

生産者代碼:

package com.xuecheng.test.rabbitmq;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Administrator
 * @version 1.0
 * @create 2018-06-17 19:23
 **/
public class Producer04_topics {
    //隊列名稱
    private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
    private static final String ROUTINGKEY_EMAIL="inform.#.email.#";
    private static final String ROUTINGKEY_SMS="inform.#.sms.#";
    public static void main(String[] args) {
        //通過連接配接工廠建立新的連接配接和mq建立連接配接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);//端口
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //設定虛拟機,一個mq服務可以設定多個虛拟機,每個虛拟機就相當于一個獨立的mq
        connectionFactory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            //建立新連接配接
            connection = connectionFactory.newConnection();
            //建立會話通道,生産者和mq服務所有通信都在channel通道中完成
            channel = connection.createChannel();
            //聲明隊列,如果隊列在mq 中沒有則要建立
            //參數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
            /**
             * 參數明細
             * 1、queue 隊列名稱
             * 2、durable 是否持久化,如果持久化,mq重新開機後隊列還在
             * 3、exclusive 是否獨占連接配接,隊列隻允許在該連接配接中通路,如果connection連接配接關閉隊列則自動删除,如果将此參數設定true可用于臨時隊列的建立
             * 4、autoDelete 自動删除,隊列不再使用時是否自動删除此隊列,如果将此參數和exclusive參數設定為true就可以實作臨時隊列(隊列不用了就自動删除)
             * 5、arguments 參數,可以設定一個隊列的擴充參數,比如:可設定存活時間
             */
            channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
            channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
            //聲明一個交換機
            //參數:String exchange, String type
            /**
             * 參數明細:
             * 1、交換機的名稱
             * 2、交換機的類型
             * fanout:對應的rabbitmq的工作模式是 publish/subscribe
             * direct:對應的Routing	工作模式
             * topic:對應的Topics工作模式
             * headers: 對應的headers工作模式
             */
            channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
            //進行交換機和隊列綁定
            //參數:String queue, String exchange, String routingKey
            /**
             * 參數明細:
             * 1、queue 隊列名稱
             * 2、exchange 交換機名稱
             * 3、routingKey 路由key,作用是交換機根據路由key的值将消息轉發到指定的隊列中,在釋出訂閱模式中調協為空字元串
             */
            channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL);
            channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_SMS);
            //發送消息
            //參數:String exchange, String routingKey, BasicProperties props, byte[] body
            /**
             * 參數明細:
             * 1、exchange,交換機,如果不指定将使用mq的預設交換機(設定為"")
             * 2、routingKey,路由key,交換機根據路由key來将消息轉發到指定的隊列,如果使用預設交換機,routingKey設定為隊列的名稱
             * 3、props,消息的屬性
             * 4、body,消息内容
             */
            for(int i=0;i<5;i++){
                //發送消息的時候指定routingKey
                String message = "send email inform message to user";
                channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.email",null,message.getBytes());
                System.out.println("send to mq "+message);
            }
            for(int i=0;i<5;i++){
                //發送消息的時候指定routingKey
                String message = "send sms inform message to user";
                channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms",null,message.getBytes());
                System.out.println("send to mq "+message);
            }
            for(int i=0;i<5;i++){
                //發送消息的時候指定routingKey
                String message = "send sms and email inform message to user";
                channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms.email",null,message.getBytes());
                System.out.println("send to mq "+message);
            }


        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //關閉連接配接
            //先關閉通道
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }


    }
}
           

郵件消費者代碼:

package com.xuecheng.test.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Administrator
 * @version 1.0
 * @create 2018-06-17 18:22
 **/
public class Consumer04_topics_email {
    //隊列名稱
    private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
    private static final String ROUTINGKEY_EMAIL="inform.#.email.#";

    public static void main(String[] args) throws IOException, TimeoutException {
        //通過連接配接工廠建立新的連接配接和mq建立連接配接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);//端口
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //設定虛拟機,一個mq服務可以設定多個虛拟機,每個虛拟機就相當于一個獨立的mq
        connectionFactory.setVirtualHost("/");

        //建立新連接配接
        Connection connection = connectionFactory.newConnection();
        //建立會話通道,生産者和mq服務所有通信都在channel通道中完成
        Channel channel = connection.createChannel();

        /**
         * 參數明細
         * 1、queue 隊列名稱
         * 2、durable 是否持久化,如果持久化,mq重新開機後隊列還在
         * 3、exclusive 是否獨占連接配接,隊列隻允許在該連接配接中通路,如果connection連接配接關閉隊列則自動删除,如果将此參數設定true可用于臨時隊列的建立
         * 4、autoDelete 自動删除,隊列不再使用時是否自動删除此隊列,如果将此參數和exclusive參數設定為true就可以實作臨時隊列(隊列不用了就自動删除)
         * 5、arguments 參數,可以設定一個隊列的擴充參數,比如:可設定存活時間
         */
        channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);
        //聲明一個交換機
        //參數:String exchange, String type
        /**
         * 參數明細:
         * 1、交換機的名稱
         * 2、交換機的類型
         * fanout:對應的rabbitmq的工作模式是 publish/subscribe
         * direct:對應的Routing	工作模式
         * topic:對應的Topics工作模式
         * headers: 對應的headers工作模式
         */
        channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
        //進行交換機和隊列綁定
        //參數:String queue, String exchange, String routingKey
        /**
         * 參數明細:
         * 1、queue 隊列名稱
         * 2、exchange 交換機名稱
         * 3、routingKey 路由key,作用是交換機根據路由key的值将消息轉發到指定的隊列中,在釋出訂閱模式中調協為空字元串
         */
        channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL);

        //實作消費方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){

            /**
             * 當接收到消息後此方法将被調用
             * @param consumerTag  消費者标簽,用來辨別消費者的,在監聽隊列時設定channel.basicConsume
             * @param envelope 信封,通過envelope
             * @param properties 消息屬性
             * @param body 消息内容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交換機
                String exchange = envelope.getExchange();
                //消息id,mq在channel中用來辨別消息的id,可用于确認消息已接收
                long deliveryTag = envelope.getDeliveryTag();
                //消息内容
                String message= new String(body,"utf-8");
                System.out.println("receive message:"+message);
            }
        };

        //監聽隊列
        //參數:String queue, boolean autoAck, Consumer callback
        /**
         * 參數明細:
         * 1、queue 隊列名稱
         * 2、autoAck 自動回複,當消費者接收到消息後要告訴mq消息已接收,如果将此參數設定為tru表示會自動回複mq,如果設定為false要通過程式設計實作回複
         * 3、callback,消費方法,當消費者接收到消息要執行的方法
         */
        channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);

    }
}
           

短信消費者代碼:

package com.xuecheng.test.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author Administrator
 * @version 1.0
 * @create 2018-06-17 18:22
 **/
public class Consumer04_topics_sms {
    //隊列名稱
    private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    private static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
    private static final String ROUTINGKEY_SMS="inform.#.sms.#";

    public static void main(String[] args) throws IOException, TimeoutException {
        //通過連接配接工廠建立新的連接配接和mq建立連接配接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);//端口
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //設定虛拟機,一個mq服務可以設定多個虛拟機,每個虛拟機就相當于一個獨立的mq
        connectionFactory.setVirtualHost("/");

        //建立新連接配接
        Connection connection = connectionFactory.newConnection();
        //建立會話通道,生産者和mq服務所有通信都在channel通道中完成
        Channel channel = connection.createChannel();

        /**
         * 參數明細
         * 1、queue 隊列名稱
         * 2、durable 是否持久化,如果持久化,mq重新開機後隊列還在
         * 3、exclusive 是否獨占連接配接,隊列隻允許在該連接配接中通路,如果connection連接配接關閉隊列則自動删除,如果将此參數設定true可用于臨時隊列的建立
         * 4、autoDelete 自動删除,隊列不再使用時是否自動删除此隊列,如果将此參數和exclusive參數設定為true就可以實作臨時隊列(隊列不用了就自動删除)
         * 5、arguments 參數,可以設定一個隊列的擴充參數,比如:可設定存活時間
         */
        channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);
        //聲明一個交換機
        //參數:String exchange, String type
        /**
         * 參數明細:
         * 1、交換機的名稱
         * 2、交換機的類型
         * fanout:對應的rabbitmq的工作模式是 publish/subscribe
         * direct:對應的Routing	工作模式
         * topic:對應的Topics工作模式
         * headers: 對應的headers工作模式
         */
        channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
        //進行交換機和隊列綁定
        //參數:String queue, String exchange, String routingKey
        /**
         * 參數明細:
         * 1、queue 隊列名稱
         * 2、exchange 交換機名稱
         * 3、routingKey 路由key,作用是交換機根據路由key的值将消息轉發到指定的隊列中,在釋出訂閱模式中調協為空字元串
         */
        channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_TOPICS_INFORM,ROUTINGKEY_SMS);

        //實作消費方法
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){

            /**
             * 當接收到消息後此方法将被調用
             * @param consumerTag  消費者标簽,用來辨別消費者的,在監聽隊列時設定channel.basicConsume
             * @param envelope 信封,通過envelope
             * @param properties 消息屬性
             * @param body 消息内容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交換機
                String exchange = envelope.getExchange();
                //消息id,mq在channel中用來辨別消息的id,可用于确認消息已接收
                long deliveryTag = envelope.getDeliveryTag();
                //消息内容
                String message= new String(body,"utf-8");
                System.out.println("receive message:"+message);
            }
        };

        //監聽隊列
        //參數:String queue, boolean autoAck, Consumer callback
        /**
         * 參數明細:
         * 1、queue 隊列名稱
         * 2、autoAck 自動回複,當消費者接收到消息後要告訴mq消息已接收,如果将此參數設定為tru表示會自動回複mq,如果設定為false要通過程式設計實作回複
         * 3、callback,消費方法,當消費者接收到消息要執行的方法
         */
        channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer);

    }
}
           

pom:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <artifactId>xc-framework-parent</artifactId>
        <groupId>com.xuecheng</groupId>
        <version>1.0-SNAPSHOT</version>
        <relativePath>../xc-framework-parent/pom.xml</relativePath>
    </parent>
    <artifactId>test-rabbitmq-producer</artifactId>

<dependencies>
<!--    <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.0.3</version>&lt;!&ndash;此版本與spring boot 1.5.9版本比對&ndash;&gt;
</dependency>-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-logging</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
    </dependency>
</dependencies>
</project>