上篇文章中詳細介紹了ActiveMQ。本文繼續介紹ActiveMQ的具體操作
ActiveMQ
處理對象消息
1.定義消息載體對象
/**
* Order Bean
* 定義消息載體類型. 即要在ActiveMQ中傳遞的資料實體類型.
* 消息載體對象必須實作接口java.io.Serializable, 因為消息需要在網絡中傳遞,要求必須可序列化
* @author dengp
*
*/
public class Order implements Serializable{
private static final long serialVersionUID = 1L;
private String id;
private String nick;
private Long price;
private Date createTime;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getNick() {
return nick;
}
public void setNick(String nick) {
this.nick = nick;
}
public Long getPrice() {
return price;
}
public void setPrice(Long price) {
this.price = price;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
public static long getSerialversionuid() {
return serialVersionUID;
}
@Override
public String toString() {
return "Order [id=" + id + ", nick=" + nick + ", price=" + price + ", createTime=" + createTime + "]";
}
}
2.定義消息生産者
/**
* ActiveMQ中的生産者(Producer)
* @author dengp
*
*/
public class OrderProducer {
public void sendhello2ActiveMq(Order messageObject) {
ConnectionFactory factory = null;
Connection conn = null;
Session session = null;
Destination destination = null;
MessageProducer producer = null;
Message message = null;
try {
factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.88.121:61616");
// 建立連結對象
conn = factory.createConnection();
// 啟動連接配接對象
conn.start();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 建立目的地,目的地的命名既是隊列的指令
destination = session.createQueue("MQ-Hello-Object");
producer = session.createProducer(destination);
// 建立消息對象. 此消息是對象消息, 其中儲存資料為對象.
message = session.createObjectMessage(messageObject);
// 發送消息
producer.send(message);
} catch (Exception e) {
e.printStackTrace();
System.out.println("通路ActiveMQ服務發生錯誤!!");
} finally {
try {
// 回收消息發送者資源
if (null != producer)
producer.close();
} catch (JMSException e) {
e.printStackTrace();
}
try {
// 回收會話資源
if (null != session)
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
try {
// 回收連結資源
if (null != conn)
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
3.定義消息消費者
/**
* ActiveMQ中的消費者(Consumer)
* @author dengp
*
*/
public class OrderConsumer {
public void reciveOrderFormActiveMq() {
ConnectionFactory factory = null;
Connection conn = null;
Session session = null;
Destination destination = null;
MessageConsumer consumer = null;
Message message = null;
try {
factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.88.121:61616");
conn = factory.createConnection();
conn.start();
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("MQ-Hello-Object");
// 建立消息消費者, 建立的消息消費者與某目的地對應, 即方法參數目的地.
consumer = session.createConsumer(destination);
// 從ActiveMQ中擷取消息
message = consumer.receive();
Object obj = ((ObjectMessage)message).getObject();
System.out.println("ActiveMQ擷取的消息是:"+obj);
} catch (Exception e) {
e.printStackTrace();
System.out.println("通路ActiveMQ服務發生錯誤!!");
} finally {
try {
// 回收消息發送者資源
if (null != consumer)
consumer.close();
} catch (JMSException e) {
e.printStackTrace();
}
try {
// 回收會話資源
if (null != session)
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
try {
// 回收連結資源
if (null != conn)
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
4.測試
生産者測試
public static void main(String[] args) {
OrderProducer pro = new OrderProducer();
Order order = new Order();
order.setId("100");
order.setNick("波波烤鴨");
order.setPrice(9999l);
order.setCreateTime(new Date());
pro.sendhello2ActiveMq(order);
}
消費者測試
擷取到了相關的資訊
實作隊列服務監聽
1.觀察者模式
1.1事件源
事件發生的源頭。 監聽器監聽的具體位置。
1.2事件
具體觸發的事件。 如: 單擊事件, 輕按兩下事件 等。
其中必然包含事件源資訊。
1.3監聽器
處理事件的代碼邏輯。
Java觀察者模式(Observer)2.定義生成者代碼
package com.dpb.observe;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* ActiveMQ中的生産者(Producer)
* @author dengp
*
*/
public class MyProducer {
public void sendhello2ActiveMq(String messageText) {
// 連接配接工廠,用于建立Connection對象
ConnectionFactory factory = null;
// activeMQ 連接配接對象
Connection conn = null;
// 一次和ActiveMQ的持久會話對象
Session session = null;
// 目的地
Destination destination = null;
// 消息發送者
MessageProducer producer = null;
// 封裝消息的對象
Message message = null;
try {
/*
* 建立連結工廠 ActiveMQConnectionFactory -由ActiveMQ實作的ConnectionFactory接口實作類.
* 構造方法: public ActiveMQConnectionFactory(String userName, String password,
* String brokerURL)
* userName - 通路ActiveMQ服務的使用者名,使用者名可以通過jetty-realm.properties配置檔案配置.
* password - 通路ActiveMQ服務的密碼,密碼可以通過jetty-realm.properties配置檔案配置.
* brokerURL -通路ActiveMQ服務的路徑位址. 路徑結構為 - 協定名://主機位址:端口号 此連結基于TCP/IP協定.
*/
factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.88.121:61616");
// 建立連結對象
conn = factory.createConnection();
// 啟動連接配接對象
conn.start();
/*
* 建立會話對象
* 方法 - connection.createSession(boolean transacted, int acknowledgeMode);
* transacted - 是否使用事務,
* 可選值為true|false
* true - 使用事務, 當設定此變量值, 則acknowledgeMode參數無效,
* 建議傳遞的acknowledgeMode參數值為 Session.SESSION_TRANSACTED
* false - 不使用事務, 設定此變量值,則acknowledgeMode參數必須設定.
* acknowledgeMode - 消息确認機制, 可選值為:
* Session.AUTO_ACKNOWLEDGE - 自動确認消息機制
* Session.CLIENT_ACKNOWLEDGE -用戶端确認消息機制
* Session.DUPS_OK_ACKNOWLEDGE - 有副本的用戶端确認消息機制
*/
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 建立目的地,目的地的命名既是隊列的指令
destination = session.createQueue("MQ-Hello-observe");
// 建立消息生成者, 建立的消息生成者與某目的地對應, 即方法參數目的地.
producer = session.createProducer(destination);
// 建立消息對象,建立一個文本消息對象。此消息對象中儲存要傳遞的文本資料.
message = session.createTextMessage(messageText);
// 發送消息
producer.send(message);
} catch (Exception e) {
e.printStackTrace();
System.out.println("通路ActiveMQ服務發生錯誤!!");
} finally {
try {
// 回收消息發送者資源
if (null != producer)
producer.close();
} catch (JMSException e) {
e.printStackTrace();
}
try {
// 回收會話資源
if (null != session)
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
try {
// 回收連結資源
if (null != conn)
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
3.定義消費者代碼
package com.dpb.observe;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* ActiveMQ中的消費者(Consumer)
* @author dengp
*
*/
public class MyConsumer {
public void reciveHelloFormActiveMq() {
// 連接配接工廠,用于建立Connection對象
ConnectionFactory factory = null;
// activeMQ 連接配接對象
Connection conn = null;
// 一次和ActiveMQ的持久會話對象
Session session = null;
// 目的地
Destination destination = null;
// 消息消費者
MessageConsumer consumer = null;
// 封裝消息的對象
Message message = null;
try {
/*
* 建立連結工廠 ActiveMQConnectionFactory -由ActiveMQ實作的ConnectionFactory接口實作類.
* 構造方法: public ActiveMQConnectionFactory(String userName, String password,
* String brokerURL)
* userName - 通路ActiveMQ服務的使用者名,使用者名可以通過jetty-realm.properties配置檔案配置.
* password - 通路ActiveMQ服務的密碼,密碼可以通過jetty-realm.properties配置檔案配置.
* brokerURL -通路ActiveMQ服務的路徑位址. 路徑結構為 - 協定名://主機位址:端口号 此連結基于TCP/IP協定.
*/
factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.88.121:61616");
// 建立連結對象
conn = factory.createConnection();
// 啟動連接配接對象
conn.start();
/*
* 建立會話對象
* 方法 - connection.createSession(boolean transacted, int acknowledgeMode);
* transacted - 是否使用事務,
* 可選值為true|false
* true - 使用事務, 當設定此變量值, 則acknowledgeMode參數無效,
* 建議傳遞的acknowledgeMode參數值為 Session.SESSION_TRANSACTED
* false - 不使用事務, 設定此變量值,則acknowledgeMode參數必須設定.
* acknowledgeMode - 消息确認機制, 可選值為:
* Session.AUTO_ACKNOWLEDGE - 自動确認消息機制
* Session.CLIENT_ACKNOWLEDGE -用戶端确認消息機制
* Session.DUPS_OK_ACKNOWLEDGE - 有副本的用戶端确認消息機制
*/
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 建立目的地,目的地的命名既是隊列的指令
destination = session.createQueue("MQ-Hello-observe");
// 建立消息消費者, 建立的消息消費者與某目的地對應, 即方法參數目的地.
consumer = session.createConsumer(destination);
// 監聽ActiveMQ服務中的消息,當發現消息的時候,自動處理
consumer.setMessageListener(new MessageListener() {
/**
* 當用消息到來的時候觸發該方法,在該方法中處理消息
*/
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
String messageString = null;
try {
messageString = textMessage.getText();
} catch (JMSException e) {
e.printStackTrace();
messageString = "處理消息失敗!!";
}
System.out.println("處理的消息内容是 : " + messageString);
}
});
// 阻塞程序
System.in.read();
} catch (Exception e) {
e.printStackTrace();
System.out.println("通路ActiveMQ服務發生錯誤!!");
} finally {
try {
// 回收消息發送者資源
if (null != consumer)
consumer.close();
} catch (JMSException e) {
e.printStackTrace();
}
try {
// 回收會話資源
if (null != session)
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
try {
// 回收連結資源
if (null != conn)
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
生産者
public static void main(String[] args) {
MyProducer pro = new MyProducer();
pro.sendhello2ActiveMq("你好啊...listener");
}
消費者
public static void main(String[] args) {
MyConsumer con = new MyConsumer();
con.reciveHelloFormActiveMq();
}
web頁面中可以看到還線上的消費者。