文章目錄
-
- 一、簡介
- 二、導入RabbitMQ的依賴包:
- 三、RabbitMQ幾種使用方式
-
- 1、Hello World
-
- 添加釋出者代碼:
- 添加接收者代碼:
- 2、工作隊列(work queue)
-
- 釋出者代碼:
- 消費者1代碼:
- 消費者2代碼:
- 3、訂閱模式:
-
- (1)訂閱之Fanout模型
-
- 生産者代碼:
- 消費者1代碼:
- 消費者2代碼:
- (2)訂閱之Direct模型:
-
- 釋出者代碼:
- 消費者代碼:
- (3)訂閱之Topic模型:
-
- 釋出者代碼:
- 消費者代碼:
一、簡介
RabbitMQ是一個消息代理。它的核心思想非常簡單:接收并轉發消息。你可以把它想象成一個郵局:當你把郵件丢進郵箱時,你非常确定郵差先生會把它送到收件人手中。在這個比喻中,RabbitMQ就是郵箱、郵局和郵差。
RabbitMQ和郵局的主要差別是它處理的不是紙張。它接收、存儲并轉發二進制資料塊,也就是message,消息。
RabbitMQ官網: https://www.rabbitmq.com/
RabbitMQ官方教程: https://www.rabbitmq.com/#getstarted
學習教程:https://www.rabbitmq.com/tutorials/tutorial-one-python.html
RabbitMQ常用的交換器類型有direct、topic、fanout三種。
二、導入RabbitMQ的依賴包:
<!--消息隊列RabbitMQ的依賴包-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
三、RabbitMQ幾種使用方式
1、Hello World
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIyZuBnLyEWNwMDMjhjMzQ2YjRWO5UGO1QjNzgzN1gTM0kTZlNzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
P(producer/ publisher):生産者,發送消息的服務
C(consumer):消費者,接收消息的服務
紅色區域就是MQ中的Queue,可以把它了解成一個郵箱
- 首先信件來了不強求必須馬上馬去拿
- 其次,它是有最大容量的(受主機和磁盤的限制,是一個緩存區)
- 允許多個消費者監聽同一個隊列,争搶消息
添加釋出者代碼:
@Slf4j
public class RabbitMQProducer {
private final static String QUEEN_NAME = "Hello World";
private final static String MESSAGE = "第一個RabbitMQ";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");//設定主機名
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEEN_NAME, false, false, false, null);
channel.basicPublish("", QUEEN_NAME, null, MESSAGE.getBytes());
log.info("發送消息:'{}'", MESSAGE);
channel.close();
connection.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
添加接收者代碼:
@Slf4j
public class RabbitMQConsumer {
private final static String QUEEN_NAME = "Hello World";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEEN_NAME, false, false, false, null);
/**
* @Author: chen
* @Date: 2021/8/17 17:28
* @Description: 第一種方式利用匿名類建立
*/
// Consumer consumer = new DefaultConsumer(channel){
// @Override
// public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// String message = new String(body, "UTF-8");
// log.info("消費者收到消息:'{}'", message);
// }
// };
// channel.basicConsume(QUEEN_NAME, true, consumer);
/**
* @Author: chen
* @Date: 2021/8/17 17:28
* @Description: 第二種方式利用Lambda建立
*/
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
log.info("消費者收到消息:'{}'", message);
};
channel.basicConsume(QUEEN_NAME, true, deliverCallback, consumerTag -> {});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
發送者會通過RabbitMQ發送一條資訊,而接收者會把它列印出來。接收者将會一直運作,等待接收消息。
布置成功之後就可以在浏覽器輸入:http://伺服器的ip:15672/ 可以找到登陸入口。
2、工作隊列(work queue)
Worker模型中也隻有一個工作隊列。但它是一種競争消費模式。可以看到同一個隊列我們綁定上了多個消費者,消費者争搶着消費消息,這可以有效的避免消息堆積。
比如對于短信微服務叢集來說就可以使用這種消息模型,來了請求大家搶着消費掉。
如何實作這種架構:對于上面的HelloWorld這其實就是相同的服務我們啟動了多次罷了,自然就是這種架構。
釋出者代碼:
/**
* @Author: chen
* @Date: 2021/8/18 10:53
* @Description: 學習位址:https://www.rabbitmq.com/tutorials/tutorial-two-java.html
*/
public class NewTask {
//定義一個消息隊列的名稱
private static final String QUEEN_NAME = "work";
public static void main(String[] args) {
//開啟連接配接工廠
ConnectionFactory factory = new ConnectionFactory();
//設定主機名稱或IP
factory.setHost("localhost");
try {
//開啟連接配接,抛出異常
Connection connection = factory.newConnection();
//開啟連接配接通道
Channel channel = connection.createChannel();
//設定隊列參數
boolean autoAck = true; //消息确認與持久性,消費者發回确認消息,告訴 RabbitMQ 特定消息已被接收、處理,并且 RabbitMQ 可以自由删除它。
channel.queueDeclare(QUEEN_NAME, autoAck, false, false, null);
//釋出消息
String []msg = {"one", "two", "three", "four", "five", "six", "seven", "eight", "night", "ten"};
String message = String.join("-", msg);//從控制台編譯
System.out.println(message);
//我們需要将我們的消息标記為持久性 - 通過将MessageProperties(實作BasicProperties)設定為值PERSISTENT_TEXT_PLAIN
channel.basicPublish("", QUEEN_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
//關閉通道
channel.close();
//關閉連接配接
connection.close();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
消費者1代碼:
@Slf4j
public class WorkConsumer {
//定義消息隊列名稱
private static final String QUEEN_NAME = "work";
public static void main(String[] args) {
//開啟連接配接工廠
ConnectionFactory factory = new ConnectionFactory();
//設定主機名稱或IP
factory.setHost("localhost");
try {
//開啟連接配接,抛出異常
Connection connection = factory.newConnection();
//開啟連接配接通道
Channel channel = connection.createChannel();
//設定隊列參數
boolean autoAck = true; //消息确認與持久性,消費者發回确認消息,告訴 RabbitMQ 特定消息已被接收、處理,并且 RabbitMQ 可以自由删除它。
channel.queueDeclare(QUEEN_NAME, autoAck, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
channel.basicQos(1); //公平排程,一次隻接受一條未确認的消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("接收到的資訊:" + message);
try {
//do somethings
doWork(message);
} finally {
System.out.println(" [x] Done");
//當您的用戶端退出時,消息将被重新傳送(這可能看起來像随機重新傳送),但 RabbitMQ 将消耗越來越多的記憶體,因為它無法釋放任何未确認的消息。是以利用basicAck()必須在接收傳遞的同一通道上發送确認
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);//确認消息,設定為false
}
};
channel.basicConsume(QUEEN_NAME, false, deliverCallback, consumerTag -> {});
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
//模拟執行時間的假任務
private static void doWork(String message) {
int count = 0;
for (char ch
: message.toCharArray()){
//遇到'-'符号耗時1s
if (ch == '-'){
try {
log.info("等待時間:{}秒" , ++count);
Thread.sleep(1000);
} catch (InterruptedException e){
e.printStackTrace();
}
}
}
}
}
消費者2代碼:
@Slf4j
public class WorkConsumer2 {
//定義消息隊列名稱
private static final String QUEEN_NAME = "work";
public static void main(String[] args) {
//開啟連接配接工廠
ConnectionFactory factory = new ConnectionFactory();
//設定主機名稱或IP
factory.setHost("localhost");
try {
//開啟連接配接,抛出異常
Connection connection = factory.newConnection();
//開啟連接配接通道
Channel channel = connection.createChannel();
//設定隊列參數
boolean autoAck = true; //消息确認與持久性,消費者發回确認消息,告訴 RabbitMQ 特定消息已被接收、處理,并且 RabbitMQ 可以自由删除它。
channel.queueDeclare(QUEEN_NAME, autoAck, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
channel.basicQos(1); //公平排程,一次隻接受一條未确認的消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("接收到的資訊:" + message);
try {
//do somethings
doWork(message);
} finally {
System.out.println(" [x] Done");
//當您的用戶端退出時,消息将被重新傳送(這可能看起來像随機重新傳送),但 RabbitMQ 将消耗越來越多的記憶體,因為它無法釋放任何未确認的消息。是以利用basicAck()必須在接收傳遞的同一通道上發送确認
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);//确認消息,設定為false
}
};
channel.basicConsume(QUEEN_NAME, false, deliverCallback, consumerTag -> {});
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
//模拟執行時間的假任務
private static void doWork(String message) {
int count = 0;
for (char ch
: message.toCharArray()){
//遇到'-'符号耗時1s
if (ch == '-'){
try {
log.info("勞工做這項工作時間:{}秒" , ++count);
Thread.sleep(1000);
} catch (InterruptedException e){
e.printStackTrace();
}
}
}
}
}
總結:在工作隊列當中,假如将消費者比作勞工,隊清單示勞工需要被分派的任務,那麼當勞工沒有完成一項工作的時候,是不會重新擷取其他任務,同時如果因為某個原因導緻任務中斷,那麼RabbitMQ就不會将這項任務在消息隊列中去除,而是會被分派給空閑的勞工,直到這項任務被完成,也可稱為消息應答或消息确認。
3、訂閱模式:
訂閱模型借助一個新的概念:Exchange(交換機)實作,不同的訂閱模型本質上是根據交換機(Exchange)的類型劃分的。
訂閱模型有三種
Fanout(廣播模型): 将消息發送給綁定給交換機的所有隊列(因為他們使用的是同一個RoutingKey)。
Direct(定向): 把消息發送給擁有指定Routing Key (路由鍵)的隊列。
Topic(通配符): 把消息傳遞給擁有 符合Routing Patten(路由模式)的隊列。
(1)訂閱之Fanout模型
這個模型的特點就是它在發送消息的時候,并沒有指明Rounting Key , 或者說他指定了Routing Key,但是所有的消費者都知道,大家都能接收到消息,就像聽廣播。
臨時隊列:
在 Java 用戶端中,當我們不向queueDeclare()提供參數時, 我們會建立一個具有生成名稱的非持久、獨占、自動删除隊列:
此時queueName包含一個随機隊列名稱。例如,它可能看起來像amq.gen-JzTY20BRgKO-HjmUJj0wLg。
綁定:
我們已經建立了一個fanout交換和一個隊列。現在我們需要告訴交換器向我們的隊列發送消息。交換和隊列之間的這種關系稱為綁定。
從現在開始,logs交換會将消息附加到我們的隊列中。
發出日志消息的生産者程式與之前的教程看起來沒有太大差別。最重要的變化是我們現在想要将消息釋出到我們的logs交換而不是無名的交換。我們需要在發送時提供一個routingKey,但它的值在fanout交換時被忽略。
生産者代碼:
/**
* @Author: chen
* @Date: 2021/8/18 15:02
* @Description: RabbitMQ訂閱模型:Fanout
* 學習位址:https://www.rabbitmq.com/tutorials/tutorial-three-java.html
*/
public class EmitLog {
//建立交換機名稱
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) {
//建立連接配接工廠
ConnectionFactory factory = new ConnectionFactory();
//設定主機名或者IP
factory.setHost("localhost");
try {
//建立連接配接
Connection connection = factory.newConnection();
//開啟連接配接通道
Channel channel = connection.createChannel();
//定義交換機類型為Fanout
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//設定釋出的消息
String []msg = {"one", "two", "three", "four", "five", "six", "seven", "eight", "night", "ten"};
for (int i = 0; i < 10; i++) {
channel.basicPublish(EXCHANGE_NAME, "", null, msg[i].getBytes("UTF-8"));
}
System.out.println(msg);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
消費者1代碼:
public class LogsConsumer {
//建立交換機名稱
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) {
//建立連接配接工廠
ConnectionFactory factory = new ConnectionFactory();
//設定主機名或IP
factory.setHost("localhost");
try {
//建立連接配接
Connection connection = factory.newConnection();
//開啟通道
Channel channel = connection.createChannel();
//定義交換機名稱
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//建立随機隊列
String queueName = channel.queueDeclare().getQueue();
//綁定随機隊列與交換機
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(),"UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
消費者2代碼:
public class LogsConsumer2 {
//建立交換機名稱
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) {
//建立連接配接工廠
ConnectionFactory factory = new ConnectionFactory();
//設定主機名或IP
factory.setHost("localhost");
try {
//建立連接配接
Connection connection = factory.newConnection();
//開啟通道
Channel channel = connection.createChannel();
//定義交換機名稱
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//建立随機隊列
String queueName = channel.queueDeclare().getQueue();
//綁定随機隊列與交換機
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(),"UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
總結:這種釋出/訂閱模式是一種廣播機制,并沒有設定路由key模式。它在釋出者代碼中建立一個交換機,并沒有沒有在釋出者代碼中建立隊列,而是在消費者中建立臨時隊列,通過臨時隊列與交換機相連,進而達到廣播效果。
(2)訂閱之Direct模型:
我們使用的是fanout交換,它沒有給我們很大的靈活性——它隻能進行無意識的廣播。
我們将改用direct交換。direct交換背後的路由算法很簡單 - 消息進入其綁定密鑰與消息的路由密鑰完全比對的隊列 。
P:生産者,向Exchange發送消息,發送消息時,會指定一個routing key。
X:Exchange(交換機),接收生産者的消息,然後把消息遞交給 與routing key完全比對的隊列
C1:消費者,其所在隊列指定了需要routing key 為 error 的消息
C2:消費者,其所在隊列指定了需要routing key 為 info、error、warning 的消息
擁有不同的RoutingKey的消費者,會收到來自交換機的不同資訊,而不是大家都使用同一個Routing Key 和廣播模型區分開來。
釋出者代碼:
/**
* @Author: chen
* @Date: 2021/8/18 16:14
* @Description: 訂閱模式:Direct
*/
public class EmitLogDirect {
//建立交換機名稱
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) {
//建立連接配接工廠
ConnectionFactory factory = new ConnectionFactory();
//設定主機名或IP
factory.setHost("localhost");
try {
//建立連接配接
Connection connection = factory.newConnection();
//開啟連接配接通道
Channel channel = connection.createChannel();
//定義交換機類型與名稱
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//設定綁定key與資訊
String []bindKey = {"info","warn", "error", "test"};
String []message = {"Hello","Warning","Error","ok"};
//釋出消息
for (int i = 0; i < bindKey.length; i++) {
channel.basicPublish(EXCHANGE_NAME, bindKey[i], null, message[i].getBytes("UTF-8"));
System.out.println(" [x] Sent '" + bindKey[i] + "':'" + message[i] + "'");
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
消費者代碼:
public class LogsDirectConsumer {
//建立交換機名稱
private static final String EXCHANGE_NAME = "direct_logs";
//建立線程訂閱'info','warn','error','test'
public static Runnable runnable = () -> {
//建立連接配接工廠
ConnectionFactory factory = new ConnectionFactory();
//設定主機名或IP
factory.setHost("localhost");
try {
//建立連接配接
Connection connection = factory.newConnection();
//開啟連接配接通道
Channel channel = connection.createChannel();
//定義交換機類型與名稱
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//建立臨時隊列
String queueName = channel.queueDeclare().getQueue();
//綁定隊列、交換機和路由密鑰
String []bindKey = {"info","warn", "error", "test"};
for (String severity :
bindKey) {
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
};
//建立線程隻訂閱'info','warn','error'
public static Runnable runnable1 = () -> {
//建立連接配接工廠
ConnectionFactory factory = new ConnectionFactory();
//設定主機名或IP
factory.setHost("localhost");
try {
//建立連接配接
Connection connection = factory.newConnection();
//開啟連接配接通道
Channel channel = connection.createChannel();
//定義交換機類型與名稱
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//建立臨時隊列
String queueName = channel.queueDeclare().getQueue();
//綁定隊列、交換機和路由密鑰
String []bindKey = {"info","warn", "error"};
for (String severity :
bindKey) {
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received queue2 '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
};
public static void main(String[] args) {
new Thread(runnable, "queue1").start();//訂閱'info','warn','error','test'
new Thread(runnable1, "queue2").start();//隻訂閱'info','warn','error'
new Thread(runnable, "queue3").start();//訂閱'info','warn','error','test'
}
}
總結:在direct訂閱模型中,生産者訂閱交換機釋出消息,産生routingKey(用于消費者綁定);而在消費者中,根據消費者的需要,可以随意訂閱自己所需的資訊(通過綁定routingKey實作),即選擇性接收。
(3)訂閱之Topic模型:
類似于Direct模型。差別是Topic的Routing Key支援通配符。
topic交換功能強大,可以像其他交換一樣運作。
當隊列與“ # ”(散列)綁定鍵綁定時——它将接收所有消息,而不管路由鍵——就像在fanout交換中一樣。
當綁定中不使用特殊字元“ * ”(星号)和“ # ”(散列)時,主題交換的行為就像direct交換一樣。
通配符如下:
- (星号)可以正好代替一個詞。
- # (hash) 可以代替零個或多個單詞。
根據上圖所示,以下的代碼是根據上圖進行編寫的。
我們發送描述動物的資訊。這是由三個字元以及兩個點組合的路由密鑰進行發送。消費者那邊用*.orange.*、*.*.rabbit以及lazy.#進行适配;生産者那邊釋出動物的消息以及設定路由key:quick.orange.rabbit、lazy.orange.elephant、quick.orange.fox、lazy.brown.fox、lazy.pink.rabbit、quick.brown.fox、quick.orange.male.rabbit、lazy.orange.male.rabbit
釋出者代碼:
/**
* @Author: chen
* @Date: 2021/8/19 9:42
* @Description: RabbitMQ訂閱模型: Topics
* 學習網址: https://www.rabbitmq.com/tutorials/tutorial-five-java.html
*/
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String[] routingKey = {"quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox", "lazy.brown.fox",
"lazy.pink.rabbit", "quick.brown.fox", "quick.orange.male.rabbit", "lazy.orange.male.rabbit"};
String[] message = {"快.橙子.兔子", "慢.橙色.大象", "快.橙色.狐狸", "慢.棕色.狐狸",
"慢.粉色.兔子", "快.棕色.狐狸", "快.橙色.雄性.兔子", "慢.橙色.雄性.兔子"};
for (int i = 0; i < routingKey.length; i++) {
channel.basicPublish(EXCHANGE_NAME, routingKey[i], null, message[i].getBytes("UTF-8"));
System.out.println(" [x] Sent '" + routingKey[i] + "':'" + message[i] + "'");
}
channel.close();
connection.close();
}
}
消費者代碼:
public class LogsTopicsConsumer {
private static final String EXCHANGE_NAME = "topic_logs";
public static void consumer(String []routingKey){
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = null;
try {
connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
for (String key :
routingKey) {
channel.queueBind(queueName, EXCHANGE_NAME, key);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
String[] routingKey1 = {"*.orange.*"};
String[] routingKey2 = {"*.*.rabbit"};
String[] routingKey3 = {"lazy.#"};
consumer(routingKey1);
consumer(routingKey2);
consumer(routingKey3);
}
}
結果分析:
路由鍵設定為“ quick.orange.rabbit ”的消息将發送到兩個隊列。
消息“ lazy.orange.elephant ”也會發給他們兩個。
“ quick.orange.fox ”隻會進入第一個隊列,而“ lazy.brown.fox ”隻會進入第二個隊列。“ lazy.pink.rabbit ”隻會被傳送到第二個隊列一次,即使它比對了兩個綁定。“ quick.brown.fox ”不比對任何綁定,是以将被丢棄。
如果我們違反合同并發送一到四個字的消息,例如 “ quick.orange.male.rabbit ”,這些消息不會比對任何綁定并且會丢失。
另一方面,“ lazy.orange.male.rabbit ”,即使它有四個單詞,也會比對最後一個綁定,并将被傳遞到第二個隊列。
下一篇blog文章:SpringBoot + RabbitMQ整合