RocketMQ高階使用
- RocketMQ高階使用
-
- 1. 車輛排程
-
- 1.1 業務分析
- 1.2 司機自動接單
- 1.3 使用者下車
- 1.4 使用者打車
- 1.2 技術分析
-
- 1.2.1 RocketMQ順序消息
- 1.2.2 Redis 輪詢隊列
- 2 順序消息
-
- 2.1 順序類型
- 2.2 Rocket順序消息
-
- 2.2.1 實作原理
- 2.2.2 如何保證叢集有序
- 2.3 代碼示例
-
- 2.31 隊列選擇器
- 2.3.2 消息發送者
- 2.3.3 消息消費者
- 3 消息投遞政策
-
- 3.1 生産者投遞政策
-
- 3.1.1 輪詢算法投遞
- 3.1.2 順序投遞政策
-
- 3.1.3 自帶實作類
- 3.2 消費者配置設定隊列
-
- 3.2.1 平均配置設定算法
- 3.2.2 一緻性hash配置設定算法
- 3.2.3 使用方式
- 4 RocketMQ消息保障
-
- 4.1 生産端保障
-
- 4.1.1 消息發送保障
-
- 同步發送
- 異步發送
- 4.1.2 消息發送總結
- 4.1.3 發送狀态
- 4.1.4 MQ發送端重試保障
-
- 重試問題
- 4.1.5 禁止自動建立topic
- 4.1.6 發端端規避
- 4.2 消費端保障
-
- 4.2.1 注意幂等性
- 4.2.2 消息消費模式
-
- 4.2.2.1 叢集消費
- 4.2.2.2 廣播消費
-
- 4.2.2.3 叢集模式模拟廣播
- 4.2.3 消息消費模式
-
- 4.2.3.1 推模式(PUSH)
- 4.2.3.2 拉模式(PULL)
- 4.2.3.3 注意事項
- 4.2.4 消息确認機制
-
- 4.2.4.1 确認消費
- 4.2.4.2 消費異常
- 4.2.4 消息重試機制
-
- 4.2.5 和生産端重試差別
- 4.2.6 重試配置方式
- 4.3 死信隊列
-
- 4.3.1 死信特性
- 5 Redis 輪詢隊列
-
- 5.1 相關代碼
-
- 5.1.1 redis擷取車輛
- 5.1.2 redis壓入車輛
RocketMQ高階使用
1. 車輛排程
1.1 業務分析
使用者打車從派單服務到排程服務,首先将消息以順序方式扔到RocketMQ中,然後消費的事務就會嚴格按照放入的順序進行消費,使用者首先拿到從RocketMQ推送的順序消息,然後保持住,開始輪詢檢查Redis中的List中是否存在車輛,存在兩種情況:
如果沒有拉取到車輛,然後會延時一段時間,繼續進行拉取,一直拉取不到的話一直進行自旋,一直等到拿到車輛才退出自旋。
如果拉取到車輛就會将使用者和拿到的車輛綁定到一起,開始後續操作,比如下訂單等。
1.2 司機自動接單
當司機上線後,開啟自動接單後,主題流程圖下
- 會先将車輛狀态設定為在Ready狀态,
- 當車輛接到使用者後會将車輛設定為Running狀态,
- 使用者下車後,會将車輛繼續設定為Ready狀态,并将車輛push進list
1.3 使用者下車
如果使用者點選下車,主體流程如下
- 會先将使用者狀态設定為Stop狀态
- 然後會解除車輛和使用者的綁定,
- 之後車輛會将會push到list的尾端,讓其他的使用者可以拉取到車輛資訊。
1.4 使用者打車
使用者上車後流程如下
- 校驗使用者狀态,然後将發送順序消息到RabbitMQ
- 消費者擷取到使用者消息,開始輪詢來拉取車輛資訊,如果拉取不到休眠一會繼續拉取,一直到拉取到
- 拉取到後校驗是否逾時,如果逾時直接結束打車,否則删除RabbitMQ的逾時檢測Key,失效逾時通知
- 設定使用者狀态為Running,後續就到了司機自動接單的流程了
1.2 技術分析
1.2.1 RocketMQ順序消息
打車需要排隊,我們需要讓前面的人能夠被消費到,不能讓這個順序亂掉,這就需要用到
RocketMQ的順序消息
1.2.2 Redis 輪詢隊列
我們要讓車輛在隊列中,從MQ拿到一個車輛後,需要再從隊列中拿取一個車輛如果拿不到則需要不斷的輪詢,一直到拿到車輛為止,如果打車玩完成還是需要将車輛歸還隊列,讓其他的使用者來打車,将一輛車重複利用起來
2 順序消息
2.1 順序類型
無序消息
無序消息也指普通的消息,Producer 隻管發送消息,Consumer 隻管接收消息,至于消息和消息之間的順序并沒有保證。
Producer 依次發送 orderId 為 1、2、3 的消息
Consumer 接到的消息順序有可能是 1、2、3,也有可能是 2、1、3 等情況,這就是普通消息。
全局順序
對于指定的一個 Topic,所有消息按照嚴格的先入先出(FIFO)的順序進行釋出和消費
比如 Producer 發送orderId 1,3,2 的消息, 那麼 Consumer 也必須要按照 1,3,2 的順序進行消費。
局部順序
在實際開發有些場景中,我并不需要消息完全按照完全按的先進先出,而是某些消息保證先進先出就可以了。
就好比一個打車涉及到不同地區 北京 , 上海 、 廣州 、 深圳 。我不用管其它的訂單,隻保證 同一個 地區的訂單ID能保證這個順序 就可以了。
2.2 Rocket順序消息
RocketMQ可以嚴格的保證消息有序,但這個順序,不是全局順序,隻是分區(queue)順序,要全局順序隻能一個分區。
之是以出現你這個場景看起來不是順序的,是因為發送消息的時候,消息發送預設是會采用輪詢的方式發送到不同的queue(分區)
2.2.1 實作原理
我們知道 生産的message最終會存放在Queue中,如果一個Topic關聯了4個Queue,如果我們不指定消息往哪個隊列裡放,那麼預設是平均配置設定消息到4個queue,
好比有10條消息,那麼這10條消息會平均配置設定在這4個Queue上,那麼每個Queue大概放2個左右。這裡有一點很重的是**:同一個queue,存儲在裡面的message 是按照先進先出的原則**
這個時候思路就來了,我們讓不同的地區用不同的queue。隻要保證同一個地區的訂單把他們放到同一個Queue那就保證消費者先進先出了。
這就保證局部順序了,即同一訂單按照先後順序放到同一Queue,那麼取消息的時候就可以保證先進先取出。
2.2.2 如何保證叢集有序
這裡還有很關鍵的一點,在一個消費者叢集的情況下,消費者1先去Queue拿消息,它拿到了 北京訂單1,它拿完後,消費者2去queue拿到的是 北京訂單2。
拿的順序是沒毛病了,但關鍵是先拿到不代表先消費完它。會存在雖然你消費者1先拿到北京訂單1,但由于網絡等原因,消費者2比你真正的先消費消息。這是不是很尴尬了。
** 分布式鎖**
Rocker采用的是分段鎖,它不是鎖整個Broker而是鎖裡面的單個Queue,因為隻要鎖單個Queue就可以保證局部順序消費了。
是以最終的消費者這邊的邏輯就是
消費者1去Queue拿 北京訂單1,它就鎖住了整個Queue,隻有它消費完成并傳回成功後,這個鎖才會釋放。
然後下一個消費者去拿到 北京訂單2 同樣鎖住目前Queue,這樣的一個過程來真正保證對同一個Queue能夠真正意義上的順序消費,而不僅僅是順序取出。
消息類型對比
全局順序與分區順序對比
發送方式對比
注意事項
- 順序消息暫不支援廣播模式。
- 順序消息不支援異步發送方式,否則将無法嚴格保證順序。
- 建議同一個 Group ID 隻對應一種類型的 Topic,即不同時用于順序消息和無序消息的收發。
- 對于全局順序消息,建議建立broker個數 >=2。
2.3 代碼示例
2.31 隊列選擇器
public class SelectorFactory {
/**
* 工廠模式擷取MessageQueueSelector
*
* @param value
* @return
*/
public static MessageQueueSelector getMessageQueueSelector(String value) {
//如果value不為空使用hash選擇器
if (StringUtils.isNotEmpty(value)) {
return new SelectMessageQueueByHash();
}
//如果value為空使用随機選擇器
return new SelectMessageQueueByRandom();
}
}
2.3.2 消息發送者
@Component
public class MQProducer {
@Autowired
DefaultMQProducer defaultMQProducer;
/**
* 同步發送消息
*
* @param orderPO
*/
public SendResult send(OrderPO orderPO) {
if (null == orderPO) {
return null;
}
SendResult sendResult = null;
//擷取消息對象
Message message = RocketMQHelper.buildMessage(DispatchConstant.SEQ_TOPIC, orderPO);
//根據區域編碼擷取隊列選擇器
MessageQueueSelector selector = SelectorFactory.getMessageQueueSelector(orderPO.getAreaCode());
//異步同步消息
try {
sendResult = defaultMQProducer.send(message, selector, orderPO.getAreaCode(), 10000);
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
}
return sendResult;
}
}
2.3.3 消息消費者
消費者真正要達到消費順序,需要分布式鎖,是以這裡需要将 MessageListenerOrderly 替換之前的MessageListenerConcurrently,因為它裡面實作了分布式鎖。
/**
* 消費消息
*/
public abstract class MQConsumeMessageListenerProcessor implements MessageListenerOrderly {
public static final Logger logger = LoggerFactory.getLogger(MQConsumeMessageListenerProcessor.class);
/**
* 消費有序消息
*
* @param list
* @param consumeOrderlyContext
* @return
*/
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
if (CollectionUtils.isEmpty(list)) {
logger.info("MQ接收消息為空,直接傳回成功");
return ConsumeOrderlyStatus.SUCCESS;
}
//消費消息
for (MessageExt messageExt : list) {
try {
String topic = messageExt.getTopic();
String tags = messageExt.getTags();
String body = new String(messageExt.getBody(), "utf-8");
//調用具體消費流程
processMessage(topic, tags, body);
logger.info("MQ消息topic={}, tags={}, 消息内容={}", topic, tags, body);
} catch (Exception e) {
logger.error("擷取MQ消息内容異常{}", e);
//暫停目前隊列
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
// TODO 處理業務邏輯
return ConsumeOrderlyStatus.SUCCESS;
}
/**
* 處理消息
*
* @param body
*/
public abstract void processMessage(String topic, String tags, String body);
}
上面我們介紹了順序消息,它主要将相同的消息投遞到一個隊列中的,具體如何投遞呢
3 消息投遞政策
上面我們介紹了順序消息,但是RocketMQ還支援哪些投遞政策呢、
RocketMQ 的消息模型整體并不複雜,如下圖所示:
一個 Topic(消息主題) 可能對應多個實際的消息隊列(MessgeQueue)
在底層實作上,為了提高MQ的可用性和靈活性,一個Topic在實際存儲的過程中,采用了多隊列的方式,具體形式如上圖所示。每個消息隊列在使用中應當保證先入先出(FIFO,First In First Out)的方式進行消費。
那麼,基于這種模型,就會引申出兩個問題:
生産者 在發送相同Topic的消息時,消息體應當被放置到哪一個消息隊列(MessageQueue)中?
消費者 在消費消息時,應當從哪些消息隊列中拉取消息?
3.1 生産者投遞政策
生産者投遞政策就是講如何将一個消息投遞到不同的queue中
3.1.1 輪詢算法投遞
預設投遞方式:基于 Queue隊列 輪詢算法投遞
預設情況下,采用了最簡單的輪詢算法,這種算法有個很好的特性就是,保證每一個 Queue隊列 的消息投遞數量盡可能均勻
3.1.2 順序投遞政策
在有些場景下,需要保證同類型消息投遞和消費的順序性。
例如,假設現在有TOPIC topicTest ,該 Topic下有4個 Queue隊列 ,該Topic用于傳遞訂單的狀态變遷,假設訂單有狀态: 未支付 、 已支付 、 發貨中(進行中) 、 發貨成功 、 發貨失敗 。
在時序上,生産者從時序上可以生成如下幾個消息:
訂單T0000001:未支付 --> 訂單T0000001:已支付 --> 訂單T0000001:發貨中(進行中) --> 訂單 T0000001:發貨失敗
消息發送到MQ中之後,可能由于輪詢投遞的原因,消息在MQ的存儲可能如下:
這種情況下,我們希望 消費者 消費消息的順序和我們發送是一緻的,然而,有上述MQ的投遞和消費機制,我們無法保證順序是正确的,對于順序異常的消息, 消費者 即使有一定的狀态容錯,也不能完全處理好這麼多種随機出現組合情況。
基于上述的情況, RockeMQ 采用了這種實作方案:對于相同訂單号的消息,通過一定的政策,将其放置在一個 queue隊列中 ,然後 消費者 再采用一定的政策(一個線程獨立處理一個 queue ,保證處理消息的順序性),能夠保證消費的順序性
生産者在消息投遞的過程中,使用了 MessageQueueSelector 作為隊列選擇的政策接口,其定義如下:
public interface MessageQueueSelector {
/**
* 根據消息體和參數,從一批消息隊列中挑選出一個合适的消息隊列
* @param mqs 待選擇的MQ隊列選擇清單
* @param msg 待發送的消息體
* @param arg 附加參數
* @return 選擇後的隊列
*/
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
3.1.3 自帶實作類
3.2 消費者配置設定隊列
RocketMQ對于消費者消費消息有兩種形式:
- BROADCASTING :廣播式消費,這種模式下,一個消息會被通知到每一個 消費者
- CLUSTERING : 叢集式消費,這種模式下,一個消息最多隻會被投遞到一個 消費者 上進行消費
對于使用了消費模式為 MessageModel.CLUSTERING 進行消費時,需要保證一個消息在整個叢集中隻需要被消費一次。實際上,在RoketMQ底層,消息指定配置設定給消費者的實作,是通過queue隊列配置設定給消費者的方式完成的:也就是說,消息配置設定的機關是消息所在的queue隊列
将 queue隊列 指定給特定的 消費者 後, queue隊列 内的所有消息将會被指定到 消費者 進行消費。
RocketMQ定義了政策接口 AllocateMessageQueueStrategy ,對于給定的 消費者分組 ,和 消息隊列 清單 、 消費者清單 , 目前消費者 應當被配置設定到哪些 queue隊列 ,定義如下:
public interface AllocateMessageQueueStrategy {
/**
* Allocating by consumer id
*
* @param consumerGroup current consumer group 目前 consumer群組
* @param currentCID current consumer id 目前consumer id
* @param mqAll message queue set in current topic 目前topic的所有queue執行個體引用
* @param cidAll consumer set in current consumer group 目前 consumer群組下所有的consumer id set集合
* @return The allocate result of given strategy 根據政策給目前consumer配置設定的queue清單
*/
List<MessageQueue> allocate(
final String consumerGroup,
final String currentCID,
final List<MessageQueue> mqAll,
final List<String> cidAll
);
/**
* Algorithm name
*
* @return The strategy name
*/
String getName();
}
相應地,RocketMQ提供了如下幾種實作
為了講述清楚上述算法的基本原理,我們先假設一個例子,下面所有的算法将基于這個例子講解。
假設目前同一個topic下有queue隊列 10 個,消費者共有 4 個,如下圖所示:
3.2.1 平均配置設定算法
這裡所謂的平均配置設定算法,并不是指的嚴格意義上的完全平均,如上面的例子中,10個queue,而消費者隻有4個,無法是整除關系,除了整除之外的多出來的queue,将依次根據消費者的順序均攤。按照上述例子來看, 10/4=2 ,即表示每個 消費者 平均均攤2個queue;而 10%4=2 ,即除了均攤之外,多出來2個 queue 還沒有配置設定,那麼,根據消費者的順序 consumer-1 、 consumer-2 、 consumer- 3 、 consumer-4 ,則多出來的2個 queue 将分别給 consumer-1 和 consumer-2 。
最終,分攤關系如下:
consumer-1 :3個
consumer-2 :3個
consumer-3 :2個
consumer-4 :2個
3.2.2 一緻性hash配置設定算法
使用這種算法,會将 consumer消費者 作為Node節點構造成一個hash環,然後 queue隊列 通過這個hash環來決定被配置設定給哪個 consumer消費者 。
其基本模式如下:
一緻性hash算法用于在分布式系統中,保證資料的一緻性而提出的一種基于hash環實作的算法
3.2.3 使用方式
預設消費者使用使用了 AllocateMessageQueueAveragely 平均配置設定政策
如果需要使用其他配置設定政策,使用方式如下
//建立一個消息消費者,并設定一個消息消費者組,并指定使用一緻性hash算法的配置設定政策
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer(null,"rocket_test_consumer_group",null,new
AllocateMessageQueueConsistentHash());
.....
4 RocketMQ消息保障
下面我們詳細說下如何保障消息不丢失以及消息幂等性問題
4.1 生産端保障
生産端保障需要從一下幾個方面來保障
- 使用可靠的消息發送方式
- 注意生産端重試
- 生産禁止自動建立topic
4.1.1 消息發送保障
同步發送
發送者向MQ執行發送消息API時,同步等待,直到消息伺服器傳回發送結果,會在收到接收方發回響應之後才發下一個資料包的通訊方式,這種方式隻有在消息完全發送完成之後才傳回結果,此方式存在需要同步等待發送結果的時間代價。
簡單來說,同步發送就是指 producer 發送消息後,會在接收到 broker 響應後才繼續發下一條消息的通信方式。
使用場景
由于這種同步發送的方式確定了消息的可靠性,同時也能及時得到消息發送的結果,故而适合一些發送比較重要的消息場景,比如說重要的通知郵件、營銷短信等等。在實際應用中,這種同步發送的方式還是用得比較多的。
注意事項
這種方式具有内部重試機制,即在主動聲明本次消息發送失敗之前,内部實作将重試一定次數,預設為2次( DefaultMQProducer#getRetryTimesWhenSendFailed )。 發送的結果存在同一個消息可能被多次發送給broker,這裡需要應用的開發者自己在消費端處理幂等性問題。
異步發送
異步發送是指發送方發出資料後,不等接收方發回響應,接着發送下個資料包的通訊方式。 MQ的異步發送,需要使用者實作異步發送回調接口( SendCallback )
異步發送是指 producer 發出一條消息後,不需要等待 broker 響應,就接着發送下一條消息的通信方式。需要注意的是,不等待 broker 響應,并不意味着 broker 不響應,而是通過回調接口來接收broker 的響應。是以要記住一點,異步發送同樣可以對消息的響應結果進行處理。
使用場景
由于異步發送不需要等待 broker 的響應,故在一些比較注重 RT(響應時間)的場景就會比較适用。比如,在一些視訊上傳的場景,我們知道視訊上傳之後需要進行轉碼,如果使用同步發送的方式來通知啟動轉碼服務,那麼就需要等待轉碼完成才能發回轉碼結果的響應,由于轉碼時間往往較長,很容易造成響應逾時。此時,如果使用的是異步發送通知轉碼服務,那麼就可以等轉碼完成後,再通過回調接口來接收轉碼結果的響應了。
注意事項
注意:RocketMQ内部隻對同步模式做了重試,異步發送模式是沒有自動重試的,需要自己手動實作
4.1.2 消息發送總結
發送方式對比
使用場景對比
在實際使用場景中,利用何種發送方式,可以總結如下:
當發送的消息不重要時,采用 one-way 方式,以提高吞吐量;
當發送的消息很重要時,且對響應時間不敏感的時候采用 sync 方式;
當發送的消息很重要,且對響應時間非常敏感的時候采用 async 方式;
4.1.3 發送狀态
發送消息時,将獲得包含SendStatus的SendResult。首先,我們假設Message的
isWaitStoreMsgOK = true(預設為true),如果沒有抛出異常,我們将始終獲得SEND_OK,以下是每個狀态的說明清單:
FLUSH_DISK_TIMEOUT
如果設定了 FlushDiskType=SYNC_FLUSH (預設是 ASYNC_FLUSH),并且 Broker 沒有在syncFlushTimeout (預設是 5 秒)設定的時間内完成刷盤,就會收到此狀态碼。
FLUSH_SLAVE_TIMEOUT
如果設定為 SYNC_MASTER ,并且 slave Broker 沒有在 syncFlushTimeout 設定時間内完成同步,就會收到此狀态碼。
SLAVE_NOT_AVAILABLE
如果設定為 SYNC_MASTER ,并沒有配置 slave Broker,就會收到此狀态碼。
SEND_OK
這個狀态可以簡單了解為,沒有發生上面列出的三個問題狀态就是SEND_OK。需要注意的是,SEND_OK 并不意味着可靠,如果想嚴格確定沒有消息丢失,需要開啟 SYNC_MASTER orSYNC_FLUSH。
注意事項
如果收到了 FLUSH_DISK_TIMEOUT, FLUSH_SLAVE_TIMEOUT ,意味着消息會丢失,有2個選擇,一是無所謂,适用于消息不關緊要的場景,二是重發,但可能産生消息重複,這就需要consumer進行去重控制。如果收到了 SLAVE_NOT_AVAILABLE 就要趕緊通知管理者了。
4.1.4 MQ發送端重試保障
如果由于網絡抖動等原因,Producer程式向Broker發送消息時沒有成功,即發送端沒有收到Broker的ACK,導緻最終Consumer無法消費消息,此時RocketMQ會自動進行重試。
DefaultMQProducer可以設定消息發送失敗的最大重試次數,并可以結合發送的逾時時間來進行重試的處理,具體API如下:
//設定消息發送失敗時的最大重試次數
public void setRetryTimesWhenSendFailed(Integer retryTimesWhenSendFailed) {
this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
}
//同步發送消息,并指定逾時時間
@Override
public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
msg.setTopic(withNamespace(msg.getTopic()));
return this.defaultMQProducerImpl.send(msg, timeout);
}
重試問題
逾時重試針對網上說的逾時異常會重試的說法都是錯誤的
是因為下面測試代碼的逾時時間設定為5毫秒 ,按照正常肯定會報逾時異常,但設定1次重試和3000次的重試,雖然最終都會報下面異常,但輸出錯誤時間報顯然不應該是一個級别。但測試發現無論設定的多少次的重試次數,報異常的時間都差不多。
測試代碼
public class RetryProducer {
public static void main(String[] args) throws UnsupportedEncodingException,
InterruptedException, RemotingException, MQClientException, MQBrokerException {
//建立一個消息生産者,并設定一個消息生産者組
DefaultMQProducer producer = new
DefaultMQProducer("rocket_test_consumer_group");
//指定 NameServer 位址
producer.setNamesrvAddr("127.0.0.1:9876");
//設定重試次數(預設2次)
producer.setRetryTimesWhenSendFailed(300000);
//初始化 Producer,整個應用生命周期内隻需要初始化一次
producer.start();
Message msg = new Message(
/* 消息主題名 */
"topicTest",
/* 消息标簽 */
"TagA",
/* 消息内容 */
("Hello Java demo RocketMQ
").getBytes(RemotingHelper.DEFAULT_CHARSET));
//發送消息并傳回結果,設定逾時時間 5ms 是以每次都會發送失敗
SendResult sendResult = producer.send(msg, 5);
System.out.printf("%s%n", sendResult);
// 一旦生産者執行個體不再被使用則将其關閉,包括清理資源,關閉網絡連接配接等
producer.shutdown();
}
}
揭曉答案
針對這個疑惑,需要檢視源碼,發現隻有同步發送才會重試,并且逾時是不重試的
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong();
//1、擷取目前時間
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
//2、去伺服器看下有沒有主題消息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
//3、通過這裡可以很明顯看出 如果不是同步發送消息 那麼消息重試隻有1次
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
//4、根據設定的重試次數,循環再去擷取伺服器主題消息
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
if (times > 0) {
//Reset topic with namespace during resend.
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
}
long costTime = beginTimestampPrev - beginTimestampFirst;
//5、前後時間對比 如果前後時間差 大于 設定的等待時間 那麼直接跳出for循環了 這就 說明連接配接逾時是不進行多次連接配接重試的
if (timeout < costTime) {
callTimeout = true;
break;
}
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
//6、如果逾時直接報錯
if (sendResult != null) {
return sendResult;
}
.......
重試總結
通過這段源碼很明顯可以看出以下幾點
- 如果是異步發送那麼重試次數隻有1次
- 對于同步而言,逾時異常也是不會再去重試。
- 如果發生重試是在一個for 循環裡去重試,是以它是立即重試而不是隔一段時間去重試。
4.1.5 禁止自動建立topic
自動建立TOPIC流程autoCreateTopicEnable 設定為true 辨別開啟自動建立topic
- 消息發送時如果根據topic沒有擷取到 路由資訊,則會根據預設的topic去擷取,擷取到路由資訊後選擇一個隊列進行發送,發送時封包會帶上預設的topic以及預設的隊列數量。
- 消息到達broker後,broker檢測沒有topic的路由資訊,則查找預設topic的路由資訊,查到表示開啟了自動建立topic,則會根據消息内容中的預設的隊列數量在本broker上建立topic,然後進行消息存儲。
- broker建立topic後并不會馬上同步給namesrv,而是每30進行彙報一次,更新namesrv上的topic路由資訊,producer會每30s進行拉取一次topic的路由資訊,更新完成後就可以正常發送消息。更新之前一直都是按照預設的topic查找路由資訊。
為什麼不能開啟自動建立
上述 broker 中流程會有一個問題,就是在producer更新路由資訊之前的這段時間,如果消息隻發送到了broker-a,則broker-b上不會建立這個topic的路由資訊,broker互相之間不通信。當producer更新之後,擷取到的broker清單隻有broker-a,就永遠不會輪詢到broker-b的隊列(因為沒有路由資訊),是以我們生産通常關閉自動建立broker,而是采用手動建立的方式。
4.1.6 發端端規避
注意了,這裡我們發現,有可能在實際的生産過程中,我們的 RocketMQ 有幾台伺服器構成的叢集
其中有可能是一個主題 TopicA 中的 4 個隊列分散在 Broker1、Broker2、Broker3 伺服器上
如果這個時候 Broker2 挂了,我們知道,但是生産者不知道(因為生産者用戶端每隔 30S 更新一次路由,但是 NamServer 與 Broker 之間的心跳檢測間隔是 10S,是以生産者最快也需要 30S 才能感覺Broker2 挂了),是以發送到 queue2 的消息會失敗,RocketMQ 發現這次消息發送失敗後,就會将Broker2排除在消息的選擇範圍,下次再次發送消息時就不會發送到 Broker2,這樣做的目的就是為了提高發送消息的成功率。
4.2 消費端保障
4.2.1 注意幂等性
應用程式在使用RocketMQ進行消息消費時必須支援幂等消費,即同一個消息被消費多次和消費一次的結果一樣。這一點在使用RoketMQ或者分析RocketMQ源代碼之前再怎麼強調也不為過。
**“至少一次送達”**的消息傳遞政策,和消息重複消費是一對共生的因果關系。要做到不丢消息就無法避免消息重複消費。原因很簡單,試想一下這樣的場景:用戶端接收到消息并完成了消費,在消費确認過程中發生了通訊錯誤。從Broker的角度是無法得知用戶端是在接收消息過程中出錯還是在消費确認過程中出錯。為了確定不丢消息,重發消息是唯一的選擇。
有了消息幂等消費約定的基礎,RocketMQ就能夠有針對性地采取一些性能優化措施,例如:并行消費、消費進度同步機制等,這也是RocketMQ性能優異的原因之一。
4.2.2 消息消費模式
從不同的次元劃分,Consumer支援以下消費模式:
廣播消費模式下,消息消費失敗不會進行重試,消費進度儲存在Consumer端;
叢集消費模式下,消息消費失敗有機會進行重試,消費進度集中儲存在Broker端。
4.2.2.1 叢集消費
使用相同 Group ID 的訂閱者屬于同一個叢集,同一個叢集下的訂閱者消費邏輯必須完全一緻(包括 Tag 的使用),這些訂閱者在邏輯上可以認為是一個消費節點
注意事項
- 消費端叢集化部署, 每條消息隻需要被處理一次。
- 由于消費進度在服務端維護, 可靠性更高。
- 叢集消費模式下,每一條消息都隻會被分發到一台機器上處理。如果需要被叢集下的每一台機器都處理,請使用廣播模式。
- 叢集消費模式下,不保證每一次失敗重投的消息路由到同一台機器上,是以處理消息時不應該做任何确定性假設。
4.2.2.2 廣播消費
廣播消費指的是:一條消息被多個consumer消費,即使這些consumer屬于同一個
ConsumerGroup,消息也會被ConsumerGroup中的每個Consumer都消費一次,廣播消費中ConsumerGroup概念可以認為在消息劃分方面無意義。
注意事項
- 廣播消費模式下不支援順序消息。
- 廣播消費模式下不支援重置消費位點。
- 每條消息都需要被相同邏輯的多台機器處理。
- 消費進度在用戶端維護,出現重複的機率稍大于叢集模式。
-
廣播模式下,消息隊列 RocketMQ
保證每條消息至少被每台用戶端消費一次,但是并不會對消費失敗的消息進行失敗重投,是以業務方需要關注消費失敗的情況。
- 廣播模式下,用戶端每一次重新開機都會從最新消息消費。用戶端在被停止期間發送至服務端的消息将會被自動跳過, 請謹慎選擇。
- 廣播模式下,每條消息都會被大量的用戶端重複處理,是以推薦盡可能使用叢集模式。 目前僅 Java 用戶端支援廣播模式。
- 廣播模式下服務端不維護消費進度,是以消息隊列 RocketMQ 控制台不支援消息堆積查詢、消息堆積報警和訂閱關系查詢功能。
4.2.2.3 叢集模式模拟廣播
如果業務需要使用廣播模式,也可以建立多個 Group ID,用于訂閱同一個 Topic。
注意事項
- 每條消息都需要被多台機器處理,每台機器的邏輯可以相同也可以不一樣。
- 消費進度在服務端維護,可靠性高于廣播模式。
- 對于一個 Group ID來說,可以部署一個消費端執行個體,也可以部署多個消費端執行個體。當部署多個消費端執行個體時,執行個體之間又組成了叢集模式(共同分擔消費消息)。假設Group ID 1 部署了三個消費者執行個體 C1、C2、C3,那麼這三個執行個體将共同分擔伺服器發送給 Group ID 1的消息。同時,執行個體之間訂閱關系必須保持一緻。
4.2.3 消息消費模式
RocketMQ消息消費本質上是基于的拉(pull)模式,consumer主動向消息伺服器broker拉取消息。
推消息模式下,消費進度的遞增是由RocketMQ内部自動維護的;
拉消息模式下,消費進度的變更需要上層應用自己負責維護,RocketMQ隻提供消費進度儲存和查詢功能。
4.2.3.1 推模式(PUSH)
我們上面使用的消費者都是PUSH模式,也是最常用的消費模式
由消息中間件(MQ消息伺服器代理)主動地将消息推送給消費者;采用Push方式,可以盡可能實時地将消息發送給消費者進行消費。但是,在消費者的處理消息的能力較弱的時候(比如,消費者端的業務系統處理一條消息的流程比較複雜,其中的調用鍊路比較多導緻消費時間比較久。概括起來地說就是(“慢消費問題”),而MQ不斷地向消費者Push消息,消費者端的緩沖區可能會溢出,導緻異常。
實作方式,代碼上使用 DefaultMQPushConsumer
consumer把輪詢過程封裝了,并注冊MessageListener監聽器,取到消息後,喚醒
MessageListener的consumeMessage()來消費,對使用者而言,感覺消息是被推送(push)過來的。主要用的也是這種方式。
4.2.3.2 拉模式(PULL)
RocketMQ的PUSH模式是由PULL模式來實作的
由消費者用戶端主動向消息中間件(MQ消息伺服器代理)拉取消息;采用Pull方式,如何設定Pull消息的頻率需要重點去考慮,舉個例子來說,可能1分鐘内連續來了1000條消息,然後2小時内沒有新消息産生(概括起來說就是“消息延遲與忙等待”)。如果每次Pull的時間間隔比較久,會增加消息的延遲,
即消息到達消費者的時間加長,MQ中消息的堆積量變大;若每次Pull的時間間隔較短,但是在一段時間内MQ中并沒有任何消息可以消費,那麼會産生很多無效的Pull請求的RPC開銷,影響MQ整體的網絡性能。
4.2.3.3 注意事項
注意:RocketMQ 4.6.0版本後将棄用DefaultMQPullConsumer
DefaultMQPullConsumer方式需要手動管理偏移量,官方已經被廢棄,将在2022年進行删除
DefaultLitePullConsumer
該類是官方推薦使用的手動拉取的實作類,偏移量送出由RocketMQ管理,不需要手動管理
4.2.4 消息确認機制
consumer的每個執行個體是靠隊列配置設定來決定如何消費消息的。那麼消費進度具體是如何管理的,又是如何保證消息成功消費的?(RocketMQ有保證消息肯定消費成功的特性,失敗則重試)
為了保證資料不被丢失,RocketMQ支援消息确認機制,即ack。發送者為了保證消息肯定消費成功,隻有使用方明确表示消費成功,RocketMQ才會認為消息消費成功。中途斷電,抛出異常等都不會認為成功——即都會重新投遞。
4.2.4.1 确認消費
業務實作消費回調的時候,當且僅當此回調函數傳回
ConsumeConcurrentlyStatus.CONSUME_SUCCESS ,RocketMQ才會認為這批消息(預設是1條)是消費完成的。
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
execute();//執行真正消費
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
4.2.4.2 消費異常
如果這時候消息消費失敗,例如資料庫異常,餘額不足扣款失敗等一切業務認為消息需要重試的場景,隻要傳回 ConsumeConcurrentlyStatus.RECONSUME_LATER ,RocketMQ就會認為這批消息消費失敗了。
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
execute();//執行真正消費
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
為了保證消息是肯定被至少消費成功一次,RocketMQ會把這批消息重發回Broker(topic不是原topic而是這個消費組的RETRY topic),在延遲的某個時間點(預設是10秒,業務可設定)後,再次投遞到這個ConsumerGroup。而如果一直這樣重複消費都持續失敗到一定次數(預設16次),就會投遞到DLQ死信隊列。應用可以監控死信隊列來做人工幹預。
4.2.4 消息重試機制
順序消息的重試
對于順序消息,當消費者消費消息失敗後,消息隊列RocketMQ版會自動不斷地進行消息重試(每次間隔時間為1秒),這時,應用會出現消息消費被阻塞的情況。是以,建議您使用順序消息時,務必保證應用能夠及時監控并處理消費失敗的情況,避免阻塞現象的發生。
無序消息的重試
無序消息的重試隻針對叢集消費方式生效;廣播方式不提供失敗重試特性,即消費失敗後,失敗消息不再重試,繼續消費新的消息。
重試次數
消息隊列RocketMQ版預設允許每條消息最多重試16次,每次重試的間隔時間如下。
如果消息重試16次後仍然失敗,消息将不再投遞。如果嚴格按照上述重試時間間隔計算,某條消息在一直消費失敗的前提下,将會在接下來的4小時46分鐘之内進行16次重試,超過這個時間範圍消息将不再重試投遞。
4.2.5 和生産端重試差別
消費者和生産者的重試還是有差別的,主要有兩點
預設重試次數:Product預設是2次,而Consumer預設是16次。
重試時間間隔:Product是立刻重試,而Consumer是有一定時間間隔的。它照
1S,5S,10S,30S,1M,2M····2H 進行重試。
注意:Product在異步情況重試失效,而對于Consumer在廣播情況下重試失效。
4.2.6 重試配置方式
需要重試
消費失敗後,重試配置方式,叢集消費方式下,消息消費失敗後期望消息重試,需要在消息監聽器
接口的實作中明确進行配置(三種方式任選一種):
方式1:傳回RECONSUME_LATER(推薦)
方式2:傳回Null
方式3:抛出異常
無需重試
叢集消費方式下,消息失敗後期望消息不重試,需要捕獲消費邏輯中可能抛出的異常,最終傳回Action.CommitMessage,此後這條消息将不會再重試。
//注冊消息監聽器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
//消息處理邏輯抛出異常,消息将重試。
doConsumerMessage(list);
} catch (Exception e) {
//捕獲消費邏輯中的所有異常,并傳回Action.CommitMessage;
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
//業務方正常消費
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
4.3 死信隊列
在正常情況下無法被消費(超過最大重試次數)的消息稱為死信消息(Dead-Letter Message),存儲死信消息的特殊隊列就稱為死信隊列(Dead-Letter Queue)
當一條消息初次消費失敗,消息隊列 RocketMQ 會自動進行消息重試;達到最大重試次 數後,若消費依然失敗,則表明消費者在正常情況下無法正确地消費該消息,此時,消息隊列 RocketMQ 不會立刻将消息丢棄,而是将其發送到該消費者對應的特殊隊列中。 在消息隊列 RocketMQ 中,這種正常情況下無法被消費的消息稱為死信消息(Dead-Letter Message),存儲死信消息的特殊隊列稱為死信隊列(Dead-Letter Queue)。
4.3.1 死信特性
死信消息特性
- 不會再被消費者正常消費
- 有效期與正常消息相同,均為 3 天,3 天後會被自動删除。故死信消息應在産生的 3 天内及時處理
** 死信隊列特性**
- 一個死信隊列對應一個消費者組,而不是對應單個消費者執行個體
- 一個死信隊列包含了對應的 Group ID 所産生的所有死信消息,不論該消息屬于哪個 Topic
- 若一個 Group ID 沒有産生過死信消息,則 RocketMQ 不會為其建立相應的死信隊列
5 Redis 輪詢隊列
redis隊列中存放車輛資訊,排程系統從隊列中擷取車輛資訊,打車完成後再将車輛資訊放回隊列中
5.1 相關代碼
5.1.1 redis擷取車輛
public String takeVehicle() {
//從Redis List清單中拿取一個車輛ID
return redisTemplate.opsForList().leftPop(DispatchConstant.VEHICLE_QUEUE, 1, TimeUnit.SECONDS);
}
5.1.2 redis壓入車輛
檢查車輛狀态,并從右側壓入車輛
public void readyDispatch(String vehicleId) {
//檢查車輛狀态
DispatchConstant.DispatchType vehicleDispatchType = taxiVehicleStatus(vehicleId);
//如果車輛時運作狀态
if (vehicleDispatchType.isRunning() || vehicleDispatchType.isReady()) {
redisTemplate.opsForValue().set(DispatchConstant.VEHICLE_STATUS_PREFIX + vehicleId, DispatchConstant.DispatchType.READY.toString());
//從右側壓入車輛
redisTemplate.opsForList().rightPush(DispatchConstant.VEHICLE_QUEUE, vehicleId);
}