天天看點

rebbitMQ Routing工作模式

rebbitMQ Routing工作模式

路由模式:

1 、每個消費者監聽自己的隊列,并且設定routingkey。

2 、生産者将消息發給交換機,由交換機根據routingkey來轉發消息到指定的隊列。

聲明exchange_routing_inform交換機。

聲明兩個隊列并且綁定到此交換機,綁定時需要指定routingkey

發送消息時需要指定routingkey

生産者:

package cn.pika.mq;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 生産者Routing 
 */
public class ProducerRouting {

    //隊列的名稱
    private static final String QUEUE_INFORM_EMAIL="queue_inform_email"; //郵箱隊列名
    private static final String QUEUE_INFORM_SMS="queue_inform_sms";   //短信隊列名
    //交換機名稱
    private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";


    public static void main(String[] args) {
        Connection connection=null;
        Channel channel=null;
        try {
            //1,建立連結的工廠對象
            ConnectionFactory factory = new ConnectionFactory();
            //1.1 設定連結的參數
            factory.setHost("localhost");   //本地連結位址
            factory.setPort(5672);          //端口号
            factory.setUsername("guest");   //使用者名
            factory.setPassword("guest");   //密碼
            factory.setVirtualHost("/");    //rabbitmq預設虛拟機名稱為“/”,虛拟機相當于一個獨立的mq伺服器
            //1.2 建立連結對象
            connection =factory.newConnection();

            //2,建立與Exchange(交換機)的通道,每個連接配接可以建立多個通道,每個通道代表一個會話任務
            channel =connection.createChannel();

            //3,聲明交換機及其類型
            /**
             * 參數1:交換機的名稱
             * 參數2:交換機的類型:fanout:不處理路由鍵,隊列在綁定交換機的時候,不需要指定路由鍵,寫空字元串即可,交換機在接受到資訊以後會轉發
             *                          到綁定了該交換機的所有隊列中
             *                  topic:将路由鍵和某模式進行比對,不需要綁定交換機,可以使用通配符
             *                  direct:處理路由鍵,隊列在綁定到交換機上的時候,需要指定路由鍵,交換機在接受到資訊以後隻會轉發到和該路由鍵比對的隊列中
             *                  headers:不處理路由鍵。而是根據發送的消息内容中的headers屬性進行比對
             */
            channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);

            //4,聲明一個隊列,如果Rabbit中沒有此隊列将自動建立
            /**
             * 參數介紹:
             *      * param1:隊列名稱
             *      * param2:是否持久化,若持久化,mq重新開機後該隊列仍然存在
             *      * param3:隊列是否獨占此連接配接,隊列隻允許在該連接配接中通路,如果連接配接關閉,則隊列自動删除
             *      * param4:隊列不再使用時是否自動删除此隊列
             *      * param5:擴充參數
             */
            channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null); //聲明一個郵件隊列
            channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);  //聲明一個消息隊列

            //5,綁定隊列
            /**
             * 參數1:隊列名稱
             * 參數2:交換機名稱
             * 參數3:路由鍵(路由key)
             */
            channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_EMAIL); //将email隊列綁定到交換機上
            channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_SMS);   //将sms隊列綁定到交換機上

            //6,發送消息
           for(int i=0;i<10;i++){
               //6.1定義發送的消息内容
               String message="郵箱----- inform to 本次消費99元"+i;

               //6.2 發送消息
               /**
                *       * param1:Exchange(交換機)的名稱,如果沒有指定,則使用Default Exchange 注意:不可以寫null
                *       * param2:routingKey,消息的路由Key,是用于Exchange(交換機)将消息轉發到指定的消息隊列
                *       * param3:消息包含的屬性,沒有可以寫null
                *       * param4:消息體,要發送的消息,位元組數組 byte【】類型
                */
               channel.basicPublish(EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_EMAIL,null,message.getBytes());
               System.out.println("發送成功,消息内容是:"+message);
           }
            for(int i=0;i<10;i++){
                //6.1定義發送的消息内容
                String message="sms----- inform to 本次消費99元"+i;

                //6.2 發送消息
                /**
                 *       * param1:Exchange(交換機)的名稱,如果沒有指定,則使用Default Exchange 注意:不可以寫null
                 *       * param2:routingKey,消息的路由Key,是用于Exchange(交換機)将消息轉發到指定的消息隊列
                 *       * param3:消息包含的屬性,沒有可以寫null
                 *       * param4:消息體,要發送的消息,位元組數組 byte【】類型
                 */
                channel.basicPublish(EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_SMS,null,message.getBytes());
                System.out.println("發送成功,消息内容是:"+message);
            }


        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }finally {
                try {
                    //5,關閉連接配接對象,倒着關!
                    if(channel != null){
                        channel.close();
                    }
                    if(connection != null){
                        connection.close();
                    }

                } catch (IOException | TimeoutException e) {
                    e.printStackTrace();
                }
        }
    }
}
           

