RabbitMQ工作队列之Routing模式(四)
上一篇讲了关于fanout发布订阅模式,允许系统推送消息至所有的订阅者(消费者),扩展一下问题,如果将生产者生产的消息进行分类,那么所有的消费者都会收到全部消息,这不是我们乐意见到的。如果我们希望可以对消费者进行分类,每个消费者能接收特定的消息,那该如何改造呢?
答案是肯定哒,标题已经说明了,本篇博文主要讲解的是RabbitMQ工作队列中的Routing路由密钥模式。
RabbitMQ工作队列中的Routing路由密钥模式三种使用方式:
- 基于原生client客户端连接使用
- 基于spring xml配置文件集成使用
- 基于spring boot集成使用
老规矩,来一张官网模型截图:

目录
用
[TOC]
来生成目录:
文章目录
- RabbitMQ工作队列之Routing模式(四)
-
-
- 目录
- @[toc]
- 1、基于原生client客户端连接使用
-
-
- 1.1、maven构建,pom文件加入依赖
- 1.2、创建连接工厂
- 1.3、创建消息生产者
- 1.4、定义info类型消费者,此消费者可以接收info、warning、error类型
- 1.5、定义warning类型消费者,只能接收warning类型消息
- 1.6、定义Error类型消费者,只能接收error类型消息
- 1.7、运行效果
- 1.8、RabbitMQ管控台
- 2、基于spring xml配置文件集成使用
-
-
- 2.1、maven工厂引入依赖包:
- 2.2、创建spring加载配置文件context.xml
- 2.3、创建rabbitMQ配置文件rabbitmq-routing.xml
- 2.4、创建生产者SendRouting
- 2.5、info消费者
- 2.6、error消费者
- 2.7、warning消费者
- 2.8、运行效果图
- 3、基于spring boot集成使用
-
-
- 3.1、maven加入依赖包
- 3.2、创建application.yml配置文件
- 3.3、创建获取静态参数类
- 3.4、创建rabbitMQ注解配置
- 3.5、创建controller层模拟生产者
- 3.6、info消费者
- 3.7、error消费者
- 3.8、warning消费者
- 3.9、运行效果
- 4、小结
-
-
- 小结
-
- 目录
- @[toc]
- 1、基于原生client客户端连接使用
-
-
- 1.1、maven构建,pom文件加入依赖
- 1.2、创建连接工厂
- 1.3、创建消息生产者
- 1.4、定义info类型消费者,此消费者可以接收info、warning、error类型
- 1.5、定义warning类型消费者,只能接收warning类型消息
- 1.6、定义Error类型消费者,只能接收error类型消息
- 1.7、运行效果
- 1.8、RabbitMQ管控台
-
- 2、基于spring xml配置文件集成使用
-
-
- 2.1、maven工厂引入依赖包:
- 2.2、创建spring加载配置文件context.xml
- 2.3、创建rabbitMQ配置文件rabbitmq-routing.xml
- 2.4、创建生产者SendRouting
- 2.5、info消费者
- 2.6、error消费者
- 2.7、warning消费者
- 2.8、运行效果图
-
- 3、基于spring boot集成使用
-
-
- 3.1、maven加入依赖包
- 3.2、创建application.yml配置文件
- 3.3、创建获取静态参数类
- 3.4、创建rabbitMQ注解配置
- 3.5、创建controller层模拟生产者
- 3.6、info消费者
- 3.7、error消费者
- 3.8、warning消费者
- 3.9、运行效果
-
- 4、小结
-
-
- 小结
-
1、基于原生client客户端连接使用
1.1、maven构建,pom文件加入依赖
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<!--引入rbiitmq的连接工具包-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.3.0</version>
</dependency>
1.2、创建连接工厂
package com.edu.rabbitmq;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author : alex
* @version :1.0.0
* @Date : create by 2018/7/19 22:01
* @description :获取连接
* @note 注意事项
*/
public class ConnectionUtils {
//获取接连
public static Connection getConnection() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
//设置连接MQ的IP地址
factory.setHost("192.168.199.128");
//设置连接端口号
factory.setPort(5672);
//设置要接连MQ的库(域)
factory.setVirtualHost("/test_vh");
//连接帐号
factory.setUsername("root");
//连接密码
factory.setPassword("123456");
return factory.newConnection();
}
}
1.3、创建消息生产者
package com.edu.routing;
import com.edu.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author : alex
* @version :1.0.0
* @Date : create by 2018/7/19 22:16
* @description :消息生产者
* @note 注意事项
*/
public class Sender {
//定义交换器名称
public static String EXCHANGE_NAME = "test_origin_routing_exchange";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//获取连接
Connection connection = ConnectionUtils.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
/**
*
* 声明一个交换器
*
* 文档地址:https://rabbitmq.github.io/rabbitmq-java-client/api/current/com/rabbitmq/client/Channel.html#exchangeDeclare-java.lang.String-com.rabbitmq.client.BuiltinExchangeType-boolean-boolean-java.util.Map-
* 该方法定义了许多的重载,拿出完整参数的方法来讲
* AMQP.Exchange.DeclareOk exchangeDeclare(String exchange,
* BuiltinExchangeType type,
* boolean durable,
* boolean autoDelete,
* Map<String,Object> arguments)
* throws IOException
*
* @param exchange 交换器的名字
* @param type 交换器的类型:direct, topic, headers, fanout
* @param durable 是否持久化,true持久化,false不持久化
* @param autoDelete 服务器不再使用该队列时,是否自动删除,true删除,false不删除
* @param arguments 其他参数,其实是定义交换器的构造方法
*/
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
Integer log_count = 3;
for (int i = 0; i < 50; i++) {
Thread.sleep(500);//模拟耗时操作,别发那么快
//自定义消息
String msg = "hello word " + i;
Integer qumo =i % log_count;//取余,模拟分发不同类型的routingKey
switch (qumo){
case 0:
//发布消息,通过交换器名称进行发布消息,指定routingKey路由密钥
channel.basicPublish(EXCHANGE_NAME,"info",null,msg.getBytes());
System.out.println("-->info send " + msg);
break;
case 1:
//发布消息,通过交换器名称进行发布消息,指定routingKey路由密钥
channel.basicPublish(EXCHANGE_NAME,"error",null,msg.getBytes());
System.out.println("-->error send " + msg);
break;
case 2:
//发布消息,通过交换器名称进行发布消息,指定routingKey路由密钥
channel.basicPublish(EXCHANGE_NAME,"warning",null,msg.getBytes());
System.out.println("-->warning send " + msg);
break;
}
}
channel.close();//关闭通道
connection.close();//关闭连接
}
}
1.4、定义info类型消费者,此消费者可以接收info、warning、error类型
package com.edu.routing;
import com.edu.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author : alex
* @version :1.0.0
* @Date : create by 2018/7/19 22:16
* @description :消息消费者
* @note 注意事项
*/
public class CustomeriInfo {
//队列名称
public static String QUEUE_NAME = "test_origin_routing_queue_info";
//定义交换器名称
public static String EXCHANGE_NAME = "test_origin_routing_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接
Connection connection = ConnectionUtils.getConnection();
//获取通道
Channel channel = connection.createChannel();
//获取交换器
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//创建消息声明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/**
* 交换器绑定队列
* 该方法定义了2个重载
* AMQP.Queue.BindOk queueBind(String queue,
* String exchange,
* String routingKey,
* Map<String,Object> arguments)
* throws IOException
*
* @param queue 队列名称
* @param exchange 交换器名称
* @param routingKey 用于绑定的路由密钥
* @param arguments 其他参数,其实是定义交换器的构造方法
*/
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");//绑定routingKey
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");//绑定routingKey
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");//绑定routingKey
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
//获取并转成String
String message = new String(body, "UTF-8");
System.out.println("-->info消费者收到消息,msg:"+message);
}
};
//监听队列
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
1.5、定义warning类型消费者,只能接收warning类型消息
package com.edu.routing;
import com.edu.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author : alex
* @version :1.0.0
* @Date : create by 2018/7/19 22:16
* @description :消息消费者
* @note 注意事项
*/
public class CustomerWarning {
//队列名称
public static String QUEUE_NAME = "test_origin_routing_queue_warning";
//定义交换器名称
public static String EXCHANGE_NAME = "test_origin_routing_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接
Connection connection = ConnectionUtils.getConnection();
//获取通道
Channel channel = connection.createChannel();
//获取交换器
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//创建消息声明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/**
* 交换器绑定队列
* 该方法定义了2个重载
* AMQP.Queue.BindOk queueBind(String queue,
* String exchange,
* String routingKey,
* Map<String,Object> arguments)
* throws IOException
*
* @param queue 队列名称
* @param exchange 交换器名称
* @param routingKey 用于绑定的路由密钥
* @param arguments 其他参数,其实是定义交换器的构造方法
*/
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
//获取并转成String
String message = new String(body, "UTF-8");
System.out.println("-->warning消费者收到消息,msg:"+message);
}
};
//监听队列
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
1.6、定义Error类型消费者,只能接收error类型消息
package com.edu.routing;
import com.edu.rabbitmq.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author : alex
* @version :1.0.0
* @Date : create by 2018/7/19 22:16
* @description :消息消费者
* @note 注意事项
*/
public class CustomerError {
//队列名称
public static String QUEUE_NAME = "test_origin_routing_queue_error";
//定义交换器名称
public static String EXCHANGE_NAME = "test_origin_routing_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接
Connection connection = ConnectionUtils.getConnection();
//获取通道
Channel channel = connection.createChannel();
//获取交换器
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//创建消息声明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
/**
* 交换器绑定队列
* 该方法定义了2个重载
* AMQP.Queue.BindOk queueBind(String queue,
* String exchange,
* String routingKey,
* Map<String,Object> arguments)
* throws IOException
*
* @param queue 队列名称
* @param exchange 交换器名称
* @param routingKey 用于绑定的路由密钥
* @param arguments 其他参数,其实是定义交换器的构造方法
*/
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
//定义消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
//获取并转成String
String message = new String(body, "UTF-8");
System.out.println("-->error消费者收到消息,msg:"+message);
}
};
//监听队列
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
1.7、运行效果
生产者生产消息
info
error
warning
1.8、RabbitMQ管控台
查看exchange绑定的queue
2、基于spring xml配置文件集成使用
2.1、maven工厂引入依赖包:
<!--引入rbiitmq的连接工具包-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.0</version>
</dependency>
<!-- 引入spring集成rabbit的包 -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.7.3.RELEASE</version>
</dependency>
<!-- spring核心库 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>5.0.7.RELEASE</version>
</dependency>
<!-- springbean库 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>5.0.7.RELEASE</version>
</dependency>
<!-- 上下文 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.0.7.RELEASE</version>
</dependency>
2.2、创建spring加载配置文件context.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd">
<!--加载routing路由密钥模式-->
<import resource="classpath:rabbitmq-routing.xml" />
</beans>
2.3、创建rabbitMQ配置文件rabbitmq-routing.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd">
<!--配置connection-factory,指定连接rabbit server参数 -->
<rabbit:connection-factory id="connectionFactory" virtual-host="/test_vh" username="root"
password="123456" host="192.168.199.128" port="5672" />
<!--MQ的管理,包括队列,交换器,声明等-->
<rabbit:admin connection-factory="connectionFactory" />
<!--定义rabbit模版,指定连接工厂以及定义exchange-->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" exchange="directExchangeName" />
<!--定义队列,自动声明(可以用于发消息和监听使用)-->
<!--定义queue 说明:durable:是否持久化 exclusive: 仅创建者可以使用的私有队列,断开后自动删除 auto_delete: 当所有消费客户端连接断开后,是否自动删除队列-->
<rabbit:queue id="test_routing_spring_xml1" name="test_routing_spring_xml1" auto-declare="true" auto-delete="true" />
<!--定义队列-->
<rabbit:queue id="test_routing_spring_xml2" name="test_routing_spring_xml2" auto-declare="true" auto-delete="true" />
<!--定义队列-->
<rabbit:queue id="test_routing_spring_xml3" name="test_routing_spring_xml3" auto-declare="true" auto-delete="true" />
<!--定义direct交换器,并且队列绑定交换器,记住生产者发送消息的时候,使用的是这个交换器的name-->
<rabbit:direct-exchange id="directExchangeName" name="test_routing_spring_exchange">
<rabbit:bindings>
<rabbit:binding queue="test_routing_spring_xml1" key="info" />
<rabbit:binding queue="test_routing_spring_xml1" key="error" />
<rabbit:binding queue="test_routing_spring_xml1" key="warning" />
<rabbit:binding queue="test_routing_spring_xml2" key="warning" />
<rabbit:binding queue="test_routing_spring_xml3" key="error" />
</rabbit:bindings>
</rabbit:direct-exchange>
<!--定义消费者-->
<bean id="myCustomer" class="routing.CustomerInfo"/>
<bean id="myCustomer2" class="routing.CustomerWarning"/>
<bean id="myCustomer3" class="routing.CustomerError"/>
<!--队列监听 acknowledge应答方式:auto,manual,none -->
<rabbit:listener-container connection-factory="connectionFactory" >
<rabbit:listener ref="myCustomer" method="listen" queue-names="test_routing_spring_xml1" />
<rabbit:listener ref="myCustomer2" method="listen" queue-names="test_routing_spring_xml2" />
<rabbit:listener ref="myCustomer3" method="listen" queue-names="test_routing_spring_xml3" />
</rabbit:listener-container>
</beans>
2.4、创建生产者SendRouting
package com.rabbit;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
* @author: Alex
* @DateTime: 2018/8/22 19:40
* @Description: 生产者
* @Version: 1.0.0
**/
public class SendRouting {
//交换器名称,这里的exchange名称要和rabbitmq-routing.xml里面配置对应
private static String EXCHANGE_NAME = "test_routing_spring_exchange";
public static void main(String[] args) throws InterruptedException {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:context.xml");
//获取rabbit模版(等价于@Autowired)
RabbitTemplate bean = context.getBean(RabbitTemplate.class);
Integer log_count = 3;
for (int i = 0; i < 30; i++) {
Thread.sleep(500);//模拟耗时操作,别发那么快
//自定义消息
String msg = "hello word " + i;
Integer qumo =i % log_count;//取余,模拟分发不同类型的routingKey
switch (qumo){
case 0:
//发布消息,通过交换器名称进行发布消息,指定routingKey路由密钥
bean.convertAndSend(EXCHANGE_NAME,"info","hello word"+i);
System.out.println("-->info send " + msg);
break;
case 1:
//发布消息,通过交换器名称进行发布消息,指定routingKey路由密钥
bean.convertAndSend(EXCHANGE_NAME,"error","hello word"+i);
System.out.println("-->error send " + msg);
break;
case 2:
//发布消息,通过交换器名称进行发布消息,指定routingKey路由密钥
bean.convertAndSend(EXCHANGE_NAME,"warning","hello word"+i);
System.out.println("-->warning send " + msg);
break;
}
}
Thread.sleep(10000);//休眠2秒后,关闭spring容器
context.close();
}
}
2.5、info消费者
package routing;
/**
* @author : alex
* @version :1.0.0
* @Date : create by 2018/7/19 23:39
* @description :我的消费者info
* @note 注意事项
*/
public class CustomerInfo {
public void listen(String foo){
System.out.println("消费者消费info,获取消息msg:"+foo);
}
}
2.6、error消费者
package routing;
/**
* @author : alex
* @version :1.0.0
* @Date : create by 2018/7/19 23:39
* @description :我的消费者error
* @note 注意事项
*/
public class CustomerError {
public void listen(String foo){
System.out.println("消费者消费error,获取消息msg:"+foo);
}
}
2.7、warning消费者
package routing;
/**
* @author : alex
* @version :1.0.0
* @Date : create by 2018/7/19 23:39
* @description :我的消费者warning
* @note 注意事项
*/
public class CustomerWarning {
public void listen(String foo){
System.out.println("消费者消费warning,获取消息msg:"+foo);
}
}
2.8、运行效果图
由此图我们可以看出,我们的
info消费者绑定了info,error,warning三个类型的消息
error消费者绑定error消息
warning消费者绑定warning消息
每次info都会打印消息,
error和warning只有接收属于自身的消息才会打印消息
3、基于spring boot集成使用
3.1、maven加入依赖包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- rabbitmq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3.2、创建application.yml配置文件
spring:
rabbitmq:
username: root
password: 123456
host: 192.168.199.128
port: 5672
virtual-host: /test_vh
rabbitMQconfig:
queueName:
info: test_spring_boot_routing_info
error: test_spring_boot_routing_error
warning: test_spring_boot_routing_warning
exchangeName:
directName: test_spring_boot_exchange_routing
3.3、创建获取静态参数类
package cn.rabbitmq.edu.rabbitmq_spring_boot.utils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* @author: Alex
* @DateTime: 2018/8/23 11:19
* @Description: 静态参数类
* @Version: 1.0.0
**/
@Component
public class ParamUtil {
@Value("${rabbitMQconfig.queueName.info}")
public String queueNameInfo;
@Value("${rabbitMQconfig.queueName.error}")
public String queueNameError;
@Value("${rabbitMQconfig.queueName.warning}")
public String queueNameWarning;
@Value("${rabbitMQconfig.exchangeName.directName}")
public String directName;
}
3.4、创建rabbitMQ注解配置
package cn.rabbitmq.edu.rabbitmq_spring_boot.utils;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author: Alex
* @DateTime: 2018/8/23 10:52
* @Description: rabbitMQ配置声明
* @Version: 1.0.0
**/
@Configuration
public class RabbitMQDeclareUtil {
@Autowired
private ParamUtil paramUtil;
@Bean
Queue getQueue1(){
//定义第一个队列队列,Ctrl+鼠标左键,点击Queue可以看到定义
return new Queue(paramUtil.queueNameInfo);
}
@Bean
Queue getQueue2(){
//定义第二个队列
return new Queue(paramUtil.queueNameError);
}
@Bean
Queue getQueue3(){
//定义第三个队列
return new Queue(paramUtil.queueNameWarning);
}
@Bean
DirectExchange getExchange() {
//定义一个DirectExchange交换器
return new DirectExchange(paramUtil.directName);
}
/**
* info队列与交换器绑定,指定routingKey为info
* @param getQueue1 定义的第一个队列
* @param getExchange 定义的交换器
* @return
*/
@Bean
Binding bindingInfo(Queue getQueue1, DirectExchange getExchange) {
return BindingBuilder.bind(getQueue1).to(getExchange).with("info");
}
/**
* info队列与交换器绑定,指定routingKey为error
* @param getQueue1 定义的第一个队列
* @param getExchange 定义的交换器
* @return
*/
@Bean
Binding bindingInfo1(Queue getQueue1, DirectExchange getExchange) {
return BindingBuilder.bind(getQueue1).to(getExchange).with("error");
}
/**
* info队列与交换器绑定,指定routingKey为warning
* @param getQueue1 定义的第一个队列
* @param getExchange 定义的交换器
* @return
*/
@Bean
Binding bindingInfo2(Queue getQueue1, DirectExchange getExchange) {
return BindingBuilder.bind(getQueue1).to(getExchange).with("warning");
}
/**
* error队列与交换器绑定,指定routingKey为error
* @param getQueue2
* @param getExchange
* @return
*/
@Bean
Binding bindingError(Queue getQueue2, DirectExchange getExchange) {
return BindingBuilder.bind(getQueue2).to(getExchange).with("error");
}
/**
* warning队列与交换器绑定,指定routingKey为warning
* @param getQueue3
* @param getExchange
* @return
*/
@Bean
Binding bindingWarning(Queue getQueue3, DirectExchange getExchange) {
return BindingBuilder.bind(getQueue3).to(getExchange).with("warning");
}
}
3.5、创建controller层模拟生产者
package cn.rabbitmq.edu.rabbitmq_spring_boot.controller;
import cn.rabbitmq.edu.rabbitmq_spring_boot.utils.ParamUtil;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author: Alex
* @DateTime: 2018/8/17 16:36
* @Description: web访问模拟发送
* @Version: 1.0.0
**/
@RestController
public class IndexController {
@Autowired
private AmqpTemplate amqpTemplate;
@Autowired
private ParamUtil paramUtil;
/**
* 使用AmqpTemplate
* @return
* @throws Exception
*/
@PostMapping("/amqpSend")
public String amqpSend() throws Exception{
Integer log_count = 3;
for (int i = 0; i < 30; i++) {
Thread.sleep(500);//模拟耗时操作,别发那么快
String msg = "hello amqp " + i;//自定义消息
Integer qumo =i % log_count;//取余,模拟分发不同类型的routingKey
switch (qumo){
case 0:
//发布消息,通过交换器名称进行发布消息,指定routingKey路由密钥
amqpTemplate.convertAndSend(paramUtil.directName,"info",msg);//根据指定的exchange发送数据
System.out.println("-->info send " + msg);
break;
case 1:
//发布消息,通过交换器名称进行发布消息,指定routingKey路由密钥
amqpTemplate.convertAndSend(paramUtil.directName,"error",msg);//根据指定的exchange发送数据
System.out.println("-->error send " + msg);
break;
case 2:
//发布消息,通过交换器名称进行发布消息,指定routingKey路由密钥
amqpTemplate.convertAndSend(paramUtil.directName,"warning",msg);//根据指定的exchange发送数据
System.out.println("-->warning send " + msg);
break;
}
}
return "success";
}
}
3.6、info消费者
package cn.rabbitmq.edu.rabbitmq_spring_boot.customer;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author: Alex
* @DateTime: 2018/8/17 16:35
* @Description: 模拟消费者1
* @Version: 1.0.0
**/
@Component
//监听的队列
@RabbitListener(queues = "test_spring_boot_routing_info")
public class CustomerMsg {
/**
* 进行接收处理
* @param string
*/
@RabbitHandler
public void onMessage(String string,Channel channel, Message message) throws IOException, InterruptedException {
System.out.println("消费者info,接收时间:"+System.currentTimeMillis()+",收到消息,消息: " + string);
// channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//手动确认
//丢弃这条消息
//channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
}
}
3.7、error消费者
package cn.rabbitmq.edu.rabbitmq_spring_boot.customer;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author: Alex
* @DateTime: 2018/8/17 16:35
* @Description: 模拟消费者2
* @Version: 1.0.0
**/
@Component
//监听的队列
@RabbitListener(queues = "test_spring_boot_routing_error")
public class CustomerMsg2 {
/**
* 进行接收处理
* @param string
*/
@RabbitHandler
public void onMessage(String string,Channel channel, Message message) throws IOException, InterruptedException {
System.out.println("消费者error,接收时间:"+System.currentTimeMillis()+",收到消息,消息: " + string);
// channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//手动确认
//丢弃这条消息
//channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
}
}
3.8、warning消费者
package cn.rabbitmq.edu.rabbitmq_spring_boot.customer;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author: Alex
* @DateTime: 2018/8/17 16:35
* @Description: 模拟消费者2
* @Version: 1.0.0
**/
@Component
//监听的队列
@RabbitListener(queues = "test_spring_boot_routing_warning")
public class CustomerMsg3 {
/**
* 进行接收处理
* @param string
*/
@RabbitHandler
public void onMessage(String string,Channel channel, Message message) throws IOException, InterruptedException {
System.out.println("消费者warning,接收时间:"+System.currentTimeMillis()+",收到消息,消息: " + string);
// channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//手动确认
//丢弃这条消息
//channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
}
}
3.9、运行效果
4、小结
小结
通过本章的学习,学到了routing模式,也就是exchange为direct类型的定义,routing模式为路由密钥模式,通过给定的routingKey进行与队列的匹配,匹配绑定后,发送消息就到达指定的队列啦。
回头再看上一章,上一章开始接触exchange,目前学习了2类exchange的类型,fanout和direct模式。
fanout为扇形广播消息
direct为匹配路由密钥广播消息