RabbitMQ的安裝可以看看之前的文章:Windows安裝RabbitMQ詳細教程 接下來我們通過程式向RabbitMQ 發送消息,通過消費端消費消息
建立一個maven工程,添加amqp-client依賴
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.1</version>
</dependency>
打開背景建立一個vhost
建立一個使用者
設定使用者有操作vhost的權限
結果如圖
寫一個連接配接工具類:
package com.test.util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil {
/**
* 建立與RabbitMQ的連接配接
* @return
* @throws Exception
*/
public static Connection getConnection() throws Exception {
//定義連接配接工廠
ConnectionFactory factory = new ConnectionFactory();
//設定服務位址
factory.setHost("192.168.13.25");
//端口
factory.setPort(5672);
//設定賬号資訊,使用者名、密碼、vhost
factory.setVirtualHost("kavito");//設定虛拟機,一個mq服務可以設定多個虛拟機,每個虛拟機就相當于一個獨立的mq
factory.setUsername("test");
factory.setPassword("123456");
// 通過工廠擷取連接配接
Connection connection = factory.newConnection();
return connection;
}
}
生産者發送消息:
package com.test.util;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Send {
private final static String QUEUE_NAME = "simple_queue";
public static void main(String[] argv) throws Exception {
// 1、擷取到連接配接
Connection connection = ConnectionUtil.getConnection();
// 2、從連接配接中建立通道,使用通道才能完成消息相關的操作
Channel channel = connection.createChannel();
// 3、聲明(建立)隊列
//參數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
/**
* 參數明細
* 1、queue 隊列名稱
* 2、durable 是否持久化,如果持久化,mq重新開機後隊列還在
* 3、exclusive 是否獨占連接配接,隊列隻允許在該連接配接中通路,如果connection連接配接關閉隊列則自動删除,如果将此參數設定true可用于臨時隊列的建立
* 4、autoDelete 自動删除,隊列不再使用時是否自動删除此隊列,如果将此參數和exclusive參數設定為true就可以實作臨時隊列(隊列不用了就自動删除)
* 5、arguments 參數,可以設定一個隊列的擴充參數,比如:可設定存活時間
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 4、消息内容
String message = "Hello World!";
// 向指定的隊列中發送消息
//參數:String exchange, String routingKey, BasicProperties props, byte[] body
/**
* 參數明細:
* 1、exchange,交換機,如果不指定将使用mq的預設交換機(設定為"")
* 2、routingKey,路由key,交換機根據路由key來将消息轉發到指定的隊列,如果使用預設交換機,routingKey設定為隊列的名稱
* 3、props,消息的屬性
* 4、body,消息内容
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
//關閉通道和連接配接(資源關閉最好用try-catch-finally語句處理)
channel.close();
connection.close();
}
}
消費者接收消息:
package com.test.util;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Recv {
private final static String QUEUE_NAME = "simple_queue";
public static void main(String[] argv) throws Exception {
// 擷取到連接配接
Connection connection = ConnectionUtil.getConnection();
//建立會話通道,生産者和mq服務所有通信都在channel通道中完成
Channel channel = connection.createChannel();
// 聲明隊列
//參數:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
/**
* 參數明細
* 1、queue 隊列名稱
* 2、durable 是否持久化,如果持久化,mq重新開機後隊列還在
* 3、exclusive 是否獨占連接配接,隊列隻允許在該連接配接中通路,如果connection連接配接關閉隊列則自動删除,如果将此參數設定true可用于臨時隊列的建立
* 4、autoDelete 自動删除,隊列不再使用時是否自動删除此隊列,如果将此參數和exclusive參數設定為true就可以實作臨時隊列(隊列不用了就自動删除)
* 5、arguments 參數,可以設定一個隊列的擴充參數,比如:可設定存活時間
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//實作消費方法
DefaultConsumer consumer = new DefaultConsumer(channel){
// 擷取消息,并且處理,這個方法類似事件監聽,如果有消息的時候,會被自動調用
/**
* 當接收到消息後此方法将被調用
* @param consumerTag 消費者标簽,用來辨別消費者的,在監聽隊列時設定channel.basicConsume
* @param envelope 信封,通過envelope
* @param properties 消息屬性
* @param body 消息内容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交換機
String exchange = envelope.getExchange();
//消息id,mq在channel中用來辨別消息的id,可用于确認消息已接收
long deliveryTag = envelope.getDeliveryTag();
// body 即消息體
String msg = new String(body,"utf-8");
System.out.println(" [x] received : " + msg + "!");
}
};
// 監聽隊列,第二個參數:是否自動進行消息确認。
//參數:String queue, boolean autoAck, Consumer callback
/**
* 參數明細:
* 1、queue 隊列名稱
* 2、autoAck 自動回複,當消費者接收到消息後要告訴mq消息已接收,如果将此參數設定為tru表示會自動回複mq,如果設定為false要通過程式設計實作回複
* 3、callback,消費方法,當消費者接收到消息要執行的方法
*/
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
先運作消費端,消費端啟動并阻塞
再執行生産端發送消息