消費者email:

package cn.pika.mq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消費者--email
 */
public class ConsumerEmailRouting {
    //隊列的名稱
    private static final String QUEUE_INFORM_EMAIL="queue_inform_email"; //郵箱隊列名
    //交換機名稱
    private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";

    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            //1,建立連結的工廠對象
            ConnectionFactory factory = new ConnectionFactory();
            //1.1 設定連結的參數
            factory.setHost("localhost");   //本地連結位址
            factory.setPort(5672);          //端口号
            factory.setUsername("guest");   //使用者名
            factory.setPassword("guest");   //密碼
            factory.setVirtualHost("/");    //rabbitmq預設虛拟機名稱為“/”,虛拟機相當于一個獨立的mq伺服器
            //1.2 建立連結對象
            connection =factory.newConnection();

            //2,建立與Exchange(交換機)的通道,每個連接配接可以建立多個通道,每個通道代表一個會話任務
            channel =connection.createChannel();

            //3,聲明交換機及其類型
            /**
             * 參數1:交換機的名稱
             * 參數2:交換機的類型:fanout:不處理路由鍵,隊列在綁定交換機的時候,不需要指定路由鍵,寫空字元串即可,交換機在接受到資訊以後會轉發
             *                          到綁定了該交換機的所有隊列中
             *                  topic:将路由鍵和某模式進行比對,不需要綁定交換機,可以使用通配符
             *                  direct:處理路由鍵,隊列在綁定到交換機上的時候,需要指定路由鍵,交換機在接受到資訊以後隻會轉發到和該路由鍵比對的隊列中
             *                  headers:不處理路由鍵。而是根據發送的消息内容中的headers屬性進行比對
             */
            channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);

            //4,聲明一個隊列,如果Rabbit中沒有此隊列将自動建立
            /**
             * 參數介紹:
             *      * param1:隊列名稱
             *      * param2:是否持久化,若持久化,mq重新開機後該隊列仍然存在
             *      * param3:隊列是否獨占此連接配接,隊列隻允許在該連接配接中通路,如果連接配接關閉,則隊列自動删除
             *      * param4:隊列不再使用時是否自動删除此隊列
             *      * param5:擴充參數
             */
            channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);

            //5,綁定隊列
            /**
             * 參數1:隊列名稱
             * 參數2:交換機名稱
             * 參數3:路由鍵(路由key)
             */
            channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_EMAIL); //将email隊列綁定到交換機上

            //6,定義消費方法,監聽到隊列中有消息以後要做什麼
            DefaultConsumer consumer = new DefaultConsumer(channel){

                //4.1重寫handleDelivery()方法,消費着接受消息以後會自動調用此方法
                /**
                 *
                 * @param consumerTag 消費者的标簽,在channel.basicConsume()去指定
                 * @param envelope    消息包的内容,可從中擷取消息id,消息routingkey,交換機,消息和重傳标志(收到消息失敗後是否需要重新發送)
                 * @param properties
                 * @param body 消息體(接受到的消息内容)
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //4.2 擷取接收到的消息
                    String message=new String(body);
                    System.out.println("接收到的消息是:"+message);
                }
            };

            //5,監聽隊列
            /**
             * 參數1:String queue:隊列名稱
             * 參數2:boolean autoAck:是否自動回複,設定為true為表示消息接收到自動向mq回複接收到了,mq接收到回複會删除消息,設定為false則需要手動回複
             * 參數3:Consumer callback:消費方法,消費者接收到消息後調用此方法
             */
            channel.basicConsume(QUEUE_INFORM_EMAIL,true,consumer);
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
           

消費者sms:

package cn.pika.mq;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消費者--email
 */
public class ConsumerSmsRouting {
    //隊列的名稱
    private static final String QUEUE_INFORM_SMS="queue_inform_sms";   //短信隊列名
    //交換機名稱
    private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";

    public static void main(String[] args) {
        Connection connection = null;
        Channel channel = null;
        try {
            //1,建立連結的工廠對象
            ConnectionFactory factory = new ConnectionFactory();
            //1.1 設定連結的參數
            factory.setHost("localhost");   //本地連結位址
            factory.setPort(5672);          //端口号
            factory.setUsername("guest");   //使用者名
            factory.setPassword("guest");   //密碼
            factory.setVirtualHost("/");    //rabbitmq預設虛拟機名稱為“/”,虛拟機相當于一個獨立的mq伺服器
            //1.2 建立連結對象
            connection =factory.newConnection();

            //2,建立與Exchange(交換機)的通道,每個連接配接可以建立多個通道,每個通道代表一個會話任務
            channel =connection.createChannel();

            //3,聲明交換機及其類型
            /**
             * 參數1:交換機的名稱
             * 參數2:交換機的類型:fanout:不處理路由鍵,隊列在綁定交換機的時候,不需要指定路由鍵,寫空字元串即可,交換機在接受到資訊以後會轉發
             *                          到綁定了該交換機的所有隊列中
             *                  topic:将路由鍵和某模式進行比對,不需要綁定交換機,可以使用通配符
             *                  direct:處理路由鍵,隊列在綁定到交換機上的時候,需要指定路由鍵,交換機在接受到資訊以後隻會轉發到和該路由鍵比對的隊列中
             *                  headers:不處理路由鍵。而是根據發送的消息内容中的headers屬性進行比對
             */
            channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);

            //4,聲明一個隊列,如果Rabbit中沒有此隊列将自動建立
            /**
             * 參數介紹:
             *      * param1:隊列名稱
             *      * param2:是否持久化,若持久化,mq重新開機後該隊列仍然存在
             *      * param3:隊列是否獨占此連接配接,隊列隻允許在該連接配接中通路,如果連接配接關閉,則隊列自動删除
             *      * param4:隊列不再使用時是否自動删除此隊列
             *      * param5:擴充參數
             */
            channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);

            //5,綁定隊列
            /**
             * 參數1:隊列名稱
             * 參數2:交換機名稱
             * 參數3:路由鍵(路由key)
             */
            channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_SMS); //将email隊列綁定到交換機上

            //6,定義消費方法,監聽到隊列中有消息以後要做什麼
            DefaultConsumer consumer = new DefaultConsumer(channel){

                //4.1重寫handleDelivery()方法,消費着接受消息以後會自動調用此方法
                /**
                 *
                 * @param consumerTag 消費者的标簽,在channel.basicConsume()去指定
                 * @param envelope    消息包的内容,可從中擷取消息id,消息routingkey,交換機,消息和重傳标志(收到消息失敗後是否需要重新發送)
                 * @param properties
                 * @param body 消息體(接受到的消息内容)
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    //4.2 擷取接收到的消息
                    String message=new String(body);
                    System.out.println("接收到的消息是:"+message);
                }
            };

            //5,監聽隊列
            /**
             * 參數1:String queue:隊列名稱
             * 參數2:boolean autoAck:是否自動回複,設定為true為表示消息接收到自動向mq回複接收到了,mq接收到回複會删除消息,設定為false則需要手動回複
             * 參數3:Consumer callback:消費方法,消費者接收到消息後調用此方法
             */
            channel.basicConsume(QUEUE_INFORM_SMS,true,consumer);
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}