天天看點

rabbitmq 工作模式 一

學習工作模式前,先看一下rabbitmq 給的helloworld案例

rabbitmq 工作模式 一

這是傳統的一對一,,,,  也就是一台機器生産,一台機器接收....

為了更好的了解代碼....我這裡示範的話用底層的代碼來示範....不整合架構了

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.0.3</version>
</dependency>
           

這是rabbitmq 提供的依賴......導入一下就可以測試了

下面是我寫的生産類...

        測試是沒問題的

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer01 {

    //隊列名稱
    private static final String QUEUE = "helloworld";

    public static void main(String[] args) throws IOException, TimeoutException {




        Connection connection = null;   //與
        Channel channel = null;


        try {

            ConnectionFactory factory = new ConnectionFactory();

            //設定ip
            factory.setHost("localhost");

            //設定端口
            factory.setPort(5672);

            //設定賬号密碼
            factory.setUsername("guest");  //預設賬号密碼都是guest
            factory.setPassword("guest");

            //設定虛拟空間
            factory.setVirtualHost("/");//虛拟機預設的虛拟名稱為/ , 虛拟機相當于一個獨立的伺服器

            //建立與RabbitMQ服務的TCP連接配接
            connection = factory.newConnection();

            //建立連接配接通道 ,每個連接配接可以建立多個通道,每個通道隻有一個會話
            channel = connection.createChannel();

            //這有五個參數
            /***
             * 聲明隊列,如果Rabbit中沒有此隊列将自動建立*
             * param1:隊列名稱*
             * param2:是否持久化*  rabbit 關閉了該隊列是否存在..
             * param3:隊列是否獨占此連接配接*    如果參數是true,那麼一個連接配接connection 隻能存在這一個channel,除非關閉程式
             * param4:隊列不再使用時是否自動删除此隊列*    該隊列不使用了就會删除該隊列
             *
             * param5:隊列參數*/
            channel.queueDeclare(QUEUE, true, false, false,null );

            String message = "你愛到極緻的人,不會愛你";


            //釋出消息
            /**
             * String exchange, String routingKey,  BasicProperties props, byte[] body
             *
             * param1  : 交換機 後面我會講這裡是指定交換機,使用預設的交換機
             * param2  :  路由key,這也是先不寫,,後面講   大概作用是用于Exchange(交換機)将消息轉發到指定的消息隊列
             *  param3 :消息包含的屬性
             *   消息體
             *
             */
            channel.basicPublish("", QUEUE,null ,message.getBytes() );

            System.out.println("Send Message is:'" + message + "'");
        }catch (Exception e){

        }finally {
            if (channel != null){
                channel.close();
            }
            
            if (connection != null){
                connection.close();
            }
        }
    }
}
           

然後下面是我寫的消費類 ,,  連接配接mq代碼都一樣來着...關注發送消息和接收消息的方法就行 ...

同樣測試過,代碼是可運作的

import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer01 {

    //隊列名稱
    private static final String QUEUE = "helloworld";

    public static void main(String[] args){

        Connection connection = null;
        Channel channel = null;

        try {



            ConnectionFactory factory = new ConnectionFactory();

            //設定ip
            factory.setHost("localhost");

            //設定端口
            factory.setPort(5672);

            //設定賬号密碼
            factory.setUsername("guest");  //預設賬号密碼都是guest
            factory.setPassword("guest");

            //設定虛拟空間
            factory.setVirtualHost("/");//虛拟機預設的虛拟名稱為/ , 虛拟機相當于一個獨立的伺服器

            //建立與RabbitMQ服務的TCP連接配接
            connection = factory.newConnection();

            //建立連接配接通道 ,每個連接配接可以建立多個通道,每個通道隻有一個會話
            channel = connection.createChannel();

            //這有五個參數
            /***
             * 聲明隊列,如果Rabbit中沒有此隊列将自動建立*
             * param1:隊列名稱*
             * param2:是否持久化*  rabbit 關閉了該隊列是否存在..
             * param3:隊列是否獨占此連接配接*    如果參數是true,那麼一個連接配接connection 隻能存在這一個channel,除非關閉程式
             * param4:隊列不再使用時是否自動删除此隊列*    該隊列不使用了就會删除該隊列
             *
             * param5:隊列參數*/
            channel.queueDeclare(QUEUE, true, false, false,null );  //這裡其實可以不用聲明隊列的,因為 生産者已經聲明過了,但是如果生産者後釋出服務,隊列沒有聲明,消費者去監聽隊列..會報錯


//            建立預設消費方法
            DefaultConsumer consumer = new DefaultConsumer(channel) {

//                重寫監聽方法


                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {


                    System.out.println("receive message.."+new String(body,"utf-8"));
                }
            };


//            監聽隊列
            /**
             * String queue, boolean autoAck, Consumer callback
             *
             * param1 :  隊列名稱
             * param2 :   是否自動回複,接收到消息會自動恢複mq收到了,mq會删除消息,如果拒絕的話需要手動回複,不回複的話會導緻mq不删除被消費過的消息,一直存在
             * param3 : 消費對象,,包含消費方法
             *
             */
            channel.basicConsume(QUEUE,true , consumer);
        }catch (Exception e){

        } 
    }
}
           

