最近公司做項目需要用到jms消息服務,最終選擇了apache的activemq這個開源消息總線,但是在activemq的官網沒能找到既滿足高可用又滿足叢集部署的方案,是以探索了其叢集+高可用部署方案,經試用驗證ok,這裡和大家分享下。
activemq 是apache出品,最流行的,能力強勁的開源消息總線。完全支援jms1.1和j2ee 1.4規範的 jms provider實作
1. 多種語言和協定編寫用戶端。語言: java, c, c++, c#, ruby, perl, python, php。應用協定: openwire,stomp rest,ws notification,xmpp,amqp
2. 完全支援jms1.1和j2ee 1.4規範 (持久化,xa消息,事務)
3. 對spring的支援,activemq可以很容易内嵌到使用spring的系統裡面去,而且也支援spring2.0的特性
4. 通過了常見j2ee伺服器(如 geronimo,jboss 4, glassfish,weblogic)的測試,其中通過jca 1.5 resourceadaptors的配置,可以讓activemq可以自動的部署到任何相容j2ee1.4商業伺服器上
5. 支援多種傳送協定:in-vm,tcp,ssl,nio,udp,jgroups,jxta
6. 支援通過jdbc和journal提供高速的消息持久化
7. 從設計上保證了高性能的叢集,用戶端-伺服器,點對點
8. 支援ajax
9. 支援與axis的整合
10. 可以很容易得調用内嵌jms provider,進行測試
1、下載下傳
2、安裝
如果是在windows系統中運作,可以直接解壓apache-activemq-5.9.0-bin.zip,并運作bin目錄下的activemq.bat檔案,此時使用的是預設的服務端口:61616和預設的console端口:8161。
如果是在linux或unix下運作,在bin目錄下執行指令:./activemq setup
3、修改activemq的服務端口和console端口
a、修改服務端口:打開conf/activemq.xml檔案,修改以下紅色字型部分
<transportconnectors>
<transportconnector name="openwire" uri="tcp://10.42.220.72:61618"discoveryuri="multicast://default"/>
</transportconnectors>
b、修改console的位址和端口:打開conf/jetty.xml檔案,修改以下紅色字型部分
<bean id="jettyport"class="org.apache.activemq.web.webconsoleport"init-method="start">
<property name="port" value="8162"/>
</bean>
需要提前将activemq解壓包中的lib目錄下的相關包引入到工程中,再進行如下編碼:
1、發送端的代碼:
importjavax.jms.connection;
importjavax.jms.connectionfactory;
importjavax.jms.deliverymode;
importjavax.jms.destination;
importjavax.jms.messageproducer;
importjavax.jms.session;
importjavax.jms.textmessage;
importorg.apache.activemq.activemqconnection;
importorg.apache.activemq.activemqconnectionfactory;
publicclass sender {
privatestaticfinalintsend_number = 5;
publicstaticvoid main(string[] args) {
// connectionfactory:連接配接工廠,jms用它建立連接配接
connectionfactory connectionfactory;
// connection:jms用戶端到jms provider的連接配接
connection connection = null;
// session:一個發送或接收消息的線程
session session;
// destination:消息的目的地;消息發送給誰.
destination destination;
// messageproducer:消息發送者
messageproducer producer;
// textmessage message;
//構造connectionfactory執行個體對象,此處采用activemq的實作jar
connectionfactory = new activemqconnectionfactory(
activemqconnection.default_user,
activemqconnection.default_password,
"failover:(tcp://10.42.220.72:61617,tcp://10.42.220.72:61618)");
try {
//構造從工廠得到連接配接對象
connection =connectionfactory.createconnection();
//啟動
connection.start();
//擷取操作連接配接
session = connection.createsession(true, session.auto_acknowledge);
//擷取session
destination = session.createqueue("firstqueue");
//得到消息生成者【發送者】
producer =session.createproducer(destination);
//設定不持久化,此處學習,實際根據項目決定
producer.setdeliverymode(deliverymode.non_persistent);
//構造消息,此處寫死,項目就是參數,或者方法擷取
sendmessage(session, producer);
session.commit();
} catch (exception e) {
e.printstacktrace();
} finally {
if (null != connection)
connection.close();
} catch (throwable ignore) {
}
publicstaticvoid sendmessage(session session,messageproducer producer)
throws exception {
for (int i = 1; i <=send_number; i++) {
textmessage message = session
.createtextmessage("activemq發送的消息" + i);
//發送消息到目的地方
system.out.println("發送消息:" + "activemq 發送的消息" + i);
producer.send(message);
2、接收端代碼:
importjavax.jms.messageconsumer;
publicclass receive {
//消費者,消息接收者
messageconsumer consumer;
session = connection.createsession(false,
session.auto_acknowledge);
consumer =session.createconsumer(destination);
while (true) {
//設定接收者接收消息的時間,為了便于測試,這裡誰定為100s
textmessage message =(textmessage) consumer.receive(100000);
if (null != message) {
system.out.println("收到消息" + message.gettext());
} else {
break;
3、通過監控檢視消息堆棧的記錄:
單點的activemq作為企業應用無法滿足高可用和叢集的需求,是以activemq提供了master-slave、broker cluster等多種部署方式,但通過分析多種部署方式之後我認為需要将兩種部署方式相結合才能滿足我們公司分布式和高可用的需求,是以後面就重點将解如何将兩種部署方式相結合。
1)shared filesystem master-slave部署方式
主要是通過共享存儲目錄來實作master和slave的熱備,所有的activemq應用都在不斷地擷取共享目錄的控制權,哪個應用搶到了控制權,它就成為master。
多個共享存儲目錄的應用,誰先啟動,誰就可以最早取得共享目錄的控制權成為master,其他的應用就隻能作為slave。
2)shared database master-slave方式
與shared filesystem方式類似,隻是共享的存儲媒體由檔案系統改成了資料庫而已。
3)replicated leveldb store方式
這種主備方式是activemq5.9以後才新增的特性,使用zookeeper協調選擇一個node作為master。被選擇的master broker node開啟并接受用戶端連接配接。
其他node轉入slave模式,連接配接master并同步他們的存儲狀态。slave不接受用戶端連接配接。所有的存儲操作都将被複制到連接配接至master的slaves。
如果master死了,得到了最新更新的slave被允許成為master。fialed node能夠重新加入到網絡中并連接配接master進入slave mode。所有需要同步的disk的消息操作都将等待存儲狀态被複制到其他法定節點的操作完成才能完成。是以,如果你配置了replicas=3,那麼法定大小是(3/2)+1=2. master将會存儲并更新然後等待 (2-1)=1個slave存儲和更新完成,才彙報success。至于為什麼是2-1,熟悉zookeeper的應該知道,有一個node要作為觀擦者存在。
單一個新的master被選中,你需要至少保障一個法定node線上以能夠找到擁有最新狀态的node。這個node将會成為新的master。是以,推薦運作至少3個replica nodes,以防止一個node失敗了,服務中斷。
前面的master-slave的方式雖然能解決多服務熱備的高可用問題,但無法解決負載均衡和分布式的問題。broker-cluster的部署方式就可以解決負載均衡的問題。
broker-cluster部署方式中,各個broker通過網絡互相連接配接,并共享queue。當broker-a上面指定的queue-a中接收到一個message處于pending狀态,而此時沒有consumer連接配接broker-a時。如果cluster中的broker-b上面由一個consumer在消費queue-a的消息,那麼broker-b會先通過内部網絡擷取到broker-a上面的message,并通知自己的consumer來消費。
1)static broker-cluster部署
在activemq.xml檔案中靜态指定broker需要建立橋連接配接的其他broker:
1、 首先在broker-a節點中添加networkconnector節點:
<networkconnectors>
<networkconnector uri="static:(tcp:// 0.0.0.0:61617)"duplex="false"/>
</networkconnectors>
2、 修改broker-a節點中的服務提供端口為61616:
<transportconnectorname="openwire"uri="tcp://0.0.0.0:61616?maximumconnections=1000&amp;wireformat.maxframesize=104857600"/>
3、 在broker-b節點中添加networkconnector節點:
<networkconnector uri="static:(tcp:// 0.0.0.0:61616)"duplex="false"/>
4、 修改broker-a節點中的服務提供端口為61617:
<transportconnectorname="openwire"uri="tcp://0.0.0.0:61617?maximumconnections=1000&amp;wireformat.maxframesize=104857600"/>
5、分别啟動broker-a和broker-b。
2)dynamic broker-cluster部署
在activemq.xml檔案中不直接指定broker需要建立橋連接配接的其他broker,由activemq在啟動後動态查找:
<networkconnectoruri="multicast://default"
dynamiconly="true"
networkttl="3"
prefetchsize="1"
decreasenetworkconsumerpriority="true" />
2、修改broker-a節點中的服務提供端口為61616:
<transportconnectorname="openwire"uri="tcp://0.0.0.0:61616? " discoveryuri="multicast://default"/>
3、在broker-b節點中添加networkconnector節點:
4、修改broker-b節點中的服務提供端口為61617:
<transportconnectorname="openwire"uri="tcp://0.0.0.0:61617" discoveryuri="multicast://default"/>
5、啟動broker-a和broker-b
可以看到master-slave的部署方式雖然解決了高可用的問題,但不支援負載均衡,broker-cluster解決了負載均衡,但當其中一個broker突然宕掉的話,那麼存在于該broker上處于pending狀态的message将會丢失,無法達到高可用的目的。
由于目前activemq官網上并沒有一個明确的将兩種部署方式相結合的部署方案,是以我嘗試者把兩者結合起來部署:
1、部署的配置修改
這裡以broker-a + broker-b建立cluster,broker-c作為broker-b的slave為例:
1)首先在broker-a節點中添加networkconnector節點:
<networkconnector uri="masterslave:(tcp://0.0.0.0:61617,tcp:// 0.0.0.0:61618)" duplex="false"/>
2)修改broker-a節點中的服務提供端口為61616:
3)在broker-b節點中添加networkconnector節點:
4)修改broker-b節點中的服務提供端口為61617:
5)修改broker-b節點中的持久化方式:
<persistenceadapter>
<kahadb directory="/localhost/kahadb"/>
</persistenceadapter>
6)在broker-c節點中添加networkconnector節點:
7)修改broker-c節點中的服務提供端口為61618:
<transportconnectorname="openwire"uri="tcp://0.0.0.0:61618?maximumconnections=1000&amp;wireformat.maxframesize=104857600"/>
8)修改broker-b節點中的持久化方式:
9)分别啟動broker-a、broker-b、broker-c,因為是broker-b先啟動,是以“/localhost/kahadb”目錄被lock住,broker-c将一直處于挂起狀态,當人為停掉broker-b之後,broker-c将擷取目錄“/localhost/kahadb”的控制權,重新與broker-a組成cluster提供服務。