在上一篇《ActiveMQ入門系列二:入門代碼執行個體(點對點模式)》中提到了ActiveMQ中的兩種模式:點對點模式(PTP)和釋出/訂閱模式(Pub & Sub),詳細介紹了點對點模式并用代碼執行個體進行說明,今天就介紹下釋出/訂閱模式。
一、理論基礎
釋出/訂閱模式的工作示意圖:
消息生産者将消息(釋出)到topic中,可以同時有多個消息消費者(訂閱)消費該消息。
和點對點方式不同,釋出到topic的消息會被所有訂閱者消費。
當生産者釋出消息,不管是否有消費者,都不會儲存消息。
一定要先有消息的消費者,後有消息的生産者。
二、代碼實作
生産者
packagecom.sam.topic;importorg.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;
public classTopicProducer {public static final String QUEUE_NAME = "topic-demo";//隊列名
public void producer(String message) throwsJMSException {
ConnectionFactory factory= null;
Connection connection= null;
Session session= null;
MessageProducer producer= null;try{factory= new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616");connection=factory.createConnection();connection.start();session= connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Destination destination=session.createTopic(QUEUE_NAME);
producer=session.createProducer(destination);
TextMessage textMessage=session.createTextMessage(message);producer.send(textMessage);
System.out.println("消息發送成功");
}catch(Exception ex){throwex;
}finally{
if(producer != null){
producer.close();
}if(session != null){
session.close();
}if(connection != null){
connection.close();
}
}
}public static voidmain(String[] args){
TopicProducer producer= newTopicProducer();try{
producer.producer("hello, activemq");
}catch(Exception ex){
ex.printStackTrace();
}
}
}
釋出/訂閱模式的生産者和點對點模式的代碼主要差別就是Destination的建立方式,點對點模式是調用session.createQueue(QUEUE_NAME),而釋出/訂閱模式是調用session.createTopic(QUEUE_NAME)。
消費者
packagecom.sam.topic;importorg.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;importjava.io.IOException;
public classTopicConsumer {public void consumer() throwsJMSException, IOException {
ConnectionFactory factory= null;
Connection connection= null;
Session session= null;
MessageConsumer consumer= null;try{
factory= new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616");
connection=factory.createConnection();connection.start();
session= connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination=session.createTopic(TopicProducer.QUEUE_NAME);
consumer=session.createConsumer(destination);consumer.setMessageListener(newMessageListener() {
@Overridepublic voidonMessage(Message message) {try{
TextMessage om=(TextMessage) message;
String data=om.getText();
System.out.println(data);
}catch(JMSException e) {
e.printStackTrace();
}
}
});
}catch(Exception ex){throwex;
}
}public static voidmain(String[] args){
TopicConsumer consumer= newTopicConsumer();try{
consumer.consumer();
}catch(Exception ex){
ex.printStackTrace();
}
}
}
消費者在點對點監聽消費的基礎上進行變化,主要差別有兩個:1.同生産者一樣,也是Destination的建立方式不同; 2.消息無需手動确認,直接采用自動确認機制
代碼寫完了,接下來進行測試,由于subscribe可以有多個,而且每個都可以消費到相同的消息,是以我們消費者啟動兩個。
先執行生産者
在控制台頁面的Topics下出現了我定義的topic并且有1條消息發送成功且未消費
然後執行兩個消費者,兩個消費者都沒有消費到任何消息
并且,控制台頁面隻是多了2個消費者,已經消費的消息還是0
為什麼呢?還記得前面的理論基礎說的嗎?就是這個原因
繼續,我們在兩個消費者啟動好的前提下,再執行生産者, 這個時候會發現兩個消費者都消費了該消息
再看下控制台頁面
已消費消息這裡是2,這個2并不是說之前發的兩個消息都消費了,而是說第二個消息消費了2次, 1 * 2 = 2
不信的話,可以再執行一遍生産者,這個時候就是4,而不是3
累計發送過3條消息,消息消費了4次,這裡的4就是後面兩條分别被消費了2次, 2 * 2 = 4
三、兩種模式比較
好,到這裡,釋出/訂閱模式就介紹完了。
如果有收獲,就點個贊呗