以上的話就是rabbitmq提供的案例  

 一台生産者 , 一台消費者 

emmm,,,

這裡講的工作的模式是 workqueues 

WorkQueues

rabbitmq 工作模式 一

對比helloword案例,這裡多了個消費者..

應用場景:對于 任務過重或任務較多情況使用工作隊列可以提高任務處理的速度。

測試 

     我們啟動倆次消費者 

     然後用剛剛寫的生産者,發送五條資訊 

rabbitmq 工作模式 一

然後我們看消費者列印的資訊

rabbitmq 工作模式 一
rabbitmq 工作模式 一

結果 :

          mq workqueues 使用的是輪詢方式講資訊平均發給消費者 ,

         消費者會在處理完消息後 接收下一條消息 

 2.Publish/subscribe 釋出訂閱模式

rabbitmq 工作模式 一

特點 

        生産者将消息發送給broker.由交換機将消息發給每個跟綁定了交換機綁定的消息隊列,每個隊列都能收到生産者發送的每一條消息

生産者 :

 聲明Exchange_fanout_inform交換機。

聲明兩個隊列并且綁定到此交換機,

綁定時不需要指定routingkey發送消息時不需要指定routingkey

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

public class Producer02 {

    //消息隊列名稱
    public static final String QUEUE_INFORM_Test1 = "queue_inform_1";
    public static final String QUEUE_INFORM_Test2 = "queue_inform_2";

    //交換機名稱
    public static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";


    public static void main(String[] args){

        Connection connection = null;
        Channel channel = null;


        try {

            ConnectionFactory factory = new ConnectionFactory();

            //設定ip
            factory.setHost("localhost");

            //設定端口
            factory.setPort(5672);

            //設定賬号密碼
            factory.setUsername("guest");  //預設賬号密碼都是guest
            factory.setPassword("guest");

            //設定虛拟空間
            factory.setVirtualHost("/");//虛拟機預設的虛拟名稱為/ , 虛拟機相當于一個獨立的伺服器

            //建立與RabbitMQ服務的TCP連接配接
            connection = factory.newConnection();

            //建立連接配接通道 ,每個連接配接可以建立多個通道,每個通道隻有一個會話
            channel = connection.createChannel();


            //聲明交換機

            /*
                String exchange, String type
                param1 :  交換機
                param2 : 交換機 類型   fanout 、 topic、direct、headers

                FANOUT 對應的模式是 釋出訂閱模式  publish/subscribe 模式

                其他的工作模式以後會将
                DIRECT  對應的是路由的工作模式
                TOPIC  對應的是通配符工作模式
                HEADERS   對應了 headers 的工作模式
             */
            channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM,BuiltinExchangeType.FANOUT);


            //聲明隊列

            /**
             * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *
             * param1: 隊列名稱
             * param2: 是否持久化
             * param3 : 是否獨占此隊列
             * param4 : 隊列不用是否自動删除
             *  param5 : 參數
             */
            channel.queueDeclare(QUEUE_INFORM_Test1,true ,false ,false ,null );
            channel.queueDeclare(QUEUE_INFORM_Test2,true ,false ,false ,null );


            //交換機和隊列綁定
            /**
             * String queue, String exchange, String routingKey
             * param1 :  隊列名稱
             * exchange :  交換機
             * routingKey : 路由key  後面講,先 用 ""代替
             */
            channel.queueBind(QUEUE_INFORM_Test1,EXCHANGE_FANOUT_INFORM ,"" );
            channel.queueBind(QUEUE_INFORM_Test2,EXCHANGE_FANOUT_INFORM ,"" );


//            發送消息
            String message = "";
            for (int i = 0; i < 9; i++) {
                message = "故事的開頭總是這樣,适逢其會,猝不及防。故事的結局總是這樣,花開兩朵,天各一方。"+ i;


                /**
                 * String exchange, String routingKey, BasicProperties props, byte[] body
                 *
                 *   param1  交換機名稱
                 *   param2 路由key,後面講,先用 "" 代替 ,
                 *   param3  參數
                 *   param4 傳遞的字元串
                 *
                 *
                 */
                channel.basicPublish(EXCHANGE_FANOUT_INFORM,"" , null, message.getBytes());

                System.out.println("Send Message is:'" + message + "'");
            }
        }catch (Exception e){

        }finally{
            if(channel!=null){
                try {
                    channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                } 
            }
            if(connection!=null){
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }







        }
}
           

