天天看点

ActiveMQ接收消息+发送消息的简单实例

最近由于公司项目需要 -activemq接收+发送消息,用的是activemq。由于这方面网上的例子不是很多,而且有的也不完整。于是经过几天的摸索学习,在网上找到了合适的方案。

我的 it技术资源库   http://itlib.tk/

producertool.java用于发送消息:

java 代码 package homework;   

import javax.jms.connection;   

import javax.jms.deliverymode;   

import javax.jms.destination;   

import javax.jms.jmsexception;   

import javax.jms.messageproducer;   

import javax.jms.session;   

import javax.jms.textmessage;   

import org.apache.activemq.activemqconnection;   

import org.apache.activemq.activemqconnectionfactory;   

public class producertool {   

    private string user = activemqconnection.default_user;   

    private string password = activemqconnection.default_password;   

    private string url = activemqconnection.default_broker_url;   

    private string subject = "tool.default";   

    private destination destination = null;   

    private connection connection = null;   

    private session session = null;   

    private messageproducer producer = null;   

    // 初始化   

    private void initialize() throws jmsexception, exception {   

        activemqconnectionfactory connectionfactory = new activemqconnectionfactory(   

                user, password, url);   

        connection = connectionfactory.createconnection();   

        session = connection.createsession(false, session.auto_acknowledge);   

        destination = session.createqueue(subject);   

        producer = session.createproducer(destination);   

        producer.setdeliverymode(deliverymode.non_persistent);   

    }   

    // 发送消息  

    public void producemessage(string message) throws jmsexception, exception {   

        initialize();   

        textmessage msg = session.createtextmessage(message);   

        connection.start();   

        system.out.println("producer:->sending message: " + message);   

        producer.send(msg);   

        system.out.println("producer:->message sent complete!");   

    // 关闭连接  

    public void close() throws jmsexception {   

        system.out.println("producer:->closing connection");   

        if (producer != null)   

            producer.close();   

        if (session != null)   

            session.close();   

        if (connection != null)   

            connection.close();   

}   

consumertool.java用于接受消息,我用的是基于消息监听的机制,需要实现messagelistener接口,这个接口有个onmessage方法,当接受到消息的时候会自动调用这个函数对消息进行处理。

import javax.jms.messageconsumer;   

import javax.jms.messagelistener;   

import javax.jms.message;   

public class consumertool implements messagelistener {   

    private string password = activemqconnection.default_password;   

    private messageconsumer consumer = null;   

    // 初始化  

        consumer = session.createconsumer(destination);   

    // 消费消息   

    public void consumemessage() throws jmsexception, exception {   

        system.out.println("consumer:->begin listening...");   

        // 开始监听  

        consumer.setmessagelistener(this);   

        // message message = consumer.receive();  

        system.out.println("consumer:->closing connection");   

        if (consumer != null)   

            consumer.close();   

    // 消息处理函数   

    public void onmessage(message message) {   

        try {   

            if (message instanceof textmessage) {   

                textmessage txtmsg = (textmessage) message;   

                string msg = txtmsg.gettext();   

                system.out.println("consumer:->received: " + msg);   

            } else {   

                system.out.println("consumer:->received: " + message);   

            }   

        } catch (jmsexception e) {   

            // todo auto-generated catch block  

            e.printstacktrace();   

        }   

如果想主动的去接受消息,而不用消息监听的话,把consumer.setmessagelistener(this)改为message message = consumer.receive(),手动去调用messageconsumer的receive方法即可。

下面是测试类test.java:

public class test {   

    /** 

     * @param args  

     */  

    public static void main(string[] args) throws jmsexception, exception {   

        // todo auto-generated method stub  

        consumertool consumer = new consumertool();   

        producertool producer = new producertool();   

        consumer.consumemessage();   

        // 延时500毫秒之后发送消息  

        thread.sleep(500);   

        producer.producemessage("hello, world!");   

        producer.close();   

        // 延时500毫秒之后停止接受消息  

        consumer.close();   

activemq的一个简单实例-activemq接收+发送消息

it技术资源库   http://itlib.tk/