前言
前面幾篇RabbitMQ的模式幾乎介紹清楚了RabbitMQ的基礎架構,以及消息确認機制,這裡打算将RabbitMQ內建到springboot中,便于日後工作查找
1、項目搭建
在idea中建構一個springboot項目,同時引入web和RabbitMQ子產品。

2、引入配置
application.properties檔案中加入如下配置,一部分是連接配接配置,另一部分是exchange的名稱和routing key的配置。
#對于rabbitMQ的支援
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#rabbitmq
spring.rabbitmq.learn.exchange.name=learn:info:mq:exchange
spring.rabbitmq.learn.queue.name=learn:info:mq:queue
spring.rabbitmq.learn.routing.key.name=learn:info:mq:routing:key
建立一個RabbitConfig,将這個類作為一個Configuration交給spring托管,如下所示:
/**
* autor:liman
* createtime:2019/10/27
* comment:
*/
@Configuration
public class RabbitConfig {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private Environment env;
@Bean
public ConnectionFactory connectionFactory() {
String host = env.getProperty("spring.rabbitmq.host");
int port = Integer.valueOf(env.getProperty("spring.rabbitmq.port"));
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
connectionFactory.setUsername(env.getProperty("spring.rabbitmq.username"));
connectionFactory.setPassword(env.getProperty("spring.rabbitmq.password"));
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
//必須是prototype類型
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}
/**
* 針對消費者配置
* 1. 設定交換機類型
* 2. 将隊列綁定到交換機
FanoutExchange: 将消息分發到所有的綁定隊列,無routingkey的概念
DirectExchange:按照routingkey分發到指定隊列
TopicExchange:多關鍵字比對
*/
@Bean
public DirectExchange defaultExchange() {
return new DirectExchange(env.getProperty("spring.rabbitmq.learn.exchange.name"));
}
/**
* 擷取隊列A
* @return
*/
@Bean
public Queue learnQueue() {
return new Queue(env.getProperty("spring.rabbitmq.learn.queue.name"), true); //隊列持久
}
@Bean
public Binding binding() {
return BindingBuilder.bind(learnQueue()).to(defaultExchange())
.with(env.getProperty("spring.rabbitmq.learn.routing.key.name"));
}
}
也就是在這個configuration中完成了RabbitTemplate的執行個體化。
3、發送消息
在一個測試controller中發送消息
/**
* autor:liman
* createtime:2019/10/27
* comment:
*/
@RestController
public class RabbitController {
private static final Logger log = LoggerFactory.getLogger(RabbitController.class);
private static final String prefix = "/rabbit";
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ObjectMapper objectMapper;
@Autowired
private Environment env;
@RequestMapping(value=prefix+"/learn",method = RequestMethod.GET)
public String sendMessage(String message){
log.info("待發送消息:{}",message);
try{
//這裡即開始發送消息,設定交換機和綁定鍵,完成消息的發送
rabbitTemplate.setExchange(env.getProperty("spring.rabbitmq.learn.exchange.name"));
rabbitTemplate.setRoutingKey(env.getProperty("spring.rabbitmq.learn.routing.key.name"));
Message msg = MessageBuilder.withBody(objectMapper.writeValueAsBytes(message))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
rabbitTemplate.convertAndSend(msg);
}catch (Exception e){
log.error("發送消息異常:{}",e.fillInStackTrace());
}
return "success";
}
}
4、兩種消費消息的方式
其實消息的消費,我個人了解為有兩種方式,一種是@RabbitListener注解,另一種是實作ChannelAwareMessageListener接口
[email protected]注解消費消息
該種消費消息的方式比較簡單,在任意方法中引入@RabbitListener注解即可,這裡作為消費端,隻需要指定敢興趣的queue即可
@Component
public class AnnotationListener {
private static final Logger log = LoggerFactory.getLogger(AnnotationListener.class);
@Autowired
private ObjectMapper objectMapper;
@RabbitListener(queues = "${spring.rabbitmq.learn.queue.name}")
public void consumerMessage(@Payload byte[] message){
String result = null;
try {
result = objectMapper.readValue(message, String.class);
log.info("注解的方式消費消息:{}",result);
} catch (IOException e) {
e.printStackTrace();
}
log.info("接收到的消息:{}",result);
}
}
4.2實作ChannelAwareMessageListener接口消費消息
該方法似乎比較複雜,分為兩步,新增一個實體類并實作ChannelAwareMessageListener接口
/**
* autor:liman
* createtime:2019/10/27
* comment:
*/
@Component
public class ImplementListener implements ChannelAwareMessageListener {
private static final Logger log = LoggerFactory.getLogger(ImplementListener.class);
@Autowired
private ObjectMapper objectMapper;
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long tag = message.getMessageProperties().getDeliveryTag();
try{
byte[] body = message.getBody();
String result = objectMapper.readValue(body, String.class);
log.info("非注解方式開始消費消息:{}",result);
}catch (Exception e){
log.error("消費消息異常,異常資訊為:{}",e.fillInStackTrace());
}
}
}
在RabbitConfig中綁定消費者,如下所示:
@Autowired
private ImplementListener implementListener;
@Bean
public SimpleMessageListenerContainer getListenerContainer(@Qualifier("learnQueue") Queue queue){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setQueues(queue);
container.setMessageListener(implementListener);
return container;
}
測試
兩個都能正常消費消息,但是這裡不貼出簡單的消費測試結果,而是在同時引入兩種消費消息的方式的時候的測試結果:
修改了一下消息發送的controller
package com.learn.springbootrabbitmqblog.controller;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
/**
* autor:liman
* createtime:2019/10/27
* comment:
*/
@RestController
public class RabbitController {
private static final Logger log = LoggerFactory.getLogger(RabbitController.class);
private static final String prefix = "/rabbit";
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ObjectMapper objectMapper;
@Autowired
private Environment env;
@RequestMapping(value = prefix + "/learn", method = RequestMethod.GET)
public String sendMessage(String message) {
log.info("待發送消息:{}", message);
try {
//多線程發送消息
for(int i =0 ;i<100;i++){
new Thread(new sendThread(rabbitTemplate,objectMapper,message+i)).start();
}
} catch (Exception e) {
log.error("發送消息異常:{}", e.fillInStackTrace());
}
return "success";
}
private class sendThread implements Runnable {
private RabbitTemplate rabbitTemplate;
private ObjectMapper objectMapper;
private String message;
public sendThread(RabbitTemplate rabbitTemplate, ObjectMapper objectMapper, String message) {
this.rabbitTemplate = rabbitTemplate;
this.objectMapper = objectMapper;
this.message = message;
}
@Override
public void run() {
try {
rabbitTemplate.setExchange(env.getProperty("spring.rabbitmq.learn.exchange.name"));
rabbitTemplate.setRoutingKey(env.getProperty("spring.rabbitmq.learn.routing.key.name"));
Message msg = null;
msg = MessageBuilder.withBody(objectMapper.writeValueAsBytes(message))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
rabbitTemplate.convertAndSend(msg);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
}
}
會通過負載均衡的方式發送消息,運作結果如下
總結
本篇部落格隻是記錄了springboot中對RabbitMQ簡單的內建,但是還沒有用到較好的業務場景,後續會補充總結springboot中的消息确認機制。