RabbitMQ多種消息模型實戰
前面我們學習了RabbitMQ的核心基礎元件,了解了基本消息模型由隊列、交換機、路由構成。而在RabbitMQ的核心元件體系中,主要有4種消息模型:
基于HeadersExchange、DirectExchange、FanoutExchange、TopicExchange的消息模型;在實際生産環境中應用最廣泛的莫過于後3中消息模型。
本篇文章主要介紹DirectExchange消息模型。
DirectExchange消息模型實戰
DirectExchange,顧名思義也是一種交換機,具有直連傳輸消息的作用,即當消息進入交換機這個中轉站時,交換機會檢查哪個路由和自己綁定在一起,并根據生産者發送消息指定的路由進行比對,如果能找到對應的綁定模型,則将消息直接路由傳輸到指定的隊列,最終由隊列對應的消費者進行監聽消費。
此模型在RabbitMQ多種消費模型中可以說是比較正規的了,因為它需要嚴格意義上的綁定,即需要且必須指定特定的交換機和路由,并綁定到指定的隊列中。這種嚴格意義的要求使得該消息模型在生産環境中具有很廣泛的運用。如圖為該消息模型的結構圖:

下面結合業務場景說明該消息模型的使用。
業務場景:将實體對象資訊當做消息,并發送到基于DirectExchange構成的消息模型中,根據綁定的路由,将消息路由至對應綁定的隊列中,最終由對應的消費者進行監聽消費處理。
1.在自定義注入配置類RabbitmqConfig中建立交換機、多條隊列及其綁定
package com.debug.middleware.server.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
/**
* @className:
* @PackageName: com.debug.middleware.server.config
* @author: youjp
* @create: 2020-04-06 16:39
* @description: TODO RabbitMQ自定義注入配置Bean相關元件
* @Version: 1.0
*/
@Configuration
public class RabbitmqConfig {
//定義日志
private static final Logger logger=LoggerFactory.getLogger(RabbitmqConfig.class);
//自動裝配RabbitMQ的連結工廠執行個體
@Autowired
private CachingConnectionFactory connectionFactory;
//自動裝配消息監聽器所在的容器工廠配置類執行個體
@Autowired
private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
/**
* 下面為單一消費者執行個體的配置
* @return
*/
@Bean("singleListenerContainer")
public SimpleRabbitListenerContainerFactory listenerContainer(){
//定義消息監聽器所在的容器工廠
SimpleRabbitListenerContainerFactory factory=new SimpleRabbitListenerContainerFactory();
//設定容器工廠所用的執行個體
factory.setConnectionFactory(connectionFactory);
//設定消息在傳輸中的格式,在這裡采用json格式進行傳輸
factory.setMessageConverter(new Jackson2JsonMessageConverter());
//設定并發消費者執行個體的初始數量。在這裡為1個
factory.setConcurrentConsumers(1);
//設定并發消費者執行個體的最大數量。在這裡為1個
factory.setMaxConcurrentConsumers(1);
//設定并發消費者執行個體中每個執行個體拉取到的消息數量-在這裡為1個
factory.setPrefetchCount(1);
return factory;
}
/**
* 多個消費者:主要針對高并發業務場景的配置
* @return
*/
@Bean(name = "multiListenerContainer")
public SimpleRabbitListenerContainerFactory multiListenerContainer(){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factoryConfigurer.configure(factory,connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.NONE);
factory.setConcurrentConsumers(10);
factory.setMaxConcurrentConsumers(15);
factory.setPrefetchCount(10);
return factory;
}
/**
* RabbitMQ發送消息的操作元件執行個體
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate(){
//設定發現消息後進行确認
connectionFactory.setPublisherConfirms(true);
//設定發現消息後傳回确認資訊
connectionFactory.setPublisherReturns(true);
//構造發送消息元件的執行個體對象
RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
//發送消息後,如果發送成功,則輸出"消息發送成功"的回報資訊
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
logger.info("消息發送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
}
});
//發送消息後,如果發送失敗,則輸出"消息發送失敗-消息丢失"的回報資訊
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
logger.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
}
});
return rabbitTemplate;
}
//定義讀取配置檔案的環境變量執行個體
@Autowired
private Environment env;
/** 建立DirectExchange消息模型**/
//建立隊列1
@Bean("directQueueOne")
public Queue directQueueOne(){
return new Queue(env.getProperty("mq.direct.queue.one.name"),true);
}
//隊列2
@Bean("directQueueTwo")
public Queue directQueueTwo(){
return new Queue(env.getProperty("mq.direct.queue.two.name"),true);
}
//建立直連交換機
@Bean
public DirectExchange directExchange(){
return new DirectExchange(env.getProperty("mq.direct.exchange.name"));
}
//建立綁定1
@Bean
public Binding directBindingOne(){
return BindingBuilder.bind(directQueueOne()).to(directExchange()).with(env.getProperty("mq.direct.routing.key.one.name"));
}
//建立綁定2
@Bean
public Binding directBindingTwo(){
return BindingBuilder.bind(directQueueTwo()).to(directExchange()).with(env.getProperty("mq.direct.routing.key.two.name"));
}
}
application.yml檔案配置交換機、隊列名稱
mq:
env: local
#定義直連式消息模型-directExchange: 建立隊列1,2, 路由1,2 交換機
direct:
queue:
one:
name: ${mq.env}.middleware.mq.direct.one.queue
two:
name: ${mq.env}.middleware.mq.direct.two.queue
exchange:
name: ${mq.env}.middleware.mq.direct.exchange
routing:
key:
one:
name: ${mq.env}.middleware.mq.direct.routing.key.one.name
two:
name: ${mq.env}.middleware.mq.direct.routing.key.two.name
2.建立對象實體資訊EventInfo
package com.debug.middleware.server.rabbitmq.entity;
import lombok.Data;
import lombok.ToString;
import java.io.Serializable;
/**
* @ClassName:
* @PackageName: com.debug.middleware.server.rabbitmq.entity
* @author: youjp
* @create: 2020-04-08 20:21
* @description: TDOO 實體對象資訊
* @Version: 1.0
*/
@Data
@ToString
public class EventInfo implements Serializable {
private Integer id; //id辨別
private String module; //子產品
private String name; //名稱
private String desc;// 描述
public EventInfo(){
}
public EventInfo(Integer id, String module, String name, String desc) {
this.id = id;
this.module = module;
this.name = name;
this.desc = desc;
}
}
3.建立對象實體生産者,這裡我們建立了兩個路由、隊列的綁定。
package com.debug.middleware.server.rabbitmq.publisher;
import com.debug.middleware.server.rabbitmq.entity.EventInfo;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
/**
* @ClassName:
* @PackageName: com.debug.middleware.server.rabbitmq.publisher
* @author: youjp
* @create: 2020-04-08 20:26
* @description: TODO 消息生産者
* @Version: 1.0
*/
@Component
public class ModelPublisher {
private Logger logger = LoggerFactory.getLogger(ModelPublisher.class);
@Autowired
private RabbitTemplate rabbitTemplate;
//json序列化和反序列化元件
@Autowired
private ObjectMapper objectMapper;
//定義讀取配置檔案的環境變量執行個體
@Autowired
private Environment env;
/**
* 使用DirectExchange消息模型:發送消息-one
* @param eventInfo
*/
public void sendMsgOneByDirectExchange(EventInfo eventInfo) {
if (eventInfo != null) {
//定義消息傳輸格式為json
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
//定義交換機為直連式交換機
rabbitTemplate.setExchange(env.getProperty("mq.direct.exchange.name"));
//定義路由1
rabbitTemplate.setRoutingKey(env.getProperty("mq.direct.routing.key.one.name"));
try {
//建立消息
Message message=MessageBuilder.withBody(objectMapper.writeValueAsBytes(eventInfo)).build();
logger.info("消息模型DirectExchange-one-生産者-發送消息:{}", eventInfo);
//發送消息
rabbitTemplate.convertAndSend(message);
} catch (JsonProcessingException e) {
logger.error("消息模型DirectExchange-one-生産者-發送消息發送異常:{}", eventInfo, e.fillInStackTrace());
e.printStackTrace();
}
}
}
/**
* 使用DirectExchange消息模型:發送消息-two
* @param eventInfo
*/
public void sendMsgTwoByDirectExchange(EventInfo eventInfo) {
if (eventInfo != null) {
//定義消息傳輸格式為json
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
//定義交換機為直連式交換機
rabbitTemplate.setExchange(env.getProperty("mq.direct.exchange.name"));
//定義路由1
rabbitTemplate.setRoutingKey(env.getProperty("mq.direct.routing.key.two.name"));
try {
//建立消息
Message message=MessageBuilder.withBody(objectMapper.writeValueAsBytes(eventInfo)).build();
logger.info("消息模型DirectExchange-two-生産者-發送消息:{}", eventInfo);
//發送消息
rabbitTemplate.convertAndSend(message);
} catch (JsonProcessingException e) {
logger.error("消息模型DirectExchange-two-生産者-發送消息發送異常:{}", eventInfo, e.fillInStackTrace());
e.printStackTrace();
}
}
}
}
4.開發用于監聽消費消息的消費者方法。由于我們建立了兩個路由、隊列及其綁定,因為需要開發兩個消費者方法,用于監聽不同隊列中的消息。
package com.debug.middleware.server.rabbitmq.consumer;
import com.debug.middleware.server.rabbitmq.entity.EventInfo;
import com.debug.middleware.server.rabbitmq.publisher.ModelPublisher;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @ClassName:
* @PackageName: com.debug.middleware.server.rabbitmq.consumer
* @author: youjp
* @create: 2020-04-08 20:41
* @description: TODO 消息消費者
* @Version: 1.0
*/
@Component
public class ModelConsumer {
private Logger logger=LoggerFactory.getLogger(ModelPublisher.class);
//json序列化和反序列化元件
@Autowired
private ObjectMapper objectMapper;
/** 使用DirectExchange消息模型的消費案例**/
/**
* @Param [bytes]
* @return void 使用DirectExchange消息模型的消費方法
* @Author youjp
* @Description //TODO 監聽并消費隊列中消息:directExchange-one
* @throw
**/
@RabbitListener(containerFactory = "singleListenerContainer",queues = "${mq.direct.queue.one.name}")
public void consumerDirectMsgOne(@Payload byte[] bytes){
try {
EventInfo eventInfo= objectMapper.readValue(bytes,EventInfo.class);
logger.info("消息模型-directExchange-one-消費者-監聽到消息:{}",eventInfo);
} catch (IOException e) {
logger.error("消息模型-directExchange-one-消費者-監聽到消息異常:{}",e.fillInStackTrace());
e.printStackTrace();
}
}
/**
* @Param [bytes]
* @return void 使用DirectExchange消息模型的消費方法
* @Author youjp
* @Description //TODO 監聽并消費隊列中消息:directExchange-two
* @throw
**/
@RabbitListener(containerFactory = "singleListenerContainer",queues = "${mq.direct.queue.two.name}")
public void consumerDirectMsgTwo(@Payload byte[] bytes){
try {
EventInfo eventInfo= objectMapper.readValue(bytes,EventInfo.class);
logger.info("消息模型-directExchange-two-消費者-監聽到消息:{}",eventInfo);
} catch (IOException e) {
logger.error("消息模型-directExchange-two-消費者-監聽到消息異常:{}",e.fillInStackTrace());
e.printStackTrace();
}
}
}
5.編寫單元測試,觸發生産者生産消息
package com.debug.middleware.server;
import com.debug.middleware.server.entity.Student;
import com.debug.middleware.server.rabbitmq.entity.EventInfo;
import com.debug.middleware.server.rabbitmq.publisher.ModelPublisher;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* @className:
* @PackageName: com.debug.middleware.server
* @author: youjp
* @create: 2020-04-06 20:58
* @description: TODO rabbitMQ 的java單元測試
* @Version: 1.0
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class RabbitmqBasicTest {
//定義日志
private static final Logger logger = LoggerFactory.getLogger(RabbitmqBasicTest.class);
//定義Json序列化和反序列化執行個體
@Autowired
private ObjectMapper objectMapper;
//定義fanoutExchange消息模型中的發生消息的生産者
@Autowired
private ModelPublisher modelPublisher;
//測試直連式消息模型
@Test
public void testDirect(){
//構造第一個實體對象
EventInfo eventInfoOne =new EventInfo(1,"使用DirectExchange模型","測試Direct子產品1","基于DirectExchange消息模型-1");
//第一個生産者發送消息
modelPublisher.sendMsgOneByDirectExchange(eventInfoOne);
//構造第二個實體對象
EventInfo eventInfoTwo =new EventInfo(2,"使用DirectExchange模型","測試Direct子產品2","基于DirectExchange消息模型-2");
//第二個生産者發送消息
modelPublisher.sendMsgTwoByDirectExchange(eventInfoTwo);
}
}
點選運作測試,控制台檢視測試結果:
打開浏覽器,檢視RabbitMQ用戶端控制台。可檢視相應的隊列和交換機清單
選擇其中一條隊列點選,即可檢視其詳細資訊,包括隊列名稱、持久化政策及綁定資訊。
至此,基于DirectExchange消息模型的大概使用已經講解完畢。此種模型适用于業務資料需要直接傳輸并消費的場景,比如業務子產品之間的消息互動,一般業務服務直接的通信是直接的、實時的,因而可以借助基于DirectExchange的消息模型進行通信。 事實上,在實際應用系統中,有90%的業務場景都可以采用直連式消息模型實作。
源碼下載下傳:
https://gitee.com/yjp245/middleware_study.git