RabbitMQ的工作模式
RabbitMQ有一下四种工作模式:
- DIRECT(“direct”),定向
- FANOUT(“fanout”),发送到所有绑定的队列
- TOPIC(“topic”),通配符的方式
- HEADERS(“headers”);参数匹配
下面将逐个讲解各个模式的用法
Work queues 工作队列模式
模式说明

如图所示,一个队列对应多个消费者,C1和C2属于竞争关系,同一条消息要么被C1消费,要么被C2消费
总结:Work queues多个消费端共同消费同一队列中的消息
应用场景:对于任务过重或者任务较多情况下使用工作队列可以提高任务处理速度
生产者
public class MessageProducer {
public static void sendMessage() throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.设置参数
// ip,默认为本地localhost
connectionFactory.setHost("127.0.0.1");
// 端口,默认值5672
connectionFactory.setPort(5672);
// 虚拟机,默认值/
connectionFactory.setVirtualHost("my_vhost");
// 账号,默认值guest
connectionFactory.setUsername("admin");
// 密码,默认值guest
connectionFactory.setPassword("admin");
// 3.创建连接 Connection
Connection connection = connectionFactory.newConnection();
// 4.创建Channel
Channel channel = connection.createChannel();
// 5.创建队列
/**
* 参数说明:
* 1.queue:队列名称
* 2.durable:是否持久化,当mq重启之后,数据依旧存在
* 3.exclusive:
* - 是否独占,只能有一个监听者该队列
* - 当connection关闭时,是否删除队列
* 4.autoDelete:是否 自动删除,当没有Consumer是,自动删除
* 5.arguments:参数
*/
// 如果没有名为“first_queue”的队列,则会创建,否则不会创建
channel.queueDeclare("work_queue",true,false,false,null);
// 6.发送消息
/**
* 参数:
* 1.exchange:交换机名称,简单模式下交换机默认为""
* 2.routing key:路由mingc
* 3.props:配置信息
* 4.body:发送的消息数据
*/
for (int i = 1; i < 21; i++) {
String message = "work queues 第" + i + "条消息";
channel.basicPublish("","work_queue",null,message.getBytes());
}
// 7.释放资源
channel.close();
connection.close();
}
}
为了测试出效果,循环发送20条消息
消费者
public class MessageConsumer {
public static void getMessage() throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.设置参数
// ip,默认为本地localhost
connectionFactory.setHost("127.0.0.1");
// 端口,默认值5672
connectionFactory.setPort(5672);
// 虚拟机,默认值/
connectionFactory.setVirtualHost("my_vhost");
// 账号,默认值guest
connectionFactory.setUsername("admin");
// 密码,默认值guest
connectionFactory.setPassword("admin");
// 3.创建连接 Connection
Connection connection = connectionFactory.newConnection();
// 4.创建Channel
Channel channel = connection.createChannel();
// 5.创建队列
/**
* 参数说明:
* 1.queue:队列名称
* 2.durable:是否持久化,当mq重启之后,数据依旧存在
* 3.exclusive:
* - 是否独占,只能有一个监听者该队列
* - 当connection关闭时,是否删除队列
* 4.autoDelete:是否 自动删除,当没有Consumer是,自动删除
* 5.arguments:参数
*/
// 如果没有名为“first_queue”的队列,则会创建,否则不会创建
channel.queueDeclare("work_queue",true,false,false,null);
// 6.接收消息
/**
* handleDelivery()参数:
* 1.consumerTag:标识
* 2.envelope:获取一些信息,交换机,路由key
* 3.properties:配置信息
* 4.bady:数据
*/
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("客户端1消费:" + new String(body));
}
};
channel.basicConsume("work_queue",true,consumer);
// 7.释放资源?不需要 消费者相当于一个监听者 时刻监听队列
// 此处只是不让程序结束,等待回调函数执行
while (true){}
}
}
同样的代码在复制一份,修改打印输出内容,作为客户端2,启动会看到work_queue有两个消费者
消费端启动后,然后运行生产客户端发送消息
消费客户端1控制台打印如下:
客户端1消费:work queues 第2条消息
客户端1消费:work queues 第4条消息
客户端1消费:work queues 第6条消息
客户端1消费:work queues 第8条消息
…
消费客户端2控制台打印如下:
客户端2消费:work queues 第1条消息
客户端2消费:work queues 第3条消息
客户端2消费:work queues 第5条消息
客户端2消费:work queues 第7条消息
客户端2消费:work queues 第9条消息
…
可以看出生产的20条消息,被两个消费端共同分担
Pub/Sub订阅模式
模式说明
该模式中多了Exchange,整个发送过程相较之前 有所变化,生产者生产的消息不再是发送到队列中去,而是发给交换机
Exchange:交换机(X),一方面接收生产者发送的消息,另一方面知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或者丢弃消息。到底怎么操作,取决于Exchange的类型,常见以下三种:
- Fanout:广播,将消息交给所有绑定的交换机队列
- Direct:定向,把消息交给符合指定routing key的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式)的队列
Exchange只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与exchange绑定或者没有符合路由规则的队列,那么消息将会丢失
生产者
public class MessageProducer_PubSub {
public static void sendMessage() throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.设置参数
// ip,默认为本地localhost
connectionFactory.setHost("127.0.0.1");
// 端口,默认值5672
connectionFactory.setPort(5672);
// 虚拟机,默认值/
connectionFactory.setVirtualHost("my_vhost");
// 账号,默认值guest
connectionFactory.setUsername("admin");
// 密码,默认值guest
connectionFactory.setPassword("admin");
// 3.创建连接 Connection
Connection connection = connectionFactory.newConnection();
// 4.创建Channel
Channel channel = connection.createChannel();
// 5.常见交换机
/**
* 参数说明:
* 1.exchange:交换机类型
* 2.type:交换机类型
* DIRECT("direct"),定向
* FANOUT("fanout"),发送到所有绑定的队列
* TOPIC("topic"),通配符的方式
* HEADERS("headers");参数匹配
* 3.durable:是否持久化
* 4.autoDelete:是否自动删除
* 5.internal:内部使用,一般为false
* 6.arguments:参数
*/
String exchangeName = "test_fanout";
channel.exchangeDeclare(exchangeName,BuiltinExchangeType.FANOUT,true,false,false,null);
// 6.创建队列
String queue1 = "test_fanout_queue1";
String queue2 = "test_fanout_queue2";
channel.queueDeclare(queue1,true,false,false,null);
channel.queueDeclare(queue2,true,false,false,null);
// 7.绑定队列和交换机
/**
* 参数说明:
* 1.queue:队列名称
* 2.exchange:交换机名称
* 3.routingKey:路由键,绑定规则
* 如果交换机的类型为fanout,routingKey设置为""
*/
channel.queueBind(queue1,exchangeName,"");
channel.queueBind(queue2,exchangeName,"");
// 8.发送消息到交换机
String message = "pubsub模式的消息";
channel.basicPublish(exchangeName,"", null,message.getBytes());
// 9.释放资源
channel.close();
connection.close();
}
}
运行代码后观察rabbit控制台
交换机新增了自定义的交换机名
点击交换机名,查看详情,能看到如下图的绑定关系
队列新增了和交换机绑定的两个队列,并且队列中各有一条消息待消费
消费者
public class MessageConsumer_PubSub1 {
public static void getMessage() throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.设置参数
// ip,默认为本地localhost
connectionFactory.setHost("127.0.0.1");
// 端口,默认值5672
connectionFactory.setPort(5672);
// 虚拟机,默认值/
connectionFactory.setVirtualHost("my_vhost");
// 账号,默认值guest
connectionFactory.setUsername("admin");
// 密码,默认值guest
connectionFactory.setPassword("admin");
// 3.创建连接 Connection
Connection connection = connectionFactory.newConnection();
// 4.创建Channel
Channel channel = connection.createChannel();
// 5.接收消息
/**
* handleDelivery()参数:
* 1.consumerTag:标识
* 2.envelope:获取一些信息,交换机,路由key
* 3.properties:配置信息
* 4.bady:数据
*/
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("客户端1消费:" + new String(body));
}
};
channel.basicConsume("test_fanout_queue1",true,consumer);
// 6.释放资源?不需要 消费者相当于一个监听者 时刻监听队列
// 此处只是不让程序结束,等待回调函数执行
while (true){}
}
}
相同代码复制一份,修改监听队列为test_fanout_queue2,修改打印信息作为客户端2,分别启动消费客户端1和消费客户端2
控制台输出如下:
客户端1消费:pubsub模式的消息
客户端2消费:pubsub模式的消息
可以看出,同一条消息换发送到交换机,由交换机分发到两个队列中,两个客户端分别在对应的队列中进行消费
Routing 路由模式
模式说明:
图中表示不同的日志级别,由交换机分发到不同的队列中
- 队列与交换机绑定,不能是任意绑定了,而是指定一个RoutingKey(路由key)
- 消息发送到exchange时,必须指定消息的RoutingKey
- Exchange不在把消息发送到每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的Routingkey与消息的RoutingKey一致时,才会收到消息
- 交换机类型为Direct,绑定
生产者
public class MessageProducer_Routing {
public static void sendMessage() throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.设置参数
// ip,默认为本地localhost
connectionFactory.setHost("127.0.0.1");
// 端口,默认值5672
connectionFactory.setPort(5672);
// 虚拟机,默认值/
connectionFactory.setVirtualHost("my_vhost");
// 账号,默认值guest
connectionFactory.setUsername("admin");
// 密码,默认值guest
connectionFactory.setPassword("admin");
// 3.创建连接 Connection
Connection connection = connectionFactory.newConnection();
// 4.创建Channel
Channel channel = connection.createChannel();
// 5.常见交换机
/**
* 参数说明:
* 1.exchange:交换机类型
* 2.type:交换机类型
* DIRECT("direct"),定向
* FANOUT("fanout"),发送到所有绑定的队列
* TOPIC("topic"),通配符的方式
* HEADERS("headers");参数匹配
* 3.durable:是否持久化
* 4.autoDelete:是否自动删除
* 5.internal:内部使用,一般为false
* 6.arguments:参数
*/
String exchangeName = "test_direct";
channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT,true,false,false,null);
// 6.创建队列
String queue1 = "test_direct_queue1";
String queue2 = "test_direct_queue2";
channel.queueDeclare(queue1,true,false,false,null);
channel.queueDeclare(queue2,true,false,false,null);
// 7.绑定队列和交换机
/**
* 参数说明:
* 1.queue:队列名称
* 2.exchange:交换机名称
* 3.routingKey:路由键,绑定规则
* 如果交换机的类型为fanout,routingKey设置为""
*/
// error队列绑定
channel.queueBind(queue1,exchangeName,"error");
// info error warn队列绑定
channel.queueBind(queue2,exchangeName,"info");
channel.queueBind(queue2,exchangeName,"error");
channel.queueBind(queue2,exchangeName,"warning");
// 8.发送消息到交换机
String messageInfo = "日志信息:info";
String messageError = "日志信息:error";
String messageWarn = "日志信息:warning";
// 发送不同RoutingKey类型的消息,观察exchange会怎么处理
channel.basicPublish(exchangeName,"info", null,messageInfo.getBytes());
channel.basicPublish(exchangeName,"error", null,messageError.getBytes());
channel.basicPublish(exchangeName,"warning", null,messageWarn.getBytes());
// 9.释放资源
channel.close();
connection.close();
}
}
运行生产者代码,观察rabbit控制台
Exchange交换机新增名称为test_direct的交换机
点击名称进入,可查看和队列绑定关系
在观察queues栏,可以看出消息根据不同的routingkey被分发到不同队列
队列1接收RoutingKey为error的消息,只有1条
队列2接收RoutingKey为info、error、warning的消息,有三天
因为两个队列都绑定了RoutingKey为error的消息,所以error消息会出现在两个队列中
消费者
public class MessageConsumer_Routing1 {
public static void getMessage() throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.设置参数
// ip,默认为本地localhost
connectionFactory.setHost("127.0.0.1");
// 端口,默认值5672
connectionFactory.setPort(5672);
// 虚拟机,默认值/
connectionFactory.setVirtualHost("my_vhost");
// 账号,默认值guest
connectionFactory.setUsername("admin");
// 密码,默认值guest
connectionFactory.setPassword("admin");
// 3.创建连接 Connection
Connection connection = connectionFactory.newConnection();
// 4.创建Channel
Channel channel = connection.createChannel();
// 5.接收消息
/**
* handleDelivery()参数:
* 1.consumerTag:标识
* 2.envelope:获取一些信息,交换机,路由key
* 3.properties:配置信息
* 4.bady:数据
*/
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("客户端1消费:" + new String(body));
}
};
channel.basicConsume("test_direct_queue1",true,consumer);
// 6.释放资源?不需要 消费者相当于一个监听者 时刻监听队列
// 此处只是不让程序结束,等待回调函数执行
while (true){}
}
}
复制相同代码,修改监听队列名称,修改打印信息作为客户端2
启动消费者,控制台打印如下:
客户端1消费:日志信息:error
客户端2消费:日志信息:info
客户端2消费:日志信息:error
客户端2消费:日志信息:warning
总结:生产者发送的消息经过exchange处理,根据RoutingKey进行匹配,分发到不同的队列中,由该队列对应的消费者去消费
Topic通配符模式
模式说明:
- 交换机类型为Topic,通配符模式
- 例RoutingKey为Usa.news 可以匹配usa.#的队列,也可以匹配#.news的队列
- 过程:生产者将消息发送给交换机,由交换机根据队列RoutingKey设定的匹配规则进行分发到不同队列
生产者:
public class MessageProducer_Topic {
public static void sendMessage() throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.设置参数
// ip,默认为本地localhost
connectionFactory.setHost("127.0.0.1");
// 端口,默认值5672
connectionFactory.setPort(5672);
// 虚拟机,默认值/
connectionFactory.setVirtualHost("my_vhost");
// 账号,默认值guest
connectionFactory.setUsername("admin");
// 密码,默认值guest
connectionFactory.setPassword("admin");
// 3.创建连接 Connection
Connection connection = connectionFactory.newConnection();
// 4.创建Channel
Channel channel = connection.createChannel();
// 5.常见交换机
/**
* 参数说明:
* 1.exchange:交换机类型
* 2.type:交换机类型
* DIRECT("direct"),定向
* FANOUT("fanout"),发送到所有绑定的队列
* TOPIC("topic"),通配符的方式
* HEADERS("headers");参数匹配
* 3.durable:是否持久化
* 4.autoDelete:是否自动删除
* 5.internal:内部使用,一般为false
* 6.arguments:参数
*/
String exchangeName = "test_topic";
channel.exchangeDeclare(exchangeName,BuiltinExchangeType.TOPIC,true,false,false,null);
// 6.创建队列
String queue1 = "test_topic_queue1";
String queue2 = "test_topic_queue2";
channel.queueDeclare(queue1,true,false,false,null);
channel.queueDeclare(queue2,true,false,false,null);
// 7.绑定队列和交换机
/**
* 参数说明:
* 1.queue:队列名称
* 2.exchange:交换机名称
* 3.routingKey:路由键,绑定规则
* 如果交换机的类型为fanout,routingKey设置为""
*/
// 订单相关信息、info信息存入队列1
channel.queueBind(queue1,exchangeName,"*.info.*");
channel.queueBind(queue1,exchangeName,"order.#");
// 订单错误信息、支付信息存入队列2
channel.queueBind(queue2,exchangeName,"order.error.*");
channel.queueBind(queue2,exchangeName,"pay.#");
// 8.发送消息到交换机
String message = "Topic模式消息";
// 发送不同RoutingKey类型的消息,观察exchange会怎么处理
channel.basicPublish(exchangeName,"test.info.test", null,message.getBytes());
channel.basicPublish(exchangeName,"order.error.select", null,message.getBytes());
channel.basicPublish(exchangeName,"order.error.select.success", null,message.getBytes());
channel.basicPublish(exchangeName,"pay.test", null,message.getBytes());
// 9.释放资源
channel.close();
connection.close();
}
}
启动生产者,观察rabbit控制台
Exchange新增名称为test_topic的交换机
点击名称查看队列及相关绑定的通配符
队列情况
队列1绑定通配符*.info.*,order.#,和它匹配的RoutingKey有test.info.test,order.error.select,order.error.select.success,因此它有三条消息
队列2绑定通配符order.error.*,pay.#,和它匹配的RoutingKey有order.error.select,pay.test,因此只有两个
为什么order.error.*不能匹配order.error.select.success
因为*表示一个单词,#表示多个单词,而单词界定以.为界,所以不能匹配
消费者
public class MessageConsumer_Topic1 {
public static void getMessage() throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.设置参数
// ip,默认为本地localhost
connectionFactory.setHost("127.0.0.1");
// 端口,默认值5672
connectionFactory.setPort(5672);
// 虚拟机,默认值/
connectionFactory.setVirtualHost("my_vhost");
// 账号,默认值guest
connectionFactory.setUsername("admin");
// 密码,默认值guest
connectionFactory.setPassword("admin");
// 3.创建连接 Connection
Connection connection = connectionFactory.newConnection();
// 4.创建Channel
Channel channel = connection.createChannel();
// 5.接收消息
/**
* handleDelivery()参数:
* 1.consumerTag:标识
* 2.envelope:获取一些信息,交换机,路由key
* 3.properties:配置信息
* 4.bady:数据
*/
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("客户端1消费:" + new String(body));
}
};
channel.basicConsume("test_topic_queue1",true,consumer);
// 6.释放资源?不需要 消费者相当于一个监听者 时刻监听队列
// 此处只是不让程序结束,等待回调函数执行
while (true){}
}
}
复制代码修改队列名,修改打印内容作为客户端2
运行两个消费者客户端,控制台分别打印
客户端1:
客户端1消费:Topic模式消息
客户端1消费:Topic模式消息
客户端1消费:Topic模式消息
客户端2:
客户端1消费:Topic模式消息
客户端1消费:Topic模式消息
总结:Topic模式和Routing模式很类似,不过Routing模式属于值绑定,topic属于通配符绑定,应用范围更广