天天看點

ActiveMQ安裝及使用

1 安裝環境

1、需要jdk

2、安裝Linux系統。生産環境都是Linux系統。

2 安裝步驟

第一步: 把ActiveMQ 的壓縮包上傳到Linux系統。

第二步:解壓縮。

第三步:關閉防火牆

臨時關閉:service iptables stop 

寫入配置檔案,開機也不啟動 chkconfig iptables off

第四步:啟動activemq服務

使用bin目錄下的activemq指令啟動:

#  ./activemq start    啟動服務
#  ./activemq stop     停止服務
#  ./activemq status   檢視服務的狀态      

注意:如果ActiveMQ整合spring時,一定不要使用activemq-all-5.12.0.jar包。建議使用5.11.2

3 管理背景

1 進入管理背景

http://192.168.25.168:8161

ActiveMQ安裝及使用
ActiveMQ安裝及使用

登入進來之後的界面如下:

ActiveMQ安裝及使用

4  linux activemq 出現無法通路的解決

ActiveMQ安裝及使用

出現上面錯誤的原因是因為機器名和ip位址沒有對應上。

解決方式:

1.用cat  /etc/sysconfig/network 命名檢視主機名

# cat  /etc/sysconfig/network      
ActiveMQ安裝及使用

2.檢視hosts檔案

# cat  /etc/hosts     檢視hosts檔案      
ActiveMQ安裝及使用

如果你的機器名(我的是admin)沒有在hosts檔案裡面,就需要将你的機器名加入到hosts的檔案裡面。

也可以修改你的機器名為hosts檔案裡面已有的項

 3. 修改完成之後重新開機activemq服務就可以通路了。

5 Queue

1 Producer

生産者:生産消息,發送端。

把jar包添加到工程中。使用5.11.2版本的jar包。

ActiveMQ安裝及使用

第一步:建立ConnectionFactory對象,需要指定服務端ip及端口号。

第二步:使用ConnectionFactory對象建立一個Connection對象。

第三步:開啟連接配接,調用Connection對象的start方法。

第四步:使用Connection對象建立一個Session對象。

第五步:使用Session對象建立一個Destination對象(topic、queue),此處建立一個Queue對象。

第六步:使用Session對象建立一個Producer對象。

第七步:建立一個Message對象,建立一個TextMessage對象。

第八步:使用Producer對象發送消息。

第九步:關閉資源。

package cn.e3mall.activeMQ;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;

/**
 * 測試activeMQ
 * 
 * @title:TestActiveMQ
 * @description:
 * @author jepson
 * @date 2018年6月8日 下午10:50:14
 * @version 1.0
 */
public class TestActiveMQ {

    @Test
    public void testQueueProducer() throws Exception {
        // 第一步:建立ConnectionFactory對象,需要指定服務端ip及端口号。
        // brokerURL伺服器的ip及端口号,端口号是61616,web服務的端口号是8161
        String brokerURL="tcp://192.168.25.131:61616";
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
        
        // 第二步:使用ConnectionFactory對象建立一個Connection對象。
        Connection connection = connectionFactory.createConnection();
        
        // 第三步:開啟連接配接,調用Connection對象的start方法。
        connection.start();
        
        // 第四步:使用Connection對象建立一個Session對象。
        /*
         *第一個參數:是否開啟事務。
         *如果true開啟事務,第二個參數無意義。一般不開啟事務。事務的意思就是沒有發出去就重發
         *
         *第二個參數:當第一個參數為false時,第二個參數才有意義。
         *表示消息的應答模式。1、自動應答2、手動應答。一般是自動應答。
         */
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        
        // 第五步:使用Session對象建立一個Destination對象(topic、queue),此處建立一個Queue對象。
        //參數:隊列的名稱。
        Queue queue = session.createQueue("test-queue");
        
        // 第六步:使用Session對象建立一個Producer對象。
        MessageProducer producer = session.createProducer(queue);
        
        // 第七步:建立一個Message對象,建立一個TextMessage對象。
        TextMessage message = session.createTextMessage("ActiveMQ helloworld,This is my first activemq test");
        
        // 第八步:使用Producer對象發送消息。
        producer.send(message);
        
        // 第九步:關閉資源。
        producer.close();
        session.close();
        connection.close();
    }
}      

