天天看點

ActiveMQ接收消息+發送消息的簡單執行個體

最近由于公司項目需要 -activemq接收+發送消息,用的是activemq。由于這方面網上的例子不是很多,而且有的也不完整。于是經過幾天的摸索學習,在網上找到了合适的方案。

我的 it技術資源庫   http://itlib.tk/

producertool.java用于發送消息:

java 代碼 package homework;   

import javax.jms.connection;   

import javax.jms.deliverymode;   

import javax.jms.destination;   

import javax.jms.jmsexception;   

import javax.jms.messageproducer;   

import javax.jms.session;   

import javax.jms.textmessage;   

import org.apache.activemq.activemqconnection;   

import org.apache.activemq.activemqconnectionfactory;   

public class producertool {   

    private string user = activemqconnection.default_user;   

    private string password = activemqconnection.default_password;   

    private string url = activemqconnection.default_broker_url;   

    private string subject = "tool.default";   

    private destination destination = null;   

    private connection connection = null;   

    private session session = null;   

    private messageproducer producer = null;   

    // 初始化   

    private void initialize() throws jmsexception, exception {   

        activemqconnectionfactory connectionfactory = new activemqconnectionfactory(   

                user, password, url);   

        connection = connectionfactory.createconnection();   

        session = connection.createsession(false, session.auto_acknowledge);   

        destination = session.createqueue(subject);   

        producer = session.createproducer(destination);   

        producer.setdeliverymode(deliverymode.non_persistent);   

    }   

    // 發送消息  

    public void producemessage(string message) throws jmsexception, exception {   

        initialize();   

        textmessage msg = session.createtextmessage(message);   

        connection.start();   

        system.out.println("producer:->sending message: " + message);   

        producer.send(msg);   

        system.out.println("producer:->message sent complete!");   

    // 關閉連接配接  

    public void close() throws jmsexception {   

        system.out.println("producer:->closing connection");   

        if (producer != null)   

            producer.close();   

        if (session != null)   

            session.close();   

        if (connection != null)   

            connection.close();   

}   

consumertool.java用于接受消息,我用的是基于消息監聽的機制,需要實作messagelistener接口,這個接口有個onmessage方法,當接受到消息的時候會自動調用這個函數對消息進行處理。

import javax.jms.messageconsumer;   

import javax.jms.messagelistener;   

import javax.jms.message;   

public class consumertool implements messagelistener {   

    private string password = activemqconnection.default_password;   

    private messageconsumer consumer = null;   

    // 初始化  

        consumer = session.createconsumer(destination);   

    // 消費消息   

    public void consumemessage() throws jmsexception, exception {   

        system.out.println("consumer:->begin listening...");   

        // 開始監聽  

        consumer.setmessagelistener(this);   

        // message message = consumer.receive();  

        system.out.println("consumer:->closing connection");   

        if (consumer != null)   

            consumer.close();   

    // 消息處理函數   

    public void onmessage(message message) {   

        try {   

            if (message instanceof textmessage) {   

                textmessage txtmsg = (textmessage) message;   

                string msg = txtmsg.gettext();   

                system.out.println("consumer:->received: " + msg);   

            } else {   

                system.out.println("consumer:->received: " + message);   

            }   

        } catch (jmsexception e) {   

            // todo auto-generated catch block  

            e.printstacktrace();   

        }   

如果想主動的去接受消息,而不用消息監聽的話,把consumer.setmessagelistener(this)改為message message = consumer.receive(),手動去調用messageconsumer的receive方法即可。

下面是測試類test.java:

public class test {   

    /** 

     * @param args  

     */  

    public static void main(string[] args) throws jmsexception, exception {   

        // todo auto-generated method stub  

        consumertool consumer = new consumertool();   

        producertool producer = new producertool();   

        consumer.consumemessage();   

        // 延時500毫秒之後發送消息  

        thread.sleep(500);   

        producer.producemessage("hello, world!");   

        producer.close();   

        // 延時500毫秒之後停止接受消息  

        consumer.close();   

activemq的一個簡單執行個體-activemq接收+發送消息

it技術資源庫   http://itlib.tk/