下面是我寫的倆個消費者

import com.rabbitmq.client.*;

import java.io.IOException;

public class ConsumerTest1 {

    //隊列名稱
    private static final String QUEUE_INFORM_Test1 = "queue_inform_1";
    //交換機名稱
    public static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";

    public static void main(String[] args){

        Connection connection = null;
        Channel channel = null;

        try {



            ConnectionFactory factory = new ConnectionFactory();

            //設定ip
            factory.setHost("localhost");

            //設定端口
            factory.setPort(5672);

            //設定賬号密碼
            factory.setUsername("guest");  //預設賬号密碼都是guest
            factory.setPassword("guest");

            //設定虛拟空間
            factory.setVirtualHost("/");//虛拟機預設的虛拟名稱為/ , 虛拟機相當于一個獨立的伺服器

            //建立與RabbitMQ服務的TCP連接配接
            connection = factory.newConnection();

            //建立連接配接通道 ,每個連接配接可以建立多個通道,每個通道隻有一個會話
            channel = connection.createChannel();

            //這有五個參數
            /***
             * 聲明隊列,如果Rabbit中沒有此隊列将自動建立*
             * param1:隊列名稱*
             * param2:是否持久化*  rabbit 關閉了該隊列是否存在..
             * param3:隊列是否獨占此連接配接*    如果參數是true,那麼一個連接配接connection 隻能存在這一個channel,除非關閉程式
             * param4:隊列不再使用時是否自動删除此隊列*    該隊列不使用了就會删除該隊列
             *
             * param5:隊列參數*/
            channel.queueDeclare(QUEUE_INFORM_Test1, true, false, false,null );  //這裡其實可以不用聲明隊列的,因為 生産者已經聲明過了,但是如果生産者後釋出服務,隊列沒有聲明,消費者去監聽隊列..會報錯




            //聲明交換機

            /*
                String exchange, String type
                param1 :  交換機
                param2 : 交換機 類型   fanout 、 topic、direct、headers

                FANOUT 對應的模式是 釋出訂閱模式  publish/subscribe 模式

                其他的工作模式以後會将
                DIRECT  對應的是路由的工作模式
                TOPIC  對應的是通配符工作模式
                HEADERS   對應了 headers 的工作模式
             */
            channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM,BuiltinExchangeType.FANOUT);

            //交換機和隊列綁定
            /**
             * String queue, String exchange, String routingKey
             * param1 :  隊列名稱
             * exchange :  交換機
             * routingKey : 路由key  後面講,先 用 ""代替
             */
            channel.queueBind(QUEUE_INFORM_Test1,EXCHANGE_FANOUT_INFORM ,"" );




//            建立預設消費方法
            DefaultConsumer consumer = new DefaultConsumer(channel) {

//                重寫監聽方法


                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {


                    System.out.println("receive message.."+new String(body,"utf-8"));
                }
            };


//            監聽隊列
            /**
             * String queue, boolean autoAck, Consumer callback
             *
             * param1 :  隊列名稱
             * param2 :   是否自動回複,接收到消息會自動恢複mq收到了,mq會删除消息,如果拒絕的話需要手動回複,不回複的話會導緻mq不删除被消費過的消息,一直存在
             * param3 : 消費對象,,包含消費方法
             *
             */
            channel.basicConsume(QUEUE_INFORM_Test1,true , consumer);
        }catch (Exception e){

        }
    }
}
           

消費者二

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;

