天天看點

阿裡三面 | RabbitMQ如何防止重複消費?

作者:愛馬士團團長

一. 前言

最近有很多小夥伴開始找工作,在面試時,面試官經常問到一個題目:RabbitMQ如何防止重複消費?

有很多小夥伴這個時候都在想,消息怎麼就會重複消費呢???.......

是以他們在面試後就跑來問健哥,針對這個比較高頻的題目,團長就在這裡為大家來講講MQ防止重複消費的實作方案吧。

二. 面試題考點

如果面試官是我的話,那麼我想考察的,其實就是候選人除了對技術的基本使用之外,再就是在各種實際應用場景中對可能發生問題的實際處理能力。

是以這道題的考點,最起碼有兩點:

第一是RabbitMQ中消息的重複消費是如何産生的,我們首先要發現問題,,知道問題産生原因;

第二是針對這個重複消費問題的處理方案及機制。

三. 解題分析

接下來健哥就根據上述考點,帶大家來一起分析這個問題的解題思路。

3.1RabbitMQ消息重複消費的産生原因

阿裡三面 | RabbitMQ如何防止重複消費?

根據上圖,團長給大家梳理總結出了消息重複消費的産生過程,如下:

消費方的業務項目從MQ隊列中接收資料;

接着處理業務;

業務處理成功後,消費方項目給MQ傳回ack進行手動确認;

傳回回調執行結果的過程中,因為網絡抖動等原因,回調資料時,MQ沒有傳回成功,是以MQ隊列中的資料會再次發給業務項目,造成重複消費。

3.2. RabbitMQ消息重複消費的處理方案

阿裡三面 | RabbitMQ如何防止重複消費?

針對消息的重複消費問題,團長根據上圖總結的解決思路如下:

監聽器接收MQ隊列中的資料;

利用redis的setnx指令,以消息唯一id為key,以消息内容為value,逾時時間設定為10秒,存入redis中;

如果能夠成功存入,說明沒有重複消費,則處理業務,處理完業務後傳回ack或者nack确認;

如果存不進去,則說明重複消費,直接傳回ack确認的回調資訊就可以了。

3.3解決重複消費的案例代碼

發送方測試代碼/**
* 測試發送
* @author 團長
*/
@SpringBootTest(classes = ProducerApplication.class)
@RunWith(SpringRunner.class)
public class TestProducer {

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void contextLoads() throws IOException {
//給消息封裝一個唯一id對象
CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());
//第四個參數: 設定消息唯一id
rabbitTemplate.convertAndSend("交換器名字","路由鍵","千鋒健哥測試MQ重複消費處理!!",messageId);
}
}
package com.qf.rabbitmq.topic;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
* @author 團長
*/
@Component
public class Consumer {

@Autowired
private StringRedisTemplate redisTemplate;

@RabbitListener(queues = "隊列名字")
public void getMessage(String msg, Channel channel, Message message) throws IOException {
//0. 擷取MessageId, 消息唯一id
String messageId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
//1. 設定key到Redis
if(redisTemplate.opsForValue().setIfAbsent(messageId,"0", 10, TimeUnit.SECONDS)) {

//2. 消費消息
System.out.println("接收到消息:" + msg);

//3. 設定key的value為1
redisTemplate.opsForValue().set(messageId,"1",10,TimeUnit.SECONDS);

//4. 手動ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

}else {
//5. 擷取Redis中的value即可 如果是1,手動ack
if("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}

}
}           
    • 接收方測試代碼

四. 總結

經過上面的分析,最後團長給大家總結一下這個問題的完整答案。

問題産生原因:

因為消費方和MQ伺服器網絡閃斷等原因,造成了接收方消費後,傳回給MQ伺服器一個ack确認消息,結果MQ沒有接收到,造成了重複消費。

解決過程:

利用redis的setnx指令,将消費的消息id存入到redis,逾時時間設定為10秒,然後再給mq傳回ack。消費前要判斷redis中是否存在這個消息id,如果不存在說明沒有消費過,則正常消費;如果redis中存在這個消息id,則說明重複消費,直接傳回ack,不重複執行業務。