hello你好,我是辰兮,很高興你能來閱讀,本篇繼續分享消息隊列的實踐案例,分享給初學者,大家一起進步!
文章目錄
-
-
- 一、文章序言
- 二、代碼詳解
-
一、文章序言
整理幾篇消息隊列文章的初衷就是初學時網上很多案例很零碎,代碼也是,為了更好的便于初學者學習就将一個完整的案例整理下來,代碼完全真實可用,請自行實踐!
消息隊列
關于初次學習消息隊列我們一定要了解它的應用場景,為什麼使用?以及如何去使用?整理本篇文章為了分享給初學者給他提供一個詳細案例,如下代碼可以直接實踐!
消息隊列最核心的三個點:解耦、異步、削峰。
參考文章:消息隊列作用(解耦、異步、削峰)圖詳解
消息隊列也設計到生産者,消費者原理可以簡單的了解一下
參考文章:生産者消費者問題-代碼詳解(Java多線程)
二、代碼詳解
生産者的相關代碼
/**
* 編寫消息的生産者
*/
@Component
public class MsgProducer implements RabbitTemplate.ConfirmCallback {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
//由于rabbitTemplate的scope屬性設定為ConfigurableBeanFactory.SCOPE_PROTOTYPE,是以不能自動注入
private RabbitTemplate rabbitTemplate;
/**
* 構造方法注入rabbitTemplate
*/
@Autowired
public MsgProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果為單例的話,那回調就是最後設定的内容
}
/**
* 發送消息方法一個交換機配一個路由配一個隊列
* @param content
*/
public void sendMsg(String content) {
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
//把消息放入ROUTINGKEY_A對應的隊列當中去,對應的是隊列A
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_A, RabbitConfig.ROUTINGKEY_A, content, correlationId);
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_B, RabbitConfig.ROUTINGKEY_B, content, correlationId);
}
/**
* 廣播模式
* @param content
*/
public void sendAll(String content) {
rabbitTemplate.convertAndSend("fanoutExchange","", content);
}
/**
* 回調
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
//logger.info(" 回調id:" + correlationData);
if (ack) {
logger.info("消息成功消費");
} else {
logger.info("消息消費失敗:" + cause);
}
}
}
關于Controller層的相關代碼
@RestController
public class SendController {
@Autowired
private MsgProducer msgProducer;
@RequestMapping(value = "/send",method = RequestMethod.GET)
public void send(int length){
for (int i=1;i<=length;i++){
msgProducer.sendMsg("這是我發送的第"+i+"個資訊");
}
}
@RequestMapping(value = "/sendAll",method = RequestMethod.GET)
public void sendAll(int length){
for (int i=1;i<=length;i++){
msgProducer.sendAll("這是我發送的第"+i+"個資訊");
}
}
}
消費者的相關代碼
@Component
@RabbitListener(queues = RabbitConfig.QUEUE_A)
public class MsgReceiver_one {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@RabbitHandler
public void process(String content) {
logger.info("消費者one接收處理隊列A當中的消息: " + content);
}
}
這兩個分别監聽隊列A和隊列B
@Component
@RabbitListener(queues = RabbitConfig.QUEUE_B)
public class MsgReceiver_two {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@RabbitHandler
public void process(String content) {
logger.info("消費者two接收處理隊列A當中的消息: " + content);
}
}
測試:在浏覽器輸入通路第一個路徑輸入相關參數
這裡第一個路徑是生産者分别發送給隊列A和B
看到控制台列印相關資訊
測試:在浏覽器輸入通路第二個路徑和輸入相關參數
這裡第二個路徑是以廣播的形式釋出都可以消費
看到控制台列印相關資訊
http://localhost:15672/#/queues
項目配置檔案的相關資訊
@Configuration
public class RabbitConfig {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
public static final String FANOUT_EXCHANGE="fanoutExchange";
//交換機
public static final String EXCHANGE_A = "my-mq-exchange_A";
public static final String EXCHANGE_B = "my-mq-exchange_B";
public static final String EXCHANGE_C = "my-mq-exchange_C";
//隊列
public static final String QUEUE_A = "QUEUE_A";
public static final String QUEUE_B = "QUEUE_B";
public static final String QUEUE_C = "QUEUE_C";
//路由關鍵字 key
public static final String ROUTINGKEY_A = "spring-boot-routingKey_A";
public static final String ROUTINGKEY_B = "spring-boot-routingKey_B";
public static final String ROUTINGKEY_C = "spring-boot-routingKey_C";
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
//connectionFactory.setVirtualHost("/test");
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
//必須是prototype類型
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}
/**
* 針對消費者配置
* 1. 設定交換機類型
* 2. 将隊列綁定到交換機
FanoutExchange: 将消息分發到所有的綁定隊列,無routingkey的概念
HeadersExchange :通過添加屬性key-value比對
DirectExchange:按照routingkey分發到指定隊列
TopicExchange:多關鍵字比對
*/
@Bean
public DirectExchange defaultExchangeA() {
return new DirectExchange(EXCHANGE_A);
}
@Bean
public DirectExchange defaultExchangeB() {
return new DirectExchange(EXCHANGE_B);
}
@Bean
public DirectExchange directExchangeC(){
return new DirectExchange(EXCHANGE_C);
}
/**
* 擷取隊列A
* @return
*/
@Bean
public Queue queueA() {
return new Queue(QUEUE_A, true); //隊列持久
}
@Bean
public Queue queueB() {
return new Queue(QUEUE_B, true); //隊列持久
}
@Bean
public Queue queueC() {
return new Queue(QUEUE_C, true); //隊列持久
}
/**
* 隊列綁定交換機
* @return
*/
@Bean
public Binding bindingA() {
return BindingBuilder.bind(queueA()).to(defaultExchangeA()).with(RabbitConfig.ROUTINGKEY_A);
}
@Bean
public Binding bindingB(){
return BindingBuilder.bind(queueB()).to(defaultExchangeB()).with(RabbitConfig.ROUTINGKEY_B);
}
@Bean
public Binding bindingC(){
return BindingBuilder.bind(queueC()).to(directExchangeC()).with(RabbitConfig.ROUTINGKEY_C);
}
//配置fanout_exchange
//fanout隻能支援統一廣播
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(RabbitConfig.FANOUT_EXCHANGE);
}
//把所有的隊列都綁定到這個交換機上去
@Bean
Binding bindingExchangeA(FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queueA()).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queueB()).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queueC()).to(fanoutExchange);
}
}
小結:
1.首先建立一個配置類 然後 建立隊列
2.建立生産者 生産者要用消息隊列的對象,然後傳遞你想傳遞的消息
3.建立控制層 填好路徑 ,建立生産者對象,然後發送資訊
4.消費者,填寫好消費隊列的名稱,監聽準備開始消費
The best investment is to invest in yourself.
2020.10.07 希望你們奔赴在自己的熱愛裡!