天天看點

ActiveMQ從入門到精通(一)JMSActiveMQ QuickStartWrite Code 4 ActiveMQ在說說Session關于消息的priority/ttl/deliveryMode

這是關于消息中間件ActiveMQ的一個系列專題文章,将涵蓋JMS、ActiveMQ的初步入門及API詳細使用、兩種經典的消息模式(PTP and Pub/Sub)、與Spring整合、ActiveMQ叢集、監控與配置優化等。話不多說,我們來一起瞧一瞧!

首先來說較早以前,也就是沒有JMS的那個時候,很多應用系統存在一些缺陷:

1.通信的同步性

client端發起調用後,必須等待server處理完成并傳回結果後才能繼續執行

2.client 和 server 的生命周期耦合太高

client程序和server服務程序都必須可用,如果server出現問題或者網絡故障,那麼client端會收到異常

3.點對點通信

client端的一次調用隻能發送給某一個單獨的服務對象,無法一對多

JMS,即Java Message Service,通過面向消息中間件(MOM:Message Oriented Middleware)的方式很好的解決了上面的問題。大緻的過程是這樣的:發送者把消息發送給消息伺服器,消息伺服器将消息存放在若幹隊列/主題中,在合适的時候,消息伺服器會将消息轉發給接受者。在這個過程中,發送和接受是異步的,也就是發送無需等待,而且發送者和接受者的生命周期也沒有必然關系;在pub/sub模式下,也可以完成一對多的通信,即讓一個消息有多個接受者。

<a href="https://s2.51cto.com/wyfs02/M02/8E/A8/wKiom1jH-UmwU2dqAABEoH9xe5M475.png" target="_blank"></a>

需要注意的是,JMS隻是定義了Java通路消息中間件的接口,其實就是在包javax.jms中,你會發現這個包下除了異常定義,其他都是interface。我們可以掃一眼,比如Message:

<a href="https://s4.51cto.com/wyfs02/M02/8E/A6/wKioL1jH-XLRqMysAACPb2BdjDw053.png" target="_blank"></a>

我想你應該發現了,JMS隻給出接口,然後由具體的中間件去實作,比如ActiveMQ就是實作了JMS的一種Provider,還有阿裡巴巴的RocketMQ(後續專題中在為大家介紹)。這些消息中間件都符合JMS規範。說起規範,自然要定義一些術語:

Provider/MessageProvider:生産者

Consumer/MessageConsumer:消費者

PTP:Point To Point,點對點通信消息模型

Pub/Sub:Publish/Subscribe,釋出訂閱消息模型

Queue:隊列,目标類型之一,和PTP結合

Topic:主題,目标類型之一,和Pub/Sub結合

ConnectionFactory:連接配接工廠,JMS用它建立連接配接

Connnection:JMS Client到JMS Provider的連接配接

Destination:消息目的地,由Session建立

Session:會話,由Connection建立,實質上就是發送、接受消息的一個線程,是以生産者、消費者都是Session建立的

初步來看,Session非常核心,因為很多東西都是它建立的,在後文中可以通過代碼來進一步認識這些術語。

ActiveMQ是Apache出品的,非常流行的消息中間件,可以說要掌握消息中間件,需要從ActiveMQ開始,要掌握更加強大的RocketMQ,也需要ActiveMQ的基礎,是以我們來搞定它吧。官網位址:http://activemq.apache.org/,目前最新的版本是5.14.4,我這邊将以最新版來講解。這篇文章主要是ActiveMQ的初步,是以我這邊暫時用windows版本,後期采用Linux。

<a href="https://s4.51cto.com/wyfs02/M00/8E/A8/wKiom1jH-e7SNNm6AADiqvsjv8o398.png" target="_blank"></a>

bin下面存放的是ActiveMQ的啟動腳本activemq.bat,注意分32、64位

conf裡面是配置檔案,重點關注的是activemq.xml、jetty.xml、jetty-realm.properties。在登入ActiveMQ Web控制台需要使用者名、密碼資訊;在JMS CLIENT和ActiveMQ進行何種協定的連接配接、端口是什麼等這些資訊都在上面的配置檔案中可以展現。

