1、綜述
通過之前的文章,我們讨論了ActiveMQ的基本使用,包括單個ActiveMQ服務節點的性能特征,關鍵調整參數;我們還介紹了單個ActiveMQ節點上三種不同的持久化存儲方案,并讨論了這三種不同的持久化存儲方案的配置和性能特點。但是這還遠遠不夠,因為在生産環境中為了保證讓我們設計的消息服務方案能夠持續工作,我們還需要為消息中間件服務搭建叢集環境,進而在保證消息中間件服務可靠性和處理性能。
2、ActiveMQ多節點方案
叢集方案主要為了解決系統架構中的兩個關鍵問題:高可用和高性能。ActiveMQ服務的高可用性是指,在ActiveMQ服務性能不變、資料不丢失的前提下,確定當系統災難出現時ActiveMQ能夠持續提供消息服務,高可靠性方案最終目的是減少整個ActiveMQ停止服務的時間。
ActiveMQ服務的高性能是指,在保證ActiveMQ服務持續穩定性、資料不丢失的前提下,確定ActiveMQ叢集能夠在機關時間内吞吐更高數量的消息、確定ActiveMQ叢集處理單條消息的時間更短、確定ActiveMQ叢集能夠容納更多的用戶端穩定連接配接。
下面我們分别介紹如何通過多個ActiveMQ服務節點叢集方式,分别提供熱備方案和高性能方案。最後我們讨論如何将兩種方案結合在一起,最終形成在生成環境下使用的推薦方案。
2-2、ActiveMQ高性能方案
ActiveMQ的多節點叢集方案,主要有動态叢集和靜态叢集兩種方案。所謂動态叢集就是指,同時提供消息服務的ActiveMQ節點數量、位置(IP和端口)是不确定的,當某一個節點啟動後,會通過網絡多點傳播的方式向其他節點發送通知(同時接受其他節點的多點傳播資訊)。當網絡中其他節點收到多點傳播通知後,就會向這個節點發起連接配接,最終将新的節點加入ActiveMQ叢集;所謂靜态叢集是指同時提供消息服務的多個節點的位置(IP和端口)是确定的,每個節點不需要通過廣播的方式發現目标節點,隻需要在啟動時按照給定的位置進行連接配接。
- 靜态叢集方案
- 動态叢集方案
2-1-1、基于多點傳播(multicast)的節點發現
在使用動态叢集配置時,當某個ActiveMQ服務節點啟動後并不知道整個網絡中還存在哪些其他的服務節點。是以ActiveMQ叢集需要規定一種節點與節點間的發現機制,以保證能夠解決上述問題。ActiveMQ叢集中,使用“多點傳播”原理進行其他節點的發現。
多點傳播(multicast)基于UDP協定,它是指在一個可連通的網絡中,某一個資料報發送源向一組資料報接收目标進行操作的過程。在這個過程中,資料報發送者隻需要向這個多點傳播位址(一個D類IP)發送一個資料報,那麼加入這個多點傳播位址的所有接收者都可以收到這個資料報。多點傳播實作了網絡中單點到多點的高效資料傳送,能夠節約大量網絡帶寬,降低網絡負載。
在IP協定中,規定的D類IP位址為多點傳播位址。224.0.0.0~239.255.255.255這個範圍内的IP都是D類IP位址,其中有一些IP段是保留的有特殊含義的:
- 224.0.0.0~224.0.0.255:這個D類IP位址段為保留位址,不建議您在開發過程中使用,因為可能産生沖突。例如224.0.0.5這個多點傳播位址專供OSPF協定(是一種路由政策協定,用于找到最優路徑)使用的多點傳播位址;224.0.0.18這個多點傳播位址專供VRRP協定使用(VRRP協定是虛拟路由器備援協定)。
- 224.0.1.0~224.0.1.255:這個D類IP位址為公用多點傳播位址,用于在整個Internet網絡上進行多點傳播。除非您有頂級DNS的控制/改寫權限,否則不建議在區域網路内使用這個多點傳播位址斷。
- 239.0.0.0~239.255.255.255:這個D類IP位址段為推薦在區域網路内使用的多點傳播位址段。注意,如果要在區域網路内使用多點傳播功能,需要區域網路中的交換機/路由器支援多點傳播功能。幸運的是,目前市面上隻要不是太過低端的交換機/路由器,都支援多點傳播功能(多點傳播功能所使用的主要協定為IGMP協定,關于IGMP協定的細節就不再進行深入了)。
下面我們使用java語言,編寫一個區域網路内的多點傳播發送和接受過程。以便讓各位讀者對基于多點傳播的節點發現操作有一個直覺的了解。雖然ActiveMQ中關于節點發現的過程,要比以下的示例複雜得多,但是基本原理是不會改變的。
- 多點傳播資料報發送者:
package multicast;
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.MulticastSocket;
import java.util.Date;
public class SendMulticast {
public static void main(String[] args) throws Throwable {
// 多點傳播位址
InetAddress group = InetAddress.getByName("239.0.0.5");
// 多點傳播端口,同時也是UDP 資料報的發送端口
int port = ;
MulticastSocket mss = null;
// 建立一個用于發送/接收的MulticastSocket多點傳播套接字對象
mss = new MulticastSocket(port);
// 建立要發送的多點傳播資訊和UDP資料報
// 攜帶的資料内容,就是這個activeMQ服務節點用來提供Network Connectors的TCP/IP位址和端口等資訊
String message = "我是一個活動的activeMQ服務節點(節點編号:yyyyyyy),我的可用tcp資訊為:XXXXXXXXXX : ";
byte[] buffer2 = message.getBytes();
DatagramPacket dp = new DatagramPacket(buffer2, buffer2.length, group, port);
// 使用多點傳播套接字joinGroup(),将其加入到一個多點傳播
mss.joinGroup(group);
// 開始按照一定的周期向加入到224.0.0.5多點傳播位址的其他ActiveMQ服務節點進行廣播
Thread thread = Thread.currentThread();
while (!thread.isInterrupted()) {
// 使用多點傳播套接字的send()方法,将多點傳播資料包對象放入其中,發送多點傳播資料包
mss.send(dp);
System.out.println(new Date() + "發起多點傳播:" + message);
synchronized (SendMulticast.class) {
SendMulticast.class.wait();
}
}
mss.close();
}
}
- 多點傳播資料報接收者:
package multicast;
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.MulticastSocket;
/**
* 測試接收多點傳播資訊
* @author yinwenjie
*/
public class AcceptMulticast {
public static void main(String[] args) throws Throwable {
// 建立多點傳播套接字,并加入分組
MulticastSocket multicastSocket = new MulticastSocket();
// 注意,多點傳播位址和端口必須和發送者的一直,才能加入正确的組
InetAddress ad = InetAddress.getByName("239.0.0.5");
multicastSocket.joinGroup(ad);
// 準備接收可能的多點傳播信号
byte[] datas = new byte[];
DatagramPacket data = new DatagramPacket(datas, ,ad , );
Thread thread = Thread.currentThread();
// 開始接收多點傳播資訊,并列印出來
System.out.println(".....開始接收多點傳播資訊.....");
while(!thread.isInterrupted()) {
multicastSocket.receive(data);
int leng = data.getLength();
System.out.println(new String(data.getData() , , leng , "UTF-8"));
}
multicastSocket.close();
}
}
另外,我們之前講過的DUBBO架構中,也有基于“多點傳播”的發現/注冊管理。具體可參考DUBBO架構中的com.alibaba.dubbo.registry.multicast.MulticastRegistry類和其引用類(以下為MulticastRegistry類中,建立多點傳播套接字和接受多點傳播資料報的關鍵代碼段):
......
mutilcastAddress = InetAddress.getByName(url.getHost());
mutilcastPort = url.getPort() <= ? DEFAULT_MULTICAST_PORT : url.getPort();
mutilcastSocket = new MulticastSocket(mutilcastPort);
mutilcastSocket.setLoopbackMode(false);
mutilcastSocket.joinGroup(mutilcastAddress);
Thread thread = new Thread(new Runnable() {
public void run() {
byte[] buf = new byte[];
DatagramPacket recv = new DatagramPacket(buf, buf.length);
while (! mutilcastSocket.isClosed()) {
try {
mutilcastSocket.receive(recv);
String msg = new String(recv.getData()).trim();
int i = msg.indexOf('\n');
if (i > ) {
msg = msg.substring(, i).trim();
}
MulticastRegistry.this.receive(msg, (InetSocketAddress) recv.getSocketAddress());
Arrays.fill(buf, (byte));
} catch (Throwable e) {
if (! mutilcastSocket.isClosed()) {
logger.error(e.getMessage(), e);
}
}
}
}
}, "DubboMulticastRegistryReceiver");
thread.setDaemon(true);
thread.start();
......
2-1-2、橋接Network Bridges
為了實作ActiveMQ叢集的橫向擴充要求和高穩定性要求,ActiveMQ叢集提供了Network Bridges功能。通過Network Bridges功能,技術人員可以将多個ActiveMQ服務節點連接配接起來。并讓它們通過配置好的政策作為一個整體對外提供服務。
這樣的服務政策主要包括兩種:主/從模式和負載均衡模式。對于第一種政策我們會在後文進行讨論。本節我們要重點讨論的是基于Network Bridges的負載均衡模式。
2-1-3、動态Network Connectors
既然已經講述了ActiveMQ中的動态節點發現原理和ActiveMQ Network Bridges的概念,那麼關于ActiveMQ怎樣配置叢集的方式就是非常簡單的問題了。我們先來讨論如何進行基于多點傳播發現的ActiveMQ負載均衡模式的配置——動态網絡連接配接Network Connectors;再來讨論基于固定位址的負載均衡模式配置——靜态網絡連接配接Network Connectors。
要配置基于多點傳播發現的ActiveMQ負載均衡模式,其過程非常簡單。開發人員隻需要在每一個ActiveMQ服務節點的主配置檔案中(activemq.xml),添加/更改 以下配置資訊即可:
......
<transportConnectors>
<!-- 在transportConnector中增加discoveryUri屬性,表示這個transportConnector是要通過多點傳播告知其它節點的:使用這個transportConnector位置連接配接我 -->
<transportConnector name="auto" uri="auto+nio://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600&org.apache.activemq.transport.nio.SelectorManager.corePoolSize=20&org.apache.activemq.transport.nio.SelectorManager.maximumPoolSize=50&consumer.prefetchSize=5" discoveryUri="multicast://239.0.0.5" />
</transportConnectors>
......
<!-- 關鍵的networkConnector标簽, uri屬性标示為多點傳播發現-->
<networkConnectors>
<networkConnector uri="multicast://239.0.0.5" duplex="false"/>
</networkConnectors>
......
2-1-3-1:networkConnector标簽
如果使用ActiveMQ的多點傳播發現功能,請在networkConnector标簽的uri屬性中添加如下格式的資訊:
multicast://[多點傳播位址][:端口]
例如,您可以按照如下方式使用ActiveMQ預設的多點傳播位址來發現網絡種其他ActiveMQ服務節點:
#ActiveMQ叢集預設的多點傳播位址(239.255.2.3):
multicast://default
也可以按照如下方式,指定一個多點傳播位址——這在高安全級别的網絡中很有用,因為可能其他的多點傳播位址已經被管理者禁用。注意多點傳播位址隻能是D類IP位址段:
#使用多點傳播位址239.0.0.5
multicast://239.0.0.5
以下是通過抓包軟體獲得的的多點傳播UDP封包:
從上圖中我們可以獲得幾個關鍵資訊:
- 192.168.61.138和192.168.61.139這兩個IP位址分别按照一定的周期(1秒一次),向多點傳播位址239.0.0.5發送UDP資料報。以便讓在這個多點傳播位址的其它服務節點能夠感覺自己的存在
- 另外,以上UDP資料封包使用的端口是6155。您也可以更改這個端口資訊通過類似如下的方式:
#使用多點傳播位址239.0.0.5:19999
multicast://239.0.0.5:19999
- 每個UDP資料報中,包含的主要資訊包括本節點ActiveMQ的版本資訊,以及連接配接到自己所需要使用的host名字、協定名和端口資訊。類似如下:
2-1-3-2:transportConnector标簽的關聯設定
任何一個ActiveMQ服務節點A,要連接配接到另外的ActiveMQ服務節點,都需要使用目前節點A已經公布的transportConnector連接配接端口,例如以下配置中,能夠供其它服務節點進行連接配接的就隻有兩個transportConnector連接配接中的任意一個:
......
<transportConnectors>
<!-- 其它ActiveMQ服務節點,隻能使用以下三個連接配接協定和端口進行連接配接 -->
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="tcp" uri="tcp://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="nio" uri="nio://0.0.0.0:61618?maximumConnections=1000" />
<transportConnector name="auto" uri="auto://0.0.0.0:61617?maximumConnections=1000" />
</transportConnectors>
......
那麼要将哪一個連接配接方式通過UDP資料報向其他ActiveMQ節點進行公布,就需要在transportConnector标簽上使用discoveryUri屬性進行辨別,如下所示:
......
<transportConnectors>
......
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="auto" uri="auto+nio://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600" discoveryUri="multicast://239.0.0.5" />
</transportConnectors>
......
<networkConnectors>
<networkConnector uri="multicast://239.0.0.5"/>
</networkConnectors>
......
2-1-3-3:其他注意事項
- 關于防火牆:請記得關閉您Linux伺服器上對需要公布的IP和端口的限制;
- 關于hosts路由資訊:由于基于多點傳播的動态發現機制,能夠找到的是目标ActiveMQ服務節點的機器名,而不是直接找到的IP。是以請設定目前服務節點的hosts檔案,以便目前ActiveMQ節點能夠通過hosts檔案中的IP路由關系,得到機器名與IP的映射:
# hosts檔案 ...... 192.168.61.139 activemq1 192.168.61.138 activemq2 ......
- 關于哪些協定能夠被用于進行Network Bridges連接配接:根據筆者以往的使用經驗,隻有tcp頭的uri格式(openwire協定)能夠被用于Network Bridges連接配接;當然您可以使用auto頭,因為其相容openwire協定;另外,您還可以指定為附加nio頭。
2-1-4、靜态Network Connectors
相比于基于多點傳播發現方式的動态Network Connectors而言,雖然靜态Network Connectors沒有那樣靈活的橫向擴充性,但是卻可以适用于網絡環境受嚴格管理的情況。例如:管理者關閉了交換機/路由器的多點傳播功能、端口受到嚴格管控等等。
配置靜态Network Connectors的ActiveMQ叢集的方式也很簡單,隻需要更改networkConnectors标簽中的配置即可,而無需關聯改動transportConnectors标簽。但是配置靜态Network Connectors的ActiveMQ叢集時,需要注意非常關鍵的細節:每一個節點都要配置其他所有節點的連接配接位置。
為了示範配置過程,我們假設ActiveMQ叢集由兩個節點構成,分别是activemq1:192.168.61.138 和 activemq2:192.168.61.139。那麼配置情況如下所示:
- 192.168.61.138:需要配置activemq2的位置資訊以便進行連接配接:
......
<transportConnectors>
<transportConnector name="auto" uri="auto+nio://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600&consumer.prefetchSize=5"/>
</transportConnectors>
......
<!-- 請注意,一定需要192.168.61.139(activemq2)提供了這樣的連接配接協定和端口 -->
<networkConnectors>
<networkConnector uri="static:(auto+nio://192.168.61.139:61616)"/>
</networkConnectors>
......
- 192.168.61.139:需要配置activemq1的位置資訊以便進行連接配接:
......
<transportConnectors>
<transportConnector name="auto" uri="auto+nio://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600&consumer.prefetchSize=5"/>
</transportConnectors>
......
<!-- 請注意,一定需要192.168.61.138(activemq1)提供了這樣的連接配接協定和端口 -->
<networkConnectors>
<networkConnector uri="static:(auto+nio://192.168.61.138:61616)"/>
</networkConnectors>
......
同理,如果您的ActiveMQ叢集規劃中有三個ActiveMQ服務節點,那麼任何一個節點都應該配置其它兩個服務節點的連接配接方式。在配置格式中使用“,”符号進行分割:
......
<networkConnectors>
<networkConnector uri="static:(tcp://host1:61616,tcp://host2:61616,tcp://..)"/>
</networkConnectors>
......
以下是配置完成後可能的效果:
- 192.168.61.138(activemq1):
- 192.168.61.139(activemq2):
2-1-5、其他配置屬性
下表列舉了在networkConnector标簽中還可以使用的屬性以及其意義。請特别注意其中的duplex屬性。如果隻從字面意義了解該屬性,則被稱為“雙工模式”;如果該屬性為true,當這個節點使用Network Bridge連接配接到其它目标節點後,将強制目标也建立Network Bridge進行反向連接配接。其目的在于讓消息既能發送到目标節點,又可以通過目标節點接受消息,但實際上大多數情況下是沒有必要的,因為目标節點一般都會自行建立連接配接到本節點。是以,該duplex屬性的預設值為false。
屬性名稱 | 預設值 | 屬性意義 |
---|---|---|
name | bridge | 名稱 |
dynamicOnly | false | 如果為true, 持久訂閱被激活時才建立對應的網路持久訂閱。 |
decreaseNetworkConsumerPriority | false | 如果為true,網絡的消費者優先級降低為-5。如果為false,則預設跟本地消費者一樣為0. |
excludedDestinations | empty | 不通過網絡轉發的destination |
dynamicallyIncludedDestinations | empty | 通過網絡轉發的destinations,注意空清單代表所有的都轉發。 |
staticallyIncludedDestinations | empty | 比對的都将通過網絡轉發-即使沒有對應的消費者,如果為預設的“empty”,那麼說明所有都要被轉發 |
duplex | false | 已經進行詳細介紹的“雙工”屬性。 |
prefetchSize | 1000 | 設定網絡消費者的prefetch size參數。如果設定成0,那麼就像之前文章介紹過的那樣:消費者會自己輪詢消息。顯然這是不被允許的。 |
suppressDuplicateQueueSubscriptions | false | 如果為true, 重複的訂閱關系一産生即被阻止(V5.3+ 的版本中可以使用)。 |
bridgeTempDestinations | true | 是否廣播advisory messages來建立臨時destination。 |
alwaysSyncSend | false | 如果為true,非持久化消息也将使用request/reply方式代替oneway方式發送到遠端broker(V5.6+ 的版本中可以使用)。 |
staticBridge | false | 如果為true,隻有staticallyIncludedDestinations中配置的destination可以被處理(V5.6+ 的版本中可以使用)。 |
以下這些屬性,隻能在靜态Network Connectors模式下使用
屬性名稱 | 預設值 | 屬性意義 |
---|---|---|
initialReconnectDelay | 1000 | 重連之前等待的時間(ms) (如果useExponentialBackOff為false) |
useExponentialBackOff | true | 如果該屬性為true,那麼在每次重連失敗到下次重連之前,都會增大等待時間 |
maxReconnectDelay | 30000 | 重連之前等待的最大時間(ms) |
backOffMultiplier | 2 | 增大等待時間的系數 |
請注意這些屬性,并不是networkConnector标簽的屬性,而是在uri屬性中進行設定的,例如:
============
(接下文)