天天看點

activemq--可持久化機制之JDBC代碼

編碼測試

一定要開啟持久化!!!

messageProducer.setDeliverMode(DeliveryMode.PERSISTENT);

隊列

生産者

public class JmsProduceJDBC {
    public static final String ACTIVEMQ_URL = "tcp://localhost:61616";

    public static final String USERNAME = "admin";

    public static final String PASSWORD = "hll123";

    public static final String QUEUE_NAME = "jdbc01";

    public static void main(String[] args) throws Exception {
        //1.按照給定的url建立連接配接工廠
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD,ACTIVEMQ_URL);
        // 2.通過工廠連接配接connection 和啟動
        Connection connection = activeMQConnectionFactory.createConnection();
        // 3.啟動
        connection.start();
        // 4.建立會話session
        //兩個參數,第一個事務,第二個簽收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 5.建立目的地,隊列、主題,這裡用隊列
        Queue queue = session.createQueue(QUEUE_NAME);
        // 6.建立消息的生産者
        MessageProducer messageProducer = session.createProducer(queue);

        /**
         * 持久化必須設定
         */
        messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

        // 7.通過MessageProducer生産3條消息發送到消息隊列中
        for (int i = 1; i <= 6; i++) {
            //8.建立消息
            TextMessage textMessage = session.createTextMessage("msg:" + LocalDateTime.now());
            //9.發送消息
            messageProducer.send(textMessage);
        }

        // 10.關閉資源
        messageProducer.close();
        session.close();
        connection.close();

        System.out.println(" **** 消息發送到MQ完成 **** ");
    }
}
           

生産6條消息:

activemq--可持久化機制之JDBC代碼

在資料庫

ACTIVEMQ_MSGS

表中,會生成6條資料,就是上一步生産的消息

activemq--可持久化機制之JDBC代碼

消費者

public class JmsConsumerJDBC {
    public static final String ACTIVEMQ_URL = "tcp://localhost:61616";

    public static final String USERNAME = "admin";

    public static final String PASSWORD = "hll123";

    public static final String QUEUE_NAME = "jdbc01";

    public static void main(String[] args) throws Exception {
        //建立連接配接工廠
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_URL);
        // 建立連接配接connection
        Connection connection = activeMQConnectionFactory.createConnection();
        //開啟連接配接
        connection.start();
        //建立會話session
        Session session = connection.createSession(false,  Session.AUTO_ACKNOWLEDGE);
        //建立隊列,同生産者一緻
        Queue queue = session.createQueue(QUEUE_NAME);
        //建立消息消費者
        MessageConsumer messageConsumer = session.createConsumer(queue);

        /**
         * 方法2:通過監聽器的方式
         */
        messageConsumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if (null != message && message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("**** 消費者接收到消息 ****:" + textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        System.in.read(); // 必須加行代碼,不然程式會直接往下執行結束了
        messageConsumer.close();
        session.close();
        connection.close();
        System.out.println("**** 消費者消費消息完成 ****");


    }
}
           

啟動消費者,會消息掉已生産的消息,mq控制台和資料庫資料都會消費

activemq--可持久化機制之JDBC代碼
activemq--可持久化機制之JDBC代碼

隊列消費總結:

  • 當DeliveryMode設定為NON_PERSISTENCE時,消息被儲存在記憶體中
  • 當DeliveryMode設定為PERSISTENCE時,消息儲存在broker的相應的檔案或者資料庫中

隊列中的消息一旦被consumer消費就從Broker中删除

主題

一定是先啟動消費者訂閱主題

消費者

public class JmsConsumerTopicJDBC {

    public static final String ACTIVEMQ_URL = "tcp://localhost:61616";

    public static final String USERNAME = "admin";

    public static final String PASSWORD = "hll123";

    public static final String TOPIC_NAME = "topic-jdbc";

    public static void main(String[] args) throws Exception {
        /**
         * 持久化主題消息訂閱,類似于微信公衆号訂閱
         * 需要先啟動消費者,訂閱上主題之後,後續生産主題消息,消費者(訂閱者)就會接收到消息
         * 消費者(訂閱者)訂閱主題之後,不管是線上還是離線狀态,隻要保持正常訂閱狀态,期間生産的消息都會接收到。離線的會在再次線上後接收到之前的消息
         */
        System.out.println("jdbc-1"); //模拟訂閱使用者
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.setClientID("jdbc-1"); // 設定clientId,表明訂閱者

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(TOPIC_NAME);
        TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "jdbc-1");
        connection.start();

        Message message = topicSubscriber.receive();
        while (null != message) {
            TextMessage textMessage = (TextMessage) message;
            System.out.println("收到的持久化topic消息:" + textMessage.getText());
            message = topicSubscriber.receive();
         }

        session.close();
        connection.close();
    }
}
           

啟動消費者:

activemq--可持久化機制之JDBC代碼
activemq--可持久化機制之JDBC代碼

檢視資料庫,

ACTIVEMQ_ACKS

表中新增一條記錄,為目前訂閱者的資訊

activemq--可持久化機制之JDBC代碼

生産者

public class JmsProduceTopicJDBC {
    public static final String ACTIVEMQ_URL = "tcp://localhost61616";

    public static final String USERNAME = "admin";

    public static final String PASSWORD = "hll123";

    public static final String TOPIC_NAME = "topic-jdbc";


    public static void main(String[] args) throws Exception {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(TOPIC_NAME);
        MessageProducer messageProducer = session.createProducer(topic);
        // connection啟動之前必須設定持久化主題
        messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

        connection.start();

        for (int i = 1; i <= 3; i++) {
            TextMessage textMessage = session.createTextMessage("jdbc-msg:" + i);
            messageProducer.send(textMessage);
        }

        messageProducer.close();
        session.close();
        connection.close();
        System.out.println(" **** 持久化消息發送到MQ完成 **** ");
    }
}
           

啟動生産者:

activemq--可持久化機制之JDBC代碼
activemq--可持久化機制之JDBC代碼

檢視資料庫:

ACTIVEMQ_MSGS

會新增消費的資料,

ACTIVEMQ_ACKS

的LAST_ACKED_ID會更新為最後消費消息的ID

ACTIVEMQ_MSGS

裡的topic消息在消費後是不會立刻删除的,而queue在消費後自動删除

activemq--可持久化機制之JDBC代碼
activemq--可持久化機制之JDBC代碼

小總結

  • queue

    生産的消息在沒有消費的情況下,消息會存在

    activemq_msgs

    表中,隻要任意一個消費者消費這些消息後,這些消息就會立即删除
  • topic

    一般是先啟動消費者訂閱之後,再通過生産者生産消息,之後消息也會存在

    activemq_msgs

    表中,

    activemq_acks

    表存的是消費者訂閱資訊
  • 開發注意事項

    1.mysql驅動包(或者其他資料庫)和對應的資料庫連接配接池jar包需要放到activemq目錄下的lib中

    2.初次配置完成,資料庫生成表之後,activemq.xml中配置

    createTablesOnStartup=false

    3.BeanFactory not initialized or already closed異常

    ​ 将作業系統的機器名帶有的"_"符号去掉,重新開機作業系統