天天看點

釋出-訂閱模式 java_ActiveMQ入門系列三:釋出/訂閱模式

在上一篇《ActiveMQ入門系列二:入門代碼執行個體(點對點模式)》中提到了ActiveMQ中的兩種模式:點對點模式(PTP)和釋出/訂閱模式(Pub & Sub),詳細介紹了點對點模式并用代碼執行個體進行說明,今天就介紹下釋出/訂閱模式。

一、理論基礎

釋出/訂閱模式的工作示意圖:

釋出-訂閱模式 java_ActiveMQ入門系列三:釋出/訂閱模式

消息生産者将消息(釋出)到topic中,可以同時有多個消息消費者(訂閱)消費該消息。

和點對點方式不同,釋出到topic的消息會被所有訂閱者消費。

當生産者釋出消息,不管是否有消費者,都不會儲存消息。

一定要先有消息的消費者,後有消息的生産者。

二、代碼實作

生産者

packagecom.sam.topic;importorg.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;

public classTopicProducer {public static final String QUEUE_NAME = "topic-demo";//隊列名

public void producer(String message) throwsJMSException {

ConnectionFactory factory= null;

Connection connection= null;

Session session= null;

MessageProducer producer= null;try{factory= new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616");connection=factory.createConnection();connection.start();session= connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Destination destination=session.createTopic(QUEUE_NAME);

producer=session.createProducer(destination);

TextMessage textMessage=session.createTextMessage(message);producer.send(textMessage);

System.out.println("消息發送成功");

}catch(Exception ex){throwex;

}finally{

if(producer != null){

producer.close();

}if(session != null){

session.close();

}if(connection != null){

connection.close();

}

}

}public static voidmain(String[] args){

TopicProducer producer= newTopicProducer();try{

producer.producer("hello, activemq");

}catch(Exception ex){

ex.printStackTrace();

}

}

}

釋出/訂閱模式的生産者和點對點模式的代碼主要差別就是Destination的建立方式,點對點模式是調用session.createQueue(QUEUE_NAME),而釋出/訂閱模式是調用session.createTopic(QUEUE_NAME)。

消費者

packagecom.sam.topic;importorg.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;importjava.io.IOException;

public classTopicConsumer {public void consumer() throwsJMSException, IOException {

ConnectionFactory factory= null;

Connection connection= null;

Session session= null;

MessageConsumer consumer= null;try{

factory= new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616");

connection=factory.createConnection();connection.start();

session= connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Destination destination=session.createTopic(TopicProducer.QUEUE_NAME);

consumer=session.createConsumer(destination);consumer.setMessageListener(newMessageListener() {

@Overridepublic voidonMessage(Message message) {try{

TextMessage om=(TextMessage) message;

String data=om.getText();

System.out.println(data);

}catch(JMSException e) {

e.printStackTrace();

}

}

});

}catch(Exception ex){throwex;

}

}public static voidmain(String[] args){

TopicConsumer consumer= newTopicConsumer();try{

consumer.consumer();

}catch(Exception ex){

ex.printStackTrace();

}

}

}

消費者在點對點監聽消費的基礎上進行變化,主要差別有兩個:1.同生産者一樣,也是Destination的建立方式不同; 2.消息無需手動确認,直接采用自動确認機制

代碼寫完了,接下來進行測試,由于subscribe可以有多個,而且每個都可以消費到相同的消息,是以我們消費者啟動兩個。

先執行生産者

釋出-訂閱模式 java_ActiveMQ入門系列三:釋出/訂閱模式

在控制台頁面的Topics下出現了我定義的topic并且有1條消息發送成功且未消費

釋出-訂閱模式 java_ActiveMQ入門系列三:釋出/訂閱模式

然後執行兩個消費者,兩個消費者都沒有消費到任何消息

釋出-訂閱模式 java_ActiveMQ入門系列三:釋出/訂閱模式
釋出-訂閱模式 java_ActiveMQ入門系列三:釋出/訂閱模式

并且,控制台頁面隻是多了2個消費者,已經消費的消息還是0

釋出-訂閱模式 java_ActiveMQ入門系列三:釋出/訂閱模式

為什麼呢?還記得前面的理論基礎說的嗎?就是這個原因

釋出-訂閱模式 java_ActiveMQ入門系列三:釋出/訂閱模式

繼續,我們在兩個消費者啟動好的前提下,再執行生産者, 這個時候會發現兩個消費者都消費了該消息

釋出-訂閱模式 java_ActiveMQ入門系列三:釋出/訂閱模式
釋出-訂閱模式 java_ActiveMQ入門系列三:釋出/訂閱模式
釋出-訂閱模式 java_ActiveMQ入門系列三:釋出/訂閱模式

再看下控制台頁面

釋出-訂閱模式 java_ActiveMQ入門系列三:釋出/訂閱模式

已消費消息這裡是2,這個2并不是說之前發的兩個消息都消費了,而是說第二個消息消費了2次, 1 * 2 = 2

不信的話,可以再執行一遍生産者,這個時候就是4,而不是3

釋出-訂閱模式 java_ActiveMQ入門系列三:釋出/訂閱模式

累計發送過3條消息,消息消費了4次,這裡的4就是後面兩條分别被消費了2次, 2 * 2 = 4

三、兩種模式比較

釋出-訂閱模式 java_ActiveMQ入門系列三:釋出/訂閱模式

好,到這裡,釋出/訂閱模式就介紹完了。

如果有收獲,就點個贊呗