ActiveMQ叢集搭建
1.使用shared filesystem Master-Slave方式主從叢集
通過共享存儲目錄(kahaDB)來實作master和slave的主從資訊同步所有ActiveMQ的broker都在不斷地擷取共享目錄的控制權,哪個broker搶到了控制權,它就成為master,它将鎖定該目錄,其他broker就隻能成為slave。
當master主出現故障後,剩下的slave從将再進行争奪共享目錄的控制權,誰搶到共享目錄的控制權,誰就成為主,其他沒有搶到控制權的稱為從。
由于他們是基于共享目錄,是以當主出現故障後,其上沒有被消費的消息在接下來産生的新的master主中可以繼續進行消費。
這種方式用戶端通路的都是主,從隻是起到了一個備份通路的作用
實作步驟:
(這裡都是采用僞叢集部署)
- 先複制3個MQ
- 配置每個activeMQ的conf /activemq.xml檔案中的共享目錄
<persistenceAdapter>
<!--<kahaDB directory="${activemq.data}/kahadb"/>-->
<kahaDB directory="/opt/kahadb"/>
</persistenceAdapter>
修改完持久化目錄後,需要在/opt目錄下建立該目錄
3. 配置每個activeMQ的conf /activemq.xml檔案中的端口
因為是單機僞叢集部署,為了避免端口号的沖突,前三個位址端口+1,後兩個端口位址-1
第一個ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5673?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1882?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
第二個ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5674?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61615?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1881?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61612?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
第三個ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61619?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5675?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1880?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61611?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
maximumConnections 最大連接配接數;
wireFormat.maxFrameSize 表示一個完整消息的最大資料量,機關byte;
0.0.0.0表示任意ip
- 修改conf/jetty.xml檔案的jetty伺服器端口(管理控制台)
第一個ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8162"/>
</bean>
第二個ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8163"/>
</bean>
第三個ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8164"/>
</bean>
- 啟動三台ActiveMQ進行測試
2.使用shared database Master-Slave方式主從叢集
該方式與shared filesystem方式類似,隻是共享的存儲媒體由檔案系統改成了資料庫。
實作步驟:
- 安裝3個ActiveMQ
- 配置每個activeMQ的conf /activemq.xml檔案中的持久化擴充卡是jdbc資料庫方式
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds"/>
</persistenceAdapter>
-
配置每個資料庫連接配接池
注意:連接配接池的配置需要配置在的外面
<bean id="mysql-ds" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost:3306/activemq?useUnicode=true&characterEncoding=utf8&useSSL=false"/>
<property name="username" value="root"/>
<property name="password" value="123456"/>
</bean>
- 在每個ActiveMQ的lib目錄下加入mysql的驅動包和資料庫連接配接池Druid包
- 啟動MySQL資料庫,并建立activemq資料庫
-
配置每個activeMQ的conf /activemq.xml檔案中的端口
為了避免端口号的沖突,前三個位址端口+1,後兩個端口位址-1
第一個ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5673?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1882?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
第二個ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5674?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61615?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1881?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61612?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
第三個ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61619?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5675?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1880?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61611?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
- 修改conf/jetty.xml檔案的jetty伺服器端口(管理控制台)
第一個ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8162"/>
</bean>
第二個ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8163"/>
</bean>
第三個ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8164"/>
</bean>
- 啟動三台ActiveMQ,測試驗證
3.使用Replicated LevelDB Store方式主從叢集(常用)
基于可複制的LevelDB存儲方式的叢集,這種叢集方式是ActiveMQ5.9版本以後新增的特性,它使用ZooKeeper從一組broker中協調選擇一個broker作為master主,其他broker作為slave從的模式。所有slave從節點通過複制master主節點的消息來實作消息同步,當主出現故障後,沒有被消費的消息在從伺服器上也同步了一份,是以不會有消息的丢失。LevelDB 是 Google開發的一套用于持久化資料的高性能kv資料庫,ActiveMQ利用該資料庫進行資料的存儲。隻有master 接受用戶端連接配接,slave不接受用戶端連接配接,Master的所有存儲操作都将被複制到slaves。在這個模式中,需要有半數以上的broker是正常的,叢集才是可用的,超過半數broker故障,ZooKeeper的選舉算法将不能選擇master,進而導緻叢集不可用。
架構圖:
實作步驟:
- 安裝3個ActiveMQ
- 配置每個activeMQ的conf /activemq.xml檔案中的持久化擴充卡是replicatedLevelDB方式
<persistenceAdapter>
<replicatedLevelDB
replicas="3"
bind="tcp://0.0.0.0:0"
zkAddress="localhost:2181"/>
</persistenceAdapter>
參數說明:
replicas :叢集中存在的節點的數目
bind :當該節點成為master後,将使用該bind配置的ip和端口進行資料複制
zkAddress :ZooKeeper的位址
3. 啟動ZooKeeper伺服器
4. 啟動三台ActiveMQ,測試驗證
注意: 這種方式,不适合叢集太大,也就是activemq不能太多,因為多個activemq之間需要複制消息,這個比較耗資源,占用網絡,建議3、5台