八、RabbitMQ整合篇
1、RabbitMQ整合Spring AMQP詳解
1)、RabbitAdmin
- RabbitAdmin類可以很好的操作RabbitMQ,在Spring中直接進行注入即可
- autoStartup必須要設定為true,否則Spring容器不會加載RabbitAdmin類
- RabbitAdmin底層實作就是從Spring容器中擷取Exchange、Binding、Routingkey以及Queue的@Bean聲明
- 然後使用RabbitTemplate的execut方法執行對應的聲明、修改、删除等一系列RabbitMQ基礎功能操作
配置類:
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
@Configuration
@ComponentScan("com.hand.rabbitmq")
public class RabbitMqConfig {
//<bean id="connectionFactory"/>
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("192.168.126.151:5672");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
return connectionFactory;
}
//ConnectionFactory形參名字和connectionFactory()方法名保持一緻
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}
測試類:
@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringbootRabbitmqApplication.class)
public class RabbitmqApplicationTests {
@Autowired
private RabbitAdmin rabbitAdmin;
@Test
public void testAdmin() {
//建立交換機
rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));
rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));
rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));
//建立隊列
rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));
rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));
rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));
//建立綁定關系 Binding構造函數的參數 隊列名稱、綁定類型、交換機名稱、綁定鍵
rabbitAdmin.declareBinding(new Binding("test.direct.queue",
Binding.DestinationType.QUEUE,
"test.direct",
"direct",
new HashMap<>()));
rabbitAdmin.declareBinding(
BindingBuilder.
bind(new Queue("test.topic.queue", false))
.to(new TopicExchange("test.topic", false, false))
.with("user.#"));
rabbitAdmin.declareBinding(
BindingBuilder.
bind(new Queue("test.fanout.queue", false))
.to(new FanoutExchange("test.fanout", false, false)));
//清空隊列資料
rabbitAdmin.purgeQueue("test.topic.queue", false);
}
}
RabbitAdmin源碼分析:
@ManagedResource(description = "Admin Tasks")
public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, ApplicationEventPublisherAware,
BeanNameAware, InitializingBean {
RabbitAdmin實作了InitializingBean接口,在Bean加載之後做一些設定
public interface InitializingBean {
void afterPropertiesSet() throws Exception;
}
@ManagedResource(description = "Admin Tasks")
public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, ApplicationEventPublisherAware,
BeanNameAware, InitializingBean {
@Override
public void afterPropertiesSet() {
synchronized (this.lifecycleMonitor) {
//如果是running或者autoStartup為false的話就直接return
if (this.running || !this.autoStartup) {
return;
}
if (this.retryTemplate == null && !this.retryDisabled) {
this.retryTemplate = new RetryTemplate();
this.retryTemplate.setRetryPolicy(new SimpleRetryPolicy(DECLARE_MAX_ATTEMPTS));
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(DECLARE_INITIAL_RETRY_INTERVAL);
backOffPolicy.setMultiplier(DECLARE_RETRY_MULTIPLIER);
backOffPolicy.setMaxInterval(DECLARE_MAX_RETRY_INTERVAL);
this.retryTemplate.setBackOffPolicy(backOffPolicy);
}
if (this.connectionFactory instanceof CachingConnectionFactory &&
((CachingConnectionFactory) this.connectionFactory).getCacheMode() == CacheMode.CONNECTION) {
this.logger.warn("RabbitAdmin auto declaration is not supported with CacheMode.CONNECTION");
return;
}
// Prevent stack overflow...
final AtomicBoolean initializing = new AtomicBoolean(false);
this.connectionFactory.addConnectionListener(connection -> {
if (!initializing.compareAndSet(false, true)) {
// If we are already initializing, we don't need to do it again...
return;
}
try {
if (this.retryTemplate != null) {
this.retryTemplate.execute(c -> {
//初始化
initialize();
return null;
});
}
else {
initialize();
}
}
finally {
initializing.compareAndSet(true, false);
}
});
this.running = true;
}
}
@Override // NOSONAR complexity
public void initialize() {
if (this.applicationContext == null) {
this.logger.debug("no ApplicationContext has been set, cannot auto-declare Exchanges, Queues, and Bindings");
return;
}
this.logger.debug("Initializing declarations");
//聲明了Exchange、Queue、Binding三個集合
Collection<Exchange> contextExchanges = new LinkedList<Exchange>(
this.applicationContext.getBeansOfType(Exchange.class).values());
Collection<Queue> contextQueues = new LinkedList<Queue>(
this.applicationContext.getBeansOfType(Queue.class).values());
Collection<Binding> contextBindings = new LinkedList<Binding>(
this.applicationContext.getBeansOfType(Binding.class).values());
processLegacyCollections(contextExchanges, contextQueues, contextBindings);
//将Bean類型是Exchange、Queue、Binding的添加到集合中
processDeclarables(contextExchanges, contextQueues, contextBindings);
final Collection<Exchange> exchanges = filterDeclarables(contextExchanges);
final Collection<Queue> queues = filterDeclarables(contextQueues);
final Collection<Binding> bindings = filterDeclarables(contextBindings);
//将Exchange、Queue集合循環拼接成RabbitMQ能夠識别的方式
for (Exchange exchange : exchanges) {
if ((!exchange.isDurable() || exchange.isAutoDelete()) && this.logger.isInfoEnabled()) {
this.logger.info("Auto-declaring a non-durable or auto-delete Exchange ("
+ exchange.getName()
+ ") durable:" + exchange.isDurable() + ", auto-delete:" + exchange.isAutoDelete() + ". "
+ "It will be deleted by the broker if it shuts down, and can be redeclared by closing and "
+ "reopening the connection.");
}
}
for (Queue queue : queues) {
if ((!queue.isDurable() || queue.isAutoDelete() || queue.isExclusive()) && this.logger.isInfoEnabled()) {
this.logger.info("Auto-declaring a non-durable, auto-delete, or exclusive Queue ("
+ queue.getName()
+ ") durable:" + queue.isDurable() + ", auto-delete:" + queue.isAutoDelete() + ", exclusive:"
+ queue.isExclusive() + ". "
+ "It will be redeclared if the broker stops and is restarted while the connection factory is "
+ "alive, but all messages will be lost.");
}
}
if (exchanges.size() == 0 && queues.size() == 0 && bindings.size() == 0) {
this.logger.debug("Nothing to declare");
return;
}
//調用rabbitTemplate.execute在RabbitMQ上建立Exchange、Queue、Binding
this.rabbitTemplate.execute(channel -> {
declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
declareQueues(channel, queues.toArray(new Queue[queues.size()]));
declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));
return null;
});
this.logger.debug("Declarations finished");
}
private void processDeclarables(Collection<Exchange> contextExchanges, Collection<Queue> contextQueues,
Collection<Binding> contextBindings) {
Collection<Declarables> declarables = this.applicationContext.getBeansOfType(Declarables.class, false, true)
.values();
declarables.forEach(d -> {
d.getDeclarables().forEach(declarable -> {
if (declarable instanceof Exchange) {
contextExchanges.add((Exchange) declarable);
}
else if (declarable instanceof Queue) {
contextQueues.add((Queue) declarable);
}
else if (declarable instanceof Binding) {
contextBindings.add((Binding) declarable);
}
});
});
}
2)、RabbitTemplate(消息模闆)
- 在與Spring AMQP整合的時候進行發送消息的關鍵類
- 提供了豐富的發送消息方法,包括可靠性投遞消息方法、回調監聽消息接口ConfirmCallback、傳回值确認接口ReturnCallback等等。同樣需要注入到Spring容器中,然後直接使用
配置類:
//ConnectionFactory形參名字和connectionFactory()方法名保持一緻
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
測試方法:
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage() {
//建立消息
MessageProperties messageProperties = new MessageProperties();
messageProperties.getHeaders().put("desc", "資訊描述");
messageProperties.getHeaders().put("type", "自定義消息類型");
Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
//發送消息
rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
//在執行消息轉換後添加/修改标題或屬性然後在進行發送
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().getHeaders().put("desc", "額外修改的資訊描述");
message.getMessageProperties().getHeaders().put("attr", "額外新添加的屬性");
return message;
}
});
}

