前言
在上一個教程中,我們改進了日志記錄系統,我們沒有使用隻能進行廣播的fanout交換機,而是使用了direct交換機,并有可能的選擇性接收日志。
盡管使用了direct交換機改進了我們的系統,但它仍然存在局限性,不能基于多個條件進行路由。為了實作這一點,我們需要學習更為複雜的topic交換。
發送到topic交換的消息不能具有任意的routing_key-它必須是單詞清單,以點分隔,這些詞可以是任何東西,但它通常指定與消息相關的功能,一些有效的路由關鍵示例:"log.orange.xxx", "quick.orange.rabbit"。路由密鑰中最多包含任意多個單詞,最多255位元組。
綁定密鑰也必須采用相同的形式。topic交換背後的邏輯類似direct交換的邏輯 -使用特定路由密鑰發送的消息将被傳遞到所有使用比對綁定密鑰綁定的隊列。但是,routing key有兩個重要的特殊情況:
- *(星号)可以代替一個單詞
- #(哈希)可以代替0個或者多個
我們可以通過一個簡單的示例來解釋:

比如routingKey設定為"quick.orange.rabbit"。那麼消息會傳到Q1和Q2中,"lazy.orange.xxxx"也會傳給它們兩個。而"quick.orange.fox"隻會傳給第一個隊列Q1,"lazy.xxxx.aaa"隻會傳給第二個隊列Q2。"quick.xxxx.aaa"與任何綁定都不比對,是以将會被丢棄。
如果我們違反合同并發送一個或四個單詞的消息,例如"orange"或"quick.orange.aaaa.rabbit",會發生什麼?這些消息将不比對任何綁定,并且将會丢失。
另一方面,"lazy.orange.male.rabbit"即使有四個單詞,也将比對最後一個綁定,并将其傳送到第二個隊列。
一、引入RabbitMQ開發包
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.6</version>
</dependency>
二、連接配接工具
package com.example.demo.util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消息隊列連接配接工具
*
*/
public class MQConnectionUtils {
public static Connection connection() throws IOException, TimeoutException{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setPort(5672);
return factory.newConnection();
}
}
三、生産者
package com.example.demo.produce;
import com.example.demo.util.MQConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 主題模式 - 生産者
*/
public class TopicProduce {
private static final String EXCHANGE_NAME = "my_exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = MQConnectionUtils.connection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String routingKey = "", msg = "";
for (int i = 0; i < 20; i++){
if ((i % 2) == 0){
routingKey = "top.info.error";
msg = "hello,我是主題模式top" + (i+1);
}else {
routingKey = "log.info.error";
msg = "hello,我是主題模式log" + (i+1);
}
channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
}
channel.close();
connection.close();
}
}
四、消費者
package com.example.demo.consume;
import com.example.demo.common.Constant;
import com.example.demo.util.MQConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 主題模式 - 消費者(top)
*/
public class TopicConsume1 {
private static final String TOPIC_QUEUE_TOP = "topic_queue_top";
private static final String EXCHANGE_NAME = "my_exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection= MQConnectionUtils.connection();
Channel channel = connection.createChannel();
channel.queueDeclare(TOPIC_QUEUE_TOP, false, false, false, null);
// 1.隊列名稱 2.交換機名稱 3.路由規則
channel.queueBind(TOPIC_QUEUE_ERROR, EXCHANGE_NAME, "top.*.*");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body,"UTF-8");
System.out.println("消費者擷取消息:<" + msg +">");
}
};
channel.basicConsume(TOPIC_QUEUE_TOP, true, consumer);
}
}
package com.example.demo.consume;
import com.example.demo.util.MQConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 主題模式 - 消費者(log)
*/
public class TopicConsume1 {
private static final String TOPIC_QUEUE_LOG = "topic_queue_log";
private static final String EXCHANGE_NAME = "my_exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection= MQConnectionUtils.connection();
Channel channel = connection.createChannel();
channel.queueDeclare(TOPIC_QUEUE_LOG, false, false, false, null);
// 1.隊列名稱 2.交換機名稱 3.路由規則
channel.queueBind(TOPIC_QUEUE_LOG, EXCHANGE_NAME, "log.*.*");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body,"UTF-8");
System.out.println("消費者擷取消息:<" + msg +">");
}
};
channel.basicConsume(TOPIC_QUEUE_LOG, true, consumer);
}
}
五、啟動服務