天天看點

ActiveMQ從入門到精通(二)消息的順序消費JMS Selectors消息的同步 AND 異步 接受MessageP2P or Pub/Sub持久化訂閱持久化消息到MySQL與Spring整合 JmsTemplate

在上一篇文章中,我們已經明确知道了ActiveMQ并不能保證消費的順序性,即便我們使用了消息優先級。而在實際開發中,有些場景又是需要對消息進行順序消費的,比如:使用者從下單、到支付、再到發貨等。如果使用ActiveMQ該如何保證消費的順序性呢?

<a href="https://s2.51cto.com/wyfs02/M01/8E/F6/wKiom1jP0PnAxnLnAABurs9WpsQ838.png" target="_blank"></a>

首先來說,在實際中,我們并不需要的是對全部消息的全局有序消費,我們僅僅需要的是局部業務有序性消費。比如說,我們僅僅需要的是一個使用者的下訂單、支付、發貨這個過程的3條消息有序消費。

比如,我們可以根據使用者ID簡單做一個HASH,将消息定位到不同的隊列上,也就意味着同一個使用者的消息将發往同一個隊列。這樣做的好處在于,多個隊列之間可以并行處理。

然後,在隊列上可以對一段時間上的消息按照使用者分組進行排序,這隻是一個少量消息的局部排序而已,比如Queue-A上有一個使用者的3條消息(訂單消息msg1、支付消息msg2、發貨消息msg3),那麼,msg1将交給訂單業務系統,處理完成後,msg2交給支付系統,處理完成後,msg3交給發貨系統。雖然這個處理過程是同步的(一條消息處理完,在接着處理),但是它的并發性,系統的處理能力并沒有下降!為什麼這麼說呢?

假設,msg1/msg2/msg3處理各需要0.1S,如果訂單業務系統、支付系統、發貨系統并沒有分開,而是一個“大系統”,那麼顯然訂單業務在0.1S完成後,需要等待後面的支付、發貨邏輯處理完才能繼續工作,意味着訂單業務幹了0.1S的活,等了0.2S,導緻在0.3秒内訂單業務隻處理了1條消息。而現在這3個系統是分開的,那麼在0.3S内,訂單業務系統可以處理3條消息,而且沒有業務系統閑着!

實際上,RocketMQ在消費順序性這塊要比ActiveMQ要強大些,後期在RocketMQ專題中再為大家介紹。

<a href="https://s4.51cto.com/wyfs02/M01/8E/F3/wKioL1jP0YOyfaQIAAB4mhlrc9M252.png" target="_blank"></a>

<a href="https://s3.51cto.com/wyfs02/M02/8E/F6/wKiom1jP0aKCBSnaAAB4hO8Pi4I017.png" target="_blank"></a>

需要注意一下幾點:

第一,生産者端需要設定消息屬性,一定要注意的是setXxxProperty(filed,value)

第二,給出條件,其實本質上就是SQL92文法

第三,建立消費者的時候,指定條件即可

消息的接受,我們已經知道,可以通過消費者的receive()/receive(long time)/receiveNoWait(),這種方式是client端主動接受消息,可以了解為消息的同步接受。要知道這種同步的消息接受方式,是讓我們很難受的,我們不得不寫一個死循環來不斷接受消息。那麼有沒有一種比較優雅的方式,比如我們設定一個類似消息監聽的機制,一旦隊列上有消息了,那麼回調我們的message handler進行處理呢?

<a href="https://s3.51cto.com/wyfs02/M01/8E/F6/wKiom1jP0djAURC8AABAcmkBxCQ906.png" target="_blank"></a>

消息的異步接受是指當消息到達時,ActiveMQ主動通知用戶端。可以通過注冊一個實作了MessageListener接口的對象到MessageConsumer。MessageListener隻有一個必須要實作的方法,即onMessage。在發往Destination的消息時,會調用該方法。

這種異步接受“貌似”是ActiveMQ主動的推送消息給消費者,其本質還是消費者輪詢消息伺服器導緻的,隻不過這個過程被封裝了!

JMS程式的核心在于,生産和消費的消息能夠被其他程式所使用到。JMS Message是一個既簡單又不乏靈活的基本格式,由消息頭、屬性、消息體3部分組成。

