天天看點

RabbitMQ 通過生産端發送消息,消費端接收消息

RabbitMQ的安裝可以看看之前的文章:Windows安裝RabbitMQ詳細教程​ 接下來我們通過程式向RabbitMQ 發送消息,通過消費端消費消息

建立一個maven工程,添加amqp-client依賴

<dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.7.1</version>
</dependency>      

打開背景建立一個vhost

RabbitMQ 通過生産端發送消息,消費端接收消息

建立一個使用者

RabbitMQ 通過生産端發送消息,消費端接收消息

設定使用者有操作vhost的權限

RabbitMQ 通過生産端發送消息,消費端接收消息

結果如圖

RabbitMQ 通過生産端發送消息,消費端接收消息

寫一個連接配接工具類:

package com.test.util;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ConnectionUtil {
    /**
     * 建立與RabbitMQ的連接配接
     * @return
     * @throws Exception
     */
    public static Connection getConnection() throws Exception {
        //定義連接配接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設定服務位址
        factory.setHost("192.168.13.25");
        //端口
        factory.setPort(5672);
        //設定賬号資訊,使用者名、密碼、vhost
        factory.setVirtualHost("kavito");//設定虛拟機,一個mq服務可以設定多個虛拟機,每個虛拟機就相當于一個獨立的mq
        factory.setUsername("test");
        factory.setPassword("123456");
        // 通過工廠擷取連接配接
        Connection connection = factory.newConnection();
        return connection;
    }


}      

生産者發送消息:

package com.test.util;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {

    private final static String QUEUE_NAME = "simple_queue";

    public static void main(String[] argv) throws Exception {
        // 1、擷取到連接配接
        Connection connection = ConnectionUtil.getConnection();
        // 2、從連接配接中建立通道,使用通道才能完成消息相關的操作
        Channel channel = connection.createChannel();
        // 3、聲明(建立)隊列
        //參數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        /**
         * 參數明細
         * 1、queue 隊列名稱
         * 2、durable 是否持久化,如果持久化,mq重新開機後隊列還在
         * 3、exclusive 是否獨占連接配接,隊列隻允許在該連接配接中通路,如果connection連接配接關閉隊列則自動删除,如果将此參數設定true可用于臨時隊列的建立
         * 4、autoDelete 自動删除,隊列不再使用時是否自動删除此隊列,如果将此參數和exclusive參數設定為true就可以實作臨時隊列(隊列不用了就自動删除)
         * 5、arguments 參數,可以設定一個隊列的擴充參數,比如:可設定存活時間
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 4、消息内容
        String message = "Hello World!";
        // 向指定的隊列中發送消息
        //參數:String exchange, String routingKey, BasicProperties props, byte[] body
        /**
         * 參數明細:
         * 1、exchange,交換機,如果不指定将使用mq的預設交換機(設定為"")
         * 2、routingKey,路由key,交換機根據路由key來将消息轉發到指定的隊列,如果使用預設交換機,routingKey設定為隊列的名稱
         * 3、props,消息的屬性
         * 4、body,消息内容
         */
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        //關閉通道和連接配接(資源關閉最好用try-catch-finally語句處理)
        channel.close();
        connection.close();
    }
}      

消費者接收消息:

package com.test.util;

import com.rabbitmq.client.*;

import java.io.IOException;

public class Recv {
    private final static String QUEUE_NAME = "simple_queue";

    public static void main(String[] argv) throws Exception {
        // 擷取到連接配接
        Connection connection = ConnectionUtil.getConnection();
        //建立會話通道,生産者和mq服務所有通信都在channel通道中完成
        Channel channel = connection.createChannel();
        // 聲明隊列
        //參數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        /**
         * 參數明細
         * 1、queue 隊列名稱
         * 2、durable 是否持久化,如果持久化,mq重新開機後隊列還在
         * 3、exclusive 是否獨占連接配接,隊列隻允許在該連接配接中通路,如果connection連接配接關閉隊列則自動删除,如果将此參數設定true可用于臨時隊列的建立
         * 4、autoDelete 自動删除,隊列不再使用時是否自動删除此隊列,如果将此參數和exclusive參數設定為true就可以實作臨時隊列(隊列不用了就自動删除)
         * 5、arguments 參數,可以設定一個隊列的擴充參數,比如:可設定存活時間
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //實作消費方法
        DefaultConsumer consumer = new DefaultConsumer(channel){
            // 擷取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用
            /**
             * 當接收到消息後此方法将被調用
             * @param consumerTag  消費者标簽,用來辨別消費者的,在監聽隊列時設定channel.basicConsume
             * @param envelope 信封,通過envelope
             * @param properties 消息屬性
             * @param body 消息内容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //交換機
                String exchange = envelope.getExchange();
                //消息id,mq在channel中用來辨別消息的id,可用于确認消息已接收
                long deliveryTag = envelope.getDeliveryTag();
                // body 即消息體
                String msg = new String(body,"utf-8");
                System.out.println(" [x] received : " + msg + "!");
            }
        };

        // 監聽隊列,第二個參數:是否自動進行消息确認。
        //參數:String queue, boolean autoAck, Consumer callback
        /**
         * 參數明細:
         * 1、queue 隊列名稱
         * 2、autoAck 自動回複,當消費者接收到消息後要告訴mq消息已接收,如果将此參數設定為tru表示會自動回複mq,如果設定為false要通過程式設計實作回複
         * 3、callback,消費方法,當消費者接收到消息要執行的方法
         */
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}      

先運作消費端,消費端啟動并阻塞

RabbitMQ 通過生産端發送消息,消費端接收消息

再執行生産端發送消息

RabbitMQ 通過生産端發送消息,消費端接收消息