在前面的文章中,我們學習了RocketMQ的原理;RocketMQ中 命名服務 ServiceName 的運作流程;以及消息生産、發送的原理和模式。這一篇,就讓我們從消息消費的角度去進一步的學習。
1 消息消費
消息的消費主要是由如下幾個核心能力組成的:
- 消費方式:Push(推) 或者 Pull(拉)
- 消費模式:廣播模式和叢集模式
- 消息消費回報
- 流量控制(包括消費并發線程數設定)
- 消息的過濾(Tag, Key),過濾标簽 TagA||TagB||TagC
1.1 消費方式Push or Pull
RocketMQ消息訂閱有方式:
-
Push方式(MQPushConsumer),MQ Server主動向消費端推送;
這種模式不考慮消費端是否有能力處理消費資料,實時性比較高,能夠及時推送資料,适合大部分業務場景。但同時存在一個問題,如果遇到峰值期,瞬間推送過多消息,會導緻積壓,甚至用戶端雪崩。
-
Pull方式(MQPullConsumer),消費端在有需要時,主動從MQ Server拉取資料。
消費端比較靈活,可以根據自己的吞吐能力,消費的節奏,主動安排消息拉取。适合消費和計算耗時比較大的消費場景。
缺點就是如何從代碼層面精準地控制拉取的頻率,過短對消費端有壓力,并且有可能空拉照成資源拉菲;過長可能對消息及時性有影響,可以采用長輪詢的方式進行處理。
-
Push模式與Pull模式的差別
Push方式的做法是,Consumer封裝了長輪詢的操作,并注冊MessageListener監聽器,當MessageListener監聽到有新的消息的時候,消費端便被喚醒,讀取消息進行消費。從使用者角度上,訂閱消息并消費感覺消息是推過來的。
Pull方式的做法是,消費端主動去拉取資料,擷取相應的Topic的,周遊MessageQueue集合,取數,重新标記offset,再取數,直至消費完成。
1.2 消費模式 叢集 or 廣播
RocketMQ 目前支援叢集模式和廣播消費模式,其中叢集模式使用範圍比較大,即點對點,消息消費了即完成。
-
叢集負載均衡消費模式(預設)
叢集模式是一個主題下的單條消息隻允許被同一消費組内的一個消費者消費,消費完即完成,即P2P。
在叢集模式下,消息隊列負載的模式:一個MessageQueue集合同一個時間内隻允許被同一消費組内的單個消費者消費一次(這種模式不允許重複消費,如付款,訂單送出),單個Consumer可以消費多個周遊MessageQueue集合。
-
廣播消費模式
廣播模式指的是目前主題下的消費組所有消費者都可以消費并處理消息一次,達到廣播的目的。很多業務場景,比如航班延遲的消息通知,告知用戶端緩存資訊過期需要重新拉起等。
1.3 消費進度回報
RocketMQ用戶端消費資料之後,需要向Broker回報消息的消費進度,Broker擷取到消費進度記錄下來。這樣保證 隊列rebalance和用戶端消費者重新開機動的時候,可以擷取到準确的消費進度。
消息消費以及進度回報的主步驟如下:
- 消費線程池消費完資料之後,将消息消費進度緩存在記憶體中。
- 定時排程任務 5s 一次将消息隊裡的消費 offset 送出至Broker。
- Broker接受到消息之後,存儲在記憶體中,如果有新的過來,可以更行,同樣的每5s将offset持久化下來。
- 消費用戶端從Broker拉取消息時,同步将MessageQueue的消費偏移量送出到Broker。
綜合上面的内容,需要注意的點如下:
- RocketMQ以Consumer Group(消費者小組)和 Queue(隊列)為标準對消費刻度進行管理的
- Consumer Offset标記消費組在消息隊列(Queue)上的消費進度。
- 消費成功後,消費進度暫時更新到本地緩存,排程任務會定時(預設5s)将進度同步到broker(需注意如果當機,消費進度未送出則可能導緻被重複消費),Broker最終将消費進度持久化到磁盤。
- RocketMQ支援并發消費,是以是多個線程并行處理,每次記錄消費進度的時候,把線程中最小的offset值作為消費進度值,這樣避免了消息丢失,但有重複消費的風險,業務中需保證操作幂等性。
- offset存儲模式:叢集模式,消息進度存儲于Broker上;廣播模式,消息消費進度在消費端即可。
1.4 消費端流量控制
可以在DefaultMQPushConsumer 對象中配置各種屬性來對消費流量進行控制:
-
PullInterval: 配置消費端拉取MQ消息的間隔時間。間隔時間是按照上次消費完成之後(比如rocketMQ收到Ack回複消息之後)。
PullInterval=20s,比如上次rocketMq服務收到Ack消息是12:15:15,則 12:15:35再去拉消息。
-
PullBatchSize: 消費端每個隊列一次拉取多少個消息,若該消費端分賠了N個監控隊列,每次拉取M個,那麼消費端每次去rocketMq拉取的消息為N * M。
消費端每次pull到消息總數=PullBatchSize * 監聽隊列數,如 PullBatchSize = 2, 監聽隊列=5,則 消息總數量 = 2 * 5 = 10。
- ThreadMin和ThreadMax: 消費端消費pull到的消息需要的線程數量。
- ThreadMin:消費端拉取到消息後配置設定消費的線程數
- ThreadMax:最大消費線程,如果預設隊列滿了,則啟用新的線程
-
RocketMq 邏輯消費隊列數量的配置
rocketMq 可以配置消費隊列,如 queue Read1 ,queue Read2,配置數量決定每次pull到的消息總數。Rocket MQ 提供了讀寫隊列數量的配置。
-
消費端節點部署數量
多節點消費端線程數量要比單節點消費線程數量多,理論上消費速度大于單節點,分治思維。
1.5 消息的過濾
在過濾消息的時候,标簽模式簡單而是用,可以篩選出你需要的資料。如下:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("groupTest");
consumer.subscribe("testTopic", MessageSelector.byTag("Tag1 || Tag2 || Tag3").bySql("*** = 'male' AND name = 'brand'));
這種情況下,消息中帶有 Tag1 、Tag2、Tag3 标簽就會被過濾出來,但是單個消限制息隻能有一個标簽,這就遠遠滿足不了各種複雜的并交集場景的需要了。
這時候Rocket MQ可以在消息中設定一些屬性,再使用SQL表達式篩選屬性來過濾出需要的資料。 如下
------------
| message |
|----------| *** = male AND name = 'brand' , Gotten
| name = 'brand' |
| *** = 'male'|
| age = 21|
------------
------------
| message |
|----------| *** = male AND name = 'brand', Gotten , Missed
| name = 'Anny' |
| *** = 'female'|
| age = 20 |
------------
1.8 提高Consumer的處理能力 :看情況
-
提高消費并行度
在同一個ConsumerGroup下(Clustering方式),可以通過增加Consumer執行個體的數量來提高并行度。
通過加機器,或者在已有機器中啟動多個Consumer程序都可以增加Consumer執行個體數。
注意:總的Consumer數量不要超過Topic下Read Queue數量,超過的Consumer執行個體接收不到消息。
此外,通過提高單個Consumer執行個體中的并行處理的線程數,可以在同一個Consumer内增加并行度來提高吞吐量(設定方法是修改consumeThreadMin和consumeThreadMax)。
-
以批量方式進行消費
某些業務場景下,多條消息同時處理的時間會大大小于逐個處理的時間總和,比如消費消息中涉及update某個資料庫,一次update10條的時間會大大小于十次update1條資料的時間。
可以通過批量方式消費來提高消費的吞吐量。實作方法是設定Consumer的consumeMessageBatchMaxSize這個參數,預設是1,如果設定為N,在消息多的時候每次收到的是個長度為N的消息連結清單。
-
檢測延時情況,跳過非重要消息
Consumer在消費的過程中,如果發現由于某種原因發生嚴重的消息堆積,短時間無法消除堆積,這個時候可以選擇丢棄不重要的消息,使Consumer盡快追上Producer的進度。
2 消息消費的模式
2.1 基本資訊消費
消費者的基本實作,連接配接 NameServer的位址,指定Topic和Tag,讀取到需要消費的資料,然後輪詢并處理。
public class SimpleConsumerApplication {
public static void main(String[] args) throws MQClientException {
// 1.建立消費者Consumer,并指定消費者組名為 testConsumGroup
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testConsumGroup");
// 2.指定NameServer的位址,以擷取Broker路由位址
consumer.setNamesrvAddr("192.168.139.1:9876");
// 3.指定Topic和Tag 資訊。* 代表所有
consumer.subscribe("testTopic", "*");
// 4.設定回調函數,用來處理讀取到的消息, MessageListenerOrderly 用單個線程處理處理隊列的資料
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgList, ConsumeOrderlyContext context) {
for (MessageExt msg : msgList) {
System.out.println("線程 " + Thread.currentThread().getName() + " : " + msg.getBody().toString());
// Todo,具體的業務邏輯
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
// 5.消費者開始執行消費任務
consumer.start();
}
}
2.2 順序消費
相比與基本消費,多了一個 ConsumeFromWhere的設定。代表消費者從哪個位置開始消費,枚舉如下:
- CONSUME_FROM_LAST_OFFSET:第一次啟動從隊列最後位置消費,非第一次啟動接着上次消費的進度繼續消費
- CONSUME_FROM_FIRST_OFFSET:第一次啟動從隊列初始位置消費,非第一次啟動接着上次消費的進度繼續消費
-
CONSUME_FROM_TIMESTAMP:第一次啟動從指定時間點位置消費,非第一次啟動接着上次消費的進度繼續消費
以上所說的第一次啟動是指從來沒有消費過的消費者,如果該消費者消費過,那麼會在broker端記錄該消費者的消費位置,消費者挂了再啟動,則從上次消費進度繼續執行。
public class SimpleOrderApplication {
public static void main(String[] args) throws MQClientException {
// 1.建立消費者Consumer,并指定消費者組名為 testConsumGroup
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testConsumGroup");
// 2.指定NameServer的位址,以擷取Broker路由位址
consumer.setNamesrvAddr("192.168.139.1:9876");
/**
* 設定Consumer第一次啟動是從隊列頭部、隊列尾部、還是指定時間戳節點開始消費
* 非第一次啟動接着上次消費的進度繼續消費
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 3.指定Topic和Tag 資訊。* 代表所有
consumer.subscribe("testTopic", "*");
// 4.設定回調函數,用來處理讀取到的消息, MessageListenerOrderly 用單個線程處理處理隊列的資料
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgList, ConsumeOrderlyContext context) {
for (MessageExt msg : msgList) {
System.out.println("線程 " + Thread.currentThread().getName() + " : " + msg.getBody().toString());
// Todo,具體的業務邏輯
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
// 5.消費者開始執行消費任務
consumer.start();
}
}
2.3 過濾消息消費
可以使用MessageSelector.byTag來進行标簽篩選;或者使用MessageSelector.bySql 來進行消息屬性篩選;或者混合使用。
參考下面代碼,注釋說明的比較清楚。
public class FilterConsumerApplication {
public static void main(String[] args) throws MQClientException {
// 1.建立消費者Consumer,并指定消費者組名為 testConsumGroup
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testConsumGroup");
// 2.指定NameServer的位址,以擷取Broker路由位址
consumer.setNamesrvAddr("192.168.139.1:9876");
// 3.指定Topic和Tag 資訊。隻有訂閱的消息有 *** 和 name 屬性, 并且年齡為 18 歲以上的男性
// consumer.subscribe("testTopic", MessageSelector.byTag("userTag1 || userTag2"));
consumer.subscribe("testTopic", MessageSelector.bySql("*** = 'male' AND age > 18"));
// 4.設定回調函數,用來處理讀取到的消息, MessageListenerOrderly 用單個線程處理處理隊列的資料
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgList, ConsumeOrderlyContext context) {
for (MessageExt msg : msgList) {
System.out.println("線程 " + Thread.currentThread().getName() + " : " + msg.getBody().toString());
// Todo,具體的業務邏輯
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
// 5.消費者開始執行消費任務
consumer.start();
}
}
3 總結
- 消費方式:Push(推) 或者 Pull(拉)
- 消費模式:廣播模式和叢集模式
- 消息消費回報
- 流量控制(包括消費并發線程數設定)
- 消息的過濾(Tag, Key),過濾标簽 TagA||TagB||TagC
為幫助開發者們提升面試技能、有機會入職BATJ等大廠公司,特别制作了這個專輯——這一次整體放出。
大緻内容包括了: Java 集合、JVM、多線程、并發程式設計、設計模式、Spring全家桶、Java、MyBatis、ZooKeeper、Dubbo、Elasticsearch、Memcached、MongoDB、Redis、MySQL、RabbitMQ、Kafka、Linux、Netty、Tomcat等大廠面試題等、等技術棧!
歡迎大家關注公衆号【Java爛豬皮】,回複【666】,擷取以上最新Java後端架構VIP學習資料以及視訊學習教程,然後一起學習,一文在手,面試我有。
每一個專欄都是大家非常關心,和非常有價值的話題,如果我的文章對你有所幫助,還請幫忙點贊、好評、轉發一下,你的支援會激勵我輸出更高品質的文章,非常感謝!