<a href="https://s5.51cto.com/wyfs02/M01/8E/F3/wKioL1jP0hLT2-5dAACirT3ATUc227.png" target="_blank"></a>

注意,在消費者端,我們接受到消息後,一般需要通過instanceof來判斷類型後在進行處理!

在ActiveMQ中,還存在一類臨時消息,就是通過建立臨時隊列/臨時主題,如果Connection一旦關閉,那麼臨時目标就關閉,消息内容也就消失。了解下即可,實際中并不适用。

上2張圖,你就會明白這2種模式的差別了。

<a href="https://s5.51cto.com/wyfs02/M02/8E/F4/wKioL1jP0kKxsEdXAABDpyHM1zQ282.png" target="_blank"></a>

生産者端發送一條消息,消費者端隻會有一個消費者消費這個消息。好像打電話,一對一通信!

<a href="https://s2.51cto.com/wyfs02/M02/8E/F4/wKioL1jP0nzDDMfKAABMnh2EWm0104.png" target="_blank"></a>

一對多通信,發送一條消息,所有訂閱了該目标的消費者都會收到消息。

P2P、Pub/Sub在代碼上的差別點僅僅在于,目标類型的建立是createQueue or createTopic,其他一切照舊!

對于訂閱模式,對訂閱者提出了特殊的要求,要想收到消息,必須先訂閱,而且訂閱程序必須一直處于運作狀态!實際上,有時候消費者重新開機了下,那麼這個消費者将丢失掉一些消息,那麼能否避免這樣的情況呢?ActiveMQ已經替我們想好了,就是持久化訂閱!

所謂持久化訂閱,打個比方,就是說跟MQ打聲招呼,即便我不在,那麼給我發送的消息暫存在MQ,等我來了,再給我發過來。說白了,持久化訂閱,需要給MQ備個案(你是誰,想在哪個Topic上搞特殊化)!看一個代碼片段:

<a href="https://s4.51cto.com/wyfs02/M00/8E/F4/wKioL1jP0qqD6Ex5AACO3SArsow900.png" target="_blank"></a>

每一個持久化訂閱者都應該有一個唯一的ID作為标示以及要在哪個Topic上進行持久化訂閱,一旦這些資訊告知MQ之後,那麼以後不論持久化訂閱者在不線上,那麼他的消息會暫存在MQ,以後都會發給他!

在前文中已經提及預設情況下,ActiveMQ是開啟持久化消息機制的,并且是持久化到kahadb的,但是"很可惜"kahadb對我們不是很友好的可視化,其實ActiveMQ提供了配置的方式讓我們來選擇持久化消息到哪裡,這裡我以到MySQL為例來說明。(實際上ActiveMQ已經在conf配置檔案中提供了相應的例子,我這裡就簡單說明下)在activemq.xml的&lt;broker&gt;節點中增加MySQL資訊

<a href="https://s3.51cto.com/wyfs02/M01/8E/F4/wKioL1jP0tijg0Y7AAA8ePy9VbM208.png" target="_blank"></a>

注意到這個bean的id,這個是要被引用的。

<a href="https://s3.51cto.com/wyfs02/M00/8E/F4/wKioL1jP03nggyqtAAAgYa7XoDg690.png" target="_blank"></a>

實際中,我們會持久化到哪裡呢?一般情況下,比如到kahadb,比如到leveldb,因為這些資料庫的性能要較MySQL更高些,我們并不關心消息的“可視化”,更加關心的是消息在持久化的同時更加高效!

這裡我将為大家示範Spring和ActiveMQ整合的核心要素。采用Spring,不要Web容器,不涉及Spring-MVC,而且在這裡我将采用JUnit + Spring-Test來進行測試!文章末尾我将提供工程源碼下載下傳。OK,先來看一眼工程截圖:

<a href="https://s1.51cto.com/wyfs02/M01/8E/F4/wKioL1jP05mhbVe3AABdtiH38FM967.png" target="_blank"></a>

第一步:POM.XML配置

<a href="https://s4.51cto.com/wyfs02/M02/8E/F6/wKiom1jP08ixMmphAACgjNB3VfE337.png" target="_blank"></a>

