Topic主題釋出和訂閱消息
前面講的案例都是點對點的消息,即一個生産者發送的一條消息隻能被一個消費者消費,然後就移除了
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLicmbw5iY1QjZlRGZ5QzY4ETOwQGOlRGN0gTY5IDOxkTYzQDZz8CX5d2bs92Yl1iclB3bsVmdlR2LcNWaw9CXt92Yu4GZjlGbh5yYjV3Lc9CX6MHc0RHaiojIsJye.png)
而topic模式一條消息可以被多個消費者訂閱,關系如下:
定義生産者
package com.dpb.topic;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* ActiveMQ中的生産者(Producer)
* @author dengp
*
*/
public class MyProducer {
public void sendhello2ActiveMq(String messageText) {
TopicSession session = null;
TopicConnection conn = null;
try {
TopicConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.88.121:61616");
conn = factory.createTopicConnection();
conn.start();
session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
// 建立消息隊列
Topic topic = session.createTopic("test-topic");
// 建立消息發送者
TopicPublisher publisher = session.createPublisher(topic);
// 設定持久化模式 NON_PERSISTENT不開啟 PERSISTENT 開啟 預設是開啟
publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("name", "波波烤鴨");
mapMessage.setString("address", "深圳");
publisher.send(mapMessage);
// 送出會話
session.commit();
} catch (Exception e) {
e.printStackTrace();
System.out.println("通路ActiveMQ服務發生錯誤!!");
} finally {
try {
// 回收會話資源
if (null != session)
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
try {
// 回收連結資源
if (null != conn)
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
定義消費者
package com.dpb.topic;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* ActiveMQ中的消費者(Consumer)
* @author dengp
*
*/
public class MyConsumer {
public void reciveHelloFormActiveMq() {
TopicSession session = null;
TopicConnection conn = null;
try {
TopicConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.88.121:61616");
conn = factory.createTopicConnection();
conn.start();
session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
// 建立消息隊列
Topic topic = session.createTopic("test-topic");
// 建立消息接受者
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message msg) {
if (msg != null) {
MapMessage map = (MapMessage) msg;
try {
System.out.println(map.getString("name") + "接收#" + map.getString("address"));
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
// 休眠100s再關閉
Thread.sleep(1000 * 100);
// 送出會話
session.commit();
} catch (Exception e) {
e.printStackTrace();
System.out.println("通路ActiveMQ服務發生錯誤!!");
} finally {
try {
// 回收會話資源
if (null != session)
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
try {
// 回收連結資源
if (null != conn)
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
測試
先啟動消費者,可以開啟多個
public static void main(String[] args) {
MyConsumer con = new MyConsumer();
con.reciveHelloFormActiveMq();
}
啟動生産者
public static void main(String[] args) {
MyProducer pro = new MyProducer();
pro.sendhello2ActiveMq("你好啊...topic");
}
好了本文介紹到此,下篇介紹ActiveMQ和Spring的整合