天天看點

ActiveMQ安裝啟動

ActiveMQ是面向消息中間件(Message-oriented middleware),是用于以分布式應用或系統中的異步、松耦合、可靠、可擴充和安全通信的一類軟體。總體思想是它作為消息發送器和消息接收器之間的消息中介,這種中介提供了一個全新水準的松耦合。

JMS 叫做 Java 消息服務(Java Message Service),是 Java 平台上有關面向 MOM 的技術規範,旨在通過提供标準的産生、發送、接收和處理消息的 API 簡化企業應用的開發,類似于 JDBC 和關系型資料庫通信方式的抽象。

首先到網上下載下傳activemq,網站:http://activemq.apache.org/download-archives.html

下載下傳完後解壓後檔案結構:

ActiveMQ安裝啟動

打開doc指令,進入到activemq的bin目錄,輸入activemq.bat,回車:

ActiveMQ安裝啟動
ActiveMQ安裝啟動

打開浏覽器通路網站http://localhost:8161/admin,使用者名密碼預設是admin,activemq便啟動好了

ActiveMQ安裝啟動
ActiveMQ安裝啟動

接下來我們用idea進行測試:

首先搭建好項目,在pom.xml中導入所需jar包:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.szxs</groupId>
    <artifactId>AcviteMQQueueDemo1</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-client -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
            <version>5.8.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>4.2.9.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>4.2.9.RELEASE</version>
        </dependency>

    </dependencies>
</project>
           

activemq中分為兩種模式:

第一種:queue,隊列模式,一對一消息發送和接受(先發送,後接受)

第二張:topic,訂閱模式,多對多消息發送和接受(先接受,後發送)

下面先來測試queue模式,分為三類(監聽器,發送消息,接受消息):

ActiveMQ安裝啟動

queue監聽器listener代碼:

package com.szxs.queue.listener;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 點對點消息接收者 使用Listener 監聽方式 在實際項目開發中使用比較多
 */
public class QueueReceiver_Listener {
    // tcp 位址 伺服器器端位址
    public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;  // 其值為 "tcp://localhost:61616";
    // 目标位址,在ActiveMQ管理者控制台建立 http://localhost:8161/admin/queues.jsp中可以查詢到發送的mq消息
    public static final String DESTINATION = "xs.mq.queue";
    //測試連接配接使用預設的使用者名
    public static final String DEFAULT_USER = ActiveMQConnection.DEFAULT_USER;//預設為null
    //測試連接配接使用預設的密碼
    public static final String DEFAULT_PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//預設為null

