天天看點

基于zookeeper+levelDB的ActiveMQ叢集

         ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個完全支援JMS1.1和J2EE 1.4規範的 JMS Provider實作,盡管JMS規範出台已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演着特殊的地位。------百度百科

       在實際的公司項目中如果隻是使用ActiveMQ的單點的話,可用性是很差的,如果不小心MQ的機器當機的話整個系統都會癱瘓,是以在實際的項目中需要使用叢集部署來提升可用性。ActiveMQ的高可用的部署方式在5.9.0版之前是用Master-Slave方式,Master-Slave方式又有三種可用的形式,分别是

1)shared filesystem Master-Slave(共享檔案系統),

2)shared database Master-Slave方式(共享資料庫方式),和第1)的差別不大,主要就是共享的存儲媒體由檔案系統改             成了資料庫而已

3)Replicated LevelDB Store方式

         這種主備方式是ActiveMQ5.9以後才新增的特性,使用ZooKeeper協調選擇一個node作為master。被選擇的master broker node                   開啟并接受用戶端連接配接。

今天我要講的就是使用第三種方式來做ActiveMQ的叢集,這裡需要注意的一點是Master-Slave方式隻能提升可用性,也就是其中一個節點挂了之後,其他節點中的一個會馬上被推舉成Master來提供服務,這個推舉的工程就由zookeeper來完成,而不能完成負載均衡的作用。ActiveMQ的負載均衡可以由它的Broker-Cluster的部署方式來實作。是以如果既要提升可用性還要做到負載均衡,就得結合這兩種部署方式了,這個我也想在以後有空的時候試一下。其實我也是剛學,很多東西都不懂,也都是在網上找相關的部落格和資料來學習,然後自己動手嘗試。

好了,開始來着手完成這個任務吧。

在整個過程中我隻用到了3台機器來做這個實驗,3台也是最少的要求了。

一、安裝和搭建zookeeper叢集

      這部分我在之前得部落格中已經完成了,是以可以去這裡看

      CentOS7下安裝配置zookeeper叢集

二、安裝和搭建ActiveMQ叢集

      1、安裝:安裝ActiveMQ非常簡單,我就不在這裡将了,可以去網上搜尋一下

      2、配置:其實這裡隻需要稍微修改一下ActiveMQ的配置檔案即可,主要改動如下

<!--
        <persistenceAdapter>
            <kahaDB directory="${activemq.data}/kahadb"/>
        </persistenceAdapter>
-->
          
        <persistenceAdapter>
           <replicatedLevelDB
              directory="${activemq.data}/leveldb"
              replicas="3"
              bind="tcp://0.0.0.0:0"
              zkAddress="192.168.202.66:2181,192.168.202.67:2181,192.168.202.68:2181"
              hostname="192.168.202.66"
              sync="local_disk"
              zkPath="/activemq/leveldb-stores"/>
       </persistenceAdapter> 
           

            将預設的kahaDB的持久化方式改成本文中的zookeeper+levelDB的方式

     這樣就完成了這個整個叢集的部署了,現在隻需要把3台機器中的zookeeper和ActiveMQ啟動起來就可以了,需要先啟動zookeeper。

   啟動成功之後可以在浏覽器中輸入:http://192.168.202.68:8161/admin/index.jsp

   去管理我們的ActiveMQ

