ActiveMQ的幾種叢集配置。
Queue consumer clusters
此叢集讓多個消費者同時消費一個隊列,若某個消費者出問題無法消費資訊,則未消費掉的消息将被發給其他正常的消費者,結構圖如下:
Broker clusters
此種配置是一個消費者連接配接到多個broker叢集的中的一個broker,當該broker出問題時,消費者自動連接配接到其他一個正常的broker。消費者使用 failover:// 協定來連接配接broker。
broker之間的通過靜态發現(static discovery)和動态發現(dynamic discovery)來維持彼此發現,下面來介紹靜态發現和動态發現的機制:
靜态發現:
靜态發現通過配置固定的broker uri來發現彼此,配置文法如下:
例如:
1
<code>static</code><code>:(tcp:</code><code>//localhost:61616,tcp://remotehost:61617?trace=false,vm://localbroker)?initialReconnectDelay=100</code>
動态發現:
動态發現機制是在各個broker啟動時通過Fanout transport來發現彼此,配置舉例如下:
2
3
4
5
6
<code><broker name=</code><code>"foo"</code><code>></code>
<code> </code><code><transportConnectors></code>
<code> </code><code><transportConnector uri=</code><code>"tcp://localhost:0"</code> <code>discoveryUri=</code><code>"multicast://default"</code><code>/></code>
<code> </code><code></transportConnectors></code>
<code> </code><code>...</code>
<code></broker></code>
Networks of brokers
多個broker組成叢集,當其中一個broker的消費者出問題導緻消息堆積無法消費掉時,通過ActiveMQ支援的Network of Broker方案可将該broker堆積的消息轉發到其他有消費者的broker。
該方案主要有以下兩種配置方式:
1、為broker配置檔案配置networkConnector元素
2、使用發現機制互相探測broker
Here is an example of using the fixed list of URIs:
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<code><?</code><code>xml</code> <code>version="1.0" encoding="UTF-8"?></code>
<code> </code>
<code><</code><code>beans</code> <code>xmlns="http://activemq.org/config/1.0"></code>
<code> </code><code><</code><code>broker</code> <code>brokerName="receiver" persistent="false" useJmx="false"></code>
<code> </code><code><</code><code>networkConnectors</code><code>></code>
<code> </code><code><!-- Static discovery --></code>
<code> </code><code><</code><code>networkConnector</code> <code>uri="static:(tcp://localhost:62001)"/></code>
<code> </code><code><!-- MasterSlave Discovery --></code>
<code> </code><code><!--<networkConnector uri="masterslave:(tcp://host1:61616,tcp://host2:61616,tcp://..)"/> --></code>
<code> </code><code></</code><code>networkConnectors</code><code>></code>
<code> </code><code><</code><code>persistenceAdapter</code><code>></code>
<code> </code><code><</code><code>memoryPersistenceAdapter</code><code>/></code>
<code> </code><code></</code><code>persistenceAdapter</code><code>></code>
<code> </code><code><</code><code>transportConnectors</code><code>></code>
<code> </code><code><</code><code>transportConnector</code> <code>uri="tcp://localhost:62002"/></code>
<code> </code><code></</code><code>transportConnectors</code><code>></code>
<code> </code><code></</code><code>broker</code><code>></code>
<code></</code><code>beans</code><code>></code>
This example uses multicast discovery:
<code> </code><code><</code><code>broker</code> <code>name="sender" persistent="false" useJmx="false"></code>
<code> </code><code><</code><code>networkConnector</code> <code>uri="multicast://default"/></code>
<code> </code><code><</code><code>transportConnectors</code><code>></code>
<code> </code><code><</code><code>transportConnector</code> <code>uri="tcp://localhost:0" discoveryUri="multicast://default"/></code>
Master Slave
通過部署多個broker執行個體,一個master和多個slave關系的broker來達到高可用性,有三種方案:
1、Master-Slave
2、SharedFile System Master Slave
3、JDBCMaster Slave
第一種方案由于隻可以由兩個AMQ執行個體元件,實際應用場景并不廣泛;
第三種方案支援N個AMQ執行個體組網,但他的性能會受限于資料庫;
第二種方案同樣支援N個AMQ執行個體組網,基于kahadb存儲政策,亦可以部署在分布式檔案系統上,應用靈活、高效且安全。
Master Slave方案當其中一個broker啟動并拿到獨占鎖時自動成為master,其他後續的broker則一直等待鎖,當master當機釋放鎖時其他slave拿到獨占鎖則自動成為master,部署結構如下:
第二種方案的配置隻需修改config檔案夾下activemq.xml檔案,修改消息持久化使用的方案:
<code><</code><code>broker</code> <code>xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="D:/Platform/mq_share_file"></code>
<code> </code><code><</code><code>kahaDB</code> <code>directory="D:/Platform/mq_share_file/kahadb" enableIndexWriteAsync="true" enableJournalDiskSyncs="false"/></code>
<code> </code><code>...</code>
<code></</code><code>broker</code><code>></code>
消息生産者代碼:
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
<code>public</code> <code>class</code> <code>P2PSender {</code>
<code> </code><code>private</code> <code>static</code> <code>final</code> <code>String QUEUE = </code><code>"client1-to-client2"</code><code>;</code>
<code> </code><code>public</code> <code>static</code> <code>void</code> <code>main(String[] args) {</code>
<code> </code><code>// ConnectionFactory :連接配接工廠,JMS用它建立連接配接</code>
<code> </code><code>ConnectionFactory connectionFactory;</code>
<code> </code><code>// Connection :JMS用戶端到JMS Provider的連接配接</code>
<code> </code><code>Connection connection = </code><code>null</code><code>;</code>
<code> </code><code>// Session:一個發送或接收消息的線程</code>
<code> </code><code>Session session;</code>
<code> </code><code>// Destination :消息的目的地;消息發送給誰.</code>
<code> </code><code>Destination destination;</code>
<code> </code><code>// MessageProducer:消息發送者</code>
<code> </code><code>MessageProducer producer;</code>
<code> </code><code>// TextMessage message;</code>
<code> </code><code>// 構造ConnectionFactory執行個體對象,此處采用ActiveMq的實作</code>
<code> </code><code>connectionFactory = </code><code>new</code> <code>ActiveMQConnectionFactory(</code>
<code> </code><code>"failover:(tcp://localhost:61616?wireFormat.maxInactivityDuration=0,tcp://localhost:61617?wireFormat.maxInactivityDuration=0)"</code><code>);</code>
<code> </code><code>try</code> <code>{</code>
<code> </code><code>// 構造從工廠得到連接配接對象</code>
<code> </code><code>connection = connectionFactory.createConnection();</code>
<code> </code><code>// 啟動</code>
<code> </code><code>connection.start();</code>
<code> </code><code>// 擷取操作連接配接</code>
<code> </code><code>session = connection.createSession(</code><code>false</code><code>, Session.AUTO_ACKNOWLEDGE);</code>
<code> </code><code>destination = session.createQueue(QUEUE);</code>
<code> </code><code>// 擷取session,FirstQueue是一個伺服器的queue destination = session.createQueue("FirstQueue");</code>
<code> </code><code>// 得到消息生成者【發送者】</code>
<code> </code><code>producer = session.createProducer(destination);</code>
<code> </code><code>// 設定不持久化</code>
<code> </code><code>producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);</code>
<code> </code><code>// 構造消息</code>
<code> </code><code>sendMessage(session, producer);</code>
<code> </code><code>// session.commit();</code>
<code> </code><code>connection.close();</code>
<code> </code><code>} </code><code>catch</code> <code>(Exception e) {</code>
<code> </code><code>e.printStackTrace();</code>
<code> </code><code>} </code><code>finally</code> <code>{</code>
<code> </code><code>if</code> <code>(</code><code>null</code> <code>!= connection) {</code>
<code> </code><code>try</code> <code>{</code>
<code> </code><code>connection.close();</code>
<code> </code><code>} </code><code>catch</code> <code>(JMSException e) {</code>
<code> </code><code>e.printStackTrace();</code>
<code> </code><code>}</code>
<code> </code><code>}</code>
<code> </code><code>}</code>
<code> </code><code>}</code>
<code> </code><code>public</code> <code>static</code> <code>void</code> <code>sendMessage(Session session, MessageProducer producer) </code><code>throws</code> <code>Exception {</code>
<code> </code><code>for</code> <code>(</code><code>int</code> <code>i = </code><code>1</code><code>; i <= </code><code>1</code><code>; i++) {</code>
<code> </code><code>Date d = </code><code>new</code> <code>Date();</code>
<code> </code><code>TextMessage message = session.createTextMessage(</code><code>"ActiveMQ發送消息"</code> <code>+ i + </code><code>" "</code> <code>+ </code><code>new</code> <code>Date());</code>
<code> </code><code>System.out.println(</code><code>"發送消息:ActiveMQ發送的消息"</code> <code>+ i + </code><code>" "</code> <code>+ </code><code>new</code> <code>Date());</code>
<code> </code><code>producer.send(message);</code>
<code>}</code>
消息消費者代碼:
<code>public</code> <code>class</code> <code>P2PReceiver {</code>
<code> </code>
<code> </code><code>// 消費者,消息接收者</code>
<code> </code><code>MessageConsumer consumer;</code>
<code> </code><code>connectionFactory = </code><code>new</code> <code>ActiveMQConnectionFactory(</code><code>"failover:(tcp://localhost:61616?wireFormat.maxInactivityDuration=0,tcp://localhost:61617?wireFormat.maxInactivityDuration=0)"</code><code>);</code>
<code> </code><code>// 得到連接配接對象</code>
<code> </code><code>// 建立Queue</code>
<code> </code><code>consumer = session.createConsumer(destination);</code>
<code> </code><code>while</code> <code>(</code><code>true</code><code>) {</code>
<code> </code><code>TextMessage message = (TextMessage) consumer.receive();</code>
<code> </code><code>if</code> <code>(</code><code>null</code> <code>!= message) {</code>
<code> </code><code>System.out.println(</code><code>"收到消息"</code> <code>+ message.getText());</code>
<code> </code><code>try</code> <code>{</code>
<code> </code><code>if</code> <code>(</code><code>null</code> <code>!= connection)</code>
<code> </code><code>} </code><code>catch</code> <code>(Throwable ignore) {</code>
本文轉自邴越部落格園部落格,原文連結:http://www.cnblogs.com/binyue/p/5325945.html,如需轉載請自行聯系原作者