本文介紹一個筆者在實際工作中的實施的基于ActiveMQ的一個高穩定,可擴充的異步消息系統。
ActiveMQ是一個成熟的基于Java語言的開源消息系統,在實際應用中被大量使用。ActiveMQ在系統穩定性,系統的容錯和擴充等方面都有很多成熟的方案,也有很多開源的管理工具,是部署異步消息系統的一個很好的選擇。
ActiveMQ工作機制
ActiveMQ有兩種消息使用方式:
l Queue模式:Producer發出到Queue裡的消息,隻能由一個Consumer來使用。
l Topic模式:Producer發送到Topic裡的消息,會傳送到Subscribe這個Topic的每一個Consumer。
Producer發出的消息有兩種Delivery模式。
l Persistent:Broker需要儲存消息,然後把消息發送到Consumer。如果Broker崩潰後,重新啟動後儲存的消息可以重新發送給Consumer。
l NonPersistent:Broker不需要儲存消息,直接把消息發送到Consumer。
ActiveMQ可以通過Networks of Brokers方式将多個Broker組成一個Cluster。Producer和Consumer可以任意的連接配接到該Cluster中的任意一個Broker。Producer發送的消息可以通過Cluser傳送到需要的Consumer。
ActiveMQ提供了Master Slave機制實作Broker的HA,有以下幾種方式:
l JDBC Master Slave
l Shared File System Master Slave
l KahaDB Replication(ZooKeeper experimental)
同一個Broker,隻能有一個Master來傳送消息。當Master崩潰後,其他的一個Slave可以作為Master。采用HA的模式,會增加系統的複雜性,也會影響系統的性能。
方案
實際部署中,ActiveMQ采用Queue的消息使用模式。Producer發送的消息使用Persistent的Delivery模式。
在兩個node上部署ActiveMQ的Broker,通過ActiveMQ的Networks of Brokers方式來組成Cluster。
系統裡的消息應用Instance通過ActiveMQ提供的client類庫采用failover TCP的方式随機的接入到ActiveMQ的cluster中。正常情況下,消息應用Instance可以通過ActiveMQ的cluster機制正常通信。如果某個ActiveMQ的node崩潰後,client會自動檢測到該情況,切換到另一個ActiveMQ的node。
由于本系統隻采用Queue的消息工作方式,而且消息的傳送采用persistent的模式。如果一個node崩潰後,重新啟動後,儲存的消息還可以重新發送到Consumer。對Broker,就不采用Master/Slave的HA模式,避免增加系統的複雜性和降低系統的性能。

配置
ActiveMQ的Broker的配置如下。
#Broker 1:
<!-- The transport connectors ActiveMQ will listen to -->
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
</transportConnectors>
<!--
The store and forward broker networks ActiveMQ will listen to.
We'll leave it empty as duplex network will be configured by another broker.
-->
<networkConnectors>
</networkConnectors>
#Broker 2:
The store and forward broker networks ActiveMQ will listen to
Create a duplex connector to the first broker
<networkConnector uri="static:(tcp://{Broker1Ip}:61616)" duplex="true"/>