
今天我們又來學習一個Apache頂級項目Apache RocketMQ,RocketMQ由國人阿裡團隊采用Java語言開發和開源的,曾獲得2016、2018中國最受歡迎的開源軟體獎。RocketMQ憑借其強大的存儲能力和強大的消息索引能力,以及各種類型消息和消息的特性脫穎而出。Apache RocketMQ官網位址及其GitHub都提供非常詳細中文學習文檔如Apache RocketMQ開發者指南等,學習起來可謂是非常之流暢、酸爽、so easy!讓我們通過官網和及其GitHub來深入學習這個與時俱進非常優秀網際網路主流的消息中間件。阿裡早期是基于ActiveMQ 5的分布式消息傳遞中間件,随着隊列和虛拟主題的增加ActiveMQ IO子產品達到了瓶頸,當時也研讨過Kafka但當時的Kafka不能滿足阿裡的要求(特别是在低延遲和高可靠性方面),是以阿裡決定自行研發一個消息中間件,從傳統的釋出/訂閱場景到高容量的實時零損失容忍度事務系統,這就是RocketMQ誕生的原因。
概述
**本人部落格網站 **IT小神 www.itxiaoshen.com
定義
Apache RocketMQ官網位址 https://rocketmq.apache.org/ Latest release v4.9.2
Apache RocketMQ GitHub源碼位址 https://github.com/apache/rocketmq
Apache RocketMQ™是一個分布式消息傳遞和流媒體平台、統一的消息傳遞引擎,輕量級的資料處理平台;具有低延遲、高性能和可靠性、萬億級容量和靈活的可伸縮性。
今天我們又來學習一個Apache頂級項目Apache RocketMQ,RocketMQ由國人阿裡團隊采用Java語言開發和開源的,曾獲得2016、2018中國最受歡迎的開源軟體獎。RocketMQ憑借其強大的存儲能力和強大的消息索引能力,以及各種類型消息和消息的特性脫穎而出。Apache RocketMQ官網位址及其GitHub都提供非常詳細中文學習文檔如Apache RocketMQ開發者指南等,學習起來可謂是非常之流暢、酸爽、so easy!讓我們通過官網和及其GitHub來深入學習這個與時俱進非常優秀網際網路主流的消息中間件。
為何需要Apache RocketMQ?
阿裡早期是基于ActiveMQ 5的分布式消息傳遞中間件,随着隊列和虛拟主題的增加ActiveMQ IO子產品達到了瓶頸,當時也研讨過Kafka但當時的Kafka不能滿足阿裡的要求(特别是在低延遲和高可靠性方面),是以阿裡決定自行研發一個消息中間件,從傳統的釋出/訂閱場景到高容量的實時零損失容忍度事務系統,這就是RocketMQ誕生的原因。
RocketMQ vs. ActiveMQ vs. Kafka
下表展示了RocketMQ、ActiveMQ和Kafka(根據awesome-java的Apache最流行的消息傳遞解決方案)之間的比較。根據個人經驗,如果不是大資料場景下如大資料日志采集等場景外建議優先使用RocketMQ,性能和功能都有保障,當然需要用于雲原生領域還有Apache Pulsar雲原生分布式消息和流平台,這個在前面的文章也有較少。
安裝部署
安裝說明
the latest release is 4.9.2
二進制下載下傳位址 https://dlcdn.apache.org/rocketmq/4.9.2/rocketmq-all-4.9.2-bin-release.zip
源碼下載下傳位址 https://dlcdn.apache.org/rocketmq/4.9.2/rocketmq-all-4.9.2-source-release.zip
Apache RocketMQ部署方式有單Master模式、多Master模式、多Master多slave模式、Dledger的叢集部署模式等,官網也提供額外的CLI Admin Tool和運維工具mqadmin。在二進制包下conf目錄提供了兩主兩從異步方式、兩主兩從同步方式、兩主無從、Dledger叢集的配置模闆。
網絡部署特點
- NameServer是一個幾乎無狀态節點,可叢集部署,節點之間無任何資訊同步。
- Broker部署相對複雜,Broker分為Master與Slave,Master提供RW通路,而Slave隻接受讀通路;一個Master可以對應多個Slave,但是一個Slave隻能對應一個Master,Master與Slave 的對應關系通過指定相同的BrokerName,不同的BrokerId 來定義,BrokerId為0表示Master,非0表示Slave。Master也可以部署多個。每個Broker與NameServer叢集中的所有節點建立長連接配接,定時注冊Topic資訊到所有NameServer。 注意:目前RocketMQ版本在部署架構上支援一Master多Slave,但隻有BrokerId=1的從伺服器才會參與消息的讀負載。
- Producer與NameServer叢集中的其中一個節點(随機選擇)建立長連接配接,定期從NameServer擷取Topic路由資訊,并向提供Topic 服務的Master建立長連接配接,且定時向Master發送心跳。Producer完全無狀态,可叢集部署。
- Consumer與NameServer叢集中的其中一個節點(随機選擇)建立長連接配接,定期從NameServer擷取Topic路由資訊,并向提供Topic服務的Master、Slave建立長連接配接,且定時向Master、Slave發送心跳。Consumer既可以從Master訂閱消息,也可以從Slave訂閱消息,消費者在向Master拉取消息時,Master伺服器會根據拉取偏移量與最大偏移量的距離(判斷是否讀老消息,産生讀I/O),以及從伺服器是否可讀等因素建議下一次是從Master還是Slave拉取。
配置推薦
在部署RocketMQ叢集時,推薦的配置如下所示:
部署方式說明
- 單Master模式
- 這種方式風險較大,一旦Broker重新開機或者當機時,會導緻整個服務不可用。不建議線上環境使用,可以用于本地測試。
- 多Master模式
- 一個叢集無Slave,全是Master,例如2個Master或者3個Master,這種模式的優缺點如下:
- 優點:配置簡單,單個Master當機或重新開機維護對應用無影響,在磁盤配置為RAID10時,即使機器當機不可恢複情況下,由于RAID10磁盤非常可靠,消息也不會丢(異步刷盤丢失少量消息,同步刷盤一條不丢),性能最高;
- 缺點:單台機器當機期間,這台機器上未被消費的消息在機器恢複之前不可訂閱,消息實時性會受到影響。
- 一個叢集無Slave,全是Master,例如2個Master或者3個Master,這種模式的優缺點如下:
- 多Master多Slave模式-異步複制
- 每個Master配置一個Slave,有多對Master-Slave,HA采用異步複制方式,主備有短暫消息延遲(毫秒級),這種模式的優缺點如下:
- 優點:即使磁盤損壞,消息丢失的非常少,且消息實時性不會受影響,同時Master當機後,消費者仍然可以從Slave消費,而且此過程對應用透明,不需要人工幹預,性能同多Master模式幾乎一樣;
- 缺點:Master當機,磁盤損壞情況下會丢失少量消息。
- 每個Master配置一個Slave,有多對Master-Slave,HA采用異步複制方式,主備有短暫消息延遲(毫秒級),這種模式的優缺點如下:
- 多Master多Slave模式-同步雙寫
- 每個Master配置一個Slave,有多對Master-Slave,HA采用同步雙寫方式,即隻有主備都寫成功,才向應用傳回成功,這種模式的優缺點如下:
- 優點:資料與服務都無單點故障,Master當機情況下,消息無延遲,服務可用性與資料可用性都非常高;
- 缺點:性能比異步複制模式略低(大約低10%左右),發送單個消息的RT會略高,且目前版本在主節點當機後,備機不能自動切換為主機。
- 每個Master配置一個Slave,有多對Master-Slave,HA采用同步雙寫方式,即隻有主備都寫成功,才向應用傳回成功,這種模式的優缺點如下:
單Master部署
單Master模式部署非常簡單,這種方式風險較大,一旦Broker重新開機或者當機時,會導緻整個服務不可用。不建議線上環境使用,可以用于本地測試。先啟動NameServer後啟動Broker。
#linux部署,解壓下載下傳zip進入二級制加壓的根目錄
unzip rocketmq-all-4.9.2-bin-release.zip
cd rocketmq-4.9.2
#啟動NameServer
nohup sh bin/mqnamesrv &
#檢視NameServer運作日志
tail -f ~/logs/rocketmqlogs/namesrv.log
#啟動Broker
nohup sh bin/mqbroker -n localhost:9876 &
#檢視Broker運作日志
tail -f ~/logs/rocketmqlogs/broker.log
#關閉Broker
sh bin/mqshutdown broker
#關閉NameServer
sh bin/mqshutdown namesrv
Dledger叢集部署
多主多從模式有模闆配置,根據不同配置拉起Broker即可,但是從上面我們知道在多主多從模式下是不支援自動容災切換功能,是以還不具備完全的高可用,我們這裡使用Dledger叢集部署實作自動容災切換;之前我們在ZooKeeper章節也了解到分布式一緻性算法,其實Dledger也是依賴Raft算法實作選舉的功能。Dledger一個基于java庫用于建構高可用性、高耐用性、強一緻性的送出,它可以作為分布式存儲系統的持久化層,如消息傳遞、流、kv、db等。Dledger是已被證明可以應用于生産級别的産品。
NameServer需要先于Broker啟動,且如果在生産環境使用,為了保證高可用,建議一般規模的叢集啟動3個NameServer。我們本次準備3台伺服器192.168.50.95(n0)、192.168.50.156(n1)、192.168.50.196(n2)。
cd rocketmq-4.9.2
#3台伺服器啟動Name Server
nohup sh bin/mqnamesrv &
#驗證Name Server 是否啟動成功
tail -f ~/logs/rocketmqlogs/namesrv.log
在conf\dledger參考broker-n0.conf資料建立檔案名為broker.conf資料内容如下,其他兩台和這個資料一樣,隻需要修改dLegerSelfId為n1和n2即可。
vi conf/dledger/broker.conf
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30911
namesrvAddr=192.168.50.95:9876;192.168.50.156:9876;192.168.50.196:9876
storePathRootDir=/home/commons/rocketmq-4.9.2/rmqstore/node00
storePathCommitLog=/home/commons/rocketmq-4.9.2/rmqstore/node00/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-192.168.50.95:40911;n1-192.168.50.156:40911;n2-192.168.50.196:40911
## must be unique
dLegerSelfId=n0
sendMessageThreadPoolNums=16
#可以3台分别先建立配置檔案路徑,非必要
mkdir /home/commons/rocketmq-4.9.2/rmqstore/node00
mkdir /home/commons/rocketmq-4.9.2/rmqstore/node00/commitlog
#3台分别啟動broker
nohup sh bin/mqbroker -c conf/dledger/broker.conf &
#檢視Broker運作日志
tail -f ~/logs/rocketmqlogs/broker.log
通過 mqadmin 運維指令檢視叢集狀态,可指定任意一台Name Server
sh bin/mqadmin clusterList -n 192.168.50.95:9876
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-PcYuzQb9-1638544959716)(http://www.itxiaoshen.com:3001/assets/1638501706653H0QPMbWB.png)]
BID 為 0 的表示 Master,其餘都是 Follower,從目前看192.168.50.156為Master,我們進行容災切換測試,停掉192.168.50.156上的Broker程序,等待約 10s 左右,用 clusterList 指令再次檢視叢集,就會發現 Leader 切換到另一個節點192.168.50.196上
再次啟動192.168.50.156上的broker重新再加入叢集并作為叢集的Follower
簡單收發消息測試
#192.168.50.95上執行測試工具的生産者發送消息
export NAMESRV_ADDR="192.168.50.95:9876;192.168.50.156:9876;192.168.50.196:9876"
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
[外鍊圖檔轉存失敗,源站可能有防盜鍊機制,建議将圖檔儲存下來直接上傳(img-HFLt4ZFo-1638544959726)(http://www.itxiaoshen.com:3001/assets/1638502366597EjsQW4e8.png)]
#192.168.50.95上執行測試工具的消費者接收消息
export NAMESRV_ADDR="192.168.50.95:9876;192.168.50.156:9876;192.168.50.196:9876"
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
Java示例
常用消息樣例說明
- 簡單消息(三種方式發送消息)
- 可靠同步,使用的比較廣泛,比如:重要的消息通知,短信通知。
- 可靠異步,通常用在對響應時間敏感的業務場景,即發送端不能容忍長時間地等待Broker的響應。
- 單向傳輸,用在不特别關心發送結果的場景,例如日志發送。
- 順序消息
- RocketMQ使用FIFO順序提供有序消息,RocketMQ可以嚴格的保證消息有序,可以分為分區有序或者全局有序。
- 比如用訂單場景,一個訂單的順序流程是:建立、付款、推送、完成。訂單号相同的消息會被先後發送到同一個隊列中,消費時,同一個OrderId擷取到的肯定是同一個隊列。
- 廣播消息
- 向一個主題的所有訂閱者發送消息。
- 延遲消息
- 延遲消息與普通消息的不同之處在于它們将在稍後提供的時間内被傳遞,比如電商裡送出了一個訂單就可以發送一個延時消息,1h後去檢查這個訂單的狀态,如果還是未付款就取消訂單釋放庫存。
- 批量消息
- 批量發送消息可以提高發送小消息的性能。
- 限制:同一批的消息應該有:相同的主題,相同的waitStoreMsgOK,不支援延遲。
- 過濾消息
- 在大多數情況下,TAG是一個簡單而有用的設計,其可以來選擇您想要的消息。
- 在RocketMQ定義的文法下可以使用SQL表達式篩選消息,SQL特性可以通過發送消息時的屬性來進行計算。
- 隻有使用push模式的消費者才能用使用SQL92标準的sql語句。
- Logappender日志
- RocketMQ日志提供log4j、log4j2和logback日志架構作為業務應用
- OpenMessaging
- 旨在建立消息和流處理規範,以為金融、電子商務、物聯網和大資料領域提供通用架構及工業級指導方案。在分布式異構環境中,設計原則是面向雲、簡單、靈活和獨立于語言。符合這些規範将幫助企業友善的開發跨平台和作業系統的異構消息傳遞應用程式。提供了openmessaging-api 0.3.0-alpha的部分實作。
- 事務消息
- 可以将其視為兩階段送出消息實作,以確定分布式系統中的最終一緻性。事務性消息確定本地事務的執行和消息的發送能夠被原子地執行。
- 限制限制
- 事務消息不支援延時消息和批量消息。
- 為了避免單個消息被檢查太多次而導緻半隊列消息累積,我們預設将單個消息的檢查次數限制為 15 次,但是使用者可以通過 Broker 配置檔案的
參數來修改此限制。如果已經檢查某條消息超過 N 次的話( N =transactionCheckMax
) 則 Broker 将丢棄此消息,并在預設情況下同時列印錯誤日志。使用者可以通過重寫transactionCheckMax
類來修改這個行為。AbstractTransactionalMessageCheckListener
- 事務消息将在 Broker 配置檔案中的參數 transactionTimeout 這樣的特定時間長度之後被檢查。當發送事務消息時,使用者還可以通過設定使用者屬性 CHECK_IMMUNITY_TIME_IN_SECONDS 來改變這個限制,該參數優先于
參數。transactionTimeout
- 事務性消息可能不止一次被檢查或消費。
- 送出給使用者的目标主題消息可能會失敗,目前這依日志的記錄而定。它的高可用性通過 RocketMQ 本身的高可用性機制來保證,如果希望確定事務消息不丢失、并且事務完整性得到保證,建議使用同步的雙重寫入機制。
- 事務消息的生産者 ID 不能與其他類型消息的生産者 ID 共享。與其他類型的消息不同,事務消息允許反向查詢、MQ伺服器能通過它們的生産者 ID 查詢到消費者。
-
事務性消息有三種狀态:
(1) TransactionStatus。CommitTransaction:送出事務,它意味着允許使用者使用此消息。
(2) TransactionStatus。rollback transaction:復原事務,它意味着消息将被删除并且不允許使用。
(3) TransactionStatus。未知:中間狀态,這意味着MQ需要進行回查以确定狀态。
簡單消息示例代碼
pom加入maven依賴
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.2</version>
</dependency>
可靠同步生産者實作代碼
package com.itxs.rocketmq;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class SyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new
DefaultMQProducer("default_group");
// Specify name server addresses.
producer.setNamesrvAddr("192.168.50.95:9876;192.168.50.156:9876;192.168.50.196:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 10; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("DefaultTopic" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}
可靠異步生産者實作代碼
package com.itxs.rocketmq;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class AsyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("default_group");
// Specify name server addresses.
producer.setNamesrvAddr("192.168.50.95:9876;192.168.50.156:9876;192.168.50.196:9876");
//Launch the instance.
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 10;
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {
try {
final int index = i;
Message msg = new Message("DefaultTopic",
"TagA",
"OrderID888888",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
countDownLatch.await(5, TimeUnit.SECONDS);
producer.shutdown();
}
}
單向傳輸生産者實作代碼
package com.itxs.rocketmq;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class OnewayProducer {
public static void main(String[] args) throws Exception{
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("default_group");
// Specify name server addresses.
producer.setNamesrvAddr("192.168.50.95:9876;192.168.50.156:9876;192.168.50.196:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 10; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("DefaultTopic" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
producer.sendOneway(msg);
}
//Wait for sending to complete
Thread.sleep(5000);
producer.shutdown();
}
}
消費者實作代碼
package com.itxs.rocketmq;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("default_group");
// Specify name server addresses.
consumer.setNamesrvAddr("192.168.50.95:9876;192.168.50.156:9876;192.168.50.196:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("DefaultTopic", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
可靠同步生産者發送消息
消費者消費消息
其他消息示例可以參考官網的樣例使用即可
面試題
說說RocketMQ架構群組成?
從Apache RocketMQ官網架構圖看可知道其由四個大部分組成,分别為名稱伺服器叢集、Broker叢集、生産者叢集和消費者叢集;它們中的每一個都可以水準擴充而不存在單一的故障點。
- NameServer Cluster(命名伺服器叢集):名稱伺服器提供輕量級的服務發現和路由。每個Name Server記錄完整的路由資訊,提供相應的讀寫服務,支援快速的存儲擴充。我們知道Kafka是依賴ZooKeeper來實作服務發現和路由的。
- Broker管理,NameServer接受來自Broker叢集的注冊,并提供心跳機制來檢查代理是否處于活動狀态。
- 路由管理,每個NameServer将儲存關于代理叢集的全部路由資訊和用于用戶端查詢的隊列資訊。
- Producer和Conumser通過NameServer就可以知道整個Broker叢集的路由資訊,進而進行消息的投遞和消費。
- NameServer通常也是叢集的方式部署,各執行個體間互相不進行資訊通訊。Broker是向每一台NameServer注冊自己的路由資訊,是以每一個NameServer執行個體上面都儲存一份完整的路由資訊。當某個NameServer因某種原因下線了,Broker仍然可以向其它NameServer同步其路由資訊,Producer,Consumer仍然可以動态感覺Broker的路由的資訊。
- RocketMQ用戶端(生産者/消費者)将從NameServer查詢隊列路由資訊,用戶端可以通過多種方式找到NameServer的位址,下面列出幾種
- 程式設計方式,如producer.setNamesrvAddr("ip:port")。
- Java選項,使用rocketmq.namesrv.addr。
- 環境變量使用NAMESRV_ADDR。
- HTTP Endpoint。
- Broker Cluster(代理叢集):Broker是作為RocketMQ最核心消息Server,Broker通過提供輕量級的TOPIC和QUEUE機制來負責消息存儲。它們支援Push和Pull模型,包含容錯機制(2副本或3副本),并提供強大的填充峰值和按原始時間順序累積數千億條消息的能力。此外,Broker提供災難恢複、豐富的度量統計資訊和警報機制,這些都是傳統消息中間件系統所缺乏的;代理伺服器負責消息存儲和傳遞、消息查詢、HA保證等,Broker伺服器有幾個重要的子子產品:
- 遠端子產品,Broker的入口,處理來自客戶機的請求。
- 用戶端管理器,管理用戶端(生産者/消費者)并維護消費者的主題訂閱。
- 存儲服務,提供簡單的api在實體磁盤中存儲或查詢消息。
- HA服務,在主Broker和從Broker之間提供資料同步功能。
- 索引服務,根據指定的鍵為消息建構索引,并提供快速的消息查詢。
Apache RocketMQ分布式消息傳遞和流資料平台及大廠面試寶典v4.9.2 - Producer Cluster(生産者叢集):生産者支援分布式部署;分布式生産者通過多種負載均衡模式向Broker叢集發送消息;發送過程支援快速失敗和低延遲。
- Consumer Cluster(消費者叢集):消費者也支援Push和Pull模型中的分布式部署;它還支援叢集使用和消息廣播;它提供了實時消息訂閱機制,可以滿足大多數使用者的需求。
說說RocketMQ核心概念?
Broker 在實際部署過程中對應一台伺服器,每個 Broker 可以存儲多個Topic的消息,每個Topic的消息也可以分片存儲于不同的 Broker。Message Queue 用于存儲消息的實體位址,每個Topic中的消息位址存儲于多個 Message Queue 中,ConsumerGroup 由多個Consumer 執行個體構成。
- 消息模型
- Clustering:叢集消費模式下,相同Consumer Group的每個Consumer執行個體平均分攤消息。
- Broadcasting:廣播消費模式下,相同Consumer Group的每個Consumer執行個體都接收全量的消息。
- 生産者組:同一類Producer的集合,這類Producer發送同一類消息且發送邏輯一緻。如果發送的是事務消息且原始生産者在發送之後崩潰,則Broker伺服器會聯系同一生産者組的其他生産者執行個體以送出或回溯消費。
- Producer(生産者):負責生産消息,一般由業務系統負責生産消息。一個消息生産者會把業務應用系統裡産生的消息發送到broker伺服器。RocketMQ提供多種發送方式,同步發送、異步發送、順序發送、單向發送。同步和異步方式均需要Broker傳回确認資訊,單向發送不需要。
- 消費者組:同一類Consumer的集合,這類Consumer通常消費同一類消息且消費邏輯一緻。消費者組使得在消息消費方面,實作負載均衡和容錯的目标變得非常容易。要注意的是,消費者組的消費者執行個體必須訂閱完全相同的Topic。RocketMQ 支援兩種消息模式:叢集消費(Clustering)和廣播消費(Broadcasting)。
- Consumer (消費者):負責消費消息,一般是背景系統負責異步消費。一個消息消費者會從Broker伺服器拉取消息、并将其提供給應用程式。從使用者應用的角度而言提供了兩種消費形式:拉取式消費、推動式消費。
- Pull:主動調用Consumer的拉消息方法從Broker伺服器拉消息、主動權由應用控制。一旦擷取了批量消息,應用就會啟動消費過程。拉取型消費者主動從broker中拉取消息消費,隻要拉取到消息,就會啟動消費過程,稱為主動型消費。
- Push:Broker收到資料後會主動推送給消費端,該消費模式一般實時性較高。推送型消費者就是要注冊消息的監聽器,監聽器是要使用者自行實作的。當消息達到broker伺服器後,會觸發監聽器拉取消息,然後啟動消費過程。但是從實際上看還是從broker中拉取消息,稱為被動消費型。
- push:消費端慢的話導緻消費端緩沖區溢出。
- pull:考慮拉的頻率,可能導緻很多無效請求的RPC開銷影響整體網絡性能。
- Consumer (消費者):負責消費消息,一般是背景系統負責異步消費。一個消息消費者會從Broker伺服器拉取消息、并将其提供給應用程式。從使用者應用的角度而言提供了兩種消費形式:拉取式消費、推動式消費。
- Broker Server :消息中轉角色,負責存儲消息、轉發消息。代理伺服器在RocketMQ系統中負責接收從生産者發送來的消息并存儲、同時為消費者的拉取請求作準備。代理伺服器也存儲消息相關的中繼資料,包括消費者組、消費進度偏移和主題和隊列消息等。
- TOPIC:主題,表示一類消息的集合,每個主題包含若幹條消息,每條消息隻能屬于一個主題,是RocketMQ進行消息訂閱的基本機關。生産者在其中傳遞消息,消費者在其中提取消息。一個Topic可能有0個、一個或多個生産者向它發送消息;從消費者的角度來看一個主題可以由零個、一個或多個消費者群體訂閱。類似地,一個消費者組可以訂閱一個或多個主題,隻要該組的執行個體保持訂閱一緻。
- message queue:消息隊列,一個Topic可以劃分成多個消息隊列。Topic隻是個邏輯上的概念,消息隊列是消息的實體管理機關,當發送消息的時候,Broker會輪詢包含該Topic的所有消息隊列,然後将消息發出去。有了消息隊列,可以使得消息的存儲可以分布式叢集化,具有了水準的擴充能力。
- message:消息系統所傳輸資訊的實體載體,生産和消費資料的最小機關,每條消息必須屬于一個主題。RocketMQ中每個消息擁有唯一的Message ID,且可以攜帶具有業務辨別的Key。系統提供了通過Message ID和Key查詢消息的功能。
- message order:當使用DefaultMQPushConsumer時,可以決定有序或并發地使用消息.
- Orderly:有序地使用消息意味着對于每個消息隊列,消息的使用順序與生産者發送消息的順序相同。如果您正在處理全局順序是強制性的場景,請確定您使用的Topic隻有一個消息隊列;消費者通過同一個消息隊列( Topic 分區,稱作 Message Queue) 收到的消息是有順序的,不同消息隊列收到的消息則可能是無順序的。如果指定了有序消費,則消息消費的最大并發性是消費組訂閱的消息隊列的數量。
- Concurrently:當并發地使用消息時,消息使用的最大并發性僅受為每個用戶端指定的線程池的限制;在此模式下不再保證消息順序。
- 嚴格順序消息模式下,消費者收到的所有消息均是有順序的。
- message order:當使用DefaultMQPushConsumer時,可以決定有序或并發地使用消息.
- tag:為消息設定的标志,用于同一主題下區分不同類型的消息。來自同一業務單元的消息,可以根據不同業務目的在同一主題下設定不同标簽。标簽能夠有效地保持代碼的清晰度和連貫性,并優化RocketMQ提供的查詢系統。消費者可以根據Tag實作對不同子主題的不同消費邏輯,實作更好的擴充性。
- offset:是指消息隊列中的offset,可以認為就是下标,消息隊列可看做數組。offset是java long型,64位,理論上100年不會溢出,是以可以認為消息隊列是一個長度無限的資料結構。
- RocketMQ支援按照下面兩種次元(“按照Message Id查詢消息”、“按照Message Key查詢消息”)進行消息查詢。
Apache RocketMQ分布式消息傳遞和流資料平台及大廠面試寶典v4.9.2
RocketMQ叢集的工作流程?
- 啟動NameServer,NameServer起來後監聽端口,等待Broker、Producer、Consumer連上來,相當于一個路由控制中心。
- Broker啟動,跟所有的NameServer保持長連接配接,定時發送心跳包。心跳包中包含目前Broker資訊(IP+端口等)以及存儲所有Topic資訊。注冊成功後,NameServer叢集中就有Topic跟Broker的映射關系。
- 收發消息前,先建立Topic,建立Topic時需要指定該Topic要存儲在哪些Broker上,也可以在發送消息時自動建立Topic。
- Producer發送消息,啟動時先跟NameServer叢集中的其中一台建立長連接配接,并從NameServer中擷取目前發送的Topic存在哪些Broker上,輪詢從隊列清單中選擇一個隊列,然後與隊列所在的Broker建立長連接配接進而向Broker發消息。
- Consumer跟Producer類似,跟其中一台NameServer建立長連接配接,擷取目前訂閱Topic存在哪些Broker上,然後直接跟Broker建立連接配接通道,開始消費消息。
RocketMQ消息存儲設計?
RocketMQ的設計理念很大程度借鑒了kafka,RocketMQ消息存儲是整個系統的核心,直接決定着吞吐性能和高可用性;RocketMQ存儲消息是直接操作檔案,借助java NIO的力量,使得I/O性能十分高。當消息來的時候,順序寫入CommitLog。為了Consumer消費消息的時候,能夠友善的根據topic查詢消息,在CommitLog的基礎上衍生出了ConsumerQueue檔案,存放了某topic的消息在CommitLog中的偏移位置。此外為了支援根據消息key查詢消息,RocketMQ的強大的支援消息索引的特性靠的就是indexFile索引檔案。
- CommitLog:消息主體以及中繼資料的存儲主體,存儲Producer端寫入的消息主體内容,消息内容不是定長的。單個檔案大小預設1G, 檔案名長度為20位,左邊補零,剩餘為起始偏移量,比如00000000000000000000代表了第一個檔案,起始偏移量為0,檔案大小為1G=1073741824;當第一個檔案寫滿了,第二個檔案為00000000001073741824,起始偏移量為1073741824,以此類推。
- CommitLog檔案的最大的一個特點就是消息順序寫入日志檔案,當檔案滿了,寫入下一個檔案;随機讀寫,關于commitLog的檔案的落盤有兩種,一種是同步刷盤,一種是異步刷盤,可通過 flushDiskType 進行配置。
- CommitLog除了消息本身,它記錄了消息的方方面面的資訊,通過一條CommitLog可以還原出很多東西。例如消息是何時、由哪個producer發送的,被發送到了哪個消息隊列,屬于哪個topic,有哪些屬性等等。RokcetMQ存儲的消息其實存儲的就是這個CommitLog記錄;可以将CommitLog記錄等同于消息,而CommitLog指存儲消息的檔案。
- CommitLog類屬性很多,但是最重要的是mappedFileQueue屬性。消息最終存儲在CommitLog裡,實際上CommitLog是一個邏輯上的概念。真正的檔案是一個個MappedFile,然後組成了mappedFileQueue。一個MappedFile最多能存放1G的CommitLog,這個大小在MessageStoreConfi類裡面定義了的。
- MappedFile 中WriteBuffer使用的是堆外記憶體,MappedByteBuffer是直接将檔案映射到記憶體中,兩者的使用是互斥的。如果啟用了臨時緩沖池(預設不啟用),那麼就會使用WriteBuffer寫CommitLog,否則就是MappedByteBuffer寫CommitLog。
- ConsumeQueue:消息消費隊列,引入的目的主要是提高消息消費的性能,由于RocketMQ是基于主題topic的訂閱模式,消息消費是針對主題進行的,如果要周遊commitlog檔案中根據topic檢索消息是非常低效的。Consumer即可根據ConsumeQueue來查找待消費的消息。其中,ConsumeQueue(邏輯消費隊列)作為消費消息的索引,儲存了指定Topic下的隊列消息在CommitLog中的起始實體偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue檔案可以看成是基于topic的commitlog索引檔案,故consumequeue檔案夾的組織方式如下:topic/queue/file三層組織結構,具體存儲路徑為:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同樣consumequeue檔案采取定長設計,每一個條目共20個位元組,分别為8位元組的commitlog實體偏移量、4位元組的消息長度、8位元組tag hashcode,單個檔案由30W個條目組成,可以像數組一樣随機通路每一個條目,每個ConsumeQueue檔案大小約5.72M;
- IndexFile:IndexFile(索引檔案)提供了一種可以通過key或時間區間來查詢消息的方法。Index檔案的存儲位置是:\(HOME \store\index\){fileName},檔案名fileName是以建立時的時間戳命名的,固定的單個IndexFile檔案大小約為400M,一個IndexFile可以儲存 2000W個索引,IndexFile的底層存儲設計為在檔案系統中實作HashMap結構,故RocketMQ的索引檔案其底層實作為hash索引。
在上面的RocketMQ的消息存儲整體架構圖中可以看出,RocketMQ采用的是混合型的存儲結構,即為Broker單個執行個體下所有的隊列共用一個日志資料檔案(即為CommitLog)來存儲。RocketMQ的混合型存儲結構(多個Topic的消息實體内容都存儲于一個CommitLog中)針對Producer和Consumer分别采用了資料和索引部分相分離的存儲結構,Producer發送消息至Broker端,然後Broker端使用同步或者異步的方式對消息刷盤持久化,儲存至CommitLog中。隻要消息被刷盤持久化至磁盤檔案CommitLog中,那麼Producer發送的消息就不會丢失。正因為如此,Consumer也就肯定有機會去消費這條消息。當無法拉取到消息後,可以等下一次消息拉取,同時服務端也支援長輪詢模式,如果一個消息拉取請求未拉取到消息,Broker允許等待30s的時間,隻要這段時間内有新消息到達,将直接傳回給消費端。這裡,RocketMQ的具體做法是,使用Broker端的背景服務線程—ReputMessageService不停地分發請求并異步建構ConsumeQueue(邏輯消費隊列)和IndexFile(索引檔案)資料。
說說RocketMQ存儲底層實作?
- MappedByteBuffer
- RocketMQ主要通過MappedByteBuffer對檔案進行讀寫操作。其中,利用了NIO中的FileChannel模型将磁盤上的實體檔案直接映射到使用者态的記憶體位址中(這種Mmap的方式減少了傳統IO将磁盤檔案資料在作業系統核心位址空間的緩沖區和使用者應用程式位址空間的緩沖區之間來回進行拷貝的性能開銷),将對檔案的操作轉化為直接對記憶體位址進行操作,進而極大地提高了檔案的讀寫效率(正因為需要使用記憶體映射機制,故RocketMQ的檔案存儲都使用定長結構來存儲,友善一次将整個檔案映射至記憶體)。
- PageCache
- 是OS對檔案的緩存,用于加速對檔案的讀寫。一般來說,程式對檔案進行順序讀寫的速度幾乎接近于記憶體的讀寫速度,主要原因就是由于OS使用PageCache機制對讀寫通路操作進行了性能優化,将一部分的記憶體用作PageCache。對于資料的寫入,OS會先寫入至Cache内,随後通過異步的方式由pdflush核心線程将Cache内的資料刷盤至實體磁盤上。對于資料的讀取,如果一次讀取檔案時出現未命中PageCache的情況,OS從實體磁盤上通路讀取檔案的同時,會順序對其他相鄰塊的資料檔案進行預讀取。
- 在RocketMQ中,ConsumeQueue邏輯消費隊列存儲的資料較少,并且是順序讀取,在page cache機制的預讀取作用下,Consume Queue檔案的讀性能幾乎接近讀記憶體,即使在有消息堆積情況下也不會影響性能。而對于CommitLog消息存儲的日志資料檔案來說,讀取消息内容時候會産生較多的随機通路讀取,嚴重影響性能。如果選擇合适的系統IO排程算法,比如設定排程算法為“Deadline”(此時塊存儲采用SSD的話),随機讀的性能也會有所提升。
說說RocketMQ檔案存儲模型層次結構?
- RocketMQ業務處理器層
- Broker端對消息進行讀取和寫入的業務邏輯入口,這一層主要包含了業務邏輯相關處理操作(根據解析RemotingCommand中的RequestCode來區分具體的業務操作類型,進而執行不同的業務處理流程),比如前置的檢查和校驗步驟、構造MessageExtBrokerInner對象、decode反序列化、構造Response傳回對象等。
- RocketMQ資料存儲元件層
- 該層主要是RocketMQ的存儲核心類—DefaultMessageStore,其為RocketMQ消息資料檔案的通路入口,通過該類的“putMessage()”和“getMessage()”方法完成對CommitLog消息存儲的日志資料檔案進行讀寫操作(具體的讀寫通路操作還是依賴下一層中CommitLog對象模型提供的方法);另外,在該元件初始化時候,還會啟動很多存儲相關的背景服務線程,包括AllocateMappedFileService(MappedFile預配置設定服務線程)、ReputMessageService(回放存儲消息服務線程)、HAService(Broker主從同步高可用服務線程)、StoreStatsService(消息存儲統計服務線程)、IndexService(索引檔案服務線程)等。
- RocketMQ存儲邏輯對象層
- 該層主要包含了RocketMQ資料檔案存儲直接相關的三個模型類IndexFile、ConsumerQueue和CommitLog。IndexFile為索引資料檔案提供通路服務,ConsumerQueue為邏輯消息隊列提供通路服務,CommitLog則為消息存儲的日志資料檔案提供通路服務。這三個模型類也是構成了RocketMQ存儲層的整體結構(對于這三個模型類的深入分析将放在後續篇幅中)。
- 封裝的檔案記憶體映射層
- RocketMQ主要采用JDK NIO中的MappedByteBuffer和FileChannel兩種方式完成資料檔案的讀寫。其中,采用MappedByteBuffer這種記憶體映射磁盤檔案的方式完成對大檔案的讀寫,在RocketMQ中将該類封裝成MappedFile類。這裡限制的問題在上面已經講過;對于每類大檔案(IndexFile/ConsumerQueue/CommitLog),在存儲時分隔成多個固定大小的檔案(單個IndexFile檔案大小約為400M、單個ConsumerQueue檔案大小約5.72M、單個CommitLog檔案大小為1G),其中每個分隔檔案的檔案名為前面所有檔案的位元組大小數+1,即為檔案的起始偏移量,進而實作了整個大檔案的串聯。這裡,每一種類的單個檔案均由MappedFile類提供讀寫操作服務(其中,MappedFile類提供了順序寫/随機讀、記憶體資料刷盤、記憶體清理等和檔案相關的服務)。
- 磁盤存儲層
- 主要指的是部署RocketMQ伺服器所用的磁盤。這裡,需要考慮不同磁盤類型(如SSD或者普通的HDD)特性以及磁盤的性能參數(如IOPS、吞吐量和通路時延等名額)對順序寫/随機讀操作帶來的影響。
如何保證 RocketMQ 不丢失消息?
一條消息從生産到被消費,将會經曆生産階段、存儲階段、消費階段三個階段。
- 生産階段,Producer 建立消息,然後通過網絡将消息投遞給 MQ Broker。
- 生産者(Producer) 通過網絡發送消息給 Broker,當 Broker 收到之後,将會傳回确認響應資訊給 Producer;是以生産者隻要接收到傳回的确認響應,就代表消息在生産階段未丢失。
- 傳回消息方式可以是同步也可以是異步,但不管是同步還是異步的方式,都會碰到網絡問題導緻發送失敗的情況。針對這種情況,我們可以設定合理的重試次數,當出現網絡問題,可以自動重試。
// 同步發送消息重試次數,預設為 2 mqProducer.setRetryTimesWhenSendFailed(3); // 異步發送消息重試次數,預設為 2 mqProducer.setRetryTimesWhenSendAsyncFailed(3);
- 存儲階段,消息将會存儲在 Broker 端磁盤中。
- 預設情況下,消息隻要到了 Broker 端,将會優先儲存到記憶體中,然後立刻傳回确認響應給生産者。随後 Broker 定期批量的将一組消息從記憶體異步刷入磁盤。這種方式減少 I/O 次數,可以取得更好的性能,但是如果發生機器掉電,異常當機等情況,消息還未及時刷入磁盤,就會出現丢失消息的情況。
- 若想保證 Broker 端不丢消息,保證消息的可靠性,我們需要将消息儲存機制修改為同步刷盤方式,即消息存儲磁盤成功,才會傳回響應。若 Broker 未在同步刷盤時間内(預設為 5s)完成刷盤,将會傳回
狀态給生産者。SendStatus.FLUSH_DISK_TIMEOUT
- 叢集部署:為了保證可用性,Broker 通常采用一主(master)多從(slave)部署方式。為了保證消息不丢失,消息還需要複制到 slave 節點。預設方式下,消息寫入 master 成功,就可以傳回确認響應給生産者,接着消息将會異步複制到 slave 節點。此時若 master 突然當機且不可恢複,那麼還未複制到 slave 的消息将會丢失。為了進一步提高消息的可靠性,我們可以采用同步的複制方式,master 節點将會同步等待 slave節點複制完成,才會傳回确認響應。提高消息的高可靠性,但是會降低性能,生産實踐中需要綜合選擇。
## master 節點配置 flushDiskType = SYNC_FLUSH brokerRole=SYNC_MASTER ## slave 節點配置 brokerRole=slave flushDiskType = SYNC_FLUSH
- 消費階段, Consumer 将會從 Broker 拉取消息。
- 消費者從 broker 拉取消息,然後執行相應的業務邏輯。一旦執行成功,将會傳回
ConsumeConcurrentlyStatus.CONSUME_SUCCESS
狀态給 Broker。
如果 Broker 未收到消費确認響應或收到其他狀态,消費者下次還會再次拉取到該條消息,進行重試。這樣的方式有效避免了消費者消費過程發生異常,或者消息在網絡傳輸中丢失的情況。
- 消費者從 broker 拉取消息,然後執行相應的業務邏輯。一旦執行成功,将會傳回
說說RocketMQ同步異步複制和刷盤?
- 複制
- 為了確定成功釋出的消息不會丢失,RocketMQ提供了同步和異步兩種複制模式獲得更強的持久性和更高的可用性。
- 同步Broker要等到送出日志被複制到從伺服器後才進行确認。
- 相反,異步Broker在主伺服器上處理消息後立即傳回。
- 刷盤
- 同步刷盤:在消息達到Broker的記憶體之後,必須刷到commitLog日志檔案中才算成功,然後傳回Producer資料已經發送成功。
-
異步刷盤:異步刷盤是指消息達到Broker記憶體後就傳回Producer資料已經發送成功,會喚醒一個線程去将資料持久化到CommitLog日志檔案中。
優缺點分析:同步刷盤保證了消息不丢失,但是響應時間相對異步刷盤要多出10%左右,适用于對消息可靠性要求比較高的場景。異步刷盤的吞吐量比較高,RT小,但是如果broker斷電了記憶體中的部分資料會丢失,适用于對吞吐量要求比較高的場景。
說說RocketMQ負載均衡?
RocketMQ中的負載均衡都在Client端完成,具體來說的話,主要可以分為Producer端發送消息時候的負載均衡和Consumer端訂閱消息的負載均衡。
nameServer儲存着Topic的路由資訊,路由記錄了broker叢集節點的通訊位址,broker的名稱以及讀寫隊列數量等資訊。寫隊列writeQueue表示生産者可以寫入的隊列數,如果不做配置預設為4,也就是queueId是0,1,2,3.broker收到消息後根據queueId生成消息隊列,生産者負載均衡的過程的實質就是選擇broker叢集和queueId的過程。讀隊列readQueue表示broker中可以供消費者讀取資訊的隊列個數,預設也是4個,也就是queueId也是0,1,2,3。消費者拿到路由資訊後會選擇queueId,從對應的broker中讀取資料消費
- Producer的負載均衡
- Producer端在發送消息的時候,會先根據Topic找到指定的TopicPublishInfo,在擷取了TopicPublishInfo路由資訊後,RocketMQ的用戶端在預設方式下selectOneMessageQueue()方法會從TopicPublishInfo中的messageQueueList中選擇一個隊列(MessageQueue)進行發送消息。具體的容錯政策均在MQFaultStrategy這個類中定義。這裡有一個sendLatencyFaultEnable開關變量,如果開啟,在随機遞增取模的基礎上,再過濾掉not available的Broker代理。所謂的"latencyFaultTolerance",是指對之前失敗的,按一定的時間做退避。例如,如果上次請求的latency超過550Lms,就退避3000Lms;超過1000L,就退避60000L;如果關閉,采用随機遞增取模的方式選擇一個隊列(MessageQueue)來發送消息,latencyFaultTolerance機制是實作消息發送高可用的核心關鍵所在。簡單的說選擇的标準:盡量不選剛剛選過的broker,盡量不選發送上條消息延遲過高或沒有響應的broker,也就是找到一個可用的
- Consumer的負載均衡
- 将MessageQueue中的消息隊列配置設定到消費者組裡的具體消費者;Consumer在啟動的時候會執行個體化RebalanceImpl,這個類負責消費端的負載均衡。在Consumer執行個體的啟動流程中的啟動MQClientInstance執行個體部分,會完成負載均衡服務線程—RebalanceService的啟動(每隔20s執行一次)。通過檢視源碼可以發現,RebalanceService線程的run()方法最終調用的是RebalanceImpl類的rebalanceByTopic()方法,該方法是實作Consumer端負載均衡的核心
- 負載均衡算法
- 平均配置設定算法
- 環形算法
- 指定機房算法
- 就近機房算法
- 一緻性雜湊演算法
- 手動配置算法
RocketMQ如何保證順序消息?
- 在預設的情況下消息發送會采取Round Robin輪詢方式把消息發送到不同的queue(分區隊列);而消費消息的時候從多個queue上拉取消息,這種情況發送和消費是不能保證順序。但是如果控制發送的順序消息隻依次發送到同一個queue中,消費的時候隻從這個queue上依次拉取,則就保證了順序。當發送和消費參與的queue隻有一個,則是全局有序;如果多個queue參與,則為分區有序,即相對每個queue,消息都是有序的。順序消費不能是并發的。
- 怎麼保證消息發到同一個queue裡?RocketMQ給我們提供了MessageQueueSelector接口,可以重寫裡面的接口,實作自己的算法,比如判斷i%2==0,那就發送消息到queue1否則發送到queue2。
RocketMQ如何實作消息去重?
- 這個得依賴于消息的幂等性原則:就是使用者對于同一種操作發起的多次請求的結果是一樣的,不會因為操作了多次就産生不一樣的結果。隻要保持幂等性,不管來多少條消息,最後處理結果都一樣,需要Consumer端自行實作。
- 在RocketMQ去重的方案:因為每個消息都有一個MessageId, 保證每個消息都有一個唯一鍵,可以是資料庫的主鍵或者唯一限制,也可以是Redis緩存中的鍵,當消費一條消息前,先檢查資料庫或緩存中是否存在這個唯一鍵,如果存在就不再處理這條消息,如果消費成功,要保證這個唯一鍵插入到去重表中。
說說RocketMQ分布式事務消息?
半消息:是指暫時還不能被Consumer消費的消息,Producer成功發送到broker端的消息,但是此消息被标記為“暫不可投遞”狀态,隻有等Producer端執行完本地事務後經過二次确認了之後,Consumer才能消費此條消息。主要分為正常事務消息的發送及送出、事務消息的補償流程兩大塊。RocketMQ事務消息依賴半消息,二次确認以及消息回查機制。
- 1、Producer向broker發送半消息
- 2、Producer端收到響應,消息發送成功,此時消息是半消息,标記為“不可投遞”狀态,Consumer消費不了。
- 3、Producer端執行本地事務。
- 4、正常情況本地事務執行完成,Producer向Broker發送Commit/Rollback,如果是Commit,Broker端将半消息标記為正常消息,Consumer可以消費,如果是Rollback,Broker丢棄此消息。
- 5、異常情況,Broker端遲遲等不到二次确認。在一定時間後,會查詢所有的半消息,然後到Producer端查詢半消息的執行情況。
- 6、Producer端查詢本地事務的狀态
- 7、根據事務的狀态送出commit/rollback到broker端。(5,6,7是消息回查)
簡單歸納RocketMQ高性能原因?
- 網絡模型,RocketMQ 使用 Netty 架構實作高性能的網絡傳輸,也遵循了Reactor多線程模型,同時又在這之上做了一些擴充和優化。而Netty高性能我們在前一篇文章也以學習過,這裡就不重複說了。
Apache RocketMQ分布式消息傳遞和流資料平台及大廠面試寶典v4.9.2 - 順序寫、随機讀、零拷貝。
- 多主多從,建立topic時,多個message queue可以在多個broker上,master提供讀寫,從broker可以分擔讀消息的壓力。
- 同步複制和異步複制。
- 同步刷盤和異步刷盤(PageCache)。
- 同步和異步發送消息。
- 業務線程池隔離,RocketMQ 對 Broker 的線程池進行了精細的隔離。使得消息的生産、消費、用戶端心跳、用戶端注冊等請求不會互相幹擾。
- 并行消費和批量消費。
最後:去哪兒網開源的QMQ消息中間件也可以好好的研究,功能非常齊全,消息中間件的應用是比較簡單的,更多應該思考和了解主流開源中間件Kafka、RocketMQ、QMQ、Palsar等的設計思想。