data目錄下是ActiveMQ進行消息持久化存放的地方,預設采用的是kahadb,當然我們可以采用leveldb,或者采用JDBC存儲到MySQL,或者幹脆不使用持久化機制。

webapps,注意ActiveMQ自帶Jetty提供Web管控台

lib中ActiveMQ為我們提供了分功能的JAR包,當然也提供了activemq-all-5.14.4.jar

在JDK安裝沒有問題的情況下,直接activemq.bat啟動它,并通路Web控制台!

<a href="https://s3.51cto.com/wyfs02/M02/8E/A8/wKiom1jH-iTh7-54AAEwjJkmVno853.png" target="_blank"></a>

到這裡,ActiveMQ就已經啟動了,So easy~ 

通路ActiveMQ web控制台的使用者名、密碼在哪裡配置的?URL當中的端口是在哪裡配置的?

<a href="https://s2.51cto.com/wyfs02/M01/8E/A6/wKioL1jH-lfjLBArAAARRTa9J6I501.png" target="_blank"></a>

<a href="https://s3.51cto.com/wyfs02/M02/8E/A6/wKioL1jH-muRIYQaAAA9hWiUT6o268.png" target="_blank"></a>

來一個HelloWorld級别的例子,來感受下ActiveMQ。具體來說,我這邊會寫一個生産者用于發送消息,一個消費者用于接收消息。實際上,JMS是有“套路”的,下面我将以生産者為例詳細說明。

<a href="https://s4.51cto.com/wyfs02/M00/8E/A8/wKiom1jH-qLAUlaWAABJuuIRJqo727.png" target="_blank"></a>

實際上,這裡是存在安全隐患的,也就是任何人一旦知道MQ的位址,就可以連接配接通路了,我們可以在activemq.xml中配置指定的使用者、密碼才能通路ActiveMQ。

關于broker_bind_url,預設就是tcp://localhost:61616,說明是采用TCP協定,61616端口。其實對于ActiveMQ不僅僅支援TCP協定,還有其他協定,開啟了多個端口。

<a href="https://s3.51cto.com/wyfs02/M00/8E/A6/wKioL1jH-tqDyWkBAACNNbRakMM627.png" target="_blank"></a>

<a href="https://s3.51cto.com/wyfs02/M02/8E/A7/wKioL1jH-wiCpwMuAAAYj2Zv-Tg909.png" target="_blank"></a>

Connection就代表了應用程式和消息伺服器之間的通信鍊路。獲得了連接配接工廠後,就可以建立Connection。

事實上,ConnectionFactory存在重載方法:

Connection createConnection(String username,String password) 

也就是說我們也可以在這裡指定使用者名、密碼進行驗證

<a href="https://s4.51cto.com/wyfs02/M01/8E/A8/wKiom1jH-3WS84NNAAAqetom-wI800.png" target="_blank"></a>

Session,用于發送和接受消息,而且是單線程的,支援事務的。如果Session開啟事務支援,那麼Session将儲存一組資訊,要麼commit到MQ,要麼復原這些消息。Session可以建立MessageProducer/MessageConsumer。

<a href="https://s5.51cto.com/wyfs02/M01/8E/A7/wKioL1jH-5Hx1iirAAAUbeqEaFg594.png" target="_blank"></a>

所謂消息目标,就是消息發送和接受的地點,要麼queue,要麼topic。

<a href="https://s4.51cto.com/wyfs02/M01/8E/A7/wKioL1jH-8KiRzdUAAATJcLR6vQ950.png" target="_blank"></a>

<a href="https://s3.51cto.com/wyfs02/M02/8E/A8/wKiom1jH--niIuTKAAAjvRRXny0402.png" target="_blank"></a>

<a href="https://s2.51cto.com/wyfs02/M02/8E/A8/wKiom1jH_Aizk41_AAAmqZOH2eQ300.png" target="_blank"></a>

生産者和消費者之間傳遞的對象,由3個主要部分構成:

