目錄
- 一、什麼是 RabbitMQ
- 二、RabbitMQ 相關概念
- 1、RabbitMQ 核心概念
- 2、RabbitMQ 運作流程
- 3、RabbitMQ 支援消息的模式
- 4、RabbitMQ 使用場景
- 三、以代碼入門
- 1、簡單模式
- 2、釋出訂閱模式
- 3、路由模式
- 4、主題Topic模式
- 5、工作模式
- 輪詢模式
- 公平分發模式
- SpringBoot 中使用 RabbitMQ
- 1、釋出訂閱模式
- 2、路由模式
- 3、主題模式
- 4、過期隊列 TTL
- 5、死信隊列 DLX
- 6、延時隊列
- 7、消息确認機制的配置
- 8、消息重發次數與手動應答
RabbitMQ是一個開源的遵循AMQP協定實作的基于Erlang語言編寫,支援多種用戶端(語言)。用于在分布式系統中存儲消息,轉發消息,具有高可用,高可擴性,易用性等特征的中間件。

- Server:又稱Broker ,接受用戶端的連接配接,實作AMQP實體服務。 安裝rabbitmq-server
- Connection:連接配接,應用程式與Broker的網絡連接配接 TCP/IP/ 三次握手和四次揮手
- Channel:網絡信道,幾乎所有的操作都在Channel中進行,Channel是進行消息讀寫的通道,用戶端可以建立對各Channel,每個Channel代表一個會話任務。
- Message :消息,服務與應用程式之間傳送的資料,由Properties和body組成,Properties可是對消息進行修飾,比如消息的優先級,延遲等進階特性,Body則就是消息體的内容。
- Virtual Host 虛拟位址,用于進行邏輯隔離,最上層的消息路由,一個虛拟主機裡可以有若幹個Exhange和- Queueu,同一個虛拟主機裡面不能有相同名字的Exchange
- Exchange:交換機,接受消息,根據路由鍵發送消息到綁定的隊列。(不具備消息存儲的能力)
- Bindings:Exchange和Queue之間的虛拟連接配接,binding中可以儲存多個routing key.
- Routing key:是一個路由規則,虛拟機可以用它來确定如何路由一個特定消息。
- Queue:隊列:也成為Message Queue,消息隊列,儲存消息并将它們轉發給消費者。
RabbitMQ 的管理界面中就可以看到這些相關的概念:
參考官網:https://www.rabbitmq.com/getstarted.html
- 簡單模式 Simple:不指定交換機,會使用預設交換機
RabbitMQ -
工作模式 Work
類型:無 ;特點:分發機制
-
釋出訂閱模式
類型:fanout;特點:Fanout—釋出與訂閱模式,是一種廣播機制,它是沒有路由key的模式。
-
路由模式
類型:direct;特點:有routing-key的比對模式
-
主題Topic模式
類型:topic;特點:模糊的routing-key的比對模式
-
參數模式
類型:headers;特點:參數比對模式
解耦:RabbitMQ可以實作不同應用之間的解耦,應用之間不直接進行通信,而是通過MQ來建立橋接。
削峰:以秒殺場景為例,會瞬時産生大量的請求,應用本身一時無法處理得完,是以可以先将請求都放入消息隊列,引用再去慢慢處理。
異步:将比較耗時而且不需要即時(同步)傳回結果的操作,作為消息放入消息隊列,以此減少請求響應時間,提高系統性能。
先建立一個maven工程,pom 引入RabbitMQ:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
使用MQ一般主要就是兩個角色,生産者(producer)和消費者(consumer),RabbitMQ建立生産者和消費者步驟類似,主要分為一下幾步:
- 1、建立連接配接工廠
- 2、建立連接配接Connection
- 3、通過連結擷取通道channel
- 4、通過通道,可以建立交換機、聲明隊列、綁定關系、路由key、發送消息、接收消息
- 5、主備消息内容
- 6、發送消息給隊列queue
- 7、關閉連接配接
- 8、關閉通道
下面講解下各個模式下得代碼實作。
不聲明交換機(exchange),使用預設交換機
生産者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
public class Producer {
public static void main(String[] args) {
// 1、建立連接配接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.136");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 2、建立連接配接 Connection
connection = factory.newConnection("生産者");
// 3、通過連結擷取通道 Channel
channel = connection.createChannel();
// 4、通過通道,可以建立交換機、聲明隊列、綁定關系、路由key、發送消息、接收消息
// 這裡我們聲明一個隊列
String queueName = "queue-jinsh";
/*
@params1:隊列名稱,
@params2:是否持久化,就是消息是否存盤。其實非持久化也會存盤,但會伴随服務重新開機丢失
@params3:排他性,是否是一個獨占隊列
@params4:是否自動删除,随着最後一個消費者消費完畢消息後是否把隊列删除
@params5:攜帶一些附加參數
*/
channel.queueDeclare(queueName, false, false, false, null);
// 5、準備消息内容
String message = "信其雌蛙一次莫黑多刺";
// 6、發送消息給隊列queue
/*
@params1:交換機,這裡沒有指定,會使用預設的交換機
@params2:隊列、路由key
@params3:消息的狀态控制
@params4:消息主題
*/
channel.basicPublish("", queueName, null, message.getBytes());
} catch (Exception e) {
e.printStackTrace();
} finally {
// 7、關閉通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 8、關閉連接配接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
消費者
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {
/**
* 1、建立連接配接工廠
* 2、建立連接配接Connection
* 3、通過連結擷取通道channel
* 4、通過通道,可以建立交換機、聲明隊列、綁定關系、路由key、發送消息、接收消息
* 5、主備消息内容
* 6、發送消息給隊列queue
* 7、關閉連接配接
* 8、關閉通道
*/
public static void main(String[] args) {
// 1、建立連接配接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.136");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 2、建立連接配接 Connection
connection = factory.newConnection("消費者");
// 3、通過連結擷取通道 Channel
channel = connection.createChannel();
// 4、通過通道,可以建立交換機、聲明隊列、綁定關系、路由key、發送消息、接收消息
// 消費消息
String queueName = "queue1";
channel.basicConsume(queueName, true, new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("收到消息是:" + new String(message.getBody(), "utf-8"));
}
}, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
System.out.println("接收失敗");
}
});
} catch (Exception e) {
e.printStackTrace();
} finally {
// 7、關閉通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
// 8、關閉連接配接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
釋出訂閱模式在聲明交換機(exchange)時需要指定交換機類型(type)為fanout,當生産者釋出消息,綁定(binding)得所有消費者都将收到消息。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.136");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection("生産者");
channel = connection.createChannel();
channel.queueDeclare("queue4", false, false, false, null);
channel.queueDeclare("queue5", false, false, false, null);
String message = "信其雌蛙一次莫黑多刺";
// 準備交換機
String exchangeName = "fanout-exchange";
// 定義路由key
String routeKey = "";
// 交換機類型
String type = "fanout";
channel.exchangeDeclare(exchangeName, type, false, false, false, null);
channel.queueBind("queue4", exchangeName, "");
channel.queueBind("queue5", exchangeName, "");
channel.basicPublish(exchangeName, routeKey, null, message.getBytes());
} catch (Exception e) {
e.printStackTrace();
} finally {
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer{
private static Runnable runnable = () -> {
// 1、建立連接配接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.136");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 2、建立連接配接 Connection
connection = factory.newConnection("消費者");
// 3、通過連結擷取通道 Channel
channel = connection.createChannel();
// 4、通過通道,可以建立交換機、聲明隊列、綁定關系、路由key、發送消息、接收消息
// 消費消息
String queueName = Thread.currentThread().getName();
channel.basicConsume(queueName, true, new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println(queueName + ":收到消息是:" + new String(message.getBody(), "utf-8"));
}
}, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
System.out.println("接收失敗");
}
});
} catch (Exception e) {
e.printStackTrace();
} finally {
// 7、關閉通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
// 8、關閉連接配接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
};
public static void main(String[] args) {
new Thread(runnable, "queue4").start();
new Thread(runnable, "queue5").start();
}
}
路由模式在聲明交換機(exchange)時需要指定交換機類型(type)為direct,每個消費者和生産者綁定時需要指定一個路由key(routing-key),當生産者釋出消息時也需要指定路由key,隻有與釋出時的路由key相同的綁定時的路由key對應的消費者才能消費到消息。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.136");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection("生産者");
channel = connection.createChannel();
// 申明queue
channel.queueDeclare("queue2", false, false, false, null);
channel.queueDeclare("queue3", false, false, false, null);
String message = "信其雌蛙一次莫黑多刺";
// 準備交換機
String exchangeName = "direct-exchange";
// 定義路由key
String routeKey = "email";
// 交換機類型
String type = "direct";
// 申明exchange
channel.exchangeDeclare(exchangeName, type, false, false, false, null);
channel.queueBind("queue2", exchangeName, "email");
channel.queueBind("queue3", exchangeName, "sms");
channel.basicPublish(exchangeName, routeKey, null, message.getBytes());
} catch (Exception e) {
e.printStackTrace();
} finally {
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer{
private static Runnable runnable = () -> {
// 1、建立連接配接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.136");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 2、建立連接配接 Connection
connection = factory.newConnection("消費者");
// 3、通過連結擷取通道 Channel
channel = connection.createChannel();
// 4、通過通道,可以建立交換機、聲明隊列、綁定關系、路由key、發送消息、接收消息
// 消費消息
String queueName = Thread.currentThread().getName();
channel.basicConsume(queueName, true, new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println(queueName + ":收到消息是:" + new String(message.getBody(), "utf-8"));
}
}, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
System.out.println("接收失敗");
}
});
} catch (Exception e) {
e.printStackTrace();
} finally {
// 7、關閉通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
// 8、關閉連接配接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
};
public static void main(String[] args) {
new Thread(runnable, "queue2").start();
new Thread(runnable, "queue3").start();
}
}
路由模式在聲明交換機(exchange)時需要指定交換機類型(type)為topic,與路由模式相同,也需要routing-key的比對,隻不過主題模式的routing-key是模糊比對,比對規則如下:
-
:必須比對一個單詞*
-
:比對0個或1個或多個單詞#
例如生産者和消費者綁定時的routing-key="#.aaa.*" ,那麼生産者發送消息時的routing-key的值為"dd.aaa.b"可以比對上,或"dd.cc.aaa.b"也可以比對上,但"dd.aaa.b.cc"是比對不上的。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.136");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection("生産者");
channel = connection.createChannel();
channel.queueDeclare("queue6", false, false, false, null);
channel.queueDeclare("queue7", false, false, false, null);
channel.queueDeclare("queue8", false, false, false, null);
String message = "信其雌蛙一次莫黑多刺";
// 準備交換機
String exchangeName = "topic-exchange";
// 定義路由key
String routeKey = "com.jinsh.user";
// 交換機類型
String type = "topic";
channel.exchangeDeclare(exchangeName, type, false, false, false, null);
channel.queueBind("queue6", exchangeName, "#.jinsh.#");
channel.queueBind("queue7", exchangeName, "com.*");
channel.queueBind("queue8", exchangeName, "com.#");
channel.basicPublish(exchangeName, routeKey, null, message.getBytes());
} catch (Exception e) {
e.printStackTrace();
} finally {
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer{
private static Runnable runnable = () -> {
// 1、建立連接配接工程
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.136");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 2、建立連接配接 Connection
connection = factory.newConnection("消費者");
// 3、通過連結擷取通道 Channel
channel = connection.createChannel();
// 4、通過通道,可以建立交換機、聲明隊列、綁定關系、路由key、發送消息、接收消息
// 消費消息
String queueName = Thread.currentThread().getName();
channel.basicConsume(queueName, true, new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println(queueName + ":收到消息是:" + new String(message.getBody(), "utf-8"));
}
}, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
System.out.println("接收失敗");
}
});
} catch (Exception e) {
e.printStackTrace();
} finally {
// 7、關閉通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
// 8、關閉連接配接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
};
public static void main(String[] args) {
new Thread(runnable, "queue6").start();
new Thread(runnable, "queue7").start();
new Thread(runnable, "queue8").start();
}
}
當有多個消費者時,我們的消息會被哪個消費者消費呢,我們又該如何均衡消費者消費資訊的多少呢?
主要有兩種模式:
1、輪詢模式的分發:一個消費者一條,按均配置設定;
2、公平分發:根據消費者的消費能力進行公平分發,處理快的處理的多,處理慢的處理的少;按勞配置設定;
public class Producer {
public static void main(String[] args) {
// 1: 建立連接配接工廠
ConnectionFactory factory = new ConnectionFactory();
// 2: 設定連接配接屬性
factory.setHost("192.168.10.136");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 3: 從連接配接工廠中擷取連接配接
connection = factory.newConnection("生産者");
// 4: 從連接配接中擷取通道channel
channel = connection.createChannel();
// 6: 準備發送消息的内容
for (int i = 1; i <= 20; i++) {
//消息的内容
String msg = "學相伴:" + i;
// 7: 發送消息給中間件rabbitmq-server
// @params1: 交換機exchange
// @params2: 隊列名稱/routingkey
// @params3: 屬性配置
// @params4: 發送消息的内容
channel.basicPublish("", "queue1", null, msg.getBytes());
}
System.out.println("消息發送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("發送消息出現異常...");
} finally {
// 7: 釋放連接配接關閉通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
消費者1
public class Work1 {
public static void main(String[] args) {
// 1: 建立連接配接工廠
ConnectionFactory factory = new ConnectionFactory();
// 2: 設定連接配接屬性
factory.setHost("192.168.10.136");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 3: 從連接配接工廠中擷取連接配接
connection = factory.newConnection("消費者-Work1");
// 4: 從連接配接中擷取通道channel
channel = connection.createChannel();
// 5: 申明隊列queue存儲消息
// 這裡如果queue已經被建立過一次了,可以不需要定義
// channel.queueDeclare("queue1", false, false, false, null);
// 同一時刻,伺服器隻會推送一條消息給消費者
// 6: 定義接受消息的回調
Channel finalChannel = channel;
finalChannel.basicConsume("queue1", true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
try{
System.out.println("Work1-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
Thread.sleep(2000);
}catch(Exception ex){
ex.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
System.out.println("Work1-開始接受消息");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("發送消息出現異常...");
} finally {
// 7: 釋放連接配接關閉通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
消費者2
public class Work2 {
public static void main(String[] args) {
// 1: 建立連接配接工廠
ConnectionFactory factory = new ConnectionFactory();
// 2: 設定連接配接屬性
factory.setHost("192.168.10.136");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
// 3: 從連接配接工廠中擷取連接配接
connection = factory.newConnection("消費者-Work2");
// 4: 從連接配接中擷取通道channel
channel = connection.createChannel();
// 5: 申明隊列queue存儲消息
// 這裡如果queue已經被建立過一次了,可以不需要定義
//channel.queueDeclare("queue1", false, true, false, null);
// 同一時刻,伺服器隻會推送一條消息給消費者
//channel.basicQos(1);
// 6: 定義接受消息的回調
Channel finalChannel = channel;
finalChannel.basicConsume("queue1", true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
try{
System.out.println("Work2-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
Thread.sleep(200);
}catch(Exception ex){
ex.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
System.out.println("Work2-開始接受消息");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("發送消息出現異常...");
} finally {
// 7: 釋放連接配接關閉通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
公平分發模式生産者和消費者代碼與輪詢模式一樣,隻是消費者的代碼略有修改:
- 新增
,表示每次消息的消費數量,即一次消費1個消息finalChannel.basicQos(1);
- basicConsume方法autoAck參數改為false,不自定應答
首先pom檔案引入
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
其次配置檔案配置連接配接
spring:
rabbitmq:
host: 192.168.10.136
port: 5672
username: guest
password: guest
virtual-host: /
聲明交換機、聲明隊列、交換機綁定隊列
這個類是即可以放在生産者項目這邊,也可以放在消費者項目那邊,一般選擇放消費者項目那邊寫。
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class rabbitMqConfiguration {
// 1、聲明交換機類型
@Bean
public Exchange exchange() {
return new Exchange("order_exchange", true, false);
}
// 2、聲明隊列
@Bean
public Queue emailQueue() {
return new Queue("email.queue", true);
}
// 3、綁定交換機和隊列
@Bean
public Binding binding() {
return BindingBuilder.bind(emailQueue()).to(exchange());
}
}
@Service
public class ProduceService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage() {
// 參數1:交換機,參數2:路由key/queue隊列名,參數3:消息内容
rabbitTemplate.convertAndSend("order_exchange", "", orderId, "hahahahahahahaha");
}
}
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@RabbitListener(queues = {"email.queue"})
@Component
public class SmsDirectConsumer {
@RabbitHandler
public void receiveMessage(String message) {
System.out.println("收到消息:" + message);
}
}
對于上面“聲明交換機、聲明隊列、交換機綁定隊列”這一步,也可以通過注解的方式聲明,寫在消費類裡:
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "email.topic.queue", durable = "true", autoDelete = "false"),
exchange = @Exchange(value = "topic_order_exchange", type = ExchangeTypes.TOPIC),
key = "*.email.#"
))
@Component
public class EmailTopicConsumer {
@RabbitHandler
public void receiveMessage(String message) {
System.out.println("email topic 收到訂單消息:" + message);
}
}
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutRabbitMqConfiguration {
// 1、聲明交換機類型
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout_order_exchange", true, false);
}
// 2、聲明隊列
@Bean
public Queue fanoutEmailQueue() {
return new Queue("email.fanout.queue", true);
}
@Bean
public Queue fanoutSmsQueue() {
return new Queue("sms.fanout.queue", true);
}
@Bean
public Queue fanoutWxQueue() {
return new Queue("wx.fanout.queue", true);
}
// 3、綁定交換機和隊列
@Bean
public Binding fanoutEmailBinding() {
return BindingBuilder.bind(fanoutEmailQueue()).to(fanoutExchange());
}
@Bean
public Binding fanoutSmsBinding() {
return BindingBuilder.bind(fanoutSmsQueue()).to(fanoutExchange());
}
@Bean
public Binding fanoutWxBinding() {
return BindingBuilder.bind(fanoutWxQueue()).to(fanoutExchange());
}
}
消費者這裡隻展示了1個其餘兩個類似
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@RabbitListener(queues = {"email.fanout.queue"})
@Component
public class EmailFanoutConsumer {
@RabbitHandler
public void receiveMessage(String message) {
System.out.println("email fanout 收到訂單消息:" + message);
}
}
public void makeOrderFanout() {
String orderId = UUID.randomUUID().toString();
System.out.println("訂單生産成功:" + orderId);
// 3、通過rabbitmq完成消息分發
String exchange = "fanout_order_exchange";
// 參數1:交換機,參數2:路由key/queue隊列名,參數3:消息内容
rabbitTemplate.convertAndSend(exchange, "", orderId, postProcessor);
}
@Configuration
public class DirectRabbitMqConfiguration {
// 1、聲明交換機類型
@Bean
public DirectExchange directExchange() {
return new DirectExchange("direct_order_exchange", true, false);
}
// 2、聲明隊列
@Bean
public Queue directEmailQueue() {
return new Queue("email.direct.queue", true, );
}
@Bean
public Queue directSmsQueue() {
return new Queue("sms.direct.queue", true);
}
@Bean
public Queue directWxQueue() {
return new Queue("wx.direct.queue", true);
}
// 3、綁定交換機和隊列
@Bean
public Binding directEmailBinding() {
return BindingBuilder.bind(directEmailQueue()).to(directExchange()).with("email");
}
@Bean
public Binding directSmsBinding() {
return BindingBuilder.bind(directSmsQueue()).to(directExchange()).with("sms");
}
@Bean
public Binding directWxBinding() {
return BindingBuilder.bind(directWxQueue()).to(directExchange()).with("wx");
}
}
消費者與上面類似這裡不展示了
public void makeOrderDirect() {
String orderId = UUID.randomUUID().toString();
System.out.println("訂單生産成功:" + orderId);
// 3、通過rabbitmq完成消息分發
String exchange = "direct_order_exchange";
// 參數1:交換機,參數2:路由key/queue隊列名,參數3:消息内容
rabbitTemplate.convertAndSend(exchange, "email", orderId);
rabbitTemplate.convertAndSend(exchange, "sms", orderId);
}
@Configuration
public class TopicRabbitMqConfiguration {
// 1、聲明交換機類型
@Bean
public DirectExchange topicExchange() {
return new DirectExchange("topic_order_exchange", true, false);
}
// 2、聲明隊列
@Bean
public Queue topicEmailQueue() {
return new Queue("email.topic.queue", true, );
}
@Bean
public Queue directSmsQueue() {
return new Queue("sms.topic.queue", true);
}
@Bean
public Queue directWxQueue() {
return new Queue("wx.topic.queue", true);
}
// 3、綁定交換機和隊列
@Bean
public Binding topicEmailBinding() {
return BindingBuilder.bind(topicEmailQueue()).to(topicExchange()).with("#.email.#");
}
@Bean
public Binding topicSmsBinding() {
return BindingBuilder.bind(topicSmsQueue()).to(topicExchange()).with("#.sms.*");
}
@Bean
public Binding topicWxBinding() {
return BindingBuilder.bind(topicWxQueue()).to(topicExchange()).with("*.wx.*");
}
}
public void makeOrderTopic() {
String orderId = UUID.randomUUID().toString();
System.out.println("訂單生産成功:" + orderId);
// 3、通過rabbitmq完成消息分發
String exchange = "topic_order_exchange";
// 參數1:交換機,參數2:路由key/queue隊列名,參數3:消息内容
rabbitTemplate.convertAndSend(exchange, "com.sms.aa", orderId);
rabbitTemplate.convertAndSend(exchange, "aa.email.cc.vv", orderId);
}
過期隊列要在隊列聲明時設定
@Bean
public Queue ttlQueue() {
Map<String, Object> arg = new HashMap<String, Object>();
// 設定過期時間5秒
arg.put("x-message-ttl", 5000);
// 設定隊列最多容納幾個消息
// arg.put("x-max-length", 5);
return new Queue("ttl.direct.queue", true, false, false, arg);
}
即消息釋出到隊列中5秒内還沒有被消費,則将廢棄。
以上時對整個隊列中的消息過期設定,還有一種是對單個消息的過期時間設定,在生産者端消息發送時配置:
public void makeOrderFanout() {
String orderId = UUID.randomUUID().toString();
System.out.println("訂單生産成功:" + orderId);
String exchange = "fanout_order_exchange";
// 給消息設定過期時間
MessagePostProcessor postProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 設定5秒過期
message.getMessageProperties().setExpiration("5000");
message.getMessageProperties().setContentEncoding("UTF-8");
return message;
}
};
// 參數1:交換機,參數2:路由key/queue隊列名,參數3:消息内容
rabbitTemplate.convertAndSend(exchange, "", orderId, postProcessor);
}
當兩種方式同時存在時,過期時間短的生效。
所謂死信隊列就是一個接盤用的隊列,聲明方式與普通隊列一樣
@Configuration
public class DeadQueueConfiguration {
// 1、聲明交換機類型
@Bean
public DirectExchange deadExchange() {
return new DirectExchange("dead_order_exchange", true, false);
}
// 2、聲明隊列
@Bean
public Queue deadQueue() {
return new Queue("dead.direct.queue", true);
}
// 3、綁定交換機和隊列
@Bean
public Binding deadBinding() {
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("dead");
}
}
死信隊列的作用就是用于存放失效的消息,以便再次業務處理。例如過期未消費的消息,過期後會被從原隊列移動到死信隊列裡,需要在原隊列裡配置死信隊列:
@Bean
public Queue ttlQueue() {
Map<String, Object> arg = new HashMap<String, Object>();
// 設定過期時間5秒
arg.put("x-message-ttl", 5000);
// 過期則或超過消息最大個數則進入死信隊列
arg.put("x-dead-letter-exchange", "dead_order_exchange");
arg.put("x-dead-letter-routing-key", "dead");
return new Queue("ttl.direct.queue", true, false, false, arg);
}
延時隊列其實就是過期隊列與死信隊列的配合使用,達到消息延時處理的目的。例如,消息延時一分鐘後處理,那麼消息先釋出到逾時隊列中,逾時時間設定為60000毫秒,時間到了,消息就會進入死信隊列,此時我們再在死信隊列裡處理這條消息就行了。
消息确認就是當生産者釋出消息成功後,可以收到确認回調。
配置檔案中開啟publisher-confirm-type:
spring:
rabbitmq:
host: 192.168.10.136
port: 5672
username: guest
password: guest
virtual-host: /
publisher-confirm-type: correlated
publisher-confirm-type:
- NONE值是禁用釋出确認模式,是預設值
- CORRELATED值是釋出消息成功到交換器後會觸發回調方法,如1示例
- SIMPLE值經測試有兩種效果,其一效果和CORRELATED值一樣會觸發回調方法,其二在釋出消息成功後使用rabbitTemplate調用waitForConfirms或waitForConfirmsOrDie方法等待broker節點傳回發送結果,根據傳回結果來判定下一步的邏輯,要注意的點是waitForConfirmsOrDie方法如果傳回false則會關閉channel,則接下來無法發送消息到broker;
确認回調類
package com.xuexiangban.rabbitmq.springbootorderrabbitmqproducer.callback;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
public class MessageConfirmCallback implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack){
System.out.println("消息确認成功!!!!");
}else{
System.out.println("消息确認失敗!!!!");
}
}
}
生産者釋出消息時設定
public void makeOrderTopic(){
String orderId = UUID.randomUUID().toString();
System.out.println("儲存訂單成功:id是:" + orderId);
// 設定消息确認機制
rabbitTemplate.setConfirmCallback(new MessageConfirmCallback());
rabbitTemplate.convertAndSend("topic_order_ex","com.email.sms.xxx",orderId);
}
消息消費過程中出現異常預設會再次發送消息,繼續消費,然後繼續異常,以此類推發生死循環。
解決這個問題主要有兩種方案:
- 配置消息的重發次數
- try-catch捕捉異常,然後手動應答
spring:
rabbitmq:
host: 192.168.10.136
port: 5672
username: guest
password: guest
virtual-host: /
listener:
simple:
retry:
enabled: true # 開啟重試
max-attempts: 3 # 最大重試次數
initial-interval: 2000ms # 重試間隔時間
# acknowledge-mode: manual # 手動應答
@RabbitListener(queues = {"email.direct.queue"})
@Component
public class EmailDirectConsumer {
@RabbitHandler
public void receiveMessage(String message, Channel channel, CorrelationData correlationData,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
try {
System.out.println("email direct 收到訂單消息:" + message);
// 消息消費過程中出現異常,會發生死循環
int a = 1/0;
channel.basicAck(tag, false);
} catch (Exception e) {
// 如果出現異常情況,根據實際情況去進行重發
// 參數1:消息的tag,參數2:多條處理,參數3:requeue 重發
// requeue = false,消息不會重發,會把消息打入死信隊列
// requeue = true,會死循環重發,如果使用true的話建議使用“解決方案1:控制重發次數”,通過重發次數去限制循環,
// 不要使用try-catch方案,try-catch方案配置acknowledge-mode: manual,手動形式會使重發次數的配置失效。
channel.basicNack(tag, false, false);
}
}
}