一. 前言
最近有很多小夥伴開始找工作,在面試時,面試官經常問到一個題目:RabbitMQ如何防止重複消費?
有很多小夥伴這個時候都在想,消息怎麼就會重複消費呢???.......
是以他們在面試後就跑來問健哥,針對這個比較高頻的題目,團長就在這裡為大家來講講MQ防止重複消費的實作方案吧。
二. 面試題考點
如果面試官是我的話,那麼我想考察的,其實就是候選人除了對技術的基本使用之外,再就是在各種實際應用場景中對可能發生問題的實際處理能力。
是以這道題的考點,最起碼有兩點:
第一是RabbitMQ中消息的重複消費是如何産生的,我們首先要發現問題,,知道問題産生原因;
第二是針對這個重複消費問題的處理方案及機制。
三. 解題分析
接下來健哥就根據上述考點,帶大家來一起分析這個問題的解題思路。
3.1RabbitMQ消息重複消費的産生原因
根據上圖,團長給大家梳理總結出了消息重複消費的産生過程,如下:
消費方的業務項目從MQ隊列中接收資料;
接着處理業務;
業務處理成功後,消費方項目給MQ傳回ack進行手動确認;
傳回回調執行結果的過程中,因為網絡抖動等原因,回調資料時,MQ沒有傳回成功,是以MQ隊列中的資料會再次發給業務項目,造成重複消費。
3.2. RabbitMQ消息重複消費的處理方案
針對消息的重複消費問題,團長根據上圖總結的解決思路如下:
監聽器接收MQ隊列中的資料;
利用redis的setnx指令,以消息唯一id為key,以消息内容為value,逾時時間設定為10秒,存入redis中;
如果能夠成功存入,說明沒有重複消費,則處理業務,處理完業務後傳回ack或者nack确認;
如果存不進去,則說明重複消費,直接傳回ack确認的回調資訊就可以了。
3.3解決重複消費的案例代碼
發送方測試代碼/**
* 測試發送
* @author 團長
*/
@SpringBootTest(classes = ProducerApplication.class)
@RunWith(SpringRunner.class)
public class TestProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void contextLoads() throws IOException {
//給消息封裝一個唯一id對象
CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());
//第四個參數: 設定消息唯一id
rabbitTemplate.convertAndSend("交換器名字","路由鍵","千鋒健哥測試MQ重複消費處理!!",messageId);
}
}
package com.qf.rabbitmq.topic;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
* @author 團長
*/
@Component
public class Consumer {
@Autowired
private StringRedisTemplate redisTemplate;
@RabbitListener(queues = "隊列名字")
public void getMessage(String msg, Channel channel, Message message) throws IOException {
//0. 擷取MessageId, 消息唯一id
String messageId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
//1. 設定key到Redis
if(redisTemplate.opsForValue().setIfAbsent(messageId,"0", 10, TimeUnit.SECONDS)) {
//2. 消費消息
System.out.println("接收到消息:" + msg);
//3. 設定key的value為1
redisTemplate.opsForValue().set(messageId,"1",10,TimeUnit.SECONDS);
//4. 手動ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}else {
//5. 擷取Redis中的value即可 如果是1,手動ack
if("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
}
}
- 接收方測試代碼
四. 總結
經過上面的分析,最後團長給大家總結一下這個問題的完整答案。
問題産生原因:
因為消費方和MQ伺服器網絡閃斷等原因,造成了接收方消費後,傳回給MQ伺服器一個ack确認消息,結果MQ沒有接收到,造成了重複消費。
解決過程:
利用redis的setnx指令,将消費的消息id存入到redis,逾時時間設定為10秒,然後再給mq傳回ack。消費前要判斷redis中是否存在這個消息id,如果不存在說明沒有消費過,則正常消費;如果redis中存在這個消息id,則說明重複消費,直接傳回ack,不重複執行業務。