rabbitTemplate其他的方法:
@Test
public void testSendMessage2() {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plain");
Message message = new Message("mq消息1".getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.amqp", message);
rabbitTemplate.convertAndSend("topic001", "spring.amqp", "mq消息2");
}
3)、SimpleMessageListenerContainer(簡單消息監聽容器)
- 對這個類進行設定,對于消費者的配置項,這個類都可以滿足
- 監聽隊列(多個隊列)、自動啟動、自動聲明功能
- 設定事務特性、事務管理器、事務屬性、事務容量(并發)、是否開始事務、復原消息等
- 設定消費者數量、最大最小數量、批量消費
- 設定消息确認和自動确認模式、是否重回隊列、異常捕獲handler函數
- 設定消費者标簽生成政策、是否獨占模式、消費者屬性等
- 設定具體的監聽器、消息轉換器等等
注意:SimpleMessageListenerContainer可以進行動态設定,比如在運作中的應用可以動态的修改其消費者數量的大小、接收消息的模式等
配置類:
//ConnectionFactory形參名字和connectionFactory()方法名保持一緻
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
//目前消費者數量
container.setConcurrentConsumers(1);
//最大消費者數量
container.setMaxConcurrentConsumers(5);
//設定拒絕消息時的預設行為 true:則重新發送消息 false:則不會重新發送消息
container.setDefaultRequeueRejected(false);
/**
* 設定消息接收确認模式
* - AcknowledgeMode.NONE:不确認
* - AcknowledgeMode.MANUAL:自動确認
* - AcknowledgeMode.AUTO:手動确認
*/
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
//消費端的标簽政策
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID();
}
});
//設定消息監聽
container.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody());
System.out.println("消費者:" + msg);
}
});
return container;
}
4)、MessageListenerAdapter(消息監聽擴充卡)
1)擴充卡方式一:自定義方法名稱和參數類型
配置類:
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
//目前消費者數量
container.setConcurrentConsumers(1);
//最大消費者數量
container.setMaxConcurrentConsumers(5);
//設定拒絕消息時的預設行為 true:則重新發送消息 false:則不會重新發送消息
container.setDefaultRequeueRejected(false);
/**
* 設定消息接收确認模式
* - AcknowledgeMode.NONE:不确認
* - AcknowledgeMode.MANUAL:自動确認
* - AcknowledgeMode.AUTO:手動确認
*/
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
//消費端的标簽政策
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID();
}
});
//擴充卡方式
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
//MessageListenerAdapter自定義方法名
adapter.setDefaultListenerMethod("consumeMessage");
//添加轉換器:從位元組數組轉換為String,到MessageDelegate的時候調用參數為String類型的方法
adapter.setMessageConverter(new TextMessageConverter());
container.setMessageListener(adapter);
return container;
}
public class MessageDelegate {
public void handleMessage(byte[] messageBody) {
System.out.println("預設方法, 消息内容:" + new String(messageBody));
}
public void consumeMessage(byte[] messageBody) {
System.out.println("位元組數組方法, 消息内容:" + new String(messageBody));
}
public void consumeMessage(String messageBody) {
System.out.println("字元串方法, 消息内容:" + messageBody);
}
}
public class TextMessageConverter implements MessageConverter {
//Java對象轉換成Message對象的方式
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
return new Message(object.toString().getBytes(), messageProperties);
}
//Message對象轉換成Java對象的方式
@Override
public Object fromMessage(Message message) throws MessageConversionException {
String contentType = message.getMessageProperties().getContentType();
if (contentType != null && contentType.contains("text")) {
return new String(message.getBody());
}
return null;
}
}
2)擴充卡方式二:隊列的名稱和方法名稱也可以進行一一的比對
配置類:
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
//目前消費者數量
container.setConcurrentConsumers(1);
//最大消費者數量
container.setMaxConcurrentConsumers(5);
//設定拒絕消息時的預設行為 true:則重新發送消息 false:則不會重新發送消息
container.setDefaultRequeueRejected(false);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
//消費端的标簽政策
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID();
}
});
//擴充卡方式:隊列的名稱和方法名稱也可以進行一一的比對
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
Map<String, String> queueOrTagToMethodName = new HashMap<>();
queueOrTagToMethodName.put("queue001","method1");
queueOrTagToMethodName.put("queue002","method2");
adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
adapter.setMessageConverter(new TextMessageConverter());
container.setMessageListener(adapter);
return container;
}
MessageListenerAdapter源碼分析:
public class MessageListenerAdapter extends AbstractAdaptableMessageListener {
//将隊列名和方法名進行比對,将指定隊列适配到指定方法裡
private final Map<String, String> queueOrTagToMethodName = new HashMap<String, String>();
public static final String ORIGINAL_DEFAULT_LISTENER_METHOD = "handleMessage";
//new MessageListenerAdapter(new MessageDelegate())時傳入的委托對象,用于處理消息
private Object delegate;
//預設的監聽方法的名字為handleMessage,如果要自定義MessageListenerAdapter方法名預設為handleMessage
private String defaultListenerMethod = ORIGINAL_DEFAULT_LISTENER_METHOD;
5)、MessageConverter(消息轉換器)
在進行發送消息的時候,正常情況下消息體為二進制的資料方式進行傳輸,如果希望内部幫我們進行轉換,或者指定自定義的轉換器,就需要用到MessageConverter
自定義消息轉換器,需要實作MessageConverter接口,重寫toMessage(Java對象轉換成Message對象的方式)和fromMessage(Message對象轉換成Java對象的方式)方法
- Json轉換器:Jackson2JsonMessageConverter可以進行Java對象的轉換功能
- DefaultJackson2JavaTypeMapper映射器:可以進行Java對象的映射關系
- 自定義二進制轉換器:比如圖檔類型、PDF、PPT、流媒體
1)Jackson2JsonMessageConverter
配置類:
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
//目前消費者數量
container.setConcurrentConsumers(1);
//最大消費者數量
container.setMaxConcurrentConsumers(5);
//設定拒絕消息時的預設行為 true:則重新發送消息 false:則不會重新發送消息
container.setDefaultRequeueRejected(false);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
//消費端的标簽政策
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID();
}
});
//支援json格式的轉換器
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
return container;
}
對應MessageDelegate中的方法:
public void consumeMessage(Map messageBody) {
System.out.println("map方法, 消息内容:" + messageBody);
}
測試方法:
@Test
public void testSendJsonMessage() throws Exception {
Order order = new Order("001","消息訂單","描述資訊");
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(order);
MessageProperties messageProperties = new MessageProperties();
//這裡一定要修改contentType為application/json
messageProperties.setContentType("application/json");
Message message = new Message(json.getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.order", message);
}
2)Jackson2JsonMessageConverter & DefaultJackson2JavaTypeMapper支援Java對象的轉換
配置類:
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
//目前消費者數量
container.setConcurrentConsumers(1);
//最大消費者數量
container.setMaxConcurrentConsumers(5);
//設定拒絕消息時的預設行為 true:則重新發送消息 false:則不會重新發送消息
container.setDefaultRequeueRejected(false);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
//消費端的标簽政策
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID();
}
});
//Jackson2JsonMessageConverter & DefaultJackson2JavaTypeMapper支援Java對象的轉換
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
//否則會抛出異常The class '...' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
//預設隻支援java.util和java.lang包下的類
javaTypeMapper.setTrustedPackages("*");
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
return container;
}
對應MessageDelegate中的方法:
public void consumeMessage(Order order) {
System.out.println("order對象, 消息内容, id: " + order.getId() +
", name: " + order.getName() +
", content: " + order.getContent());
}
測試方法:
@Test
public void testSendJavaMessage() throws Exception {
Order order = new Order("001","消息訂單","描述資訊");
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(order);
MessageProperties messageProperties = new MessageProperties();
//這裡一定要修改contentType為application/json
messageProperties.setContentType("application/json");
//__TypeId__指定類的全路徑
messageProperties.getHeaders().put("__TypeId__", "com.hand.rabbitmq.domain.Order");
Message message = new Message(json.getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.order", message);
}
3)DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter支援java對象多映射轉換
配置類:
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
//目前消費者數量
container.setConcurrentConsumers(1);
//最大消費者數量
container.setMaxConcurrentConsumers(5);
//設定拒絕消息時的預設行為 true:則重新發送消息 false:則不會重新發送消息
container.setDefaultRequeueRejected(false);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
//消費端的标簽政策
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID();
}
});
//DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter支援java對象多映射轉換
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
Map<String, Class<?>> idClassMapping = new HashMap<>();
idClassMapping.put("order", Order.class);
idClassMapping.put("packaged", Packaged.class);
javaTypeMapper.setIdClassMapping(idClassMapping);
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
return container;
}
對應MessageDelegate中的方法:
public void consumeMessage(Order order) {
System.out.println("order對象, 消息内容, id: " + order.getId() +
", name: " + order.getName() +
", content: " + order.getContent());
}
public void consumeMessage(Packaged pack) {
System.out.println("package對象, 消息内容, id: " + pack.getId() +
", name: " + pack.getName() +
", content: " + pack.getDescription());
}
測試方法:
@Test
public void testSendMappingMessage() throws Exception {
ObjectMapper mapper = new ObjectMapper();
Order order = new Order("001","消息訂單","描述資訊");
String json1 = mapper.writeValueAsString(order);
MessageProperties messageProperties1 = new MessageProperties();
//這裡注意一定要修改contentType為 application/json
messageProperties1.setContentType("application/json");
messageProperties1.getHeaders().put("__TypeId__", "order");
Message message1 = new Message(json1.getBytes(), messageProperties1);
rabbitTemplate.send("topic001", "spring.order", message1);
Packaged pack = new Packaged("002","包裹消息","包裹描述資訊");
String json2 = mapper.writeValueAsString(pack);
MessageProperties messageProperties2 = new MessageProperties();
//這裡一定要修改contentType為application/json
messageProperties2.setContentType("application/json");
messageProperties2.getHeaders().put("__TypeId__", "packaged");
Message message2 = new Message(json2.getBytes(), messageProperties2);
rabbitTemplate.send("topic001", "spring.pack", message2);
}
4)ContentTypeDelegatingMessageConverter全局轉換器
配置類:
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
//目前消費者數量
container.setConcurrentConsumers(1);
//最大消費者數量
container.setMaxConcurrentConsumers(5);
//設定拒絕消息時的預設行為 true:則重新發送消息 false:則不會重新發送消息
container.setDefaultRequeueRejected(false);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
//消費端的标簽政策
container.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID();
}
});
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
//全局的轉換器
ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter();
TextMessageConverter textConvert = new TextMessageConverter();
convert.addDelegate("text", textConvert);
convert.addDelegate("html/text", textConvert);
convert.addDelegate("xml/text", textConvert);
convert.addDelegate("text/plain", textConvert);
Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter();
convert.addDelegate("json", jsonConvert);
convert.addDelegate("application/json", jsonConvert);
ImageMessageConverter imageConverter = new ImageMessageConverter();
convert.addDelegate("image/png", imageConverter);
convert.addDelegate("image", imageConverter);
PDFMessageConverter pdfConverter = new PDFMessageConverter();
convert.addDelegate("application/pdf", pdfConverter);
adapter.setMessageConverter(convert);
container.setMessageListener(adapter);
return container;
}
2、RabbitMQ整合SpringBoot詳解
1)生産端整合
核心配置:
spring.rabbitmq.addresses=192.168.126.151:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
#開啟Publisher Confirm機制
spring.rabbitmq.publisher-confirms=true
#開啟Publisher Return機制
spring.rabbitmq.publisher-returns=true
#啟用強制消息,設定為false收不到Publisher Return機制傳回的消息
spring.rabbitmq.template.mandatory=false
- publisher-confirms,實作一個監聽器用于監聽Broker端給我們傳回的确認消息:RabbitTemplate.ConfirmCallback
- publisher-returns,保證消息對Broker端是可達的,如果出現路由鍵不可達的情況,則使用監聽器對不可達的消息進行後續的處理,保證消息的路由成功:RabbitTemplate.ReturnCallback
- 在發送消息的時候對template進行配置mandatory=true保證監聽有效
- 生産端還可以配置其他屬性,比如發送重試、逾時時間、次數、間隔等
import com.hand.rabbitmq.domain.Order;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.UUID;
@Component
public class RabbitSender {
@Autowired
private RabbitTemplate rabbitTemplate;
final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("correlationData:" + correlationData);
System.out.println("ack:" + ack);
if (!ack) {
System.out.println("異常處理");
}
}
};
final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("return exchange:" + exchange + ",routingKey:" + routingKey + ",replyCode:" + replyCode + ",replyText:" + replyText);
}
};
public void send(Object message, Map<String, Object> properties) {
MessageHeaders mhs = new MessageHeaders(properties);
Message msg = MessageBuilder.createMessage(message, mhs);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
//最好是id +時間戳 全局唯一
CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("exchange-1", "springboot.hello", msg, cd);
}
//Order需要實作序列化接口
public void sendOrder(Order order) {
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
//最好是id +時間戳 全局唯一
CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("exchange-2", "springboot.hello", order, cd);
}
}
2)消費端整合
核心配置:
#确認模式為手動确認
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.concurrency=1
spring.rabbitmq.listener.simple.max-concurrency=5
- 首先配置手工确認模式,用于ACK的手工确認,這樣可以保證消息的可靠性送達,或者在消費端消費失敗的時候可以做到重回隊列、根據業務記錄日志等處理
- 可以設定消費端的監聽個數和最大個數,用于控制消費端的并發情況
- 消費端監聽使用@RabbitListener注解,@RabbitListener是一個組合注解,裡面可以注解配置@Queue、@QueueBinding、@Exchange直接通過這個組合注解一次性解決消費端交換機、隊列、綁定、路由并且配置監聽功能等
配置資訊:
spring.rabbitmq.listener.order.queue.name=queue-2
spring.rabbitmq.listener.order.queue.durable=true
spring.rabbitmq.listener.order.exchange.name=exchange-2
spring.rabbitmq.listener.order.exchange.durable=true
spring.rabbitmq.listener.order.exchange.type=topic
spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
spring.rabbitmq.listener.order.key=springboot.*
import com.hand.rabbitmq.domain.Order;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
public class RabbitReceiver {
//如果沒有可以自動建立交換機、隊列、綁定、路由
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "queue-1", durable = "true"),
exchange = @Exchange(value = "exchange-1", durable = "true", type = "topic", ignoreDeclarationExceptions = "true"),
key = "springboot.*"
))
@RabbitHandler
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println("消費端:" + message.getPayload());
Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
//手工ACK
channel.basicAck(deliveryTag, false);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}",
durable = "${spring.rabbitmq.listener.order.queue.durable}"),
exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}",
durable = "${spring.rabbitmq.listener.order.exchange.durable}",
type = "${spring.rabbitmq.listener.order.exchange.type}",
ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
key = "${spring.rabbitmq.listener.order.key}"
))
@RabbitHandler
//Order需要實作序列化接口
public void onMessage(@Payload Order order, @Headers Map<String, Object> headers, Channel channel) throws Exception {
System.out.println("消費端:" + order);
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
//手工ACK
channel.basicAck(deliveryTag, false);
}
}
3、RabbitMQ整合Spring Cloud Stream詳解
Spring Cloud Stream核心架構圖:
Spring Cloud Stream通過定義綁定器Binder作為中間層,完美地實作了應用程式與消息中間件之間的隔離。通過向應用程式暴露統一的Channel通道,使得應用程式不需要再考慮各種不同的消息中間件的實作。當需要更新消息中間件,或是更換其他消息中間件産品時,隻需要更換對應的Binder綁定器而不需要修改任何的應用邏輯
上圖中黃色的為RabbitMQ的部分,綠色的部分為Spring Cloud Stream在生産者和消費者添加了一層中間件
- @EnableBinding:value參數指定用于定義綁定消息通道的接口,在應用啟動時實作對定義消息通道的綁定
- @Output:輸出注解,用于定義發送消息接口
- @Input:輸入注解,用于定義消息的消費者接口
- @StreamListener:用于定義監聽方法的注解
使用Spring Cloud Stream不能實作可靠性的投遞,會存在少量消息丢失的問題(為了兼顧Kafka)
添加依賴:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>
Spring Cloud Stream提供了Sink、Source和Processor三個預設實作消息通道的接口
public interface Sink {
String INPUT = "input";
@Input(Sink.INPUT)
SubscribableChannel input();
}
public interface Source {
String OUTPUT = "output";
@Output(Source.OUTPUT)
MessageChannel output();
}
public interface Processor extends Source, Sink {
}
1)生産端整合
server.port=8001
spring.application.name=producer
spring.cloud.stream.bindings.output_channel.destination=exchange-3
spring.cloud.stream.bindings.output_channel.group=queue-3
spring.cloud.stream.bindings.output_channel.binder=rabbit_cluster
spring.cloud.stream.binders.rabbit_cluster.type=rabbit
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.addresses=192.168.126.151:5672
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.username=guest
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.password=guest
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.virtual-host=/
public interface MqMessageSource {
String OUTPUT_CHANNEL = "output_channel";
@Output(MqMessageSource.OUTPUT_CHANNEL)
MessageChannel output();
}
@EnableBinding(MqMessageSource.class)
@Service
public class RabbitmqSender {
@Autowired
private MqMessageSource mqMessageSource;
/**
* 發送消息
*/
public String sendMessage(Object message, Map<String, Object> properties) {
try {
MessageHeaders mhs = new MessageHeaders(properties);
Message msg = MessageBuilder.createMessage(message, mhs);
boolean sendStatus = mqMessageSource.output().send(msg);
System.out.println("發送資料:" + message + ",sendStatus: " + sendStatus);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
return null;
}
}
2)消費端整合
server.port=8002
spring.application.name=consumer
spring.cloud.stream.bindings.input_channel.destination=exchange-3
spring.cloud.stream.bindings.input_channel.group=queue-3
spring.cloud.stream.bindings.input_channel.binder=rabbit_cluster
spring.cloud.stream.bindings.input_channel.consumer.concurrency=1
spring.cloud.stream.rabbit.bindings.input_channel.consumer.requeue-rejected=false
spring.cloud.stream.rabbit.bindings.input_channel.consumer.acknowledge-mode=MANUAL
spring.cloud.stream.rabbit.bindings.input_channel.consumer.recovery-interval=3000
spring.cloud.stream.rabbit.bindings.input_channel.consumer.durable-subscription=true
spring.cloud.stream.rabbit.bindings.input_channel.consumer.max-concurrency=5
spring.cloud.stream.binders.rabbit_cluster.type=rabbit
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.addresses=192.168.126.151:5672
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.username=guest
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.password=guest
spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.virtual-host=/
public interface MqMessageSink {
String INPUT_CHANNEL = "input_channel";
@Input(MqMessageSink.INPUT_CHANNEL)
SubscribableChannel input();
}
@EnableBinding(MqMessageSink.class)
@Service
public class RabbitmqReceiver {
@StreamListener(MqMessageSink.INPUT_CHANNEL)
public void receiver(Message message) throws Exception {
Channel channel = (Channel) message.getHeaders().get(AmqpHeaders.CHANNEL);
Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
System.out.println("Input Stream 1 接受資料:" + message);
channel.basicAck(deliveryTag, false);
}
}