個人部落格網:https://wushaopei.github.io/ (你想要這裡多有)
1、可用性保證
引入消息隊列之後該如何保證其高可用性?
持久化、事務、簽收、 以及帶複制的 Leavel DB + zookeeper 主從叢集搭建
2、異步投遞Async Sends
2.1 異步投遞的定義
對于一個Slow Consumer,使用同步發送消息可能出現Producer堵塞的情況,慢消費者适合使用異步發送
如圖介紹:

2.2 什麼是異步投遞
ActiveMQ支援同步,異步兩種發送的模式将消息發送到broker,模式的選擇對發送延時有巨大的影響。producer能達到怎麼樣的産出率(産出率=發送資料總量/時間)主要受發送延時的影響,使用異步發送可以顯著提高發送的性能。
ActiveMQ預設使用異步發送的模式:除非明确指定使用同步發送的方式或者在未使用事務的前提下發送持久化的消息,這兩種情況都是同步發送的。
如果你沒有使用事務且發送的是持久化的消息,每一次發送都是同步發送的且會阻塞producer知道broker傳回一個确認,表示消息已經被安全的持久化到磁盤。确認機制提供了消息安全的保障,但同時會阻塞用戶端帶來了很大的延時。
很多高性能的應用,允許在失敗的情況下有少量的資料丢失。如果你的應用滿足這個特點,你可以使用異步發送來提高生産率,即使發送的是持久化的消息。
異步發送
- 它可以最大化producer端的發送效率。我們通常在發送消息量比較密集的情況下使用異步發送,它可以很大的提升Producer性能;不過這也帶來了額外的問題,
- 就是需要消耗更多的Client端記憶體同時也會導緻broker端性能消耗增加;
- 此外它不能有效的確定消息的發送成功。在userAsyncSend=true的情況下用戶端需要容忍消息丢失的可能。
2.3 官網配置參考(投遞開啟)
在高性能要求下,可以使用異步提高producer 的性能。但會消耗較多的client 端記憶體,也不能完全保證消息發送成功。在 useAsyncSend = true 情況下容忍消息丢失。
// 開啟異步投遞
activeMQConnectionFactory.setUseAsyncSend(true);
2.4 面試題: 異步消息如何确定發送成功?
異步發送丢失消息的場景是:生産者設定userAsyncSend=true,使用producer.send(msg)持續發送消息。
- 如果消息不阻塞,生産者會認為所有send的消息均被成功發送至MQ。
- 如果MQ突然當機,此時生産者端記憶體中尚未被發送至MQ的消息都會丢失。
- 是以,正确的異步發送方法是需要接收回調的。
同步發送和異步發送的差別就在此,
- 同步大宋等send不阻塞了就表示一定發送成功了,
- 異步發送需要用戶端回執并由用戶端再判斷一次是否發送成功
具體實作的代碼案例:
JmsProduce_AsyncSend 在代碼中接收回調的函數 :
package com.demo;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.AsyncCallback;
import javax.jms.*;
import java.util.UUID;
public class Producer {
private static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
private static final String ACTIVEMQ_QUEUE_NAME = "Queue-異步投遞回調";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(ACTIVEMQ_URL);
//開啟異步投遞
activeMQConnectionFactory.setUseAsyncSend(true);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
//向上轉型到ActiveMQMessageProducer
ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer) session.createProducer(queue);
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("message-" + i);
textMessage.setJMSMessageID(UUID.randomUUID().toString() + "----orderAtguigu");
String textMessageId = textMessage.getJMSMessageID();
//使用ActiveMQMessageProducer的發送消息,可以建立回調
activeMQMessageProducer.send(textMessage, new AsyncCallback() {
@Override
public void onSuccess() {
System.out.println(textMessageId + "發送成功");
}
@Override
public void onException(JMSException exception) {
System.out.println(textMessageId + "發送失敗");
}
});
}
activeMQMessageProducer.close();
session.close();
connection.close();
}
}
3、延時投遞和定時投遞
3.1 在配置檔案中設定定時器開關 為 true
3.2 代碼編寫
- Java 代碼中封裝的輔助消息類型 ScheduleMessage
- 可以設定的 常用參數 如下:
long delay = 3 * 1000 ;
long perid = 4 * 1000 ;
int repeat = 7 ;
for (int i = 1; i < 4 ; i++) {
TextMessage textMessage = session.createTextMessage("delay msg--" + i);
// 消息每過 3 秒投遞,每 4 秒重複投遞一次 ,一共重複投遞 7 次
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,delay);
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD,perid);
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT,repeat);
messageProducer.send(textMessage);
}
4、 Activemq的消息重試機制
(1)由消息丢失或延時而引發的問題——常見面試題
具體哪些情況會引發消息重發
- Client用了transactions且再session中調用了rollback
- Client用了transactions且再調用commit之前關閉或者沒有commit
- Client再CLIENT_ACKNOWLEDGE的傳遞模式下,session中調用了recover
請說說消息重發時間間隔和重發次數
-
間隔:1
次數:6
每秒發6次
有毒消息Poison ACK
- 一個消息被redelivedred超過預設的最大重發次數(預設6次)時,消費的回個MQ發一個“poison ack”表示這個消息有毒,告訴broker不要再發了。這個時候broker會把這個消息放到DLQ(私信隊列)。
重試機制的注意事項:
- 最多六次還沒發出就會
- 加入DLQ (死信隊列)
官網連結位址: http://activemq.apache.org/redelivery-policy
(2)簡單案例說明:
(3)重發整合Spring 的配置
(4)重試機制的屬性說明:
5、死信隊列
(1)概念: Activemq中引入了“死信隊列”(Dead Letter Queue) 的概念。即一條消息再被重發了多次後(預設為重發6次redeliveryCounter==6),将會被Activemq移入“死信隊列”,開發人員可以在這個Queue中檢視處理出錯的消息,進行人工幹預。
(2)死信隊列的使用:處理失敗的消息
(3)死信隊列的配置介紹
- SharedDeadLetterStrategy
- IndividualDeadLetterStrategy
- 配置案例
a. 自動删除過期消息
b. 存放非持久消息到死隊列中
在業務邏輯中,如果一個訂單系統沒有問題,則使用正常的業務隊列,當出現問題,則加入死信隊列 ,此時可以選擇人工幹預還是機器處理 。
6、 如何保證消息不被重複消費,幂等性的問題
如果消息是做資料庫的插入操作,給這個消息一個唯一的主鍵,那麼就算出現重複消費的情況,就會導緻主鍵沖突,避免資料庫出現髒資料 。
如果不是,可以用redis 等的第三方服務,給消息一個全局 id ,隻要消費過的消息,将 id ,message 以 K-V 形式寫入 redis ,那消費者開始消費前,先去 redis 中查詢有沒消費的記錄即可。