最近由于公司项目需要 -activemq接收+发送消息,用的是activemq。由于这方面网上的例子不是很多,而且有的也不完整。于是经过几天的摸索学习,在网上找到了合适的方案。
我的 it技术资源库 http://itlib.tk/
producertool.java用于发送消息:
java 代码 package homework;
import javax.jms.connection;
import javax.jms.deliverymode;
import javax.jms.destination;
import javax.jms.jmsexception;
import javax.jms.messageproducer;
import javax.jms.session;
import javax.jms.textmessage;
import org.apache.activemq.activemqconnection;
import org.apache.activemq.activemqconnectionfactory;
public class producertool {
private string user = activemqconnection.default_user;
private string password = activemqconnection.default_password;
private string url = activemqconnection.default_broker_url;
private string subject = "tool.default";
private destination destination = null;
private connection connection = null;
private session session = null;
private messageproducer producer = null;
// 初始化
private void initialize() throws jmsexception, exception {
activemqconnectionfactory connectionfactory = new activemqconnectionfactory(
user, password, url);
connection = connectionfactory.createconnection();
session = connection.createsession(false, session.auto_acknowledge);
destination = session.createqueue(subject);
producer = session.createproducer(destination);
producer.setdeliverymode(deliverymode.non_persistent);
}
// 发送消息
public void producemessage(string message) throws jmsexception, exception {
initialize();
textmessage msg = session.createtextmessage(message);
connection.start();
system.out.println("producer:->sending message: " + message);
producer.send(msg);
system.out.println("producer:->message sent complete!");
// 关闭连接
public void close() throws jmsexception {
system.out.println("producer:->closing connection");
if (producer != null)
producer.close();
if (session != null)
session.close();
if (connection != null)
connection.close();
}
consumertool.java用于接受消息,我用的是基于消息监听的机制,需要实现messagelistener接口,这个接口有个onmessage方法,当接受到消息的时候会自动调用这个函数对消息进行处理。
import javax.jms.messageconsumer;
import javax.jms.messagelistener;
import javax.jms.message;
public class consumertool implements messagelistener {
private string password = activemqconnection.default_password;
private messageconsumer consumer = null;
// 初始化
consumer = session.createconsumer(destination);
// 消费消息
public void consumemessage() throws jmsexception, exception {
system.out.println("consumer:->begin listening...");
// 开始监听
consumer.setmessagelistener(this);
// message message = consumer.receive();
system.out.println("consumer:->closing connection");
if (consumer != null)
consumer.close();
// 消息处理函数
public void onmessage(message message) {
try {
if (message instanceof textmessage) {
textmessage txtmsg = (textmessage) message;
string msg = txtmsg.gettext();
system.out.println("consumer:->received: " + msg);
} else {
system.out.println("consumer:->received: " + message);
}
} catch (jmsexception e) {
// todo auto-generated catch block
e.printstacktrace();
}
如果想主动的去接受消息,而不用消息监听的话,把consumer.setmessagelistener(this)改为message message = consumer.receive(),手动去调用messageconsumer的receive方法即可。
下面是测试类test.java:
public class test {
/**
* @param args
*/
public static void main(string[] args) throws jmsexception, exception {
// todo auto-generated method stub
consumertool consumer = new consumertool();
producertool producer = new producertool();
consumer.consumemessage();
// 延时500毫秒之后发送消息
thread.sleep(500);
producer.producemessage("hello, world!");
producer.close();
// 延时500毫秒之后停止接受消息
consumer.close();
activemq的一个简单实例-activemq接收+发送消息
it技术资源库 http://itlib.tk/