天天看點

關于ActiveMQ的幾種叢集配置

ActiveMQ的幾種叢集配置。

Queue consumer clusters

此叢集讓多個消費者同時消費一個隊列,若某個消費者出問題無法消費資訊,則未消費掉的消息将被發給其他正常的消費者,結構圖如下:

關于ActiveMQ的幾種叢集配置

Broker clusters

此種配置是一個消費者連接配接到多個broker叢集的中的一個broker,當該broker出問題時,消費者自動連接配接到其他一個正常的broker。消費者使用 failover:// 協定來連接配接broker。

broker之間的通過靜态發現(static discovery)和動态發現(dynamic discovery)來維持彼此發現,下面來介紹靜态發現和動态發現的機制:

靜态發現:

靜态發現通過配置固定的broker uri來發現彼此,配置文法如下:

例如:

1

<code>static</code><code>:(tcp:</code><code>//localhost:61616,tcp://remotehost:61617?trace=false,vm://localbroker)?initialReconnectDelay=100</code>

  

動态發現:

動态發現機制是在各個broker啟動時通過Fanout transport來發現彼此,配置舉例如下:

2

3

4

5

6

<code>&lt;broker name=</code><code>"foo"</code><code>&gt;</code>

<code>  </code><code>&lt;transportConnectors&gt;</code>

<code>    </code><code>&lt;transportConnector uri=</code><code>"tcp://localhost:0"</code> <code>discoveryUri=</code><code>"multicast://default"</code><code>/&gt;</code>

<code>  </code><code>&lt;/transportConnectors&gt;</code>

<code>  </code><code>...</code>

<code>&lt;/broker&gt;</code>

Networks of brokers

多個broker組成叢集,當其中一個broker的消費者出問題導緻消息堆積無法消費掉時,通過ActiveMQ支援的Network of Broker方案可将該broker堆積的消息轉發到其他有消費者的broker。

該方案主要有以下兩種配置方式:

1、為broker配置檔案配置networkConnector元素

2、使用發現機制互相探測broker

Here is an example of using the fixed list of URIs:

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

<code>&lt;?</code><code>xml</code> <code>version="1.0" encoding="UTF-8"?&gt;</code>

<code> </code> 

<code>&lt;</code><code>beans</code> <code>xmlns="http://activemq.org/config/1.0"&gt;</code>

<code>  </code><code>&lt;</code><code>broker</code> <code>brokerName="receiver" persistent="false" useJmx="false"&gt;</code>

<code>    </code><code>&lt;</code><code>networkConnectors</code><code>&gt;</code>

<code>      </code><code>&lt;!-- Static discovery --&gt;</code>

<code>      </code><code>&lt;</code><code>networkConnector</code> <code>uri="static:(tcp://localhost:62001)"/&gt;</code>

<code>      </code><code>&lt;!-- MasterSlave Discovery --&gt;</code>

<code>      </code><code>&lt;!--&lt;networkConnector uri="masterslave:(tcp://host1:61616,tcp://host2:61616,tcp://..)"/&gt; --&gt;</code>

<code>    </code><code>&lt;/</code><code>networkConnectors</code><code>&gt;</code>

<code>    </code><code>&lt;</code><code>persistenceAdapter</code><code>&gt;</code>

<code>      </code><code>&lt;</code><code>memoryPersistenceAdapter</code><code>/&gt;</code>

<code>    </code><code>&lt;/</code><code>persistenceAdapter</code><code>&gt;</code>

<code>   </code><code>&lt;</code><code>transportConnectors</code><code>&gt;</code>

<code>      </code><code>&lt;</code><code>transportConnector</code> <code>uri="tcp://localhost:62002"/&gt;</code>

<code>    </code><code>&lt;/</code><code>transportConnectors</code><code>&gt;</code>

<code>  </code><code>&lt;/</code><code>broker</code><code>&gt;</code>

<code>&lt;/</code><code>beans</code><code>&gt;</code>

This example uses multicast discovery:

<code>  </code><code>&lt;</code><code>broker</code> <code>name="sender" persistent="false" useJmx="false"&gt;</code>

<code>      </code><code>&lt;</code><code>networkConnector</code> <code>uri="multicast://default"/&gt;</code>

<code>  </code><code>&lt;</code><code>transportConnectors</code><code>&gt;</code>

<code>      </code><code>&lt;</code><code>transportConnector</code> <code>uri="tcp://localhost:0" discoveryUri="multicast://default"/&gt;</code>

Master Slave

通過部署多個broker執行個體,一個master和多個slave關系的broker來達到高可用性,有三種方案:

1、Master-Slave

2、SharedFile System Master Slave

3、JDBCMaster Slave

第一種方案由于隻可以由兩個AMQ執行個體元件,實際應用場景并不廣泛;

第三種方案支援N個AMQ執行個體組網,但他的性能會受限于資料庫;

第二種方案同樣支援N個AMQ執行個體組網,基于kahadb存儲政策,亦可以部署在分布式檔案系統上,應用靈活、高效且安全。

Master Slave方案當其中一個broker啟動并拿到獨占鎖時自動成為master,其他後續的broker則一直等待鎖,當master當機釋放鎖時其他slave拿到獨占鎖則自動成為master,部署結構如下:

關于ActiveMQ的幾種叢集配置

第二種方案的配置隻需修改config檔案夾下activemq.xml檔案,修改消息持久化使用的方案:

<code>&lt;</code><code>broker</code> <code>xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="D:/Platform/mq_share_file"&gt;</code>

<code>            </code><code>&lt;</code><code>kahaDB</code> <code>directory="D:/Platform/mq_share_file/kahadb" enableIndexWriteAsync="true" enableJournalDiskSyncs="false"/&gt;</code>

<code>    </code><code>...</code>

<code>&lt;/</code><code>broker</code><code>&gt;</code>

消息生産者代碼:

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

<code>public</code> <code>class</code> <code>P2PSender {</code>

<code>    </code><code>private</code> <code>static</code> <code>final</code> <code>String QUEUE = </code><code>"client1-to-client2"</code><code>;</code>

<code>    </code><code>public</code> <code>static</code> <code>void</code> <code>main(String[] args) {</code>

<code>        </code><code>// ConnectionFactory :連接配接工廠,JMS用它建立連接配接</code>

<code>        </code><code>ConnectionFactory connectionFactory;</code>

<code>        </code><code>// Connection :JMS用戶端到JMS Provider的連接配接</code>

<code>        </code><code>Connection connection = </code><code>null</code><code>;</code>

<code>        </code><code>// Session:一個發送或接收消息的線程</code>

<code>        </code><code>Session session;</code>

<code>        </code><code>// Destination :消息的目的地;消息發送給誰.</code>

<code>        </code><code>Destination destination;</code>

<code>        </code><code>// MessageProducer:消息發送者</code>

<code>        </code><code>MessageProducer producer;</code>

<code>        </code><code>// TextMessage message;</code>

<code>        </code><code>// 構造ConnectionFactory執行個體對象,此處采用ActiveMq的實作</code>

<code>        </code><code>connectionFactory = </code><code>new</code> <code>ActiveMQConnectionFactory(</code>

<code>                </code><code>"failover:(tcp://localhost:61616?wireFormat.maxInactivityDuration=0,tcp://localhost:61617?wireFormat.maxInactivityDuration=0)"</code><code>);</code>

<code>        </code><code>try</code> <code>{</code>

<code>            </code><code>// 構造從工廠得到連接配接對象</code>

<code>            </code><code>connection = connectionFactory.createConnection();</code>

<code>            </code><code>// 啟動</code>

<code>            </code><code>connection.start();</code>

<code>            </code><code>// 擷取操作連接配接</code>

<code>            </code><code>session = connection.createSession(</code><code>false</code><code>, Session.AUTO_ACKNOWLEDGE);</code>

<code>            </code><code>destination = session.createQueue(QUEUE);</code>

<code>            </code><code>// 擷取session,FirstQueue是一個伺服器的queue destination = session.createQueue("FirstQueue");</code>

<code>            </code><code>// 得到消息生成者【發送者】</code>

<code>            </code><code>producer = session.createProducer(destination);</code>

<code>            </code><code>// 設定不持久化</code>

<code>            </code><code>producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);</code>

<code>            </code><code>// 構造消息</code>

<code>            </code><code>sendMessage(session, producer);</code>

<code>            </code><code>// session.commit();</code>

<code>            </code><code>connection.close();</code>

<code>        </code><code>} </code><code>catch</code> <code>(Exception e) {</code>

<code>            </code><code>e.printStackTrace();</code>

<code>        </code><code>} </code><code>finally</code> <code>{</code>

<code>            </code><code>if</code> <code>(</code><code>null</code> <code>!= connection) {</code>

<code>                </code><code>try</code> <code>{</code>

<code>                    </code><code>connection.close();</code>

<code>                </code><code>} </code><code>catch</code> <code>(JMSException e) {</code>

<code>                    </code><code>e.printStackTrace();</code>

<code>                </code><code>}</code>

<code>            </code><code>}</code>

<code>        </code><code>}</code>

<code>    </code><code>}</code>

<code>    </code><code>public</code> <code>static</code> <code>void</code> <code>sendMessage(Session session, MessageProducer producer) </code><code>throws</code> <code>Exception {</code>

<code>        </code><code>for</code> <code>(</code><code>int</code> <code>i = </code><code>1</code><code>; i &lt;= </code><code>1</code><code>; i++) {</code>

<code>            </code><code>Date d = </code><code>new</code> <code>Date();</code>

<code>            </code><code>TextMessage message = session.createTextMessage(</code><code>"ActiveMQ發送消息"</code> <code>+ i + </code><code>"  "</code> <code>+ </code><code>new</code> <code>Date());</code>

<code>            </code><code>System.out.println(</code><code>"發送消息:ActiveMQ發送的消息"</code> <code>+ i + </code><code>"  "</code> <code>+ </code><code>new</code> <code>Date());</code>

<code>            </code><code>producer.send(message);</code>

<code>}</code>

消息消費者代碼:

<code>public</code> <code>class</code> <code>P2PReceiver {</code>

<code>    </code> 

<code>        </code><code>// 消費者,消息接收者</code>

<code>        </code><code>MessageConsumer consumer;</code>

<code>        </code><code>connectionFactory = </code><code>new</code> <code>ActiveMQConnectionFactory(</code><code>"failover:(tcp://localhost:61616?wireFormat.maxInactivityDuration=0,tcp://localhost:61617?wireFormat.maxInactivityDuration=0)"</code><code>);</code>

<code>            </code><code>// 得到連接配接對象</code>

<code>            </code><code>// 建立Queue</code>

<code>            </code><code>consumer = session.createConsumer(destination);</code>

<code>            </code><code>while</code> <code>(</code><code>true</code><code>) {</code>

<code>                </code><code>TextMessage message = (TextMessage) consumer.receive();</code>

<code>                </code><code>if</code> <code>(</code><code>null</code> <code>!= message) {</code>

<code>                    </code><code>System.out.println(</code><code>"收到消息"</code> <code>+ message.getText());</code>

<code>            </code><code>try</code> <code>{</code>

<code>                </code><code>if</code> <code>(</code><code>null</code> <code>!= connection)</code>

<code>            </code><code>} </code><code>catch</code> <code>(Throwable ignore) {</code>

本文轉自邴越部落格園部落格,原文連結:http://www.cnblogs.com/binyue/p/5325945.html,如需轉載請自行聯系原作者

上一篇: ENUM幫助類
下一篇: 維數災難

繼續閱讀