1.Message Queue(MQ)
消息队列Message Queue
是一种跨进程的通信机制,用于在系统之间进行传递消息。
MQ作为消息中间件,可以进行异步处理请求,从而减少请求响应时间和解耦。
2.消息队列的使用场景
-
应用解耦:
多应用间通过消息队列对同一消息进行处理,避免调用接口失败导致整个过程失败;
系统间消息传递通过MQ进行解耦,“消息队列”是在消息的传输过程中保存消息的容器
RabbitMq简介及使用 -
异步处理:
应用间并发处理消息,相比串行处理,减少处理时间;
比如用户为了使用某个应用,进行注册,系统需要发送注册邮件,验证短信
RabbitMq简介及使用 -
限流削峰:
广泛应用于秒杀或抢购活动中,避免流量过大导致处理请求阻塞的情况
RabbitMq简介及使用
3.RabbitMQ简介
RabbitMQ是支持多种消息协议,易于部署和使用的开源消息代理服务器,用于在分布式系统中存储转发消息
由以高性能、健壮以及可伸缩性出名的Erlang语言编写;提供了成熟的高并发,高可用的解决方案
- 可以根据实际业务情况动态地扩展集群节点
- 在集群中的机器上设置镜像,使得在部分节点出现问题的情况下仍然可用
- 支持多种客户端语言,如:Python、Ruby、.NET、Java等
- RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息、集群中的节点等。
4.Window环境下安装与使用RabbitMQ
-
安装Erlang程序运行环境
下载地址http://www.erlang.org/downloads
-
安装RabbitMQ服务器
下载地址http://www.rabbitmq.com/
启动RabbitMQ服务
-
激活RabbitMQ管理控制台
cd sbin
rabbitmq-plugins.bat enable rabbitmq_management
-
通过浏览器进行访问
http://localhost:15672/
用户名:guest 密码:guest
5.Linux环境下安装
-
下载Erlang运行环境RPM包
https://www.erlang-solutions.com/resources/download.html
-
下载RabbitMQ服务器安装包
http://www.rabbitmq.com/install-rpm.html#downloads
-
安装rpm包
rpm -ivh --nodeps esl-erlang_21.2.6-1_centos_7_amd64.rpm
rpm -ivh --nodeps rabbitmq-server-3.7.13-1.el7.noarch.rpm
-
启用服务
rabbitmq-server
-
启用控制台
rabbitmq-plugins enable rabbitmq_management
6.RabbitMQ相关概念
-
Producer
生产者,即消息的提供者
-
Consumer
消费者,即消息的使用者
-
Message
消息,即进程之间进行通信的数据
-
Queue
队列,即消息存放的容器,消息以先进先出的方式进行存储
-
Vhost
虚拟主机,用于存储消息队列
7. Java 客户端访问RabbitMQ
- maven工程的pom文件中添加依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
- 创建消息生产者
- 创建消息消费者
8.RabbitMQ消息的状态
-
Ready
消息已经被送到队列,等待被消费
-
Unacked
消息已经被消费者认领,但是还没有被确认“已经被消费”
-
Unacked状态下消费者断开连接则消息回到“Ready”
没有确认,但是客户端也没有断开连接则一直处于“Unacked”
-
Finished
调用basicAsk()方法后,表示消息已经被消费,从队列中移除
9. RabbitMQ的工作模式
-
Simple简单模式
一个生产者,一个消费者
RabbitMq简介及使用 - Work工作模式
一个生产者,多个消费者,每个消费者获取到的消息唯一
在多个消息的情况下,WorkQueue会将消息分派给不同的消费者,每个消费者会接收到不同
的消息,并且可以根据消息的处理速度来接受不同数量的消息,进而让消费者程序发挥最大
的性能
适合在集群的环境中做异步处理,最大发挥每一台服务器的性能
- publish/subscribe发布订阅模式:
一个生产者发送的消息会被多个消费者获取
发布订阅模式中生产者不在直接与队列进行绑定,而是将数据发送给交换机(Exchan{ge)
交换机Exchage负责将数据按照某种规则送入与之绑定的队列,进而供消费者使用
该模式下交换机的类型为扇形交换中心(Fanout exchange)
- Routing 路由模式
发布订阅模式是交换机无条件将所有消息分发给绑定的队列;而路由模式则是根据routing
key有条件的将数据筛选后分发给绑定的队列。
发送消息到交换机时要指定路由key ,消费者将队列绑定到交换机时也需要指定路由key
路由模式下交换机的类型是直接交换中心 (Direct exchange)
- Topic主题模式
主题模式是在Routing模式匹配基础上,提供了对RouteKey模糊匹配的功能,可以简化编
程,主题模式下交换机的类型为主题交换中心(Topic exchange)
主题模式下,模糊匹配表达式规则为
*匹配单个关键字
#匹配所有关键字
10.RabbitMQ消息确认机制
- RabbitMQ提供了监听器来接收消息投递的状态
- 消息确认涉及两种状态:
Confirm代表生产者将消息送达了Broker时产生的状态,后续会出现两种情况:
ack代表Broker已经将数据接收
nack代表Broker未接收到消息
Return代表消息被正常接收,但Broker没有对应的队列进行投递,消息被退回给生产者的状
态
上述状态表示生产者与Broker之间的消息投递情况,与消费者是否接收/确认消息无关
11.SpringBoot整合RabbitMQ
11.1生产者端
- 导入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 配置文件
spring:
rabbitmq:
username: root
password: root
host: 192.168.8.105
port: 5672
virtual-host: /myvhost
publisher-returns: true #开启消息返回机制
publisher-confirm-type: correlated #消息确认
template:
mandatory: true #将消息退回给生产者
- 生产者端,配置MQ
@Configuration
public class RabbitMQConfig {
//开启确认、返回
RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("消息的ID:"+correlationData.getId());
if(!b){
System.out.println("消息拒收的原因:"+s);
}
}
};
RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
System.out.println("==================");
System.out.println("返回的错误编码:"+i); System.out.println("返回的错误描述:"+s);
System.out.println("交换机:"+s1);
System.out.println("路由key:"+s2);
System.out.println("消息主体:"+new String(message.getBody()));
System.out.println("==================");
}
};
//配置交换机
@Bean
public TopicExchange orderExchange(){
return new TopicExchange("order_exchange");
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory factory){
RabbitTemplate template = new RabbitTemplate(factory);
template.setMessageConverter(messageConverter());
template.setMandatory(true);//强制退回给生成者
template.setConfirmCallback(confirmCallback);
template.setReturnCallback(returnCallback);
return template;
}
}
- 注入RabbitTemplate
@Component
public class OrderProducer {
@Autowired
private RabbitTemplate template;
public void sendOrder(String exchange, String routingKey, OrderDTO orderDTO){
CorrelationData data = new CorrelationData(orderDTO.getOrderSn());
//参数data:是发送消息的附加数据,用来自定以消息的ID
template.convertAndSend(exchange,routingKey,orderDTO,data); }
11.2消费者端
- 添加依赖
- 配置文件
spring:
rabbitmq:
username: root
password: root
virtual-host: /myvhost
port: 5672
host: 192.168.8.105 l
istener:
simple:
acknowledge-mode: manual #手动签收
- 配置MQ
@Configuration
public class RabbitMQConfig implements RabbitListenerConfigurer {
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
}
@Bean
public MessageHandlerMethodFactory messageHandlerMethodFactory(){
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setMessageConverter(mappingJackson2MessageConverter());
return factory;
}
//消息反序列化转换器
@Bean
public MappingJackson2MessageConverter mappingJackson2MessageConverter(){
return new MappingJackson2MessageConverter();
}
}
- 绑定队列到交换机,监听队列处理消息
@Component
public class OrderConsumer {
@Bean
public Queue orderQueue(){
return new Queue("order_queue",true,false,false);
}
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("order_exchange");
}//绑定对列到交换机
@Bean
public Binding binding(){
return BindingBuilder.bind(orderQueue()).to(topicExchange()).with("order.#");
}
@RabbitListener(queues = "order_queue")
@RabbitHandler
public void processOrder(@Payload OrderDTO orderDTO, Channel channel,@Headers Map header){
System.out.println("订单消费者处理订单消息:"+orderDTO.getOrderSn()+"-- ->"+orderDTO.getCreateTime());
Long tag = (Long)header.get(AmqpHeaders.DELIVERY_TAG);
try {
channel.basicAck(tag,false);
} catch (IOException e) {
e.printStackTrace();
}
}
}