目錄
-
- 一、簡介
- 二、級目錄
-
- 2.1、生産者示例
- 2.2、消費者示例
- 2.3、測試
- 三、PTP 和 PUB/SUB 對比
一、簡介
1)消息生産者(釋出)将消息釋出到 topic 中,同時有多個消息消費者(訂閱)消費該消 息;
2)和點對點方式不同,釋出到 topic 的消息會被所有訂閱者消費;
3)當生産者釋出消息,不管是否有消費者,都不會儲存消息;
4)一定要先有消息的消費者,後有消息的生産者;
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHL9QzVhRTNXFGNsdVWv50MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnL5MzM0ITNxkDM1IzNwAjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
二、級目錄
2.1、生産者示例
package com.example.demo.activemq.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 生産者(訂閱釋出)
*/
public class TopicProducer {
public static void main(String[] args) throws Exception {
TopicProducer producer = new TopicProducer();
producer.sendTextMessage("這是一條測試消息");
}
public void sendTextMessage(String datas) throws Exception {
ConnectionFactory factory = new ActiveMQConnectionFactory("guest", "guest", "tcp://192.168.48.128:61616");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Destination destination = session.createTopic("test-topic-MQ");
MessageProducer producer = session.createProducer(destination);
Message message = session.createTextMessage(datas);
producer.send(message);
System.out.println("消息已發送");
producer.close();
session.close();
connection.close();
}
}
2.2、消費者示例
package com.example.demo.activemq.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 消費者(訂閱釋出)
*/
public class TopicConsumer {
public static void main(String[] args) throws Exception {
TopicConsumer consumer = new TopicConsumer();
String messageString = consumer.receiveTextMessage();
System.out.println("消息内容是: " + messageString);
}
public String receiveTextMessage() throws Exception {
ConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.48.128:61616");
Connection connection = factory.createConnection();
// 消費者必須啟動連接配接
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic("test-topic-MQ");
MessageConsumer consumer = session.createConsumer(destination);
Message message = consumer.receive();
String resultCode = ((TextMessage) message).getText();
consumer.close();
session.close();
connection.close();
return resultCode;
}
}
2.3、測試
1)隻執行生産者
建立了
test-topic-MQ
主題,并發送了1條消息,但是因為沒有訂閱者,是以消息未被消費
2)先執行消費者,再執行生産者
此時可以看到生産者釋出的1條消息,被訂閱者消費了
訂閱者收到的消息
09:49:34.031 [main] DEBUG org.apache.activemq.util.ThreadPoolUtils - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@13deb50e[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1]
消息内容是: 這是一條測試消息
三、PTP 和 PUB/SUB 對比
對比項 | Topic | Queue |
---|---|---|
概要 | Publish Subscribe messaging 釋出 訂閱消息 | Point-to-Point 點對點 |
有無狀态 | topic 資料預設不存儲,是無狀态 的 | Queue 資料預設會在 mq 服 務器上以檔案形式儲存,比如 Active MQ 一 般 保 存 在 $AMQ_HOME\data\kahadb 下 面,也可以配置成 DB 存儲 |
完整性保障 | 并不保證 publisher 釋出的每條數 據,Subscriber 都能接受到 | Queue 保證每條資料都能 被 receiver 接收,消息不逾時 |
消息是否會 丢失 | 一般來說 publisher 釋出消息到某 一個 topic 時,隻有正在監聽該 topic 位址的 sub 能夠接收到消息;如果沒 有 sub 在監聽,該 topic 就丢失了 | Sender 發 送 消 息 到 目 标 Queue,receiver 可以異步接收 這個 Queue 上的消息。Queue 上的消息如果暫時沒有 receiver 來取,也不會丢失,前提是消息 不逾時 |
消息釋出接 收政策 | 一對多的消息釋出接收政策,監 聽同一個 topic 位址的多個 sub 都能收 到 publisher 發送的消息。Sub 接收完 通知 mq 伺服器 | 一對一的消息釋出接收策 略,一個 sender 發送的消息, 隻 能 有 一 個 receiver 接 收 。 receiver 接收完後,通知 mq 服 務器已接收,mq 伺服器對queue 裡的消息采取删除或其他操作 |