rabbitTemplate rabbitmq 釋出模式與消息确認
2、組成
- 生産者
- 消息隊列
- 消費者
- 交換機:
- 隔離生産者和消息隊列,充當二者中間體。
- 接受相應的消息并綁定到指定隊列
3、三種釋出模式
- 根據交換機類型不同,分為3種:
- Direct<直接>:1對1-----一個消息隻能被一個消費者消費
-
Topic<主題>:1對多-----一個消息可以被多個消費者消費**(糾正:一個消息可被多個隊列接收,若多個消費者監聽同一個隊列,會以輪詢方式被多個消費者接收,本質依然是1對1,類Direct。詳述見下方評論!)**
将路由和某個模式比對,# 比對一個或者多個,* 比對一個。例如 Good.insert Good.delete,則Good.#都能獲得
- Fanout<分列>:廣播
4、Direct釋出模式
-
核心依賴及application.properties(除端口外不變)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置檔案
spring.rabbitmq.host=101.132.43.162
spring.rabbitmq.port=5672
spring.rabbitmq.username=rebbitmq
spring.rabbitmq.password=3321
-
生産者工程
- 4.1、建立消息隊列
兩種方式:代碼建立
/**
* 模式為direct,直接發送到對應的消息隊列
* @return
*/
@Bean
public Queue directQueue(){
return new Queue("direct");
}
(2)RabbitMQ管理界面手動添加隊列

- 4.2、發送端
API:amqpTemplate.convertAndSend(“隊列名”,“消息内容”)
此處隊列名必須與建立的隊列一緻。
public void send(String str) {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(UUID.randomUUID().toString());
Message message = new Message(str.getBytes(), messageProperties);
rabbitTemplate.convertAndSend("direct",message);
}
- 4.3、測試類
-
消費者工程
- 4.1、建立消息隊列
同上:略
- 4.2、接收端
@RabbitListener(queues = “direct”):監聽器監聽指定隊列
@RabbitHandler
@RabbitListener(queues = "direct")
public void Handler(Message message, Channel channel) throws IOException {
String messageId = message.getMessageProperties().getMessageId();
System.out.println("消息編号:"+messageId);
String s = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("消息消費:"+s);
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}catch (Exception e){
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
}
}
5、Topic釋出模式
-
生産者工程
- 5.1、建立消息隊列
API:BindingBuilder.bind(指定隊列).to(交換機).with(路由鍵);
路由鍵相當于隊列名
@Bean(name = "message")
public Queue queue(){
return new Queue("topic.message");
}
@Bean(name="messages")
public Queue queue1(){
return new Queue("topic.messages");
}
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("exchange");
}
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanoutExchange");
}
@Bean
public Binding binding(@Qualifier("message") Queue message, TopicExchange topicExchange){
return BindingBuilder.bind(message).to(topicExchange).with("topic.message");
}
@Bean
public Binding binding1(@Qualifier("messages") Queue queue,TopicExchange topicExchange){
return BindingBuilder.bind(queue).to(topicExchange).with("topic.#");
}