消息頭(路由)+消息屬性(消息選擇器,以後介紹)+消息體(JMS規範的5種類型消息)

<a href="https://s5.51cto.com/wyfs02/M02/8E/A7/wKioL1jH_DeQjAYEAADX_QSVoNY943.png" target="_blank"></a>

<a href="https://s2.51cto.com/wyfs02/M00/8E/A8/wKiom1jH_FizW_MHAAAhxd_F5Ug271.png" target="_blank"></a>

必須close connection,隻有這樣ActiveMQ才會釋放資源!

消費者的代碼和上面非常類似,隻不過就是建立MessageConsumer進行receive而已,注意receive()/receive(long)/receiveNoWait(),這些說明消費者可以采用阻塞模式、非阻塞模式接受消息。

程式運作後,我們來看一下管控台:

<a href="https://s5.51cto.com/wyfs02/M00/8E/A8/wKiom1jH_Irw0_d3AACEgWQwO30515.png" target="_blank"></a>

Messages Enqueued:表示生産了多少條消息,記做P

Messages Dequeued:表示消費了多少條消息,記做C

Number Of Consumers:表示在該隊列上還有多少消費者在等待接受消息

Number Of Pending Messages:表示還有多少條消息沒有被消費,實際上是表示消息的積壓程度,就是P-C

在通過Connection建立Session的時候,需要設定2個參數,一個是否支援事務,另一個是簽收的模式。我們重點說一下簽收模式:

<a href="https://s2.51cto.com/wyfs02/M00/8E/A7/wKioL1jH_NTzTMLBAAATQzVVF0k615.png" target="_blank"></a>

什麼是簽收?通俗點說,就是消費者接受到消息後,需要告訴消息伺服器,我收到消息了。當消息伺服器收到回執後,本條消息将失效。是以簽收将對PTP模式産生很大影響。如果消費者收到消息後,并不簽收,那麼本條消息繼續有效,很可能會被其他消費者消費掉!

AUTO_ACKNOWLEDGE:表示在消費者receive消息的時候自動的簽收

CLIENT_ACKNOWLEDGE:表示消費者receive消息後必須手動的調用acknowledge()方法進行簽收

DUPS_OK_ACKNOWLEDGE:簽不簽收無所謂了,隻要消費者能夠容忍重複的消息接受,當然這樣會降低Session的開銷

在實際中,我們應該采用哪種簽收模式呢?CLIENT_ACKNOWLEDGE,采用手動的方式較自動的方式可能更好些,因為接收到了消息,并不意味着成功的處理了消息,假設我們采用手動簽收的方式,隻有在消息成功處理的前提下才進行簽收,那麼隻要消息處理失敗,那麼消息還有效,仍然會繼續消費,直至成功處理!

消息有優先級及存活時間,在MessageProducer進行send的時候,存在多個重載方法,我們來看一下:

<a href="https://s1.51cto.com/wyfs02/M02/8E/A8/wKiom1jH_RmTHGSYAAEq-u-ev0g030.png" target="_blank"></a>

在上面的code當中,我們建立生産者的時候,指定了Destination,設定了持久化方式,實際上這些都可以不必指定的,而是到send的時候指定。而且在實際業務開發中,往往根據各種判斷,來決定将這條消息發往哪個Queue,是以往往不會在MessageProducer建立的時候指定Destination。

TTL,消息的存活時間,一句話:生産者生産了消息,如果消費者不來消費,那麼這條消息保持多久的有效期

priority,消息優先級,0-9。0-4是普通消息,5-9是加急消息,消息預設級别是4。注意,消息優先級隻是一個理論上的概念,并不能絕對保證優先級高的消息一定被消費者優先消費!也就是說ActiveMQ并不能保證消費的順序性!

deliveryMode,如果不指定,預設是持久化的消息。如果可以容忍消息的丢失,那麼采用非持久化的方式,将會改善性能、減少存儲的開銷。

本文轉自zfz_linux_boy 51CTO部落格,原文連結:http://blog.51cto.com/zhangfengzhe/1906555,如需轉載請自行聯系原作者