天天看點

ActiveMQ釋出訂閱模式Topic主題釋出和訂閱消息

Topic主題釋出和訂閱消息

前面講的案例都是點對點的消息,即一個生産者發送的一條消息隻能被一個消費者消費,然後就移除了

ActiveMQ釋出訂閱模式Topic主題釋出和訂閱消息

而topic模式一條消息可以被多個消費者訂閱,關系如下:

ActiveMQ釋出訂閱模式Topic主題釋出和訂閱消息

定義生産者

package com.dpb.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * ActiveMQ中的生産者(Producer)
 * @author dengp
 *
 */
public class MyProducer {

    public void sendhello2ActiveMq(String messageText) {
        TopicSession session = null;
        TopicConnection conn = null;
        try {
            TopicConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.88.121:61616");
            conn = factory.createTopicConnection();
            conn.start();
            session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
            // 建立消息隊列
            Topic topic = session.createTopic("test-topic");
            // 建立消息發送者
            TopicPublisher publisher = session.createPublisher(topic);
            // 設定持久化模式 NON_PERSISTENT不開啟  PERSISTENT 開啟 預設是開啟
            publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            MapMessage mapMessage = session.createMapMessage();
            mapMessage.setString("name", "波波烤鴨");
            mapMessage.setString("address", "深圳");
            publisher.send(mapMessage);
            
            // 送出會話
            session.commit();
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("通路ActiveMQ服務發生錯誤!!");
        } finally {
            
            try {
                // 回收會話資源
                if (null != session)
                    session.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
            try {
                // 回收連結資源
                if (null != conn)
                    conn.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}      

定義消費者

package com.dpb.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * ActiveMQ中的消費者(Consumer)
 * @author dengp
 *
 */
public class MyConsumer {

    public void reciveHelloFormActiveMq() {
            TopicSession session = null;
            TopicConnection conn = null;
            try {
                TopicConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.88.121:61616");
                conn = factory.createTopicConnection();
                conn.start();
                session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
                // 建立消息隊列
                Topic topic = session.createTopic("test-topic");
                // 建立消息接受者
                MessageConsumer consumer = session.createConsumer(topic);
                consumer.setMessageListener(new MessageListener() {
                    
                    @Override
                    public void onMessage(Message msg) {
                        if (msg != null) {
                            MapMessage map = (MapMessage) msg;
                            try {
                               System.out.println(map.getString("name") + "接收#" + map.getString("address"));
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                });
                 // 休眠100s再關閉
                Thread.sleep(1000 * 100); 
                // 送出會話
                session.commit();
            } catch (Exception e) {
                e.printStackTrace();
                System.out.println("通路ActiveMQ服務發生錯誤!!");
            } finally {
                try {
                    // 回收會話資源
                    if (null != session)
                        session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
                try {
                    // 回收連結資源
                    if (null != conn)
                        conn.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
    }
}      

測試

先啟動消費者,可以開啟多個

public static void main(String[] args) {
    MyConsumer con = new MyConsumer();
    con.reciveHelloFormActiveMq();
}      
ActiveMQ釋出訂閱模式Topic主題釋出和訂閱消息
ActiveMQ釋出訂閱模式Topic主題釋出和訂閱消息

啟動生産者

public static void main(String[] args) {
    MyProducer pro = new MyProducer();
    pro.sendhello2ActiveMq("你好啊...topic");
}      
ActiveMQ釋出訂閱模式Topic主題釋出和訂閱消息
ActiveMQ釋出訂閱模式Topic主題釋出和訂閱消息

好了本文介紹到此,下篇介紹ActiveMQ和Spring的整合