天天看點

ActiveMQ-事務消息-發送 與 接收

發送:

package com.bjpowernode.activemq.transaction;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * ClassName:TransactionSender
 * Package:com.bjpowernode.activemq.transaction
 * Description:
 *
 * @date:2018/10/16 10:29
 * @author:  robin
 */
public class TransactionSender {

    public static void main(String[] args) {
        send();
    }

    private static void send() {
        MessageProducer producer=null;
        Session session= null;
        Connection connection=null;
        try {
            //1.根據Broker位址建立連接配接工廠對象
            ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.31.128:61616");
            //2.建立連接配接對象
            connection=connectionFactory.createConnection();
            //3.建立Session回話對象,
            // 參數1為是否使用事務性的消息 false表示不使用事務 true表示使用事務
            //參數2表示消息的确認機制,會影響消息的接收方法而不會影響發送方法Session.AUTO_ACKNOWLEDGE表示自動确認
            session=connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
            //4.建立一個目的地對象,createQueue建立一個基于點對點的目的地對象,參數為目的地名稱,需要與接收時對應
            Destination destination=session.createQueue("myTransaction");
            //5.建立一個文本類型的消息,并設定消息的資料内容
            Message message=session.createTextMessage("queue的Transaction測試消息");
            //6.建立消息的發送者 ,并設定消息的發送位置
            producer=session.createProducer(destination);
            //7.發送消息到指定的目的地
            producer.send(message);
            //開啟事務消息後,必須顯示手動調用commit方法否則消息不會寫入broker,注意:一個方法要發送2條或以上的消息時需要開啟事務;
            session.commit();
            System.out.println("消息發送成功");
        } catch (JMSException e) {
            e.printStackTrace();
        }finally {
            if(producer!=null){
                try {
                    producer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

            if(session!=null){
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if(connection!=null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
           

接收:

package com.bjpowernode.activemq.transaction;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * ClassName:TransactionReceive
 * Package:com.bjpowernode.activemq.transaction
 * Description:
 *
 * @date:2018/10/16 10:32
 * @author: robin
 */
public class TransactionReceive {
    public static void main(String[] args) {
        receive();
    }

    private static void receive() {
        MessageConsumer consumer =null;
        Session session=null;
        Connection connection=null;
        try {
            //1.根據Broker位址建立連接配接工廠對象
            ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.31.128:61616");
            //2.建立連接配接對象
            connection=connectionFactory.createConnection();
            connection.start();//接收消息前前必須要start否則無法讀取消息,線程會阻塞在receive()方法上
            //3.建立Session回話對象,
            // 參數1為是否使用事務性的消息 false表示不使用事務 true表示使用事務
            //參數2表示消息的确認機制,會影響消息的接收方法而不會影響發送方法Session.AUTO_ACKNOWLEDGE表示自動确認
           // 消息處理成功第三步(消息确認) 消息成功消費後從broker中删除
            session=connection.createSession(true,Session.AUTO_ACKNOWLEDGE);

            //4.建立一個目的地對象,createQueue建立一個基于點對點的目的地對象,參數為目的地名稱,需要與發送時對應
            Destination destination=session.createQueue("myTransaction");

            //5.建立消息的接收者 ,并設定從哪裡擷取消息
            consumer =session.createConsumer(destination);

            //6從消息服務中讀取消息        接收消息(消息成功消費的第一步)
            Message message= consumer.receive();
            //注意:使用事務消息後,必須顯示的調用commit否則,讀取的消息不會從broker中移除,這樣的話後面的所有消息都不能夠被消費
            //如果一個系統中的方法,需要讀取多個隊列中的資料來完成業務那麼就需要使用事務消息,
            //接收時如果隻有一接收也要盡可能使用事務性的消息
            session.commit();
            //擷取消息的具體内容
            if(message instanceof TextMessage){
                // 處理消息(消息成功處理第二步)
                System.out.println(((TextMessage) message).getText());
            }

        } catch (JMSException e) {
            e.printStackTrace();
        }finally {
            if(consumer!=null){
                try {
                    consumer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }

            if(session!=null){
                try {
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
            if(connection!=null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }


    }
}