最近由于公司項目需要 -activemq接收+發送消息,用的是activemq。由于這方面網上的例子不是很多,而且有的也不完整。于是經過幾天的摸索學習,在網上找到了合适的方案。
我的 it技術資源庫 http://itlib.tk/
producertool.java用于發送消息:
java 代碼 package homework;
import javax.jms.connection;
import javax.jms.deliverymode;
import javax.jms.destination;
import javax.jms.jmsexception;
import javax.jms.messageproducer;
import javax.jms.session;
import javax.jms.textmessage;
import org.apache.activemq.activemqconnection;
import org.apache.activemq.activemqconnectionfactory;
public class producertool {
private string user = activemqconnection.default_user;
private string password = activemqconnection.default_password;
private string url = activemqconnection.default_broker_url;
private string subject = "tool.default";
private destination destination = null;
private connection connection = null;
private session session = null;
private messageproducer producer = null;
// 初始化
private void initialize() throws jmsexception, exception {
activemqconnectionfactory connectionfactory = new activemqconnectionfactory(
user, password, url);
connection = connectionfactory.createconnection();
session = connection.createsession(false, session.auto_acknowledge);
destination = session.createqueue(subject);
producer = session.createproducer(destination);
producer.setdeliverymode(deliverymode.non_persistent);
}
// 發送消息
public void producemessage(string message) throws jmsexception, exception {
initialize();
textmessage msg = session.createtextmessage(message);
connection.start();
system.out.println("producer:->sending message: " + message);
producer.send(msg);
system.out.println("producer:->message sent complete!");
// 關閉連接配接
public void close() throws jmsexception {
system.out.println("producer:->closing connection");
if (producer != null)
producer.close();
if (session != null)
session.close();
if (connection != null)
connection.close();
}
consumertool.java用于接受消息,我用的是基于消息監聽的機制,需要實作messagelistener接口,這個接口有個onmessage方法,當接受到消息的時候會自動調用這個函數對消息進行處理。
import javax.jms.messageconsumer;
import javax.jms.messagelistener;
import javax.jms.message;
public class consumertool implements messagelistener {
private string password = activemqconnection.default_password;
private messageconsumer consumer = null;
// 初始化
consumer = session.createconsumer(destination);
// 消費消息
public void consumemessage() throws jmsexception, exception {
system.out.println("consumer:->begin listening...");
// 開始監聽
consumer.setmessagelistener(this);
// message message = consumer.receive();
system.out.println("consumer:->closing connection");
if (consumer != null)
consumer.close();
// 消息處理函數
public void onmessage(message message) {
try {
if (message instanceof textmessage) {
textmessage txtmsg = (textmessage) message;
string msg = txtmsg.gettext();
system.out.println("consumer:->received: " + msg);
} else {
system.out.println("consumer:->received: " + message);
}
} catch (jmsexception e) {
// todo auto-generated catch block
e.printstacktrace();
}
如果想主動的去接受消息,而不用消息監聽的話,把consumer.setmessagelistener(this)改為message message = consumer.receive(),手動去調用messageconsumer的receive方法即可。
下面是測試類test.java:
public class test {
/**
* @param args
*/
public static void main(string[] args) throws jmsexception, exception {
// todo auto-generated method stub
consumertool consumer = new consumertool();
producertool producer = new producertool();
consumer.consumemessage();
// 延時500毫秒之後發送消息
thread.sleep(500);
producer.producemessage("hello, world!");
producer.close();
// 延時500毫秒之後停止接受消息
consumer.close();
activemq的一個簡單執行個體-activemq接收+發送消息
it技術資源庫 http://itlib.tk/