天天看點

ActiveMQ 部署及發送接收消息

ActiveMQ 部署及發送接收消息

一、           下載下傳

下載下傳位址:http://activemq.apache.org/ 我這裡使用的版本為目前最新5.8.0。

下載下傳版本有Windows和Linux兩個版本,且都分為32位和64位。根據自己需要選擇下載下傳。

二、           安裝

我這裡下載下傳的為windows的32位版本(apache-activemq-5.8.0-bin.zip),下載下傳後直接解壓到需要安裝的目錄或在直接解壓到目前目錄也可,解壓完安裝也完成。

解壓後目錄如上圖,裡面包含了示例和文檔,及所有的jar包。

三、           運作

進入到bin目錄(apache-activemq-5.8.0\bin),輕按兩下activemq.bat,就會運作,運作截圖如下:

此時表示ActiveMQ已經在運作了,當然正常生産環境下可以設定作為服務在背景運作,并且随系統啟動而啟動。

四、           測試

ActiveMQ自帶了一套管理系統,通路http://localhost:8161/admin/,會出現需要輸入使用者名和密碼的頁面如下:

預設使用者名和密碼都是admin,進入後則為主界面:

在這個界面上,我們可以管理隊列及其他的一些功能,為了下面的繼續,我們在這裡建立一個Queue和一個Topic。

點選目錄上的Queues進入建立Queue頁面,輸入Queue名稱,點選Create後下面就建立了G2Queue的queue隊列。

這裡也可以不用這樣手工建立,在發送端指定了一個Queue或Topic名字後,會自動建立一個隊列,如上面的choice.queue和FirstQueue都是我測試程式時,程式裡面指定的Queue名稱,自動建立的。

同樣的方式建立一個Topic,如下:

五、           發送消息

建立一個新的項目,我這裡是建立的webproject名稱為ActiveMQ,引入ActiveMQ的jar包,整個工程結構如下:

此段代碼從網上直接copy,隻是稍作修改:

[java] view plaincopy

import java.util.Random;  

import javax.jms.Connection;  

import javax.jms.ConnectionFactory;  

import javax.jms.DeliveryMode;  

import javax.jms.Destination;  

import javax.jms.JMSException;  

import javax.jms.MessageProducer;  

import javax.jms.Session;  

import javax.jms.TextMessage;  

import org.apache.activemq.ActiveMQConnectionFactory;  

public class SendMessage {  

    private static final String url = "tcp://localhost:61616";  

    private static final String QUEUE_NAME = "G2Queue";  

    public void sendMessage() throwsJMSException {  

       // JMS 用戶端到JMSProvider 的連接配接  

       Connection connection = null;  

       try {  

           // 連接配接工廠,JMS 用它建立連接配接  

           // 構造ConnectionFactory執行個體對象,此處采用ActiveMq的實作jar  

           ConnectionFactory connectionFactory = newActiveMQConnectionFactory(url);  

           connection = (Connection)connectionFactory.createConnection();  

           // 啟動連接配接  

           connection.start();  

           //Session:發送或接收消息的線程  

           // 擷取session  

           Session session = (Session) connection.createSession(false,  

                  Session.AUTO_ACKNOWLEDGE);  

           // 消息的目的地,消息發送到那個隊列  

           Destination destination = session.createQueue(QUEUE_NAME);  

           //MessageProducer:消息發送者(生産者)  

           // 建立消息發送者  

           MessageProducer producer =session.createProducer(destination);  

           // 設定是否持久化  

           //DeliveryMode.NON_PERSISTENT:不持久化  

           //DeliveryMode.PERSISTENT:持久化  

           producer.setDeliveryMode(DeliveryMode.PERSISTENT);  

           String msg = "";  

           int i = 0;  

        do {  

            msg = "第"+i + "次發送的消息:"+new Random();  

                TextMessagemessage = session.createTextMessage(msg);  

                Thread.sleep(1000);  

                // 發送消息到目的地方  

               producer.send(message);  

                System.out.println("發送消息:" +msg);  

                i++;  

        } while (i<1000);  

       } catch (Exception e) {  

           e.printStackTrace();  

       }  

    }  

    public static void main(String[] args) {  

       SendMessage sndMsg = newSendMessage();  

           sndMsg.sendMessage();  

       } catch (Exception ex) {  

           System.out.println(ex.toString());  

}  

運作結果如下:

六、           接收消息

package cn.g2room.mq.test;  

import javax.jms.Message;  

import javax.jms.MessageConsumer;  

/** 

 * 消息接收類 

 * 

 * @createTime:Apr 7, 2013 5:11:11 PM 

 * @author:<a href="mailto:[email protected]">迷蝶</a> 

 * @version:0.1 

 *@lastVersion: 0.1 

 * @updateTime: 

 *@updateAuthor: <a href="mailto:[email protected]">迷蝶</a> 

 * @changesSum: 

 */  

public class ReceiveMessage {  

         privatestatic final String url = "tcp://localhost:61616";  

         privatestatic final String QUEUE_NAME = "G2Queue";  

         publicvoid receiveMessage() {  

                   Connectionconnection = null;  

                   try{  

                            try{  

                                     ConnectionFactoryconnectionFactory = new ActiveMQConnectionFactory(  

                                                        url);  

                                     connection= connectionFactory.createConnection();  

                            }catch (Exception e) {  

                                     System.out.println(e.toString());  

                            }  

                            connection.start();  

                            Sessionsession = connection.createSession(false,  

                                               Session.AUTO_ACKNOWLEDGE);  

                            Destinationdestination = session.createQueue(QUEUE_NAME);  

                            //消息接收者,也就是消費者  

                            MessageConsumerconsumer = session.createConsumer(destination);  

                            consumeMessagesAndClose(connection,session, consumer);  

                   }catch (Exception e) {  

                            System.out.println(e.toString());  

                   }  

         }  

         /** 

          * 接收和關閉消息,如遇到消息内容為close則,關閉連接配接 

          * 

          * @param connection   JMS 用戶端到JMSProvider 的連接配接 

          * @param session                   發送或接收消息的線程 

          * @param consumer              消息接收對象 

          * @throws JMSException 

          * @auther <ahref="mailto:[email protected]">迷蝶</a> 

          * Apr 8, 2013 10:31:55 AM 

          */  

         protectedvoid consumeMessagesAndClose(Connection connection,  

                            Sessionsession, MessageConsumer consumer) throws JMSException {  

                   do{  

                            Messagemessage = consumer.receive(1000);  

                            if("close".equals(message)){  

                                     consumer.close();  

                                     session.close();  

                                     connection.close();  

                            if(message != null) {  

                                     onMessage(message);  

                   }while (true);  

         publicvoid onMessage(Message message) {  

                            if(message instanceof TextMessage) {  

                                     TextMessagetxtMsg = (TextMessage) message;  

                                     Stringmsg = txtMsg.getText();  

                                     System.out.println("Received:" + msg);  

                            e.printStackTrace();  

         publicstatic void main(String args[]) {  

                   ReceiveMessagerm = new ReceiveMessage();  

                   rm.receiveMessage();