發送:
package com.bjpowernode.activemq.transaction;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* ClassName:TransactionSender
* Package:com.bjpowernode.activemq.transaction
* Description:
*
* @date:2018/10/16 10:29
* @author: robin
*/
public class TransactionSender {
public static void main(String[] args) {
send();
}
private static void send() {
MessageProducer producer=null;
Session session= null;
Connection connection=null;
try {
//1.根據Broker位址建立連接配接工廠對象
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.31.128:61616");
//2.建立連接配接對象
connection=connectionFactory.createConnection();
//3.建立Session回話對象,
// 參數1為是否使用事務性的消息 false表示不使用事務 true表示使用事務
//參數2表示消息的确認機制,會影響消息的接收方法而不會影響發送方法Session.AUTO_ACKNOWLEDGE表示自動确認
session=connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
//4.建立一個目的地對象,createQueue建立一個基于點對點的目的地對象,參數為目的地名稱,需要與接收時對應
Destination destination=session.createQueue("myTransaction");
//5.建立一個文本類型的消息,并設定消息的資料内容
Message message=session.createTextMessage("queue的Transaction測試消息");
//6.建立消息的發送者 ,并設定消息的發送位置
producer=session.createProducer(destination);
//7.發送消息到指定的目的地
producer.send(message);
//開啟事務消息後,必須顯示手動調用commit方法否則消息不會寫入broker,注意:一個方法要發送2條或以上的消息時需要開啟事務;
session.commit();
System.out.println("消息發送成功");
} catch (JMSException e) {
e.printStackTrace();
}finally {
if(producer!=null){
try {
producer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if(session!=null){
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if(connection!=null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}
接收:
package com.bjpowernode.activemq.transaction;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* ClassName:TransactionReceive
* Package:com.bjpowernode.activemq.transaction
* Description:
*
* @date:2018/10/16 10:32
* @author: robin
*/
public class TransactionReceive {
public static void main(String[] args) {
receive();
}
private static void receive() {
MessageConsumer consumer =null;
Session session=null;
Connection connection=null;
try {
//1.根據Broker位址建立連接配接工廠對象
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.31.128:61616");
//2.建立連接配接對象
connection=connectionFactory.createConnection();
connection.start();//接收消息前前必須要start否則無法讀取消息,線程會阻塞在receive()方法上
//3.建立Session回話對象,
// 參數1為是否使用事務性的消息 false表示不使用事務 true表示使用事務
//參數2表示消息的确認機制,會影響消息的接收方法而不會影響發送方法Session.AUTO_ACKNOWLEDGE表示自動确認
// 消息處理成功第三步(消息确認) 消息成功消費後從broker中删除
session=connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
//4.建立一個目的地對象,createQueue建立一個基于點對點的目的地對象,參數為目的地名稱,需要與發送時對應
Destination destination=session.createQueue("myTransaction");
//5.建立消息的接收者 ,并設定從哪裡擷取消息
consumer =session.createConsumer(destination);
//6從消息服務中讀取消息 接收消息(消息成功消費的第一步)
Message message= consumer.receive();
//注意:使用事務消息後,必須顯示的調用commit否則,讀取的消息不會從broker中移除,這樣的話後面的所有消息都不能夠被消費
//如果一個系統中的方法,需要讀取多個隊列中的資料來完成業務那麼就需要使用事務消息,
//接收時如果隻有一接收也要盡可能使用事務性的消息
session.commit();
//擷取消息的具體内容
if(message instanceof TextMessage){
// 處理消息(消息成功處理第二步)
System.out.println(((TextMessage) message).getText());
}
} catch (JMSException e) {
e.printStackTrace();
}finally {
if(consumer!=null){
try {
consumer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if(session!=null){
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if(connection!=null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
}