天天看點

ActiveMQ之釋出- 訂閱消息模式實作一、概念二、案例

一、概念

  • 釋出者/訂閱者模型支援向一個特定的消息主題釋出消息。0或多個訂閱者可能對接收來自特定消息主題的消息感興趣。在這種模型下,釋出者和訂閱者彼此不知道對方。這種模式好比是匿名公告闆。這種模式被概括為:多個消費者可以獲得消息
  • 在釋出者和訂閱者之間存在時間依賴性。釋出者需要建立一個訂閱(subscription),以便客戶能夠訂閱。訂閱者必須保持持續的活動狀态以接收消息,除非訂閱者建立了持久的訂閱。在那種情況下,在訂閱者未連接配接時釋出的消息将在訂閱者重新連接配接時重新釋出。

二、案例

  2.1  消息生産者-消息釋出者

package com.shyroke.firstActiveMQ2;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息生産者
 * @author Administrator
 *
 */
public class JMSProducer {

    private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 預設的連接配接使用者名
    private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 預設的連接配接密碼
    private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 預設的連接配接位址
    private static final int SENDNUM=10; // 發送的消息數量
    
    public static void main(String[] args) {
        
        ConnectionFactory connectionFactory; // 連接配接工廠
        Connection connection = null; // 連接配接
        Session session; // 會話 接受或者發送消息的線程
        Destination destination; // 消息的目的地
        MessageProducer messageProducer; // 消息生産者
        
        // 執行個體化連接配接工廠
        connectionFactory=new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);
        
        try {
            connection=connectionFactory.createConnection(); // 通過連接配接工廠擷取連接配接
            connection.start(); // 啟動連接配接
            session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 建立Session
//            destination=session.createQueue("FirstQueue1"); // 建立消息隊列
            destination=session.createTopic("firstTopic");
            messageProducer=session.createProducer(destination); // 建立消息生産者
            sendMessage(session, messageProducer); // 發送消息
            session.commit();
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally{
            if(connection!=null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
    }
    
    /**
     * 發送消息
     * @param session
     * @param messageProducer
     * @throws Exception
     */
    public static void sendMessage(Session session,MessageProducer messageProducer)throws Exception{
        for(int i=0;i<JMSProducer.SENDNUM;i++){
            TextMessage message=session.createTextMessage("ActiveMQ 釋出的消息"+i);
            System.out.println("發送消息:"+"ActiveMQ 釋出的消息"+i);
            messageProducer.send(message);
        }
    }
}
           

  2.2  消息消費者-消息訂閱者一

package com.shyroke.firstActiveMQ2;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSConsumer {
    private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 預設的連接配接使用者名
    private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 預設的連接配接密碼
    private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 預設的連接配接位址
    
    public static void main(String[] args) {
        
        ConnectionFactory connectionFactory; // 連接配接工廠
        Connection connection = null; // 連接配接
        Session session; // 會話 接受或者發送消息的線程
        Destination destination; // 消息的目的地
        MessageConsumer consumer; //建立消費者
        
        // 執行個體化連接配接工廠
        connectionFactory=new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);
        
        try {
            connection=connectionFactory.createConnection(); // 通過連接配接工廠擷取連接配接
            connection.start(); // 啟動連接配接
            /**
             * 這裡的最好使用Boolean.FALSE,如果是用true則必須commit才能生效,且http://127.0.0.1:8161/admin管理頁面才會更新消息隊列的變化情況。
             */
            session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 建立Session
//            destination=session.createQueue("FirstQueue1"); // 建立消息隊列
            destination=session.createTopic("firstTopic");
            consumer=session.createConsumer(destination);
            consumer.setMessageListener(new MyListener()); // 注冊消息監聽
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } 
    }
    

    
}
           
  • 監聽器1

package com.shyroke.firstActiveMQ2;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class MyListener implements MessageListener {

    public void onMessage(Message message) {
        try {
            System.out.println("訂閱者一收到的消息:"+((TextMessage)message).getText());
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}
           

  2.3  消息消費者-消息訂閱者二

package com.shyroke.firstActiveMQ2;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSConsumer2 {
    private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 預設的連接配接使用者名
    private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 預設的連接配接密碼
    private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 預設的連接配接位址
    
    public static void main(String[] args) {
        
        ConnectionFactory connectionFactory; // 連接配接工廠
        Connection connection = null; // 連接配接
        Session session; // 會話 接受或者發送消息的線程
        Destination destination; // 消息的目的地
        MessageConsumer consumer; //建立消費者
        
        // 執行個體化連接配接工廠
        connectionFactory=new ActiveMQConnectionFactory(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD, JMSConsumer2.BROKEURL);
        
        try {
            connection=connectionFactory.createConnection(); // 通過連接配接工廠擷取連接配接
            connection.start(); // 啟動連接配接
            /**
             * 這裡的最好使用Boolean.FALSE,如果是用true則必須commit才能生效,且http://127.0.0.1:8161/admin管理頁面才會更新消息隊列的變化情況。
             */
            session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 建立Session
//            destination=session.createQueue("FirstQueue1"); // 建立消息隊列
            destination=session.createTopic("firstTopic");
            consumer=session.createConsumer(destination);
            consumer.setMessageListener(new MyListener2()); // 注冊消息監聽
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } 
    }
}
           
  • 監聽器2

package com.shyroke.firstActiveMQ2;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class MyListener implements MessageListener {

    public void onMessage(Message message) {
        try {
            System.out.println("訂閱者一收到的消息:"+((TextMessage)message).getText());
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}
           
  • 測試以及結果

釋出- 訂閱消息模式首先必須訂閱者先訂閱服務,然後釋出者再釋出消息,最後訂閱者受到服務消息。

是以本章我們先執行消息訂閱者一和消息訂閱者二的代碼然後檢視ActiveMQ的管理界面:

ActiveMQ之釋出- 訂閱消息模式實作一、概念二、案例

如圖可見,訂閱者已經注冊成功,然後再釋出者釋出消息:

ActiveMQ之釋出- 訂閱消息模式實作一、概念二、案例
ActiveMQ之釋出- 訂閱消息模式實作一、概念二、案例
ActiveMQ之釋出- 訂閱消息模式實作一、概念二、案例
ActiveMQ之釋出- 訂閱消息模式實作一、概念二、案例