生産者可靠性投遞消息後,消費者也可能會産生一些問題,比如:沒有接受到消息,接收消息後在代碼執行過程中出現了異常等。在這種情況下我們需要進行額外的處理,那麼就需要手動進行消息的确認簽收,rabbitmq給我們提供了一個機制:ACK機制。額外的知識:RabbitMQ的死信隊列詳解 - 簡書 (jianshu.com)
ACK機制有三種方式:
- 自動确認 acknowledge="none"
- 手動确認 acknowledge="manual"
- 根據異常情況來确認(暫時不怎麼用) acknowledge="auto"
這篇部落格是在這個基礎上寫的(17條消息) rabbitMQ 可靠性消息投遞_angen2018的部落格-CSDN部落格,具體代碼我會打包好了,放個連結下載下傳。
代碼具體實作:
手動确認,在可靠性投遞時使用
開啟手動确認
spring:
rabbitmq:
port: 5672
host: localhost
username: guest
password: guest
virtual-host: /
#開啟confirms這個模式
#springboot2.2.0.RELEASE支援這個
#publisher-confirm-type: correlated
publisher-confirms: true
#開啟return模式
publisher-returns: true
listener:
direct:
#開啟手動簽收
acknowledge-mode: manual
測試
package com.example.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
@Component
@RabbitListener(queues = "queue_test6")
public class MyRabbitListener {
/**
*
* @param message 消息封裝的對象,(包括了消息的序号,消息本身,消費者名稱等)
* @param channel 連結的通道
* @param msg 消息本身
*/
@RabbitHandler //用于處理具體類型的消息,會自動把消息轉換成對應的對象
public void receiveMessage(Message message, Channel channel,String msg){
//接收消息
System.out.println(msg);
MessageProperties messageProperties = message.getMessageProperties();
try {
//模拟業務-100
System.out.println("消費後減100元");
//模拟出問題
int i = 10/0;
//如果正常就簽收消息
//參數1,消息的序号
//參數二,是否批量簽收 true是批量簽收
channel.basicAck(messageProperties.getDeliveryTag(),true);
} catch (Exception e) {
e.printStackTrace();
//不正常就拒收消息(丢棄了)
try {
//參數1 消息序号
//參數2 是否批量拒絕消息
//參數3 是否把消息重新回到隊列中
channel.basicNack(messageProperties.getDeliveryTag(),true,false);
//不能批量處理拒絕消息。第二個參數,true會重新放回隊列,是以需要自己根據業務邏輯判斷什麼時候使用拒絕
//channel.basicReject();
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}
//消費者監聽隊列
/*@RabbitHandler //用于處理具體類型的消息,會自動把消息轉換成對應的對象
public void receiveMessage(String msg){
//接收消息
System.out.println(msg);
//模拟業務-100
}*/
//消費者監聽隊列
@RabbitHandler //用于處理具體類型的消息,會自動把消息轉換成對應的對象
public void receiveMessage(Map msg){
//接收消息
System.out.println(msg);
//模拟業務-100
}
}
package com.example.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
@Component
@RabbitListener(queues = "queue_test6")
public class MyRabbitListener {
/**
*
* @param message 消息封裝的對象,(包括了消息的序号,消息本身,消費者名稱等)
* @param channel 連結的通道
* @param msg 消息本身
*/
@RabbitHandler //用于處理具體類型的消息,會自動把消息轉換成對應的對象
public void receiveMessage(Message message, Channel channel,String msg){
//接收消息
System.out.println(msg);
MessageProperties messageProperties = message.getMessageProperties();
try {
//模拟業務-100
System.out.println("消費後減100元");
//模拟出問題
int i = 10/0;
//如果正常就簽收消息
//參數1,消息的序号
//參數二,是否批量簽收 true是批量簽收
channel.basicAck(messageProperties.getDeliveryTag(),true);
} catch (Exception e) {
e.printStackTrace();
//不正常就拒收消息(丢棄了)
try {
//如果該消息重回過隊列就不投遞了,避免死循環
if(messageProperties.getRedelivered()){
System.out.println("已經重新投遞過一次了");
}else{
//參數1 消息序号
//參數2 是否批量拒絕消息
//參數3 是否把消息重新回到隊列中
channel.basicNack(messageProperties.getDeliveryTag(),true,true);
}
//不能批量處理拒絕消息。第二個參數,true會重新放回隊列,是以需要自己根據業務邏輯判斷什麼時候使用拒絕
//channel.basicReject();
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}
//消費者監聽隊列
/*@RabbitHandler //用于處理具體類型的消息,會自動把消息轉換成對應的對象
public void receiveMessage(String msg){
//接收消息
System.out.println(msg);
//模拟業務-100
}*/
//消費者監聽隊列
@RabbitHandler //用于處理具體類型的消息,會自動把消息轉換成對應的對象
public void receiveMessage(Map msg){
//接收消息
System.out.println(msg);
//模拟業務-100
}
}
小結:
如何保證消息的高可靠性傳輸(資料一緻)?
- 持久化:交換機持久化,隊列持久化,消息持久化
- 生産方确認Confirm , Return
- 消費方确認 ACK