天天看點

Apache ActiveMQ實戰(1)-基本安裝配置與消息類型ActiveMQ簡介ActiveMQ整體架構ActiveMQ的安裝配置 ActiveMQ與Spring內建ActiveMQ與Spring內建-示例講解簡單消息與事務型消息點對點,應答式消息

Apache ActiveMQ實戰(1)-基本安裝配置與消息類型ActiveMQ簡介ActiveMQ整體架構ActiveMQ的安裝配置 ActiveMQ與Spring內建ActiveMQ與Spring內建-示例講解簡單消息與事務型消息點對點,應答式消息

ActiveMQ是一種開源的,實作了JMS1.1規範的,面向消息(MOM)的中間件,為應用程式提供高效的、可擴充的、穩定的和安全的企業級消息通信。ActiveMQ使用Apache提供的授權,任何人都可以對其實作代碼進行修改。

ActiveMQ的設計目标是提供标準的,面向消息的,能夠跨越多語言和多系統的應用內建消息通信中間件。ActiveMQ實作了JMS标準并提供了很多附加的特性。這些附加的特性包括,JMX管理(java Management Extensions,即java管理擴充),主從管理(master/salve,這是叢集模式的一種,主要展現在可靠性方面,當主中介(代理)出現故障,那麼從代理會替代主代理的位置,不至于使消息系統癱瘓)、消息組通信(同一組的消息,僅會送出給一個客戶進行處理)、有序消息管理(確定消息能夠按照發送的次序被接受者接收)。

ActiveMQ 支援JMS規範,ActiveMQ完全實作了JMS1.1規範。

JMS規範提供了同步消息和異步消息投遞方式、有且僅有一次投遞語義(指消息的接收者對一條消息必須接收到一次,并且僅有一次)、訂閱消息持久接收等。如果僅使用JMS規範,表明無論您使用的是哪家廠商的消息代理,都不會影響到您的程式。

ActiveMQ主要涉及到5個方面:

傳輸協定消息之間的傳遞,無疑需要協定進行溝通,啟動一個ActiveMQ打開了一個監聽端口, ActiveMQ提供了廣泛的連接配接模式,其中主要包括SSL、STOMP、XMPP;ActiveMQ預設的使用 的協定是openWire,端口号:61616;

消息域

ActiveMQ主要包含Point-to-Point (點對點),Publish/Subscribe Model (釋出/訂閱者),其中在 Publich/Subscribe 模式下又有Nondurable subscription和       durable subscription (持久 化訂閱)2種消息處理方式

消息存儲

在消息傳遞過程中,部分重要的消息可能需要存儲到資料庫或檔案系統中,當中介崩潰時,資訊不 回丢失

Cluster  (叢集)

最常見到 叢集方式包括network of brokers和Master Slave;

Monitor (監控)

ActiveMQ一般由jmx來進行監控;

Apache ActiveMQ實戰(1)-基本安裝配置與消息類型ActiveMQ簡介ActiveMQ整體架構ActiveMQ的安裝配置 ActiveMQ與Spring內建ActiveMQ與Spring內建-示例講解簡單消息與事務型消息點對點,應答式消息

通過http://activemq.apache.org/download.html 下載下傳:apache-activemq-5.13.3-bin.tar.gz 

把下載下傳的該檔案通過tar –zxvf apache-activemq-5.13.3-bin.tar.gz解壓在目前目錄

通過修改$ACTIVEMQ_HOME/conf/activemq.xml檔案可以修改其配置

一般修改的其實也隻有以下幾個段:、

我們在此段増加配置如下:

此處,我們使用的是”>”通配符,上述配置為每個隊列、每個Topic配置了一個最大2mb的隊列,并且使用了”optimizedDispatch=true”這個政策,該政策會啟用優化了的消息分發器,直接減少消息來回時的上下文以加快消息分發速度。

找到下面這一段

為確定擴充配置既可以處理大量連接配接也可以處理海量消息隊列,我們可以使用JDBC或更新更快的KahaDB消息存儲。預設情況下ActiveMQ使用KahaDB消息存儲。

ActiveMQ支援三種持久化政策:AMQ、KAHADB、JDBC

AMQ 它是一種檔案存儲形式,具有寫入快、容易恢複的特點,采用這種方式持久化消息會被存儲在一個個檔案中,每個檔案預設大小為32MB,如果一條消息超過32MB,那麼這個值就需要設大。當一個檔案中所有的消息被“消費”掉了,那麼這檔案會被置成“删除”标志,并且在下一個清除開始時被删除掉。