    public static void run() throws Exception {

        QueueConnection connection = null;
        QueueSession session = null;
        try {
            // 1、建立連結工廠
            QueueConnectionFactory factory = new ActiveMQConnectionFactory(QueueReceiver_Listener.DEFAULT_USER, QueueReceiver_Listener.DEFAULT_PASSWORD,QueueReceiver_Listener.BROKER_URL);
            // 2、通過工廠建立一個連接配接
            connection = factory.createQueueConnection();
            // 3、啟動連接配接
            connection.start();
            // 4、建立一個session會話
            session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 5、建立一個消息隊列
            Queue queue = session.createQueue(DESTINATION);
            // 建立消息接收者
            javax.jms.QueueReceiver receiver = session.createReceiver(queue);

            //使用内部類為消息接收者加載相應的Listener監聽
            receiver.setMessageListener(new MessageListener() {
                //重寫onMessage方法
                public void onMessage(Message msg) {
                    if (msg != null) {
                        TextMessage textMessage = (TextMessage) msg;
                        try {
                            System.out.println("接收#" + textMessage.getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            // 休眠10s再關閉 接收生産者發送的全部的10條消息
            // 需要注意的是這裡使用sleep會使目前正在執行的線程進入休眠狀态
            // 也就是QueueReceiver_Listener這個類進入休眠狀态了,而接收者的監聽器仍然會繼續執行的哦。
            Thread.sleep(1000 * 10);

            // 送出會話
            session.commit();

        } catch (Exception e) {
            throw e;
        } finally {
            // 關閉釋放資源
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }

    public static void main(String[] args) throws Exception {
        QueueReceiver_Listener.run();
    }
}
           

queue發送消息sender代碼:

package com.szxs.queue.send;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 點對點消息發送者
 */
public class QueueSender {
    // 發送次數
    public static final int SEND_NUM = 10;
    // tcp 位址 伺服器器端位址
    public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;  // 其值為 "tcp://localhost:61616";
    // 目标位址,在ActiveMQ管理者控制台建立 http://localhost:8161/admin/queues.jsp中可以查詢到發送的mq消息
    public static final String DESTINATION = "xs.mq.queue";
    //測試連接配接使用預設的使用者名
    public static final String DEFAULT_USER = ActiveMQConnection.DEFAULT_USER;//預設為null
    //測試連接配接使用預設的密碼
    public static final String DEFAULT_PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//預設為null

    /**
     * 發送消息
     * @param session
     * @param sender
     * @throws Exception
     */
    public static void sendMessage(QueueSession session, javax.jms.QueueSender sender) throws Exception {
        for (int i = 0; i < SEND_NUM; i++) {

            String message = "發送第" + (i + 1) + "條消息";
            TextMessage textMessage = session.createTextMessage(message);
            System.out.println(textMessage.getText());
            sender.send(textMessage);
        }
    }

    /**
     * 建立連接配接并發送消息
     * @throws Exception
     */
    public static void run() throws Exception {

        //點對點隊列連接配接
        QueueConnection connection = null;
        //點對點會話Session
        QueueSession session = null;
        try {
            // 1、建立連結工廠
            QueueConnectionFactory factory = new ActiveMQConnectionFactory(QueueSender.DEFAULT_USER, QueueSender.DEFAULT_PASSWORD, QueueSender.BROKER_URL);
            // 2、通過工廠建立一個連接配接
            connection = factory.createQueueConnection();
            // 3、啟動連接配接
            connection.start();
            // 4、建立一個session會話
            session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 5、建立一個消息隊列
            Queue queue = session.createQueue(DESTINATION);
            // 6、建立消息發送者
            javax.jms.QueueSender sender = session.createSender(queue);
            // 設定持久化模式
            sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            sendMessage(session, sender);
            // 送出會話
            session.commit();

        } catch (Exception e) {
            throw e;
        } finally {
            // 關閉釋放資源
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }

    public static void main(String[] args) throws Exception {
        QueueSender.run();
    }
}
           

queue接受消息receive代碼:

package com.szxs.queue.receive;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 點對點消息接收者  直接Receive 方式
 */
public class QueueReceiver_Receive {
    // 接收消息的個數
    public static final int Receive_NUM = 10;
    // tcp 位址 伺服器器端位址
    public static final String BROKER_URL =ActiveMQConnection.DEFAULT_BROKER_URL;  // 其值為 "tcp://localhost:61616";
    // 目标位址,在ActiveMQ管理者控制台建立 http://localhost:8161/admin/queues.jsp中可以查詢到發送的mq消息
    public static final String DESTINATION = "xs.mq.queue";
    //測試連接配接使用預設的使用者名
    public static final String DEFAULT_USER = ActiveMQConnection.DEFAULT_USER;//預設為null
    //測試連接配接使用預設的密碼
    public static final String DEFAULT_PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//預設為null

    public static void run() throws Exception {

        QueueConnection connection = null;
        QueueSession session = null;
        try {
            // 1、建立連結工廠
            QueueConnectionFactory factory = new ActiveMQConnectionFactory(QueueReceiver_Receive.DEFAULT_USER, QueueReceiver_Receive.DEFAULT_PASSWORD,QueueReceiver_Receive.BROKER_URL);
            // 2、通過工廠建立一個連接配接
            connection = factory.createQueueConnection();
            // 3、啟動連接配接
            connection.start();
            // 4、建立一個session會話
            session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 5、建立一個消息隊列
            Queue queue = session.createQueue(DESTINATION);
            // 建立消息接收者
            javax.jms.QueueReceiver receiver = session.createReceiver(queue);
            // 直接Receive 方式 接收消息
            for(int i=0;i<QueueReceiver_Receive.Receive_NUM;i++){
                TextMessage textMessage=(TextMessage) receiver.receive();
                if(textMessage!=null)
                    System.out.println("接收#" + textMessage.getText());
            }
            // 送出會話
            session.commit();

        } catch (Exception e) {
            throw e;
        } finally {
            // 關閉釋放資源
            if (session != null) {
                // 關閉會話
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }

    public static void main(String[] args) throws Exception {
        QueueReceiver_Receive.run();
    }
}
           

然後分别執行queue代碼(發送消息—>接受消息—>監聽器),浏覽器按下圖點選,這就是queue隊列模式

ActiveMQ安裝啟動

下面測試topic模式,分為三類(監聽器,發送消息,接受消息):

ActiveMQ安裝啟動

topic監聽器listener代碼:

package com.szxs.topic.listener;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 釋出訂閱式消息接收者
 */
public class TopicReceiver_Listener {
    // tcp 位址 伺服器器端位址
    public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;  // 其值為 "tcp://localhost:61616";
    // 目标位址,在ActiveMQ管理者控制台建立 http://localhost:8161/admin/topics.jsp中可以查詢到發送的mq消息
    public static final String DESTINATION = "xs.mq.topic";
    //測試連接配接使用預設的使用者名
    public static final String DEFAULT_USER = ActiveMQConnection.DEFAULT_USER;//預設為null
    //測試連接配接使用預設的密碼
    public static final String DEFAULT_PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//預設為null


    public static void run() throws Exception {

        TopicConnection connection = null;
        TopicSession session = null;
        try {
            // 1、建立連結工廠
            TopicConnectionFactory factory = new ActiveMQConnectionFactory(TopicReceiver_Listener.DEFAULT_USER, TopicReceiver_Listener.DEFAULT_PASSWORD, TopicReceiver_Listener.BROKER_URL);
            // 2、通過工廠建立一個連接配接
            connection = factory.createTopicConnection();
            // 3、啟動連接配接
            connection.start();
            // 4、建立一個session會話
            session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 5、建立一個消息隊列
            Topic topic = session.createTopic(DESTINATION);
            // 6、建立消息制作者
            TopicSubscriber subscriber = session.createSubscriber(topic);

            //使用監聽器的方式訂閱消息
            subscriber.setMessageListener(new MessageListener() {
                public void onMessage(Message msg) {
                    if (msg != null) {
                        TextMessage textMessage = (TextMessage) msg;
                        try {
                            System.out.println("接收#" + textMessage.getText());
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            // 休眠100s再關閉 接收生産者發送的全部的10條消息
            // 需要注意的是這裡使用sleep會使目前正在執行的線程進入休眠狀态
            // 也就是TopicReceiver_Listener這個類進入休眠狀态了,而接收者的監聽器仍然會繼續執行的哦。
            Thread.sleep(1000 *100);

            // 送出會話
            session.commit();

        } catch (Exception e) {
            throw e;
        } finally {
            // 關閉釋放資源
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }

    public static void main(String[] args) throws Exception {
        TopicReceiver_Listener.run();
    }
}
           

topic發送消息sender代碼:

package com.szxs.topic.send;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 釋出訂閱式消息發送者
 */
public class TopicProducer {
    // 發送次數
    public static final int SEND_NUM = 10;
    // tcp 位址 伺服器器端位址
    public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;  // 其值為 "tcp://localhost:61616";
    // 目标位址,在ActiveMQ管理者控制台建立 http://localhost:8161/admin/topics.jsp中可以查詢到發送的mq消息
    public static final String DESTINATION = "xs.mq.topic";
    //測試連接配接使用預設的使用者名
    public static final String DEFAULT_USER = ActiveMQConnection.DEFAULT_USER;//預設為null
    //測試連接配接使用預設的密碼
    public static final String DEFAULT_PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//預設為null

    /**
     * 消息發送端
     * @param session
     * @param publisher
     * @throws Exception
     */
    public static void sendMessage(TopicSession session, TopicPublisher publisher) throws Exception {
        for (int i = 0; i < SEND_NUM; i++) {
            String message = "發送消息第" + (i + 1) + "條";

            TextMessage textMessage = session.createTextMessage(message);
            System.out.println(textMessage.getText());
            //發送 Topic消息
            publisher.send(textMessage);
        }
    }

    public  void run() throws Exception {
        //Topic連接配接
        TopicConnection connection = null;
        //Topic會話
        TopicSession session = null;
        try {
            // 1、建立連結工廠
            TopicConnectionFactory factory = new ActiveMQConnectionFactory(TopicProducer.DEFAULT_USER, TopicProducer.DEFAULT_PASSWORD, TopicProducer.BROKER_URL);
            // 2、通過工廠建立一個連接配接
            connection = factory.createTopicConnection();
            // 3、啟動連接配接
            connection.start();
            // 4、建立一個session會話
            session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 5、建立一個消息隊列
            Topic topic = session.createTopic(DESTINATION);
            // 6、建立消息發送者
            TopicPublisher publisher = session.createPublisher(topic);
            // 設定持久化模式
            publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            sendMessage(session, publisher);
            // 送出會話
            session.commit();


        } catch (Exception e) {
            throw e;
        } finally {
            // 關閉釋放資源
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }

    public static void main(String[] args) throws Exception {
        new  TopicProducer().run();
    }
}
           

topic接受消息receive代碼:

package com.szxs.topic.receive;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 釋出訂閱式消息接收者
 */
public class TopicReceiver_Receive {
    // tcp 位址 伺服器器端位址
    public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;  // 其值為 "tcp://localhost:61616";
    // 目标位址,在ActiveMQ管理者控制台建立 http://localhost:8161/admin/topics.jsp中可以查詢到發送的mq消息
    public static final String DESTINATION = "xs.mq.topic";
    //測試連接配接使用預設的使用者名
    public static final String DEFAULT_USER = ActiveMQConnection.DEFAULT_USER;//預設為null
    //測試連接配接使用預設的密碼
    public static final String DEFAULT_PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//預設為null


    public static void run() throws Exception {

        TopicConnection connection = null;
        TopicSession session = null;
        try {
            // 1、建立連結工廠
            TopicConnectionFactory factory = new ActiveMQConnectionFactory(TopicReceiver_Receive.DEFAULT_USER, TopicReceiver_Receive.DEFAULT_PASSWORD, TopicReceiver_Receive.BROKER_URL);
            // 2、通過工廠建立一個連接配接
            connection = factory.createTopicConnection();
            // 3、啟動連接配接
            connection.start();
            // 4、建立一個session會話
            session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 5、建立一個消息隊列
            Topic topic = session.createTopic(DESTINATION);
            // 6、建立消息制作者
            final TopicSubscriber subscriber = session.createSubscriber(topic);

            //接收Topic生産者發送過來的消息
            //需要注意的是此處需要啟動一個新的線程來處理問題
            new Thread(){
                public void run(){
                    TextMessage textMessage = null;
                    try {
                        while(true){//持續接收消息
                            textMessage = (TextMessage) subscriber.receive();
                            if(textMessage==null)
                                break;
                            System.out.println("接收#" + textMessage.getText());
                        }
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }.start();

            // 休眠100s再關閉 接收生産者發送的全部的10條消息
            // 需要注意的是這裡使用sleep會使目前正在執行的線程進入休眠狀态
            // 也就是TopicReceiver_Receive這個類進入休眠狀态了,而接收者.start方法剛剛啟動的新線程會繼續執行的哦。
            Thread.sleep(1000 *100);

            // 送出會話
            session.commit();

        } catch (Exception e) {
            throw e;
        } finally {
            // 關閉釋放資源
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }

    public static void main(String[] args) throws Exception {
        TopicReceiver_Receive.run();
    }
}
           

然後分别執行topic代碼(接受消息—>發送消息—>監聽器),浏覽器按下圖點選,這就是topic訂閱模式

ActiveMQ安裝啟動

結合spring使用:

首先建立applicationContext.xml檔案,代碼如下:

ActiveMQ安裝啟動
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context-3.1.xsd">

    <!--連接配接池-->
    <!--<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL" value="tcp://localhost:61616"/>
            </bean>
        </property>
    </bean>-->
    <!-- 連接配接工廠 -->
    <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://localhost:61616" />
    </bean>

    <!-- 配置消息目标 -->
    <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
        <!-- 目标,在ActiveMQ管理者控制台建立 http://localhost:8161/admin/queues.jsp -->
        <constructor-arg index="0" value="xs.mq.queue"/>
    </bean>

    <!-- 消息模闆 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="activeMQConnectionFactory"/>
        <property name="defaultDestination" ref="destination"/>
        <property name="messageConverter">
            <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
        </property>
    </bean>
</beans>
           

然後建立發送消息,接受消息類:

ActiveMQ安裝啟動

發送消息Sender代碼:

package com.spring.send;

import java.text.SimpleDateFormat;
import java.util.Date;

import javax.jms.*;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
/**
 * Spring JMSTemplate 消息發送者<br>
 *  将JMS整合到spring上面進行開發
 */
public class Sender {
    public static void main(String[] args) {
        ApplicationContext ctx = new FileSystemXmlApplicationContext("classpath:applicationContext.xml");
        JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplate");

        jmsTemplate.send(new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                TextMessage message = session.createTextMessage();
                SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                String date=formatter.format(new Date());
                String text="current system time: "+date;
                message.setText(text);
                System.out.println(text);
                return message;
            }
        });
    }
}
           

接受消息Receiver代碼:

package com.spring.receive;

import javax.jms.JMSException;
import javax.jms.TextMessage;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.FileSystemXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;

/**
 * Spring JMSTemplate 消息接收者<br>
 *  将JMS整合到spring上面進行開發
 */
public class Receiver {
    public static void main(String[] args) throws JMSException {
        ApplicationContext ctx = new FileSystemXmlApplicationContext("classpath:applicationContext.xml");

        JmsTemplate jmsTemplate = (JmsTemplate) ctx.getBean("jmsTemplate");
        while(true) {
            //設定10s逾時時間
            jmsTemplate.setReceiveTimeout(1000*10);
            TextMessage text =   (TextMessage) jmsTemplate.receive();
            if(text==null)
                break;
            //接收到相應的消息
            System.out.println("收到消息:" + text.getText());
        }
    }
}
           

然後運作,浏覽器點選,這就是結合spring使用:

ActiveMQ安裝啟動

謝謝大家!