天天看點

ActiveMQ Pub/Sub模型

目錄

    • 一、簡介
    • 二、級目錄
      • 2.1、生産者示例
      • 2.2、消費者示例
      • 2.3、測試
    • 三、PTP 和 PUB/SUB 對比

一、簡介

1)消息生産者(釋出)将消息釋出到 topic 中,同時有多個消息消費者(訂閱)消費該消 息;

2)和點對點方式不同,釋出到 topic 的消息會被所有訂閱者消費;

3)當生産者釋出消息,不管是否有消費者,都不會儲存消息;

4)一定要先有消息的消費者,後有消息的生産者;

ActiveMQ Pub/Sub模型

二、級目錄

2.1、生産者示例

package com.example.demo.activemq.topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 生産者(訂閱釋出)
 */
public class TopicProducer {

    public static void main(String[] args) throws Exception {
        TopicProducer producer = new TopicProducer();
        producer.sendTextMessage("這是一條測試消息");
    }

    public void sendTextMessage(String datas) throws Exception {
        ConnectionFactory factory = new ActiveMQConnectionFactory("guest", "guest", "tcp://192.168.48.128:61616");
        Connection connection = factory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
        Destination destination = session.createTopic("test-topic-MQ");
        MessageProducer producer = session.createProducer(destination);
        Message message = session.createTextMessage(datas);
        producer.send(message);
        System.out.println("消息已發送");
        producer.close();
        session.close();
        connection.close();
    }
}
           

2.2、消費者示例

package com.example.demo.activemq.topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 消費者(訂閱釋出)
 */
public class TopicConsumer {

    public static void main(String[] args) throws Exception {
        TopicConsumer consumer = new TopicConsumer();
        String messageString = consumer.receiveTextMessage();
        System.out.println("消息内容是: " + messageString);
    }

    public String receiveTextMessage() throws Exception {
        ConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.48.128:61616");
        Connection connection = factory.createConnection();
        // 消費者必須啟動連接配接
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createTopic("test-topic-MQ");
        MessageConsumer consumer = session.createConsumer(destination);
        Message message = consumer.receive();
        String resultCode = ((TextMessage) message).getText();
        consumer.close();
        session.close();
        connection.close();
        return resultCode;
    }
}
           

2.3、測試

1)隻執行生産者

建立了

test-topic-MQ

主題,并發送了1條消息,但是因為沒有訂閱者,是以消息未被消費

ActiveMQ Pub/Sub模型

2)先執行消費者,再執行生産者

此時可以看到生産者釋出的1條消息,被訂閱者消費了

ActiveMQ Pub/Sub模型

訂閱者收到的消息

09:49:34.031 [main] DEBUG org.apache.activemq.util.ThreadPoolUtils - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@13deb50e[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1]
消息内容是: 這是一條測試消息
           

三、PTP 和 PUB/SUB 對比

對比項 Topic Queue
概要 Publish Subscribe messaging 釋出 訂閱消息 Point-to-Point 點對點
有無狀态 topic 資料預設不存儲,是無狀态 的 Queue 資料預設會在 mq 服 務器上以檔案形式儲存,比如 Active MQ 一 般 保 存 在 $AMQ_HOME\data\kahadb 下 面,也可以配置成 DB 存儲
完整性保障 并不保證 publisher 釋出的每條數 據,Subscriber 都能接受到 Queue 保證每條資料都能 被 receiver 接收,消息不逾時
消息是否會 丢失 一般來說 publisher 釋出消息到某 一個 topic 時,隻有正在監聽該 topic 位址的 sub 能夠接收到消息;如果沒 有 sub 在監聽,該 topic 就丢失了 Sender 發 送 消 息 到 目 标 Queue,receiver 可以異步接收 這個 Queue 上的消息。Queue 上的消息如果暫時沒有 receiver 來取,也不會丢失,前提是消息 不逾時
消息釋出接 收政策 一對多的消息釋出接收政策,監 聽同一個 topic 位址的多個 sub 都能收 到 publisher 發送的消息。Sub 接收完 通知 mq 伺服器 一對一的消息釋出接收策 略,一個 sender 發送的消息, 隻 能 有 一 個 receiver 接 收 。 receiver 接收完後,通知 mq 服 務器已接收,mq 伺服器對queue 裡的消息采取删除或其他操作