1. 為什麼你們公司選擇RabbitMQ作為消息中間件
在消息隊列選型時,我們調研了市場上比較常用ActiveMQ,RabbitMQ,RocketMQ,Kafka。
- RabbitMQ相對成熟穩定,這是我們選擇它最主要的原因。
- 社群比較活躍,有完善的資料可以參考。
- Rabbitmq的吞吐量可以達到萬級,完全滿足我們系統的要求。
- RabbitMQ是Erlang語言開發的,性能比較好。
- 有完善的可視化界面,友善檢視。
2. 消息隊列的優點和缺點有哪些
優點有:
- 異步處理 - 相比于傳統的串行、并行方式,提高了系統吞吐量。
- 應用解耦 - 系統間通過消息通信,不用關心其他系統的處理。
- 流量削鋒 - 可以通過消息隊列長度控制請求量;可以緩解短時間内的高并發請求。
缺點有:
- 系統可用性降低
- 系統複雜度提高
3. RabbitMQ常用的工作模式有哪些
2.1 簡單模型
- p:生成者
- C:消費者
- 紅色部分:quene,消息隊列
2.2 工作模型
這種模式下一條消息隻能由一個消費者進行消費,預設情況下,每個消費者是輪詢消費的。
- p:生成者
- C1、C2:消費者
- 紅色部分:quene,消息隊列
2.3 釋出訂閱模型(fanout)
這種模型中生産者發送的消息所有消費者都可以消費。
- p:生成者
- X:交換機
- C1、C2:消費者
- 紅色部分:quene,消息隊列
2.4 路由模型(routing)
這種模型消費者發送的消息,不同類型的消息可以由不同的消費者去消費。
- p:生成者
- X:交換機,接收到生産者的消息後将消息投遞給與routing key完全比對的隊列
- C1、C2:消費者
- 紅色部分:quene,消息隊列
2.5 主題模型(topic)
這種模型和direct模型一樣,都是可以根據routing key将消息路由到不同的隊列,隻不過這種模型可以讓隊列綁定routing key 的時候使用通配符。這種類型的routing key都是由一個或多個單詞組成,多個單詞之間用
.
分割。
通配符介紹:
*
:隻比對一個單詞
#
:比對一個或多個單詞
4. 如何保證消息不丢失(如何保證消息的可靠性)
一條消息從生産到消費經曆了三個階段,分别是生産者,MQ和消費者,對于RabbitMQ來說,消息的傳遞還涉及到交換機。是以RabbitMQ出現消息丢失的情況有四個
分别是
- 消息生産者沒有成功将消息發送到MQ導緻消息丢失
- 交換機未路由到消息隊列導緻消息丢失
- 消息在MQ中時,MQ發生當機導緻消息丢失
- 消費者消費消息時出現異常導緻消息丢失
針對上面提到的四種情況,分别進行處理
- amqp協定提供了事務機制,在投遞消息時開啟事務,如果消息投遞失敗,則復原事務,很少有人去使用事務。除了事務之外,RabbitMQ還提供了生産者确認機制(publisher confirm)。生産者将信道設定成confirm(确認)模式,一旦信道進入confirm模式,所有在該信道上面釋出的消息都會被指派一個唯一的ID(從1開始),一旦消息被投遞到所有比對的隊列之後,RabbitMQ就會發送一個确認(Basic.Ack)給生産者(包含消息的唯一ID),這就使得生産者知曉消息已經正确到達了目的地了。
# 開啟生産者确認機制,
# 注意這裡确認的是是否到達交換機
spring.rabbitmq.publisher-confirm-type=correlated
@RestController
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("send")
public void sendMessage(){
/**
* 生産者确認消息
*/
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println(correlationData);
System.out.println(ack);
System.out.println(cause);
}
});
rabbitTemplate.convertAndSend("s","error","這是一條錯誤日志!!!");
}
}
- 消息從交換機未能比對到隊列時将此條消息傳回給生産者
spring.rabbitmq.publisher-returns=true
@RestController
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("send")
public void sendMessage(){
/**
* 消息未達隊列時傳回該條消息
*/
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println(returnedMessage);
}
});
rabbitTemplate.convertAndSend("s","error","這是一條錯誤日志!!!");
}
}
- 消息在交換機或隊列中發生丢失,我們隻需要将交換機和隊列進行持久化。
/**
* 定義一個持久化的topic交換機
* durable 持久化
* @return
*/
@Bean
public Exchange exchangeJavatrip(){
return ExchangeBuilder.topicExchange(EXCHANGE).durable(true).build();
}
/**
* 定義一個持久化的隊列
* durable 持久化
* @return
*/
@Bean
public Queue queueJavatrip(){
return QueueBuilder.durable(QUEUE).build();
}
- 消費者開啟手動簽收模式,消費完成後進行ack确認。
spring.rabbitmq.listener.simple.acknowledge-mode=manual
@RabbitListener(queues = MqConfig.QUEUE)
public void receive(String body, Message message, Channel channel) throws Exception{
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.println(deliveryTag);
// 系統業務邏輯判斷是否簽收
if(deliveryTag % 2 == 0){
channel.basicAck(deliveryTag,false);
}else{
// 第二個參數是否批量确認,第三個參數是否重新回隊列
channel.basicNack(deliveryTag,false,true);
}
}
5. 如何保證消息不重複消費(如何保證消息的幂等性)
消息重複的原因有兩個:
-
生産時消息重複
由于生産者發送消息給MQ,在MQ确認的時候出現了網絡波動,生産者沒有收到确認,實際上MQ已經接收到了消息。這時候生産者就會重新發送一遍這條消息。
-
消費時消息重複。
消費者消費成功後,在給MQ确認的時候出現了網絡波動,MQ沒有接收到确認,為了保證消息被消費,MQ就會繼續給消費者投遞之前的消息。這時候消費者就接收到了兩條一樣的消息。
由于消息重複是網絡波動等原因造成的,無法避免,我們能做的的就是保證消息的幂等性,以防業務重複處理。具體處理方案為:
讓每個消息攜帶一個全局的唯一ID,即可保證消息的幂等性,具體消費過程為:
- 消費者擷取到消息後先根據id去查詢redis/db是否存在該消息。
- 如果不存在,則正常消費,消費完畢後寫入redis/db。
- 如果存在,則證明消息被消費過,直接丢棄。
@RabbitListener(queues = MqConfig.QUEUE)
public void receive(Message message, Channel channel){
String messageId = message.getMessageProperties().getMessageId();
String body = new String(message.getBody());
String redisId = redisTemplate.opsForValue().get(messageId)+"";
// 如果redis中存有目前消息的消息id
// 則證明消費過
if(messageId.equals(redisId)){
return;
}
redisTemplate.opsForValue().set(messageId, UUID.randomUUID());
}
6. 消息大量堆積應該怎麼處理
消息堆積的原因有兩個
- 網絡故障,消費者無法正常消費
- 消費方消費後未進行ack确認
解決方案如下:
- 檢查并修複消費者故障,使其正常消費
- 編寫臨時程式将堆積的消息發送到容量更大的MQ叢集,增加消費者快速消費
- 堆積消息消費完畢後,停止臨時程式,恢複正常消費
7. 死信是什麼?死信如何處理
當一條消息在隊列中出現以下三種情況的時候,該消息就會變成一條死信。
- 消息被拒絕(basic.reject / basic.nack),并且requeue = false
- 消息TTL過期
- 隊列達到最大長度
當消息在一個隊列中變成一個死信之後,如果配置了死信隊列,它将被重新publish到死信交換機,死信交換機将死信投遞到一個隊列上,這個隊列就是死信隊列。
一條消息成為死信後,一般會通過死信隊列進行存庫,然後定時将庫中的死信進行重新投遞到消息隊列上。
8. 如果我有一筆訂單,30分鐘未支付則關閉訂單,使用RabbitMQ如何來實作
RabbitMQ可以使用死信隊列來實作延時消費,使用者下單之後,将訂單資訊投遞到消息隊列中,并且設定消息過期時常為30分鐘。如果使用者支付則正常關閉訂單,如果使用者未支付,消息達到過期時間,消息會進入死信交換,由消費者進行消費死信隊列來關閉訂單。
9. RabbitMQ如何保證高可用
RabbitMQ有兩種叢集模式,分别是普通叢集和鏡像叢集,普通模式無法保證RabbitMQ的高可用。
普通叢集
假如有三個節點,rabbitmq1、rabbitmq2、rabbitmq3,消息實際上隻存在于其中一個節點,三個節點僅有相同的中繼資料,即隊列的結構,當消息進入rabbitmq2節點的queue後,consumer從rabbitmq1的節點進行消費,rabbitmq1和rabbitmq2會進行臨時通信,從rabbitmq2中擷取消息然後傳回給consumer。
這種模式存在以下兩個問題:
- 當rabbitmq2當機後,消息無法正常消費,沒有做到真正的高可用
- 實際資料還是在單個執行個體上,存在瓶頸問題
鏡像叢集
假如有三個節點,rabbitmq1、rabbitmq2、rabbitmq3,每個執行個體之間都可以互相通信,每次生産者寫消息到queue的時候,每個rabbitmq節點上都有queue的消息資料和中繼資料。這種模式使用于可靠性要求較高的場景。
點關注、不迷路
如果覺得文章不錯,歡迎關注、點贊、收藏,你們的支援是我創作的動力,感謝大家。
如果文章寫的有問題,請不要吝惜文筆,歡迎留言指出,我會及時核查修改。
如果你還想看到更多别的東西,可以微信搜尋「Java旅途」進行關注。回複“手冊”領取Java面試手冊!