前言
在上一个教程中,我们改进了日志记录系统,我们没有使用只能进行广播的fanout交换机,而是使用了direct交换机,并有可能的选择性接收日志。
尽管使用了direct交换机改进了我们的系统,但它仍然存在局限性,不能基于多个条件进行路由。为了实现这一点,我们需要学习更为复杂的topic交换。
发送到topic交换的消息不能具有任意的routing_key-它必须是单词列表,以点分隔,这些词可以是任何东西,但它通常指定与消息相关的功能,一些有效的路由关键示例:"log.orange.xxx", "quick.orange.rabbit"。路由密钥中最多包含任意多个单词,最多255字节。
绑定密钥也必须采用相同的形式。topic交换背后的逻辑类似direct交换的逻辑 -使用特定路由密钥发送的消息将被传递到所有使用匹配绑定密钥绑定的队列。但是,routing key有两个重要的特殊情况:
- *(星号)可以代替一个单词
- #(哈希)可以代替0个或者多个
我们可以通过一个简单的示例来解释:

比如routingKey设置为"quick.orange.rabbit"。那么消息会传到Q1和Q2中,"lazy.orange.xxxx"也会传给它们两个。而"quick.orange.fox"只会传给第一个队列Q1,"lazy.xxxx.aaa"只会传给第二个队列Q2。"quick.xxxx.aaa"与任何绑定都不匹配,因此将会被丢弃。
如果我们违反合同并发送一个或四个单词的消息,例如"orange"或"quick.orange.aaaa.rabbit",会发生什么?这些消息将不匹配任何绑定,并且将会丢失。
另一方面,"lazy.orange.male.rabbit"即使有四个单词,也将匹配最后一个绑定,并将其传送到第二个队列。
一、引入RabbitMQ开发包
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.6</version>
</dependency>
二、连接工具
package com.example.demo.util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消息队列连接工具
*
*/
public class MQConnectionUtils {
public static Connection connection() throws IOException, TimeoutException{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setPort(5672);
return factory.newConnection();
}
}
三、生产者
package com.example.demo.produce;
import com.example.demo.util.MQConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 主题模式 - 生产者
*/
public class TopicProduce {
private static final String EXCHANGE_NAME = "my_exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = MQConnectionUtils.connection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String routingKey = "", msg = "";
for (int i = 0; i < 20; i++){
if ((i % 2) == 0){
routingKey = "top.info.error";
msg = "hello,我是主题模式top" + (i+1);
}else {
routingKey = "log.info.error";
msg = "hello,我是主题模式log" + (i+1);
}
channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
}
channel.close();
connection.close();
}
}
四、消费者
package com.example.demo.consume;
import com.example.demo.common.Constant;
import com.example.demo.util.MQConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 主题模式 - 消费者(top)
*/
public class TopicConsume1 {
private static final String TOPIC_QUEUE_TOP = "topic_queue_top";
private static final String EXCHANGE_NAME = "my_exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection= MQConnectionUtils.connection();
Channel channel = connection.createChannel();
channel.queueDeclare(TOPIC_QUEUE_TOP, false, false, false, null);
// 1.队列名称 2.交换机名称 3.路由规则
channel.queueBind(TOPIC_QUEUE_ERROR, EXCHANGE_NAME, "top.*.*");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body,"UTF-8");
System.out.println("消费者获取消息:<" + msg +">");
}
};
channel.basicConsume(TOPIC_QUEUE_TOP, true, consumer);
}
}
package com.example.demo.consume;
import com.example.demo.util.MQConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 主题模式 - 消费者(log)
*/
public class TopicConsume1 {
private static final String TOPIC_QUEUE_LOG = "topic_queue_log";
private static final String EXCHANGE_NAME = "my_exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection= MQConnectionUtils.connection();
Channel channel = connection.createChannel();
channel.queueDeclare(TOPIC_QUEUE_LOG, false, false, false, null);
// 1.队列名称 2.交换机名称 3.路由规则
channel.queueBind(TOPIC_QUEUE_LOG, EXCHANGE_NAME, "log.*.*");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String msg = new String(body,"UTF-8");
System.out.println("消费者获取消息:<" + msg +">");
}
};
channel.basicConsume(TOPIC_QUEUE_LOG, true, consumer);
}
}
五、启动服务