KAHADB,相比AMQ來説,KAHADB速度沒有AMQ快,可是KAHADB具有極強的垂直和橫向擴充能力,恢複時間比AMQ還要短,是以從5.4版後ActiveMQ預設使用KAHADB作為其持久化存儲。而且在作MQ的叢集時使用KAHADB可以做到Cluster+Master Slave的這樣的完美高可用叢集方案。

JDBC,即ActiveMQ預設可以支援把資料持久化到DB中,如:MYSQL、ORACLE等。

此處為ActiveMQ的記憶體配置,從5.10版後ActiveMQ在<memoryUsage>中引入了一個percentOfJvmHeap的配置,該百分比為:

$ACTIVEMQ_HOME/bin/env中配置的JVM堆大小的百分比,如$ACTIVEMQ_HOME/bin/env 中:

那麼此處的percentOfJvmHeap=90即表示:MQ消息隊列一共會用到2048M*0.9的記憶體。

全部配完後我們可以通過以下指令啟動ActiveMQ

這種方式為前台啟動activemq,用于開發模式便于調試時的資訊輸出。

你也可以使用:

以背景程序的方式啟動activemq。

啟動後在浏覽器内輸入http://192.168.0.101:8161/admin/ 輸入管理者帳号(預設為admin/admin)即可登入activemq的console界面

Apache ActiveMQ實戰(1)-基本安裝配置與消息類型ActiveMQ簡介ActiveMQ整體架構ActiveMQ的安裝配置 ActiveMQ與Spring內建ActiveMQ與Spring內建-示例講解簡單消息與事務型消息點對點,應答式消息

啟動後的ActiveMQ的資料位于:$ACTIVEMQ_HOME/data/目錄内

啟動後的ActiveMQ運作日志位于:$ACTIVEMQ_HOME/data/目錄内的activemq.log檔案

如果需要改ActiveMQ的日志配置可以通過修改$ACTIVEMQ_HOME/conf/log4j.properties

Apache ActiveMQ實戰(1)-基本安裝配置與消息類型ActiveMQ簡介ActiveMQ整體架構ActiveMQ的安裝配置 ActiveMQ與Spring內建ActiveMQ與Spring內建-示例講解簡單消息與事務型消息點對點,應答式消息

在Spring中建立一個activemq.xml檔案,使其内容如下:

其中:

<property name="alwaysSessionAsync" value=“true" />

對于一個connection如果隻有一個session,該值有效,否則該值無效,預設這個參數的值為true。

<property name="useAsyncSend" value="true" />

将該值開啟官方說法是可以取得更高的發送速度(5倍)。

<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">

<!-- 設定消息隊列的名字 -->

<constructor-arg value="ymk.queue?consumer.prefetchSize=100" />

</bean>

在此我們申明了一個隊列,并用它用于後面的實驗代碼。

consumer.prefetchSize則代表我們在此使用“消費者”預配置設定協定,在消費者内在足夠時可以使這個值更大以獲得更好的吞吐性能。

工程中的pom.xml檔案主要内容如下:

上述例子非常的簡單。

它其實是啟動了一個Message Listener用來監聽ymk.queue中的消息,如果有消息到達,接收端代碼就會把消息“消費”掉。

而發送端代碼也很簡單,它每次向ymk.queue隊列發送一個文本消息。

這邊所謂的MQ消費大家可以這樣了解:

使用者sender向MQ的KAHADB中插入一條資料。

使用者receiver把這條資料select後,再delete,這個select一下後再delete就是一個“消費”動作。

我們可以注意到上述的例子中我們的代碼中有這樣的一段: 

它代表的是我們的MQ消費端消費模式為“自動”,即一旦消費端從MQ中取到一條消息,這條消息會自動從隊列中删除。

ActiveMQ是一個分布式消息隊列,它自然支援“事務”型消息,我們可以舉一個例子

系統A和系統B是有一個事務的系統間“服務內建”,我們可以把它想成如下場景:

系統A先會do sth…然後發送消息給系統B,系統B拿到消息後do sth,如果在其中任意一個環節發生了Exception,那麼代表系統A與系統B之間的消息調用這一過程為“失敗”。

失敗要重發,重發的話那原來那條消息必須還能重新拿得到。

此時我們就需要使用事務性的消息了。而事務性的消息是在:

生産端和消費端在建立session時,需要:

session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);

下面來看一個實際例子。

此處其它代碼與普通式消息發送代碼相似,隻在以下幾處有不同,首先在取得session時會聲明事務開啟“true”。

