天天看点

ActiveMQ Pub/Sub模型

目录

    • 一、简介
    • 二、级目录
      • 2.1、生产者示例
      • 2.2、消费者示例
      • 2.3、测试
    • 三、PTP 和 PUB/SUB 对比

一、简介

1)消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消 息;

2)和点对点方式不同,发布到 topic 的消息会被所有订阅者消费;

3)当生产者发布消息,不管是否有消费者,都不会保存消息;

4)一定要先有消息的消费者,后有消息的生产者;

ActiveMQ Pub/Sub模型

二、级目录

2.1、生产者示例

package com.example.demo.activemq.topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 生产者(订阅发布)
 */
public class TopicProducer {

    public static void main(String[] args) throws Exception {
        TopicProducer producer = new TopicProducer();
        producer.sendTextMessage("这是一条测试消息");
    }

    public void sendTextMessage(String datas) throws Exception {
        ConnectionFactory factory = new ActiveMQConnectionFactory("guest", "guest", "tcp://192.168.48.128:61616");
        Connection connection = factory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        Destination destination = session.createTopic("test-topic-MQ");
        MessageProducer producer = session.createProducer(destination);
        Message message = session.createTextMessage(datas);
        producer.send(message);
        System.out.println("消息已发送");
        producer.close();
        session.close();
        connection.close();
    }
}
           

2.2、消费者示例

package com.example.demo.activemq.topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 消费者(订阅发布)
 */
public class TopicConsumer {

    public static void main(String[] args) throws Exception {
        TopicConsumer consumer = new TopicConsumer();
        String messageString = consumer.receiveTextMessage();
        System.out.println("消息内容是: " + messageString);
    }

    public String receiveTextMessage() throws Exception {
        ConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.48.128:61616");
        Connection connection = factory.createConnection();
        // 消费者必须启动连接
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createTopic("test-topic-MQ");
        MessageConsumer consumer = session.createConsumer(destination);
        Message message = consumer.receive();
        String resultCode = ((TextMessage) message).getText();
        consumer.close();
        session.close();
        connection.close();
        return resultCode;
    }
}
           

2.3、测试

1)只执行生产者

创建了

test-topic-MQ

主题,并发送了1条消息,但是因为没有订阅者,所以消息未被消费

ActiveMQ Pub/Sub模型

2)先执行消费者,再执行生产者

此时可以看到生产者发布的1条消息,被订阅者消费了

ActiveMQ Pub/Sub模型

订阅者收到的消息

09:49:34.031 [main] DEBUG org.apache.activemq.util.ThreadPoolUtils - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@13deb50e[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1]
消息内容是: 这是一条测试消息
           

三、PTP 和 PUB/SUB 对比

对比项 Topic Queue
概要 Publish Subscribe messaging 发布 订阅消息 Point-to-Point 点对点
有无状态 topic 数据默认不存储,是无状态 的 Queue 数据默认会在 mq 服 务器上以文件形式保存,比如 Active MQ 一 般 保 存 在 $AMQ_HOME\data\kahadb 下 面,也可以配置成 DB 存储
完整性保障 并不保证 publisher 发布的每条数 据,Subscriber 都能接受到 Queue 保证每条数据都能 被 receiver 接收,消息不超时
消息是否会 丢失 一般来说 publisher 发布消息到某 一个 topic 时,只有正在监听该 topic 地址的 sub 能够接收到消息;如果没 有 sub 在监听,该 topic 就丢失了 Sender 发 送 消 息 到 目 标 Queue,receiver 可以异步接收 这个 Queue 上的消息。Queue 上的消息如果暂时没有 receiver 来取,也不会丢失,前提是消息 不超时
消息发布接 收策略 一对多的消息发布接收策略,监 听同一个 topic 地址的多个 sub 都能收 到 publisher 发送的消息。Sub 接收完 通知 mq 服务器 一对一的消息发布接收策 略,一个 sender 发送的消息, 只 能 有 一 个 receiver 接 收 。 receiver 接收完后,通知 mq 服 务器已接收,mq 服务器对queue 里的消息采取删除或其他操作