源码
- 引入开发包
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springboot-integrate</artifactId>
<groupId>springboot-integrate</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>springboot-rabbitmq</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<mainClass>com.integrate.rabbitmq.RabbitmqApplication</mainClass>
</configuration>
</plugin>
</plugins>
</build>
</project>
- 将 rabbitmq 配置信息写入 application.yml 文件,springBoot会创建一个 RabbitTemplate 实例
spring:
application:
name: springboot-rabbitmq
rabbitmq:
host: 52.83.239.30
port: 5672
username: root
password: yunduan2019
# 开启发送确认
publisher-confirms: true
# 开启发送失败退回
publisher-returns: true
# 开启ACK
listener:
direct:
acknowledge-mode: manual
simple:
acknowledge-mode: manual
template:
mandatory: true
server:
port: 10002
- 创建几个队列和交换机,并将队列绑定至交换机
package com.integrate.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @author 刘志强
* @date 2020/8/18 14:07
*/
@Configuration
public class RabbitmqConfig {
/**
* 创建一个队列
* @return
*/
@Bean(name="queue")
public Queue queue() {
return new Queue("queue");
}
@Bean(name="memberQueue")
public Queue memberQueue() {
return new Queue("memberQueue");
}
/**
* 创建路由交换机 根据路由匹配转发消息给队列
* @return
*/
@Bean(name = "exchange")
public TopicExchange exchange() {
return new TopicExchange("exchange");
}
/**
* 配置交换机(广播) 转发消息给旗下所有队列
* @return
*/
@Bean(name = "fanoutExchange")
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
/**
* 将队列进绑定至路由交换机并设置路由键
* 交换机会将消息传递给 满足路由键的队列
* @param queue
* @param exchange
* @return
*/
@Bean
Binding bindingExchangeQueue(@Qualifier("queue") Queue queue, @Qualifier("exchange") TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("exchange.queue");
}
@Bean
Binding bindingExchangeMemberQueue(@Qualifier("memberQueue") Queue memberQueue, @Qualifier("exchange") TopicExchange exchange) {
// *表示一个词,#表示零个或多个词
return BindingBuilder.bind(memberQueue).to(exchange).with("exchange.*");
}
/**
* 将队列绑定至广播交换机
* 因为不绑定路由键 所以交换机会把消息传递给被绑定的所由队列 广播交换机无法设置路由键。因为消息会发给旗下的所有队列
* @param queue
* @param fanoutExchange
* @return
*/
@Bean
Binding bindingFanoutExchangeQueueTwo(@Qualifier("queue") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
@Bean
Binding bindingFanoutExchangeMemberQueueTwo(@Qualifier("memberQueue") Queue memberQueue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
return BindingBuilder.bind(memberQueue).to(fanoutExchange);
}
}
- 创建订阅类,用于订阅队列
package com.integrate.rabbitmq.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* 消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
* channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
* ack返回false,并重新回到队列,api里面解释得很清楚
* channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
* 拒绝消息
* channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
* 丢弃这条消息
* channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
* @author 刘志强
* @date 2020/11/13 13:45
*/
@Component
@Slf4j
public class RabbitConsumer {
/**
* 订阅queue队列 消费消息
* @param object
*/
@RabbitListener(queues="queue")
public void consumerQueue(Channel channel, Object object, Message message) throws IOException {
log.info("consumerQueue 消费来自queue队列消息,消息内容为" + object);
// 告诉Rabbit已消费,从队列中删除
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
/**
* 订阅queue队列 不消费消息,并将消息返回队列
* @param object
*/
@RabbitListener(queues="queue")
public void noConsumerQueue(Channel channel, Object object, Message message) throws IOException {
log.info("noConsumerQueue 消费来自queue队列消息,消息内容为" + object);
// 不消费并返回队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
/**
* 订阅memberQueue队列
* @param object
*/
@RabbitListener(queues="memberQueue")
public void consumerMemberQueue(Channel channel, Object object, Message message) throws IOException {
log.info("consumerMemberQueue 消费memberQueue队列消息,消息内容为" + object);
// 告诉Rabbit已消费,从队列中删除
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
- 创建生产类 用于发送消息,并设置发送成功回调
package com.integrate.rabbitmq.porducer;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
/**
* @author 刘志强
* @date 2020/8/18 14:47
*/
@Component
@Slf4j
public class AckSender implements RabbitTemplate.ConfirmCallback , RabbitTemplate.ReturnCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 指定队列发送
* @param queue
* @param content
* @return
*/
public CorrelationData convertAndSend(String queue, Object content) {
//设置回调对象
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
//构建回调返回的数据
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(queue,content,correlationData);
return correlationData;
}
/**
* 指定交换机 和 路由建 发送
* @param exchange
* @param routingKey
* @param content
* @return
*/
public CorrelationData convertAndSend(String exchange, String routingKey, Object content) {
//设置回调对象
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
//构建回调返回的数据
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(exchange,routingKey, content,correlationData);
return correlationData;
}
/**
* 消息回调确认方法
* 如果消息没有到exchange,则confirm回调,ack=false
* 如果消息到达exchange,则confirm回调,ack=true
* @param
*/
@Override
public void confirm(CorrelationData correlationData, boolean isSendSuccess, String s) {
log.info("confirm--message:回调消息ID为: " + correlationData.getId());
if (isSendSuccess) {
log.info("消息进入交换机成功");
} else {
log.info("消息进入交换机失败====" + s);
}
}
/**
* exchange到queue成功,则不回调return
* exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了)
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
StringBuilder str = new StringBuilder();
str.append("return--message:").append(message.getBody())
.append(",replyCode:").append(replyCode).append(",replyText:").append(replyText).append(",exchange:").append(exchange)
.append(",routingKey:").append(routingKey);
log.info(String.valueOf(str));
}
}
- 测试
@Autowired
private AckSender ackSender;
ackSender.convertAndSend("queue", msg);
ackSender.convertAndSend("exchange", "exchange.queue", msg);
延迟消息(实现定时需求)
- 基于TTL(消息存活时间) 和 x-dead-letter-exchange(死信)
- 意思就是 消息在死亡后,会将此消息发送至死信交换机。死信交换机在发送至(x-dead-letter-routing-key)死信路由的下的死信队列(死信交换机和死信队列也是普通的交换机和队列)
- 开始操作
- 创建 延迟队列 并绑定死信交换机 及 死信路由
/**
* 创建一个队列作为延迟队列
* @return
*/
@Bean(name="delayQueue")
public Queue delayQueue() {
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 将队列绑定至交换机
args.put("x-dead-letter-exchange", "deathExchange");
// x-dead-letter-routing-key 当消息死亡时,转发给delayExchange交换机的路由
args.put("x-dead-letter-routing-key", "death-route");
return new Queue("delayQueue",true, false, false,args);
}
- 创建一个交换机作为 死信交换机,延迟队列里死亡的消息将像此交换机发送
/**
* 创建一个交换机 用于绑定死信队列
* @return
*/
@Bean(name = "deathExchange")
public TopicExchange deathExchange() {
return new TopicExchange("deathExchange");
}
- 创建一个队列 作为死信队列。延迟队列里死亡的消息将通过 交换机发送至此队列
/**
* 创建一个队列作为死信队列
* @return
*/
@Bean(name="deathQueue")
public Queue deathQueue() {
return new Queue("deathQueue");
}
- 将死信队列绑定至死信交换机
/**
* 将死信列绑定至交换机 并设置路由健
* @param queue
* @param exchange
* @return
*/
@Bean
Binding deathExchangeDeathQueue(@Qualifier("deathQueue") Queue queue, @Qualifier("deathExchange") TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("death-route");
}
- 创建 发送消息 配置失效时间
- 自定义消息处理器追加 消息信息 MessagePostProcessor
package com.integrate.rabbitmq.porducer;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
/**
* @author 刘志强
* @date 2020/11/24 16:40
*/
public class MyMessagePostProcessor implements MessagePostProcessor {
private final Long ttl;
public MyMessagePostProcessor(final Long ttl) {
this.ttl = ttl;
}
@Override
public Message postProcessMessage(final Message message) throws AmqpException {
// 设置消息失效时间
message.getMessageProperties().setExpiration(ttl.toString());
return message;
}
}
- 创建发送定时过期消息工具方法
public CorrelationData convertAndSendDelay(String queue, Object content, Long time) {
//设置回调对象
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
//构建回调返回的数据
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
MessagePostProcessor messagePostProcessor = new MyMessagePostProcessor(time);
rabbitTemplate.convertAndSend(queue, content,messagePostProcessor,correlationData);
return correlationData;
}
- 订阅死信队列
/**
* 订阅死信队列,当延迟队列的消息死亡时。消息会进入死信队列
* @param channel
* @param object
* @param message
* @throws IOException
*/
@RabbitListener(queues="deathQueue")
public void deathQueue(Channel channel, Object object, Message message) throws IOException {
log.info("deathQueue 消费deathQueue队列消息,消息内容为" + object);
// 告诉Rabbit已消费,从队列中删除
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
- 测试
/**
* 像队列发送消息并设置 TTL(过期事件)
* @param msg
* @return
*/
@GetMapping("convertAndSendDelay")
public String convertAndSendDelay(String msg,Long ttl) {
ackSender.convertAndSendDelay("delayQueue",msg,ttl);
return "已发送";
}