第二步:MQ資訊配置檔案、Spring配置檔案

<a href="https://s4.51cto.com/wyfs02/M00/8E/F4/wKioL1jP0_LwjJJTAAAwFxJyt98067.png" target="_blank"></a>

<a href="https://s5.51cto.com/wyfs02/M00/8E/F6/wKiom1jP1AySNYDQAACeorO8SOc286.png" target="_blank"></a>

下面我們重點關注spring-activemq.xml:

<a href="https://s4.51cto.com/wyfs02/M01/8E/F6/wKiom1jP1DySZU17AACroEmvkGo746.png" target="_blank"></a>

注意從ActiveMQConectionFactory到PooledConnectionFactory,到Spring提供的SingleConnectionFactory,就是一個适配的過程。

<a href="https://s2.51cto.com/wyfs02/M02/8E/F6/wKiom1jP1F_yUZyAAAC6NSdMrcw467.png" target="_blank"></a>

注意Spring的套路經常是這樣的,提供XxxTemplate,比如HibernateTemplate,對于JMS,提供了JmsTemplate。

生産者應該持有JmsTemplate進行發送消息。

消費者,提供監聽器、監聽的目的地、連接配接工廠即可。

上面的配置,隻是一個非常簡單的示例,比如是發送到隊列,還是發送到主題,事務的配置,簽收機制的配置,ttl/priority等配置在後文通過看一下源碼,你就會知道該如何配置了。

第三步:消費者實作監聽器

<a href="https://s3.51cto.com/wyfs02/M00/8E/F6/wKiom1jP1KmAYJZOAAA8d4Y8byE119.png" target="_blank"></a>

第四步:生産者

<a href="https://s5.51cto.com/wyfs02/M00/8E/F4/wKioL1jP1NPBYVSwAABiznWXIfc819.png" target="_blank"></a>

第五步:利用Junit4 + SpringTest方式進行測試

我們以前在測試Spring這一塊,大都是通過手動編碼的方式(加載XML,setter/getter bean)進行,這裡我将為大家介紹一種全新的方式測試Spring程式!

<a href="https://s1.51cto.com/wyfs02/M01/8E/F6/wKiom1jP1P2jAsH6AAAZXsYe2Ok914.png" target="_blank"></a>

為什麼要提供一個測試基類呢?因為我們可能有很多個測試類,如果有了這個基類,其他測試類繼承它,就可以自動獲得測試基類的屬性了。

@RunWith  指明采用SpringJunit4進行測試

@ContextConfiguration 告訴配置檔案在哪裡

<a href="https://s1.51cto.com/wyfs02/M00/8E/F4/wKioL1jP1SXDeP8AAAAs1ccNFLY292.png" target="_blank"></a>

發現沒有,這樣寫Junit單元測試,和以前感覺不一樣!

其實,SpringTest + Junit4還提供了很多功能強大的地方,比如可以設定資料庫事務。如果我們在測試的過程結束後,希望復原資料庫的話,很簡單,隻需要在相應方法上打上注解即可。

運作結果

<a href="https://s3.51cto.com/wyfs02/M01/8E/F6/wKiom1jP1VaAgjUZAAA9hjXYGlo532.png" target="_blank"></a>

看一下屬性:

<a href="https://s5.51cto.com/wyfs02/M01/8E/F4/wKioL1jP1Xzhs2N2AABsVsoCqUk097.png" target="_blank"></a>

很多屬性,是不是很熟悉呢?

JmsTemplate的父類中有一個重要屬性:

<a href="https://s1.51cto.com/wyfs02/M00/8E/F6/wKiom1jP1aOTw5TAAAAYE-4GQ1I418.png" target="_blank"></a>

預設情況下,是P2P模式,如果将這個屬性配置成true,那麼将是主題模式。

OK,到這裡這篇部落格的内容就介紹完畢了,下一篇是關于ActiveMQ叢集方面的知識,See you again.

<b>本文轉自zfz_linux_boy 51CTO部落格,原文連結:http://blog.51cto.com/zhangfengzhe/1908576,如需轉載請自行聯系原作者</b>