三、測試:我們需要寫一個測試項目來測試這個ActiveMQ叢集是不是真的可以做到避免單點故障的能力,我的整個項目也是在另外一個部落格中下載下傳下來的,隻是說他之前隻是在單點上去做的測試,我把它改成了在叢集上去做測試而已,在此非常感謝那位作者,在文章最後我會把本文中左右參考部落格的位址都注明上。那麼從單點到叢集,我所做的改動不多,僅僅是把配置檔案做了修改。

 1、ActiveMQ的連接配接配置檔案

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:context="http://www.springframework.org/schema/context"
	xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd">

	<!-- 配置架構包中自動注入 -->
	<!-- 配置ConnectionFactory -->
	<!-- 真正可以産生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->
	<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
		<property name="brokerURL"
			value="failover://(tcp://192.168.202.66:61616,tcp://192.168.202.67:61616,tcp://192.168.202.68:61616)?initialReconnectDelay=10000" />
		<property name="userName" value="admin" />
		<property name="password" value="admin" />
	</bean>
	<!-- 連接配接池配置 -->
	<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
		destroy-method="stop">
		<property name="connectionFactory" ref="targetConnectionFactory"></property>
		<property name="maxConnections" value="10"></property>
	</bean>
	
     <!-- 需要通過 JmsTemplate來完成發送消息-->
   	<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="pooledConnectionFactory">
		</property>
		<property name="explicitQosEnabled" value="true" />
		<!-- deliveryMode, priority, timeToLive 的開關,要生效,必須配置為true,預設false -->
		<property name="deliveryMode" value="2" />
		<!-- 發送模式 DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久 -->
		<property name="pubSubDomain" value="false" />
	</bean>
	
	<!-- 和上面的JmsTemplate的差別是,上面的是針對點對點的,這個是針對廣播消息的 -->
	<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
		<property name="connectionFactory" ref="pooledConnectionFactory">
		</property>
		<property name="explicitQosEnabled" value="true" />
		<!-- deliveryMode, priority, timeToLive 的開關,要生效,必須配置為true,預設false -->
		<property name="deliveryMode" value="2" />
		<!-- 發送模式 DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久 -->
		<property name="pubSubDomain" value="true" />
	</bean>

    <!-- 點對點消息的目的地隊列 -->
	<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
		<constructor-arg index="0" value="queue"/>
	</bean>
	<!-- 廣播消息的目的地隊列 -->
	<bean id="destination2" class="org.apache.activemq.command.ActiveMQTopic">
		<constructor-arg index="0" value="topic"/>
	</bean>
	
	<!-- 4個消息消費者(消息監聽器) -->
	<bean id="queueReceiver1" class="com.tgb.SpringActivemq.mq.consumer.queue.QueueReceiver1" />
	<bean id="queueReceiver2" class="com.tgb.SpringActivemq.mq.consumer.queue.QueueReceiver2" />
	
	<bean id="topicReceiver1" class="com.tgb.SpringActivemq.mq.consumer.topic.TopicReceiver1" />
	<bean id="topicReceiver2" class="com.tgb.SpringActivemq.mq.consumer.topic.TopicReceiver2" />
	
	<!-- 消息監聽器對應的容器 -->
	<bean id="consumerListenerContainer"
		class="org.springframework.jms.listener.DefaultMessageListenerContainer" primary="true">
		<property name="connectionFactory" ref="pooledConnectionFactory" />
		<property name="destination" ref="destination" />
		<property name="messageListener" ref="queueReceiver1" />
	</bean>
	
	<bean id="consumerListenerContainer1"
		class="org.springframework.jms.listener.DefaultMessageListenerContainer" primary="true">
		<property name="connectionFactory" ref="pooledConnectionFactory" />
		<property name="destination" ref="destination" />
		<property name="messageListener" ref="queueReceiver2" />
	</bean>
	
	<bean id="consumerListenerContainer2"
		class="org.springframework.jms.listener.DefaultMessageListenerContainer" primary="true">
		<property name="connectionFactory" ref="pooledConnectionFactory" />
		<property name="destination" ref="destination2" />
		<property name="messageListener" ref="topicReceiver1"/>
		<property name="pubSubDomain" value="true" /> 
        <property name="pubSubNoLocal" value="false" /> 
	</bean>
	
 	<bean id="consumerListenerContainer3"
		class="org.springframework.jms.listener.DefaultMessageListenerContainer" primary="true">
		<property name="connectionFactory" ref="pooledConnectionFactory" />
		<property name="destination" ref="destination2" />
		<property name="messageListener" ref="topicReceiver2" />
		<property name="pubSubDomain" value="true" /> 
        <property name="pubSubNoLocal" value="false" />
	</bean> 
</beans>  
           

2、測試:3台機器中的ActiveMQ隻會有一個MQ可以被用戶端連接配接使用,在測試時可以把Master關掉,然後在重試用戶端消息發送和消費還可以正常使用,則說明叢集搭建正常。

在浏覽器中輸入:http://127.0.0.1:8090/ActiveMQSpring/index.jsp   進行測試操作

基于zookeeper+levelDB的ActiveMQ叢集
基于zookeeper+levelDB的ActiveMQ叢集
基于zookeeper+levelDB的ActiveMQ叢集

      3、源碼下載下傳

   自此,這個叢集搭建已經完成。

   本文參考到的部落格如下,非常感謝他們的付出:

   http://my.oschina.net/xiaohui249/blog/313028

   http://www.open-open.com/lib/view/open1400126457817.html

   http://blog.csdn.net/jiuqiyuliang/article/details/48758203

   如果其他作者又發現我引用了你們的文章而沒有貼出你們的位址連結,請聯系我,謝謝!