RabbitMQ将會根據第二個參數去尋找有沒有比對此規則的隊列,如果有,則把消息給它,如果有不止一個,則把消息分發給比對的隊列(每個隊列都有消息!)
@Override
public void send1(String str){
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(UUID.randomUUID().toString());
Message message = new Message(str.getBytes(), messageProperties);
rabbitTemplate.convertAndSend("exchange","topic.message",message);
}
- 5.3、測試類
-
消費者工程
@RabbitHandler
@RabbitListener(queues = "topic.message")
public void Handler1(Message message,Channel channel) throws IOException {
String messageId = message.getMessageProperties().getMessageId();
System.out.println("topic.message消息編号" + messageId);
String str = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("topic.message消息體" + str);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
@RabbitHandler
@RabbitListener(queues = "topic.messages")
public void Handler2(Message message,Channel channel) throws IOException {
String messageId = message.getMessageProperties().getMessageId();
System.out.println("topic.messages消息編号" + messageId);
String str = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("topic.messages消息體" + str);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
注意:topic模式中隻有消費端監聽的路由鍵符合發送端的路由規則(路由鍵決定)的隊列才會收到消息。
6、Fanout釋出模式
- 廣播:發送到路由器的消息會使得綁定到該路由器的每一個Queue接收到消息,這個時候就算指定了Key,或者規則(即上文中convertAndSend方法的參數2),也會被忽略!
- 交換機類型:FanoutExchange
- API:amqpTemplate.convertAndSend(“交換機名”,“ ”,“消息内容”);//路由鍵被忽略
-
消費端:隻要是綁定到該交換機上的都能收到消息。
在配置類中增加
@Bean(name = "fanoutMessage")
public Queue queueC(){
return new Queue("fanoutMessage");
}
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanoutExchange");
}
@Bean
public Binding binding2(@Qualifier("fanoutMessage") Queue queue,FanoutExchange fanoutExchange){
return BindingBuilder.bind(queue).to(fanoutExchange);
}
生産者服務
@Override
public void send2(String str){
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(UUID.randomUUID().toString());
Message message = new Message(str.getBytes(), messageProperties);
rabbitTemplate.convertAndSend("fanoutExchange","",message);
}
消費者服務
@RabbitHandler
@RabbitListener(queues = "fanoutMessage")
public void Handler3(Message message,Channel channel) throws IOException {
String messageId = message.getMessageProperties().getMessageId();
System.out.println("fanoutMessage消息編号" + messageId);
String str = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("fanoutMessage消息體" + str);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
人工确認消息發送和消費成功
在application.properties中增加配置
#設定消息确認成功确認後進行回調
#NONE值是禁用釋出确認模式,是預設值
#CORRELATED值是釋出消息成功到交換器後會觸發回調方法,如1示例
#SIMPLE值經測試有兩種效果,其一效果和CORRELATED值一樣會觸發回調方法,其二在釋出消息成功後使用rabbitTemplate調用waitForConfirms或waitForConfirmsOrDie方法等待broker節點傳回發送結果,根據傳回結果來判定下一步的邏輯,要注意的點是waitForConfirmsOrDie方法如果傳回false則會關閉channel,則接下來無法發送消息到broker;
#調用confirmCallback接口
spring.rabbitmq.publisher-confirm-type=correlated
#設定成功到達路由器後不會調用returnsCallback 方法,否則則會調用returnsCallback
spring.rabbitmq.publisher-returns=true
消息隊列配置
package cn.atiaozao.springbootmqprovider.config;
import com.rabbitmq.client.AMQP;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MqConfig {
/**
* 模式為direct,直接發送到對應的消息隊列
* @return
*/
@Bean
public Queue directQueue(){
return new Queue("direct");
}
@Bean(name = "message")
public Queue queue(){
return new Queue("topic.message");
}
@Bean(name="messages")
public Queue queue1(){
return new Queue("topic.messages");
}
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("exchange");
}
//消息綁定
@Bean
public Binding binding(@Qualifier("message") Queue message, TopicExchange topicExchange){
return BindingBuilder.bind(message).to(topicExchange).with("topic.message");
}
//表達式綁定
@Bean
public Binding binding1(@Qualifier("messages") Queue queue,TopicExchange topicExchange){
return BindingBuilder.bind(queue).to(topicExchange).with("topic.#");
}
}
消息服務方法
package cn.atiaozao.springbootmqprovider.service.impl;
import cn.atiaozao.springbootmqprovider.service.RabbitMqService;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.UUID;
@Component
public class RabiitmqServiceImpl implements RabbitMqService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ConfirmMessageImpl confirmMessageImpl;
/**
* 使用@PostConstruct 在注入完成之後進行設定消息push 确認的回調方法
*
*/
@PostConstruct
public void init(){
//消息成功到達指定隊列的回調方法
rabbitTemplate.setConfirmCallback(confirmMessageImpl);
//消息如果未成功到達隊列的
rabbitTemplate.setReturnCallback(confirmMessageImpl);
}
@Override
public void send(String str) {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(UUID.randomUUID().toString());
Message message = new Message(str.getBytes(), messageProperties);
rabbitTemplate.convertAndSend("direct",message);
}
@Override
public void send1(String str){
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(UUID.randomUUID().toString());
Message message = new Message(str.getBytes(), messageProperties);
rabbitTemplate.convertAndSend("exchange","topic.message",message);
}
@Override
public void send2(String str){
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(UUID.randomUUID().toString());
Message message = new Message(str.getBytes(), messageProperties);
rabbitTemplate.convertAndSend("exchange","",message);
}
}
消息成功到達路由器後的确認回調方法
package cn.atiaozao.springbootmqprovider.service.impl;
import lombok.extern.java.Log;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Log
@Component
public class ConfirmMessageImpl implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("消息确認成功"+b+"|傳回錯誤消息"+s+"|附帶資料"+correlationData);
}
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
System.out.println("消息發送失敗|"+message.getBody());
}
}
消息消費服務
#設定消息隊列使用手工确認的方式
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.direct.acknowledge-mode=manual
消息消費監聽
package cn.atiaozao.springbootmqconsumer.consumer;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.logging.Handler;
@Component
public class DirectConsumerHander {
@RabbitHandler
@RabbitListener(queues = "direct")
public void Handler(Message message, Channel channel) throws IOException {
String messageId = message.getMessageProperties().getMessageId();
System.out.println("消息編号:"+messageId);
String s = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("消息消費:"+s);
try {
//消息人工确認
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}catch (Exception e){
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
}
}
@RabbitHandler
@RabbitListener(queues = "topic.message")
public void Handler1(Message message,Channel channel) throws IOException {
String messageId = message.getMessageProperties().getMessageId();
System.out.println("topic.message消息編号" + messageId);
String str = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("topic.message消息體" + str);
//消息人工确認
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
@RabbitHandler
@RabbitListener(queues = "topic.messages")
public void Handler2(Message message,Channel channel) throws IOException {
String messageId = message.getMessageProperties().getMessageId();
System.out.println("topic.messages消息編号" + messageId);
String str = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("topic.messages消息體" + str);
//消息人工确認
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
}