消息的可靠性
分为持久化和非持久化
通过setDeliveryMode设置
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //非持久化:当服务器宕机,重启后消息会丢失
messageProducer.setDeliverMode(DeliveryMode.PERSISTENT); //持久化:当服务器宕机,重启后消息依然存在
持久queue
配置setDeliveryMode
持久topic
消费者
/**
* 持久化主题消息订阅,类似于微信公众号订阅
* 需要先启动消费者,订阅上主题之后,后续生产主题消息,消费者(订阅者)就会接收到消息
* 消费者(订阅者)订阅主题之后,不管是在线还是离线状态,只要保持正常订阅状态,期间生产的消息都会接收到。离线的会在再次在线后接收到之前的消息
*/
System.out.println("consumer-1"); //模拟订阅用户
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.setClientID("consumer-1"); // 设置clientId,表明订阅者
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "订阅用户:consumer-1");
connection.start();
Message message = topicSubscriber.receive();
while (null != message) {
TextMessage textMessage = (TextMessage) message;
System.out.println("收到的持久化topic消息:" + textMessage.getText());
message = topicSubscriber.receive();
}
session.close();
connection.close();
生产者
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer messageProducer = session.createProducer(topic);
// connection启动之前必须设置持久化主题
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
for (int i = 1; i <= 3; i++) {
TextMessage textMessage = session.createTextMessage("persist-msg:" + i);
messageProducer.send(textMessage);
}
messageProducer.close();
session.close();
connection.close();
System.out.println(" **** 持久化消息发送到MQ完成 **** ");