天天看點

消息隊列之activeMQ

實作高可用、高伸縮、高性能、易用和安全的企業級面向消息服務的系統

異步消息的消費和處理

控制消息的消費順序

可以和Spring/springBoot整合簡化編碼

配置叢集容錯的MQ叢集

下載下傳位址:http://activemq.apache.org/components/classic/download/

這裡筆者是下載下傳的linux版的:

消息隊列之activeMQ

因為activeMQ底層是使用java編寫的,是以需要安裝jdk,這個請移步我之前的部落格:

安裝activeMq:

前台通路的端口是8161,在檢視前台時,要關閉linux和windows的防火牆:

在通路之前,需要修改conf目錄下的jetty.xml,将下面的host修改成自己的ip,以及修改使用者名和密碼。

修改完成之後重新開機activemq

檢視,位址為192.168.189.150:8161

消息隊列之activeMQ

到這裡就說明activemq安裝成功了。

JMS(java message service)是一個用于提供消息服務的技術規範,他制定了在整個消息服務提供過程中的所有資料結構和互動流程。當兩個程式使用jms進行通信時,他們并不是直接相連的,而是通過一個共同的消息收發服務連接配接起來的,達到解耦的效果。jms為标準消息協定和消息服務提供了一組通用的接口,包括建立、發送、讀取消息等。

消息隊列之activeMQ

異步:用戶端不用發送請求,JMS自動将消息發送給用戶端

可靠:JMS保證消息隻傳遞一次

JMS provider:實作了jms接口和規範的消息中間件

JMS producer:消息生産者,建立和發送JMS消息的用戶端應用

JMS consumer:消息消費者,接受和處理JMS消息的用戶端應用

JMS message:由消息頭、消息屬性、消息體組成

消息頭(在send方法之前,通過setXXX()設定):

JMSDestination:消息發送的目的地,主要是指Queue(點對點傳送模型)和Topic(釋出訂閱模型)

JMSDeliverMode:消息是否持久

JMSExpiration:設定消息過期時間

JMSPriority:消息優先級,0-4被稱為普通消息,5-9是加急消息,預設為4

JMSMessageID:唯一識别每個消息的辨別,由MQ産者或者自己設定

消息屬性:除消息頭以外的值,如識别,去重,重點标注等方法,如textMessage.setStringProperty("c1","VIP");

消息體:

TextMessage:普通字元串

MapMessage:map類型,其中key為String類型,而值為java的基本類型

BytesMessage:二進制數組消息

StreamMessage:java資料流消息,用個标準流來順序填充和讀取

ObjectMessage:對象消息,包含一個可序列化的java對象

點對點消息傳送模型:應用程式由消息隊列、發送者、接收者組成,每個消息發送給一個特殊的消息隊列,該隊列儲存了所有發送給它的消息,處理消費掉的和已過期的消息

點對點消息傳送的特性:

1.每個消息隻有一個接收者

2.消息發送者和接收者沒有時間依賴性

3.當消息發送者發送消息時,無論接收者程式在不在運作,都能發送消息

4.當接收者收到消息時,會發送确認收到通知

釋出訂閱消息傳遞模型:釋出者釋出一個消息,該消息通過topic傳遞給所有訂閱的用戶端,釋出者和訂閱者彼此不知道對方,是匿名的且可以動态釋出和消息訂閱。

釋出訂閱消息傳遞的特性:

1.一個消息可以傳遞給多個訂閱者

2.釋出者和訂閱者有時間依賴性

3.為了緩和嚴格的時間相關性,JMS允許訂閱者建立一個可持久化的訂閱

1.引入jar包

2.生産者代碼

運作代碼在浏覽器上檢視,可以看到queue01裡面有5條消息:

消息隊列之activeMQ

Number Of Pending Messages:等待消費的消息 這個是目前未出隊列的數量。可以了解為總接收數-總出隊列數

Number Of Consumers:消費者的數量

Messages Enqueued:進入隊列的消息 進入隊列的總數量,包括出隊列的。 這個數量隻增不減

Messages Dequeued:出了隊列的消息 可以了解為是消費這消費掉的數量

運作消費者的代碼,應該我上面生産者的代碼運作了兩次,是以消息有10條。

消息隊列之activeMQ

在這裡,筆者使用的基于Zookeeper+levelDb搭建的activeMq叢集,為了避免單點故障,使用一主兩從的架構。使用Zookeeper叢集注冊所有的ActiveMQ Broker但隻有其中一個Broker可以提供服務,它被視為master,也就是說如果master因為故障而不能提供服務,Zookeeper會從SLave中選舉出一個Broker充當master。

我這邊的zookeeper叢集已經搭建好了,150和151是follower,152是leader。

從上面可以看到,隻有00000000020這個幾點的elected裡面有值,表明它被選舉為master節點了。

在浏覽器上依次通路:192.168.189.150:8161 , 192.168.189.151:8161,192.168.189.152:8161

隻有192.168.189.150:8161可以通路成功,因為隻有master節點可以對外提供通路,是以隻有一個節點能通路到,那麼它就是master節點。

第二種檢視的方式:

檢視activemq的日志,最後一行,可以看到,MasterLevelDBStore即為master節點,SlaveLevelDBStore即為slave節點。

消息隊列之activeMQ
消息隊列之activeMQ

第三種檢視的方式為使用zookeeper的可視化工具。

由于activeMq叢集是基于zookeeper叢集實作的,是以要注意一下三點:

activeMQ的用戶端隻能通路master的Broker,其它處于Slave的Broker不能通路,是以用戶端連接配接的Broker應該使用failover協定

當一個activeMQ節點挂掉或者一個Zookeeper節點挂掉,activeMQ服務正常運轉,但是如果僅剩一個activeMQ節點,由于不能選舉Master,是以activeMQ不能正常運作;(一個就不成叢集了)

同理,如果Zookeeper僅剩一個節點是活動的,不管activeMQ是都存活或者說不管activeMQ個節點是否存活,activeMQ不能正常提供服務,必須依賴于Zookeeper叢集服務。

叢集的代碼和上面單機的代碼大緻是一直的,就隻需要修改一個activemq的位址。

1.消息發送方式

預設情況下,非持久化的消息是異步發送的,持久化的消息是同步發送的。但是在開啟事務的情況下,消息都是異步發送的,效率會有2個數量級的提升,是以在發送持久化消息時,請開啟事務模式。

2.儲存機制

在通常情況下,非持久化的消息時存儲在記憶體中的,持久化消息時存儲在檔案中的,他們的最大限制在配置檔案中的

是以盡量不要用非持久化檔案,如果非要用的化,可以将臨時檔案的限制調大。同時,非持久化的消息要及時處理,不要堆積,或者啟動事務。啟動事務後,commit()會等待伺服器的消息傳回,也不會導緻消息丢失了。

3.死信隊列

一條消息在被重發多次後(預設是6次),将會被ActiveMQ移入死信隊列;說白了就是異常消息的歸并處理的集合,主要是處理失敗的消息。可以在activeMQ.DLQ這個隊列中檢視。

4.重複消息,幂等性調用

在網絡延遲的情況洗啊,可能會造成MQ重試,可能會造成重複消費。如果消息是做資料庫的插入操作,給這個消息做一個唯一主鍵,那麼就算出現重複消費的情況,因為唯一主鍵,會造成主鍵沖突,避免資料庫出現髒資料。如果是第三方消費,可以在每條資料裡面加一個全局唯一的id,如果消息消費了,就将消息存在redis中,在消費消息之前将id到redis中查詢一下,判斷是否消費過,如果沒有消費過,就處理,如果消費過了,就不處理了。