下一章: 發送問題排查 | 《Rocket MQ 使用排查指南》第二章>>> 點選免費下載下傳 《Rocket MQ 使用排查指南》>>>

什麼是消息隊列Rocket MQ
核心概念
消息隊列 RocketMQ 版是阿裡雲基于 Apache RocketMQ 建構的低延遲、高 并發、高可用、高可靠的分布式消息中間件。消息隊列 RocketMQ 版既可為分布式 應用系統提供異步解耦和削峰填谷的能力,同時也具備網際網路應用所需的海量消息堆 積、高吞吐、可靠重試等特性。
産品功能與特性
消息隊列 RocketMQ 版在阿裡雲多個地域(Region)提供了高可用消息雲服務。 單個地域内采用多機房部署,可用性極高,即使整個機房都不可用,仍然可以為應用 提供消息釋出服務。
消息隊列 RocketMQ 版提供 TCP 和 HTTP 協定的多語言接入方式,友善不 同程式設計語言開發的應用快速接入消息隊列 RocketMQ 版消息雲服務。您可以将應 用部署在阿裡雲 ECS、企業自建雲,或者嵌入到移動端、物聯網裝置中與消息隊列 RocketMQ 版建立連接配接進行消息收發;同時,本地開發者也可以通過公網接入消息隊 列 RocketMQ 版服務進行消息收發。
系統部署架構
系統部署架構如下圖所示。
圖中所涉及到的概念如下所述:
● Name Server:是一個幾乎無狀态節點,可叢集部署,在消息隊列 RocketMQ 版中提供命名服務,更新和發現 Broker 服務。
● Broker:消息中轉角色,負責存儲消息,轉發消息。分為 Master Broker 和 Slave Broker,一個 Master Broker 可以對應多個 Slave Broker,但是一個 Slave Broker 隻能對應一個 Master Broker。Broker 啟動後需要完成一次将 自己注冊至 Name Server 的操作;随後每隔 30s 定期向 Name Server 上報 Topic 路由資訊。
● 生産者:與 Name Server 叢集中的其中一個節點(随機)建立長連結(Keepalive),定期從 Name Server 讀取 Topic 路由資訊,并向提供 Topic 服務的 Master Broker 建立長連結,且定時向 Master Broker 發送心跳。
● 消費者:與 Name Server 叢集中的其中一個節點(随機)建立長連接配接,定期從 Name Server 拉取 Topic 路由資訊,并向提供 Topic 服務的 Master Broker、 Slave Broker 建立長連接配接,且定時向 Master Broker、Slave Broker 發送心 跳。Consumer 既可以從 Master Broker 訂閱消息,也可以從 Slave Broker 訂閱消息,訂閱規則由 Broker 配置決定。 應用場景
應用場景
削峰填谷
流量削峰也是消息隊列 RocketMQ 版的常用場景,一般在秒殺或團隊搶購活動 中使用廣泛。
在秒殺或團隊搶購活動中,由于使用者請求量較大,導緻流量暴增,秒殺的應用在 處理如此大量的通路流量後,下遊的通知系統無法承載海量的調用量,甚至會導緻系 統崩潰等問題而發生漏通知的情況。為解決這些問題,可在應用和下遊通知系統之間 加入消息隊列 RocketMQ 版。
秒殺處理流程如下所述:
- 使用者發起海量秒殺請求到秒殺業務處理系統。
- 秒殺處理系統按照秒殺處理邏輯将滿足秒殺條件的請求發送至消息隊列 RocketMQ 版。
- 下遊的通知系統訂閱消息隊列 RocketMQ 版的秒殺相關消息,再将秒殺成 功的消息發送到相應使用者。
- 使用者收到秒殺成功的通知。 異步解耦 傳統處理
異步解耦
傳統處理
最常見的一個場景是使用者注冊後,需要發送注冊郵件和短信通知,以告知使用者注 冊成功。傳統的做法有以下兩種:
(1)串行方式
串行方式下的注冊流程如下圖所示。
資料流動如下所述:
● 使用者在注冊頁面填寫賬号和密碼并送出注冊資訊,這些注冊資訊首先會被寫入 注冊系統成功。
● 注冊資訊寫入注冊系統成功後,再發送請求至郵件通知系統。郵件通知系統收 到請求後向使用者發送郵件通知。
● 郵件通知系統接收注冊系統請求後再向下遊的短信通知系統發送請求。短信通 知系統收到請求後向使用者發送短信通知。
以上三個任務全部完成後,才傳回注冊結果到用戶端,使用者才能使用賬号登入。
假設每個任務耗時分别為 50 ms,則使用者需要在注冊頁面等待總共需要 150 ms 才能登入。
(2)并行方式
并行方式下的注冊流程如下圖所示。
● 注冊資訊寫入注冊系統成功後,再同時發送請求至郵件和短信通知系統。郵件 和短信通知系統收到請求後分别向使用者發送郵件和短信通知。
以上兩個任務全部完成後,才傳回注冊結果到用戶端,使用者才能使用賬号登入。
假設每個任務耗時分别為 50 ms,其中,郵件和短信通知并行完成,則使用者需 要在注冊頁面等待總共需要 100 ms 才能登入。
以下就注冊場景中使用了消息隊列 RocketMQ 版的效果進行說明。
對于使用者來說,注冊功能實際隻需要注冊系統存儲使用者的賬戶資訊後,該使用者便 可以登入,後續的注冊短信和郵件不是即時需要關注的步驟。
對于注冊系統而言,發送注冊成功的短信和郵件通知并不一定要綁定在一起同步 完成,是以實際當資料寫入注冊系統後,注冊系統就可以把其他的操作放入對應的消 息隊列 RocketMQ 版中然後馬上傳回使用者結果,由消息隊列 RocketMQ 版異步地 進行這些操作。
● 注冊資訊寫入注冊系統成功後,再發送消息至消息隊列 RocketMQ 版。消息 隊列 RocketMQ 版會馬上傳回響應給注冊系統,注冊完成。使用者可立即登入。
● 下遊的郵件和短信通知系統訂閱消息隊列 RocketMQ 版的此類注冊請求消息, 即可向使用者發送郵件和短信通知,完成所有的注冊流程。
使用者隻需在注冊頁面等待注冊資料寫入注冊系統和消息隊列 RocketMQ 版的時 間,即等待 55 ms 即可登入。
異步解耦是消息隊列 RocketMQ 版的主要特點,主要目的是減少請求響應時間和 解耦。主要的适用場景就是将比較耗時而且不需要即時(同步)傳回結果的操作作為消 息放入消息隊列。同時,由于使用了消息隊列 RocketMQ 版,隻要保證消息格式不 變,消息的發送方和接收方并不需要彼此聯系,也不需要受對方的影響,即解耦和。
順序收發
消息隊列 RocketMQ 版順序消息分為兩種情況:
全局順序:對于指定的一個 Topic,所有消息将按照嚴格的先入先出(FIFO)的 順序,進行順序釋出和順序消費;
分區順序:對于指定的一個 Topic,所有消息根據 Sharding Key 進行區塊分 區,同一個分區内的消息将按照嚴格的 FIFO 的順序,進行順釋出和順序消費,可以 保證一個消息被一個程序消費。
在注冊場景中,可使用使用者 ID 作為 Sharding Key 來進行分區,同一個分區下 的建立、更新或删除注冊資訊的消息必須按照 FIFO 的順序釋出和消費。
分布式事務一緻性
注冊系統注冊的流程中,使用者入口在網頁注冊系統,通知系統在郵件系統,兩個系統之間的資料需要保持最終一緻。
普通消息處理
如上所述,注冊系統和郵件通知系統之間通過消息隊列進行異步處理。注冊系統 将注冊資訊寫入注冊系統之後,發送一條注冊成功的消息到消息隊列 RocketMQ 版, 郵件通知系統訂閱消息隊列 RocketMQ 版的注冊消息,做相應的業務處理,發送注 冊成功或者失敗的郵件。
流程說明如下:
- 注冊系統發起注冊。
- 注冊系統向消息隊列 RocketMQ 版發送注冊消息成功與否的消息。
2.1 消息發送成功,進入 3。
2.2 消息發送失敗,導緻郵件通知系統未收到消息隊列 RocketMQ 版發送 的注冊成功與否的消息,而無法發送郵件,最終郵件通知系統和注冊 系統之間的狀态資料不一緻。
- 郵件通知系統收到消息隊列 RocketMQ 版的注冊成功消息。
- 郵件通知系統發送注冊成功郵件給使用者。
在這樣的情況下,雖然實作了系統間的解藕,上遊系統不需要關心下遊系統的業 務處理結果;但是資料一緻性不好處理,如何保證郵件通知系統狀态與注冊系統狀态 的最終一緻。
- 注冊系統向消息隊列 RocketMQ 版發送半事務消息。 1.1 半事務消息發送成功,進入 2。
1.2 半事務消息發送失敗,注冊系統不進行注冊,流程結束。(最終注冊系 統與郵件通知系統資料一緻)
- 注冊系統開始注冊。
2.1 注冊成功,進入 3.1。
2.2 注冊失敗,進行 3.2。
- 注冊系統向消息隊列 RocketMQ 版發送半消息狀态。
3.1 送出半事務消息,産生注冊成功消息,進入 4。
3.2 復原半事務消息,未産生注冊成功消息,流程結束。(最終注冊系統與 郵件通知系統資料一緻)
- 郵件通知系統接收消息隊列 RocketMQ 版的注冊成功消息。
- 郵件通知系統發送注冊成功郵件。(最終注冊系統與郵件通知系統資料一緻)
大規模機器的緩存同步
雙十一大促時,各個分會場會有玲琅滿目的商品,每件商品的價格都會實時變 化。使用緩存技術也無法滿足對商品價格的通路需求,緩存伺服器網卡滿載。通路較 多次商品價格查詢影響會場頁面的打開速度。
此時需要提供一種廣播機制,一條消息本來隻可以被叢集的一台機器消費,如果 使用消息隊列 RocketMQ 版的廣播消費模式,那麼這條消息會被所有節點消費一次, 相當于把價格資訊同步到需要的每台機器上,取代緩存的作用。
消息類型
普通消息
普通消息是指消息隊列 RocketMQ 版中無特性的消息,即發送到服務端會立馬 被消費的消息,且消息是無序消費,不會按照發送的順序一次順序消費。
定時消息
Producer 将消息發送到消息隊列 RocketMQ 版服務端,但并不期望立馬投遞 這條消息,而是推遲到在目前時間點之後的某一個時間投遞到 Consumer 進行消費, 該消息即定時消息。
延時消息
Producer 将消息發送到消息隊列 RocketMQ 版服務端,但并不期望立馬投遞 這條消息,而是延遲一定時間後才投遞到 Consumer 進行消費,該消息即延時消息。 定時消息與延時消息在代碼配置上存在一些差異,但是最終達到的效果相同:消 息在發送到消息隊列 RocketMQ 版服務端後并不會立馬投遞,而是根據消息中的屬 性延遲固定時間後才投遞給消費者。
全局順序消息
對于指定的一個 Topic,所有消息按照嚴格的先入先出(FIFO)的順序進行釋出 和消費。
分區順序消息
對于指定的一個 Topic,所有消息根據 Sharding Key 進行區塊分區。同一個分 區内的消息按照嚴格的 FIFO 順序進行釋出和消費。Sharding Key 是順序消息中用 來區分不同分區的關鍵字段,和普通消息的 Message Key 是完全不同的概念。
事務消息
消息隊列 RocketMQ 版提供類似 X/Open XA 的分布事務功能,通過消息隊列
RocketMQ 版的事務消息能達到分布式事務的最終一緻。
SDK 支援語言及協定
RocketMQ 支援 tcp 協定以及 http 協定的接入。
其中推薦使用阿裡推出的 tcp 協定下的三大 sdk:java,C/C++,.NET。
除了阿裡推出的 sdk,我們還支援開源的多語言 sdk 接入阿裡雲 RocketMQ: java,go,python,C++。
如 果 您 想 使 用 多 語 言 的 sdk, 推 薦 使 用 http 協 議 接 入:java,PHP,go, Python,Nodejs,C#,C++。
具體協定以及 sdk 的擷取參考連結:
https://help.aliyun.com/document_detail/124693.html?spm=a2c4g.11186623.6.582.104e425cW5Tbm2
RocketMQ 快速入門教程
如果您使用的是阿裡雲主賬号,則可以通過本文來體驗從開通服務、建立資源、 到使用 SDK 收發消息的完整流程,快速上手消息隊列 RocketMQ 版。 無論您使用的是消息隊列 RocketMQ 版支援的何種協定、何種語言,前三個步 驟都一緻,隻是在控制台上具體填寫的資訊會略有不同,請以控制台說明為準。但在 調用 SDK 時,不同協定和語言的示例代碼有所不同,本文以 TCP 協定下的 Java SDK 為例進行說明。
步驟一:開通服務
- 在消息隊列 RocketMQ 版産品頁,單擊立即開通。
- 在确認訂單頁面,選擇我已閱讀并同意《消息隊列 MQ 服務協定》,再單擊 立即開通即可完成開通。
步驟二:建立資源
在使用消息隊列 RocketMQ 版時,請注意以下網絡通路限制:
● Topic 和 Group ID 需建立在同一個地域(Region)下的同一個執行個體中才能互通。 例如,當某 Topic 建立在華東 1(杭州)下的執行個體 A 中,那麼該 Topic 隻能被在 華東 1(杭州)下的執行個體 A 中建立的 Group ID 對應的生産端和消費端通路。
● 如果隻是測試,或者需要在本地(非阿裡雲 ECS 伺服器)使用消息隊列 RocketMQ 版的服務,請将 Topic 和 Group ID 都建立在公網地域下的執行個體 中。生産端和消費端可以部署在本地或者部署在任意地域的 ECS 上,前提是 本地伺服器或者相應的 ECS 需要能夠通路公網。
建立執行個體
執行個體是用于消息隊列 RocketMQ 版服務的虛拟機資源,會存儲消息主題 (Topic)和用戶端 ID(Group ID)資訊。
- 登入消息隊列 RocketMQ 版控制台。在頁面頂部導航欄,選擇地域,如公 網地域。
- 在左側導航欄,單擊執行個體詳情。
- 在執行個體詳情頁面右上角,單擊建立執行個體按鈕。
- 在建立執行個體對話框,選擇執行個體類型,并輸入執行個體名和描述,然後單擊确認。
建立 Topic
Topic 是消息隊列 RocketMQ 版裡對消息的一級歸類,例如可以建立 Topic_ Trade 這一 Topic 來識别交易類消息,消息生産者将消息發送到 Topic_Trade,而 消息消費者則通過訂閱該 Topic 來擷取和消費消息。
● Topic 不能跨執行個體使用,例如在執行個體 A 中建立的 Topic A 不能在執行個體 B 中 使用。
● Topic 名稱必須在同一執行個體中是唯一的。
● 您可建立不同的 Topic 來發送不同類型的消息,例如用 Topic A 發送普通消 息,Topic B 發送事務消息,Topic C 發送定時 / 延時消息。
- 在控制台左側導航欄,單擊 Topic 管理。
- 在 Topic 管理頁面上方選擇剛建立的執行個體,單擊建立 Topic 按鈕。
- 在建立 Topic 對話框中的 Topic 一欄,輸入 Topic 名稱,選擇該 Topic 對 應的消息類型,輸入該 Topic 的備注内容,然後單擊确定。
您建立的 Topic 将出現在 Topic 清單中。
建立 Group ID
建立完執行個體和 Topic 後,您需要為消息的消費者(或生産者)建立用戶端 ID , 即 Group ID 作為辨別。
● Group ID 必須在同一執行個體中是唯一的。
● Group ID 和 Topic 的關系是 N:N,即一個消費者可以訂閱多個 Topic,同 一個 Topic 也可以被多個消費者訂閱;一個生産者可以向多個 Topic 發送消 息,同一個 Topic 也可以接收來自多個生産者的消息。 說明 :消費者必須有對應的 Group ID,生産者不做強制要求。
- 在控制台左側導航欄,單擊 Group 管理。
- 在 Group 管理頁面上方選擇剛建立的執行個體,然後選擇 TCP 協定 > 建立 Group ID。本文以 TCP 協定為例。 說明 :TCP 和 HTTP 協定下的 Group ID 不可以共用,是以需分别建立。
- 在建立 Group ID 對話框中,輸入 Group ID 和描述,然後單擊确認。
建立阿裡雲 AccessKey
阿裡雲 AccessKey 用于收發消息時進行賬戶鑒權。
在調用 SDK 發送和訂閱消息的時候,除了需要指定建立的 Topic 和 Group ID 以外,還需輸入您在 RAM 控制台建立的身份驗證資訊,即 AccessKey。AccessKey 的資訊包含 AccessKeyId 和 AcessKeySecret。
步驟三:擷取接入點
在控制台建立好資源後,您需通過控制台擷取執行個體的接入點。在收發消息時,您需要為生産端和消費端配置該接入點,以此接入某個具體執行個體或地域的服務。
- 在控制台左側導航欄,單擊執行個體詳情。
- 在執行個體詳情頁面上方選擇剛建立的執行個體。
- 在預設顯示的執行個體資訊頁簽的擷取接入點資訊區域,您可以分别看到新創 建執行個體的 TCP 和 HTTP 協定接入點。接入點性質因協定而異,具體說明 如下:
● TCP 協定:您在控制台看到的 TCP 協定接入點是地域下某個具體執行個體的接入 點。同一地域下的不同執行個體的接入點各不相同。
● HTTP 協定:您在控制台看到的 HTTP 協定接入點是某個地域的接入點,跟 具體執行個體無關。您在收發消息時還需另外設定執行個體 ID。
- 在 TCP 協定的接入點區域,單擊複制。 對于 TCP 協定的接入點,您還可以單擊示例代碼,檢視在各種開發語言的程式 中如何設定接入點。
完成以上準備工作後,您就可以運作示例代碼,用消息隊列 RocketMQ 版進行 消息發送和訂閱了。
步驟四:發送消息
您可以通過以下方式發送消息: 控制台發送消息:用于快速驗證 Topic 資源的可用性,主要用作測試。
- 在 Topic 管理頁面,找到您剛剛建立的 Topic,單擊右側操作列的發送。
- 在發送消息對話框中的 Message Body 一欄,輸入消息的具體内容,單擊 确定。
控制台會傳回消息發送成功通知以及相應的 Message ID。
調用 SDK 發送消息:用于生産環境下使用消息隊列 RocketMQ 版。
下文以調用 TCP Java SDK 為例進行說明。
調用 TCP Java SDK 發送消息
- 通過以下任一方式引入依賴:
● Maven 方式引入依賴:
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>"XXX"</version>
// 設定為 Java SDK 的最新版本号
</dependency>
● 下載下傳依賴 JAR 包:
- 根據以下說明設定相關參數,運作示例代碼:
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
public class ProducerTest {
public static void main(String[] args) {
Properties properties = new Properties();
// 您在控制台建立的 Group ID
properties.put(PropertyKeyConst.GROUP_ID, "XXX");
// 鑒權用 AccessKeyId,在阿裡雲伺服器管理控制台建立
properties.put(PropertyKeyConst.AccessKey,"XXX");
// 鑒權用 AccessKeySecret,在阿裡雲伺服器管理控制台建立
properties.put(PropertyKeyConst.SecretKey, "XXX");
// 設定 TCP 接入域名,進入控制台的執行個體詳情頁面,在頁面上方選擇執行個體後,在執行個體資訊中的“擷取
接入點資訊”區域檢視
properties.put(PropertyKeyConst.NAMESRV_ADDR,"XXX");
Producer producer = ONSFactory.createProducer(properties);
// 在發送消息前,必須調用 start 方法來啟動 Producer,隻需調用一次即可
producer.start();
// 循環發送消息
while(true){
Message msg = new Message( //
// 在控制台建立的 Topic,即該消息所屬的 Topic 名稱
"TopicTestMQ",
// Message Tag,
// 可了解為 Gmail 中的标簽,對消息進行再歸類,友善 Consumer 指定過濾條件在消息隊列
RocketMQ 版伺服器過濾
"TagA",
// Message Body
// 任何二進制形式的資料,消息隊列 RocketMQ 版不做任何幹預,
// 需要 Producer 與 Consumer 協商好一緻的序列化和反序列化方式
"Hello MQ".getBytes());
// 設定代表消息的業務關鍵屬性,請盡可能全局唯一,以友善您在無法正常收到消息情況下,可通過控
制台查詢消息并補發
// 注意:不設定也不會影響消息正常收發
msg.setKey("ORDERID_100");
// 發送消息,隻要不抛異常就是成功
// 列印 Message ID,以便用于消息發送狀态查詢
SendResult sendResult = producer.send(msg);
System.out.println("Send Message success. Message ID is: " + sendResult.
getMessageId());
}
// 在應用退出前,可以銷毀 Producer 對象
// 注意:如果不銷毀也沒有問題
producer.shutdown();
}
}
檢視消息是否發送成功 消息發送後,您可以在控制台檢視消息發送狀态,步驟如下:
- 在控制台左側導航欄,選擇消息查詢 > 按 Message ID 查詢。
- 在搜尋框中輸入發送消息後傳回的 Message ID,單擊搜尋查詢消息發送 狀态。 儲存時間表示消息隊列 RocketMQ 版服務端存儲這條消息的時間。如果查詢到此消息,表示消息已經成功發送到服務端。
步驟五:調用 SDK 訂閱消息
消息發送成功後,需要啟動消費者來訂閱消息。下文以調用 TCP Java SDK 為 例說明如何訂閱消息。
- 調用 TCP Java SDK 訂閱消息。
您可以運作以下示例代碼來啟動消費者,并測試訂閱消息的功能。請按照說明正 确設定相關參數。
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
public class ConsumerTest {
public static void main(String[] args) {
Properties properties = new Properties();
// 您在控制台建立的 Group ID
properties.put(PropertyKeyConst.GROUP_ID, "XXX");
// 鑒權用 AccessKeyId,在阿裡雲伺服器管理控制台建立
properties.put(PropertyKeyConst.AccessKey, "XXX");
// 鑒權用 AccessKeySecret,在阿裡雲伺服器管理控制台建立
properties.put(PropertyKeyConst.SecretKey, "XXX");
// 設定 TCP 接入域名,進入控制台的執行個體詳情頁面,在頁面上方選擇執行個體後,在執行個體資訊中的“擷取
接入點資訊”區域檢視
properties.put(PropertyKeyConst.NAMESRV_ADDR,"XXX");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("TopicTestMQ", "*", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println("Receive: " + message);
return Action.CommitMessage;
}
});
consumer.start();
System.out.println("Consumer Started");
}
}
- 檢視消息訂閱是否成功。
完成上述步驟後,您可以在控制台檢視消費者是否啟動成功,即消息訂閱是否 成功。
- 找到要檢視的 Group ID,單擊該 Group ID 所在行操作列的訂閱關系。
如果是否線上顯示為是,且訂閱關系一緻,則說明訂閱成功。否則說明訂閱 失敗。