天天看点

activemq--消息持久化与非持久化

消息的可靠性

分为持久化和非持久化

通过setDeliveryMode设置

messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //非持久化:当服务器宕机,重启后消息会丢失

messageProducer.setDeliverMode(DeliveryMode.PERSISTENT); //持久化:当服务器宕机,重启后消息依然存在
           
持久queue

配置setDeliveryMode

持久topic

消费者

/**
         * 持久化主题消息订阅,类似于微信公众号订阅
         * 需要先启动消费者,订阅上主题之后,后续生产主题消息,消费者(订阅者)就会接收到消息
         * 消费者(订阅者)订阅主题之后,不管是在线还是离线状态,只要保持正常订阅状态,期间生产的消息都会接收到。离线的会在再次在线后接收到之前的消息
         */
        System.out.println("consumer-1"); //模拟订阅用户
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.setClientID("consumer-1"); // 设置clientId,表明订阅者

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(TOPIC_NAME);
        TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "订阅用户:consumer-1");
        connection.start();

        Message message = topicSubscriber.receive();
        while (null != message) {
            TextMessage textMessage = (TextMessage) message;
            System.out.println("收到的持久化topic消息:" + textMessage.getText());
            message = topicSubscriber.receive();
         }

        session.close();
        connection.close();
           

生产者

ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(TOPIC_NAME);
        MessageProducer messageProducer = session.createProducer(topic);
        // connection启动之前必须设置持久化主题
        messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

        connection.start();

        for (int i = 1; i <= 3; i++) {
            TextMessage textMessage = session.createTextMessage("persist-msg:" + i);
            messageProducer.send(textMessage);
        }

        messageProducer.close();
        session.close();
        connection.close();
        System.out.println(" **** 持久化消息发送到MQ完成 **** ");