import com.rabbitmq.client.*;

import java.io.IOException;

public class ConsumerTest2 {

    //隊列名稱

    public static final String QUEUE_INFORM_Test2 = "queue_inform_2";

    //交換機名稱
    public static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";

    public static void main(String[] args){

        Connection connection = null;
        Channel channel = null;

        try {



            ConnectionFactory factory = new ConnectionFactory();

            //設定ip
            factory.setHost("localhost");

            //設定端口
            factory.setPort(5672);

            //設定賬号密碼
            factory.setUsername("guest");  //預設賬号密碼都是guest
            factory.setPassword("guest");

            //設定虛拟空間
            factory.setVirtualHost("/");//虛拟機預設的虛拟名稱為/ , 虛拟機相當于一個獨立的伺服器

            //建立與RabbitMQ服務的TCP連接配接
            connection = factory.newConnection();

            //建立連接配接通道 ,每個連接配接可以建立多個通道,每個通道隻有一個會話
            channel = connection.createChannel();

            //這有五個參數
            /***
             * 聲明隊列,如果Rabbit中沒有此隊列将自動建立*
             * param1:隊列名稱*
             * param2:是否持久化*  rabbit 關閉了該隊列是否存在..
             * param3:隊列是否獨占此連接配接*    如果參數是true,那麼一個連接配接connection 隻能存在這一個channel,除非關閉程式
             * param4:隊列不再使用時是否自動删除此隊列*    該隊列不使用了就會删除該隊列
             *
             * param5:隊列參數*/
            channel.queueDeclare(QUEUE_INFORM_Test2, true, false, false,null );  //這裡其實可以不用聲明隊列的,因為 生産者已經聲明過了,但是如果生産者後釋出服務,隊列沒有聲明,消費者去監聽隊列..會報錯




            //聲明交換機

            /*
                String exchange, String type
                param1 :  交換機
                param2 : 交換機 類型   fanout 、 topic、direct、headers

                FANOUT 對應的模式是 釋出訂閱模式  publish/subscribe 模式

                其他的工作模式以後會将
                DIRECT  對應的是路由的工作模式
                TOPIC  對應的是通配符工作模式
                HEADERS   對應了 headers 的工作模式
             */
            channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM,BuiltinExchangeType.FANOUT);





            //交換機和隊列綁定
            /**
             * String queue, String exchange, String routingKey
             * param1 :  隊列名稱
             * exchange :  交換機
             * routingKey : 路由key  後面講,先 用 ""代替
             */
            channel.queueBind(QUEUE_INFORM_Test2,EXCHANGE_FANOUT_INFORM ,"" );




//            建立預設消費方法
            DefaultConsumer consumer = new DefaultConsumer(channel) {

//                重寫監聽方法


                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {


                    System.out.println("receive message.."+new String(body,"utf-8"));
                }
            };


//            監聽隊列
            /**
             * String queue, boolean autoAck, Consumer callback
             *
             * param1 :  隊列名稱
             * param2 :   是否自動回複,接收到消息會自動恢複mq收到了,mq會删除消息,如果拒絕的話需要手動回複,不回複的話會導緻mq不删除被消費過的消息,一直存在
             * param3 : 消費對象,,包含消費方法
             *
             */
            channel.basicConsume(QUEUE_INFORM_Test2,true , consumer);
        }catch (Exception e){

        }
    }
}
           

細心的同學可能發現了,,生産者跟消費者的不同代碼其實就是發送消息,接收消息的方法而已 ....

生産者聲明隊列跟交換機,消費者也聲明各自的隊列跟交換機...  其實就是為了怕先啟動消費者沒有發現隊列跟交換機報錯而已...

核心代碼的話  其實  就是 生成交換機,生成隊列 綁定交換機跟隊列  

測試的話

我們先啟動倆個消費者

rabbitmq 工作模式 一

然後我們啟動生産者

rabbitmq 工作模式 一
rabbitmq 工作模式 一

我們看列印結果,,倆台消費者各自都處理了9條資訊 

其實這種方法比WORKQUEUES 工作模式強,因為多台機器可以監聽一個隊列,也就是下圖所示,我們可以要倆個隊列,當然也可以建立一個隊列...建立多少隊列跟一個隊列多少消費者完全取決與我們

rabbitmq 工作模式 一

因為時間問題,,,,還有4種工作模式下次寫- -