運作測試程式,然後檢視web服務端

ActiveMQ安裝及使用

點選test-queue

ActiveMQ安裝及使用
ActiveMQ安裝及使用

2 Consumer

消費者:接收消息。

第一步:建立一個ConnectionFactory對象。

第二步:從ConnectionFactory對象中獲得一個Connection對象。

第三步:開啟連接配接。調用Connection對象的start方法。

第五步:使用Session對象建立一個Destination對象。和發送端保持一緻queue,并且隊列的名稱一緻。

第六步:使用Session對象建立一個Consumer對象。

第七步:接收消息。

第八步:列印消息。

第九步:關閉資源

@Test
public void testQueueConsumer() throws Exception {
    // 第一步:建立一個ConnectionFactory對象連接配接MQ伺服器。
    String brokerURL = "tcp://192.168.25.131:61616";
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
    
    // 第二步:從ConnectionFactory對象中獲得一個Connection對象。
    Connection connection = connectionFactory.createConnection();
    
    // 第三步:開啟連接配接。調用Connection對象的start方法。
    connection.start();
    
    // 第四步:使用Connection對象建立一個Session對象。
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
    // 第五步:使用Session對象建立一個Destination對象。和發送端保持一緻queue,并且隊列的名稱一緻。
    Queue queue = session.createQueue("test-queue");
    
    // 第六步:使用Session對象建立一個Consumer對象。
    MessageConsumer consumer = session.createConsumer(queue);
    
    // 第七步:接收消息。
    consumer.setMessageListener(new MessageListener() {
        @Override
        public void onMessage(Message message) {
            try {
                TextMessage textMessage = (TextMessage) message;
                String text =null;
                //取消息的内容
                text = textMessage.getText();
                // 第八步:列印消息。
                System.out.println(text);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    });
    //等待鍵盤輸入
    System.in.read();
    // 第九步:關閉資源
    consumer.close();
    session.close();
    connection.close();
}      

運作上面的測試程式,然後可以在控制看到消費者拿到了消息内容

ActiveMQ安裝及使用

我們再去檢視一個web服務端

ActiveMQ安裝及使用

6 Topic

使用步驟:

第五步:使用Session對象建立一個Destination對象(topic、queue),此處建立一個Topic對象。

@Test
public void testTopicProducer() throws Exception {
    // 第一步:建立ConnectionFactory對象,需要指定服務端ip及端口号。
    // brokerURL伺服器的ip及端口号,端口号是61616,web服務的端口号是8161
    String brokerURL="tcp://192.168.25.131:61616";
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
    
    // 第二步:使用ConnectionFactory對象建立一個Connection對象。
    Connection connection = connectionFactory.createConnection();
    
    // 第三步:開啟連接配接,調用Connection對象的start方法。
    connection.start();
    
    // 第四步:使用Connection對象建立一個Session對象。
    /*
     *第一個參數:是否開啟事務。
     *如果true開啟事務,第二個參數無意義。一般不開啟事務。事務的意思就是沒有發出去就重發
     *
     *第二個參數:當第一個參數為false時,第二個參數才有意義。
     *表示消息的應答模式。1、自動應答2、手動應答。一般是自動應答。
     */
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
    // 第五步:使用Session對象建立一個Destination對象(topic、queue),此處建立一個Topic對象。
    //參數:隊列的名稱。
    Topic topic = session.createTopic("test-topic");
    
    // 第六步:使用Session對象建立一個Producer對象。
    MessageProducer producer = session.createProducer(topic);
    
    // 第七步:建立一個Message對象,建立一個TextMessage對象。
    /*
     * TextMessage message = new ActiveMQTextMessage(); message.setText(
     * "hello activeMq,this is my first test.");
     */
    TextMessage message = session.createTextMessage("hello activeMq,this is my first test.");
    
    // 第八步:使用Producer對象發送消息。
    producer.send(message);
    
    // 第九步:關閉資源。
    producer.close();
    session.close();
    connection.close();
}      

第五步:使用Session對象建立一個Destination對象。和發送端保持一緻topic,并且話題的名稱一緻。

@Test
public void testTopicConsumer() throws Exception {
    // 第一步:建立一個ConnectionFactory對象連接配接MQ伺服器。
    String brokerURL = "tcp://192.168.25.131:61616";
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
    
    // 第二步:從ConnectionFactory對象中獲得一個Connection對象。
    Connection connection = connectionFactory.createConnection();
    
    // 第三步:開啟連接配接。調用Connection對象的start方法。
    connection.start();
    
    // 第四步:使用Connection對象建立一個Session對象。
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
    // 第五步:使用Session對象建立一個Destination對象。和發送端保持一緻topic,并且話題的名稱一緻。
    Topic topic = session.createTopic("test-topic");
    
    // 第六步:使用Session對象建立一個Consumer對象。
    MessageConsumer consumer = session.createConsumer(topic);
    
    // 第七步:接收消息。
    consumer.setMessageListener(new MessageListener() {
        @Override
        public void onMessage(Message message) {
            try {
                TextMessage textMessage = (TextMessage) message;
                String text =null;
                //取消息的内容
                text = textMessage.getText();
                // 第八步:列印消息。
                System.out.println(text);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    });
    System.out.println("topic的消費端01。。。。。");
    //等待鍵盤輸入
    System.in.read();
    // 第九步:關閉資源
    consumer.close();
    session.close();
    connection.close();
}      

3 測試

1 測試一

運作producer生成者

ActiveMQ安裝及使用

可以看到消息發送了一個,沒有發送到任何消費者。消息不會持久化,直接丢失掉了,點選test-topic也看不到發送的消息内容。

2 測試二

運作consumer消費者3次,相當于啟動了三個消費者。分别修改輸出,用以區分.。

System.out.println("topic的消費端01。。。。。");
System.out.println("topic的消費端02。。。。。");
System.out.println("topic的消費端03。。。。。");
      

然後運作producer

會發現三個消費者都能夠收到消息

ActiveMQ安裝及使用
ActiveMQ安裝及使用
ActiveMQ安裝及使用

 然後我們檢視一下web服務端

ActiveMQ安裝及使用

可以看到有3個消費者,發送了2次消息【測試一發送一次,是以總的是兩次】,3條消息已出隊。

 如果我們在運作一次producer,會顯示發送了消息3次,6條消息已出隊

ActiveMQ安裝及使用

7 Quene 和 Topic的差別

Quene:點對點,消息會被持久化

Topic:廣播,消息不會被持久化

8 activemq和spring的整合

第一步:引入相關的jar包

<!-- ActiveMQ用戶端依賴的jar包 -->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
</dependency>


<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context-support</artifactId>
</dependency>      

第二步:producer生産者的spring配置檔案

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:context="http://www.springframework.org/schema/context" 
       xmlns:p="http://www.springframework.org/schema/p"
       xmlns:aop="http://www.springframework.org/schema/aop" 
       xmlns:tx="http://www.springframework.org/schema/tx"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans 
                           http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
                           http://www.springframework.org/schema/context 
                           http://www.springframework.org/schema/context/spring-context-4.2.xsd
                           http://www.springframework.org/schema/aop 
                           http://www.springframework.org/schema/aop/spring-aop-4.2.xsd 
                           http://www.springframework.org/schema/tx 
                           http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
                           http://www.springframework.org/schema/util 
                           http://www.springframework.org/schema/util/spring-util-4.2.xsd">
    <!-- 真正可以産生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://47.93.53.127:61616" />
    </bean>
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标ConnectionFactory對應真實的可以産生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>
    <!-- 配置生産者 -->
    <!-- Spring提供的JMS工具類,它可以進行消息發送、接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 -->
        <property name="connectionFactory" ref="connectionFactory" />
    </bean>
    <!--這個是隊列目的地,點對點的 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>spring-queue</value>
        </constructor-arg>
    </bean>
    <!--這個是主題目的地,一對多的 -->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="topic" />
    </bean>
</beans>      

第三步:生産者的代碼

package cn.e3mall.activeMQ;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

/**
 * 測試spring整合activemq
 * @title:TestSpringActiveMQ
 * @description:
 * @author jepson
 * @date 2018年6月10日 下午5:41:47
 * @version 1.0
 */
public class TestSpringActiveMQ {

    @Test
    public void testSpringActiveMq() throws Exception {
        //初始化spring容器
        ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
        //從spring容器中獲得JmsTemplate對象
        JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
        //從spring容器中取Destination對象
        Destination destination = (Destination) applicationContext.getBean("queueDestination");
        //使用JmsTemplate對象發送消息。
        jmsTemplate.send(destination, new MessageCreator() {
            
            @Override
            public Message createMessage(Session session) throws JMSException {
                //建立一個消息對象并傳回
                TextMessage textMessage = session.createTextMessage("spring activemq queue message");
                return textMessage;
            }
        });
    }
}      

第四步:MessageListener代碼實作

package cn.e3mall.search.listener;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 測試消息接收
 * @title:MyMessageListener
 * @description:
 * @author jepson
 * @date 2018年6月10日 下午7:01:04
 * @version 1.0
 */
public class MyMessageListener implements MessageListener{

    @Override
    public void onMessage(Message message) {
        try {
            TextMessage textMessage = (TextMessage) message;
            //取消息内容
            String text = textMessage.getText();
            System.out.println(text);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}      

第五步:consumer消費者的spring配置檔案

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:context="http://www.springframework.org/schema/context" 
       xmlns:p="http://www.springframework.org/schema/p"
       xmlns:aop="http://www.springframework.org/schema/aop" 
       xmlns:tx="http://www.springframework.org/schema/tx"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans 
                        http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
                        http://www.springframework.org/schema/context 
                        http://www.springframework.org/schema/context/spring-context-4.2.xsd
                        http://www.springframework.org/schema/aop 
                        http://www.springframework.org/schema/aop/spring-aop-4.2.xsd 
                        http://www.springframework.org/schema/tx 
                        http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
                        http://www.springframework.org/schema/util 
                        http://www.springframework.org/schema/util/spring-util-4.2.xsd">
    <!-- 真正可以産生Connection的ConnectionFactory,由對應的 JMS服務廠商提供 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://47.93.53.127:61616" />
    </bean>
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标ConnectionFactory對應真實的可以産生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>
    <!-- 配置生産者 -->
    <!-- Spring提供的JMS工具類,它可以進行消息發送、接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 -->
        <property name="connectionFactory" ref="connectionFactory" />
    </bean>
    <!--這個是隊列目的地,點對點的 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>spring-queue</value>
        </constructor-arg>
    </bean>
    <!--這個是主題目的地,一對多的 -->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="topic" />
    </bean>
    <!-- 接收消息 -->
    <!-- 配置監聽器 -->
    <bean id="myMessageListener" class="cn.e3mall.search.listener.MyMessageListener" />
    <!-- 消息監聽容器 -->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="queueDestination" />
        <property name="messageListener" ref="myMessageListener" />
    </bean>
</beans>      

第六步:測試消費

package cn.e3mall.activemq;

import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 *
 * @title:MessageConsumer
 * @description:
 * @author jepson
 * @date 2018年6月10日 下午7:07:29
 * @version 1.0
 */
public class MessageConsumer {
    @Test
    public void testQueueConsumer() throws Exception {
        //初始化spring容器
        ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
        //等待
        System.in.read();
    }
}      

繼續閱讀