然後在發送時會有一個動作:

相應的在catch(Exception)時需要

在我們的接收端的createSession時也需要把它設為“事務開啟”,此時請注意,生産和消費是在一個事務邊界中的。

然後在接收時會有一個動作:

注意:

如果在消費端的onMessage中沒有session.commit(),那麼這條消息可以正常被接收,但不會被消費,換句話説用戶端隻要不commit這條消息,這條消息可以被用戶端無限消費下去,直到commit(從MQ所persistent的DB中被删除)。

如果在消費斷遇到任何Exception時session.rollback()了,ActiveMQ會按照預設政策每隔1s會重發一次,重發6次如果還是失敗,則進入ActiveMQ的ActiveMQ.DLQ隊列,重發政策這個值可以設(稍後會給出)。

如果在生産端的try{}塊裡發生錯誤,導緻復原(沒有commit),會怎麼樣?消費隊列永遠拿不到這條被rollback的消息,因為這條資料還沒被插入KAHADB中呢。

再如果,消費端拿到了消息不commit也不rollback呢?那消費端重新開機後會再次拿到這條消息(因為始終取where status=‘未消費’取不到的原因,對吧?)

以上例子申明了對于destination這個隊列的重發機制為間隔100毫秒重發一次。

Apache ActiveMQ實戰(1)-基本安裝配置與消息類型ActiveMQ簡介ActiveMQ整體架構ActiveMQ的安裝配置 ActiveMQ與Spring內建ActiveMQ與Spring內建-示例講解簡單消息與事務型消息點對點,應答式消息
Apache ActiveMQ實戰(1)-基本安裝配置與消息類型ActiveMQ簡介ActiveMQ整體架構ActiveMQ的安裝配置 ActiveMQ與Spring內建ActiveMQ與Spring內建-示例講解簡單消息與事務型消息點對點,應答式消息
Apache ActiveMQ實戰(1)-基本安裝配置與消息類型ActiveMQ簡介ActiveMQ整體架構ActiveMQ的安裝配置 ActiveMQ與Spring內建ActiveMQ與Spring內建-示例講解簡單消息與事務型消息點對點,應答式消息

所謂點對點應答式消息和事務無關,它主要實作的是如:

生産端:我發給你一個消息了,在你收到并處理後請回複!因為我要根據你的回複内容再做處理

消費端:我收到你的消息了,我處理完了請查收我給你的回複

生産端:收到你的消息,88

Apache ActiveMQ實戰(1)-基本安裝配置與消息類型ActiveMQ簡介ActiveMQ整體架構ActiveMQ的安裝配置 ActiveMQ與Spring內建ActiveMQ與Spring內建-示例講解簡單消息與事務型消息點對點,應答式消息
Apache ActiveMQ實戰(1)-基本安裝配置與消息類型ActiveMQ簡介ActiveMQ整體架構ActiveMQ的安裝配置 ActiveMQ與Spring內建ActiveMQ與Spring內建-示例講解簡單消息與事務型消息點對點,應答式消息

其實也沒啥花頭,就是多了一個隊列(不要打我)

。。。。。。

關鍵在于代碼,代碼,不要隻重視表面嗎。。。要看内含的LA。。。

這兩個隊列其實:

一個Request

一個應答(也可以使用temp隊列來做應答隊列)

我們設立兩個程式:

發送端(生産端)内含一個MessageListener,用來收消費端的傳回消息

服務端(消費端)内含一個MessageListener,用來收生産端發過來的消息然後再異步傳回

而溝通生産端和消費端的這根“消息鍊”是兩個東西:

JMSCorrelationID

JMSReplyTo

JMSCorrelationID:

它就是一個随機不可重複的數字,以String型傳入API,也可以是GUID,它主要是被用來标示MQ 中每一條不同的消息用的一個唯一ID

它就是一個生産端用來接收消費端傳回消息的位址

RandomStringUtils

import org.apache.commons.lang.RandomStringUtils;

replyDest

replyDest = (Destination) context.getBean("replyDestination");

來看位于用戶端(生産端)的messageListener吧

其餘部分代碼(沒啥花頭,就是sender裡帶了一個messageListener):

此處的send()方法内有兩個參數,注意其用法

然後為這個消費端也加一個messageListener如:

Apache ActiveMQ實戰(1)-基本安裝配置與消息類型ActiveMQ簡介ActiveMQ整體架構ActiveMQ的安裝配置 ActiveMQ與Spring內建ActiveMQ與Spring內建-示例講解簡單消息與事務型消息點對點,應答式消息