ActiveMQ應用
1、ActiveMQ中常用API介紹
下述API都是接口類型,由定義在javax.jms包中.是JMS标準接口定義.

2、JMS-Hello
2.1 導入相關的jar包
<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.11.1</version>
</dependency>
2.2 建立生産者代碼
/**
* ActiveMQ中的生産者(Producer)
* @author dengp
*
*/
public class HelloProducer {
public void sendhello2ActiveMq(String messageText) {
// 連接配接工廠,用于建立Connection對象
ConnectionFactory factory = null;
// activeMQ 連接配接對象
Connection conn = null;
// 一次和ActiveMQ的持久會話對象
Session session = null;
// 目的地
Destination destination = null;
// 消息發送者
MessageProducer producer = null;
// 封裝消息的對象
Message message = null;
try {
/*
* 建立連結工廠 ActiveMQConnectionFactory -由ActiveMQ實作的ConnectionFactory接口實作類.
* 構造方法: public ActiveMQConnectionFactory(String userName, String password,
* String brokerURL)
* userName - 通路ActiveMQ服務的使用者名,使用者名可以通過jetty-realm.properties配置檔案配置.
* password - 通路ActiveMQ服務的密碼,密碼可以通過jetty-realm.properties配置檔案配置.
* brokerURL -通路ActiveMQ服務的路徑位址. 路徑結構為 - 協定名://主機位址:端口号 此連結基于TCP/IP協定.
*/
factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.88.121:61616");
// 建立連結對象
conn = factory.createConnection();
// 啟動連接配接對象
conn.start();
/*
* 建立會話對象
* 方法 - connection.createSession(boolean transacted, int acknowledgeMode);
* transacted - 是否使用事務,
* 可選值為true|false
* true - 使用事務, 當設定此變量值, 則acknowledgeMode參數無效,
* 建議傳遞的acknowledgeMode參數值為 Session.SESSION_TRANSACTED
* false - 不使用事務, 設定此變量值,則acknowledgeMode參數必須設定.
* acknowledgeMode - 消息确認機制, 可選值為:
* Session.AUTO_ACKNOWLEDGE - 自動确認消息機制
* Session.CLIENT_ACKNOWLEDGE -用戶端确認消息機制
* Session.DUPS_OK_ACKNOWLEDGE - 有副本的用戶端确認消息機制
*/
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 建立目的地,目的地的命名既是隊列的指令
destination = session.createQueue("MQ-Hello");
// 建立消息生成者, 建立的消息生成者與某目的地對應, 即方法參數目的地.
producer = session.createProducer(destination);
// 建立消息對象,建立一個文本消息對象。此消息對象中儲存要傳遞的文本資料.
message = session.createTextMessage(messageText);
// 發送消息
producer.send(message);
} catch (Exception e) {
e.printStackTrace();
System.out.println("通路ActiveMQ服務發生錯誤!!");
} finally {
try {
// 回收消息發送者資源
if (null != producer)
producer.close();
} catch (JMSException e) {
e.printStackTrace();
}
try {
// 回收會話資源
if (null != session)
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
try {
// 回收連結資源
if (null != conn)
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
2.3建立消費者代碼
/**
* ActiveMQ中的消費者(Consumer)
* @author dengp
*
*/
public class HelloConsumer {
public void reciveHelloFormActiveMq() {
// 連接配接工廠,用于建立Connection對象
ConnectionFactory factory = null;
// activeMQ 連接配接對象
Connection conn = null;
// 一次和ActiveMQ的持久會話對象
Session session = null;
// 目的地
Destination destination = null;
// 消息消費者
MessageConsumer consumer = null;
// 封裝消息的對象
Message message = null;
try {
/*
* 建立連結工廠 ActiveMQConnectionFactory -由ActiveMQ實作的ConnectionFactory接口實作類.
* 構造方法: public ActiveMQConnectionFactory(String userName, String password,
* String brokerURL)
* userName - 通路ActiveMQ服務的使用者名,使用者名可以通過jetty-realm.properties配置檔案配置.
* password - 通路ActiveMQ服務的密碼,密碼可以通過jetty-realm.properties配置檔案配置.
* brokerURL -通路ActiveMQ服務的路徑位址. 路徑結構為 - 協定名://主機位址:端口号 此連結基于TCP/IP協定.
*/
factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.88.121:61616");
// 建立連結對象
conn = factory.createConnection();
// 啟動連接配接對象
conn.start();
/*
* 建立會話對象
* 方法 - connection.createSession(boolean transacted, int acknowledgeMode);
* transacted - 是否使用事務,
* 可選值為true|false
* true - 使用事務, 當設定此變量值, 則acknowledgeMode參數無效,
* 建議傳遞的acknowledgeMode參數值為 Session.SESSION_TRANSACTED
* false - 不使用事務, 設定此變量值,則acknowledgeMode參數必須設定.
* acknowledgeMode - 消息确認機制, 可選值為:
* Session.AUTO_ACKNOWLEDGE - 自動确認消息機制
* Session.CLIENT_ACKNOWLEDGE -用戶端确認消息機制
* Session.DUPS_OK_ACKNOWLEDGE - 有副本的用戶端确認消息機制
*/
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 建立目的地,目的地的命名既是隊列的指令
destination = session.createQueue("MQ-Hello");
// 建立消息消費者, 建立的消息消費者與某目的地對應, 即方法參數目的地.
consumer = session.createConsumer(destination);
// 從ActiveMQ中擷取消息
message = consumer.receive();
String textMessage = ((TextMessage)message).getText();
System.out.println("ActiveMQ擷取的消息是:"+textMessage);
} catch (Exception e) {
e.printStackTrace();
System.out.println("通路ActiveMQ服務發生錯誤!!");
} finally {
try {
// 回收消息發送者資源
if (null != consumer)
consumer.close();
} catch (JMSException e) {
e.printStackTrace();
}
try {
// 回收會話資源
if (null != session)
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
try {
// 回收連結資源
if (null != conn)
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
2.4測試代碼
生産者
public static void main(String[] args) {
HelloProducer pro = new HelloProducer();
pro.sendhello2ActiveMq("你好啊...");
}
消費者
public static void main(String[] args) {
HelloConsumer con = new HelloConsumer();
con.reciveHelloFormActiveMq();
}
先啟動消費者,再啟動生産者,檢視效果
消費者啟動後阻塞狀态,等待消息
web頁面檢視效果
啟動生産者
總結
ActiveMQ的消息發送的流程如下: