天天看點

ActiveMQ 筆記(二)部署和DEMO(隊列、主題)

個人部落格網:https://wushaopei.github.io/    (你想要這裡多有)

一、部署操作

1. 部署在linux 上的acvtiveMQ 要可以通過前台windows 的頁面通路,必須把linux 的IP和 windows的 IP 位址配置到同一個網關下 。這種情況一般都是修改 linux 的IP 位址,修改網卡檔案對應的IP 位址

修改linux 的ip 位址:

cd   /etc/sysconfig/network-scripts

vi  ifcfg-eth0            
ActiveMQ 筆記(二)部署和DEMO(隊列、主題)

這是修改之後的網卡檔案配置,IP 位址為:192.168.17.3

配置成功後 ,可以用 windows ping linux , linux ping windows ,當全部ping 通後,可以使用圖形化界面通路activeMQ

// ActiveMQ 的前台端口為 8161 , 提供控制台服務 背景端口為61616 ,提供 JMS 服務

            ​

// 192.168.17.3 為 linux 的IP 位址, 使用 IP+端口 通路了ActiveMQ , 登陸之後的樣子如上。(能通路成功首先得在linux 上啟動activeMQ 的服務),首次登入的預設賬戶密碼為 賬号:admin 密碼:admin  ,預設端口号:8161

2、JMS

Java 消息中間件的服務接口規範,activemq 之上是 mq , 而 mq 之上是JMS 定義的消息規範 。 activemq 是mq 技術的一種理論實作(與之相類似的實作還有 Kafka RabbitMQ RockitMQ ),而 JMS 是更上一級的規範。

      ​

JMS 的兩種模式:

在點對點的消息傳遞時,目的地稱為 隊列 queue

在釋出訂閱消息傳遞中,目的地稱為 主題 topic           
ActiveMQ 筆記(二)部署和DEMO(隊列、主題)

類比JDBC編碼套路:

第一步:注冊驅動(僅僅隻做一次)
Class.forName("com.mysql.jdbc.com");
第二步:建立連接配接(Connection)
DriverManager.getConnection(url,user,password);
第三步:建立運作SQL語句(Statement)
connection.createStatement();
第四步:運作語句
rs.executeQuery(sql);
第五步:處理結果集(ResultSet)
第六步:釋放資源
           

3、工程建立與配置

  • IDEA建立Maven工程
  • 配置POM.xml檔案

pom.xml 依賴:

<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.11</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.xbean/xbean-spring -->
        <dependency>
            <groupId>org.apache.xbean</groupId>
            <artifactId>xbean-spring</artifactId>
            <version>4.15</version>
        </dependency>           

4、隊列模式與案例講解

在點對點的消息傳遞域中,目的地被稱為隊列(queue)           

點對點消息傳遞域的特點如下:

  • 每個消息隻能有一個消費者,類似于1對1的關系。好比個人快遞自己領自己的。
  • 消息的生産者和消費者之間沒有時間上的相關性。無論消費者在生産者發送消息的時候是否處于運作狀态,消費者都可以提取消息。好比我們的發送短信,發送者發送後不見得接收者會即收即看。
  • 消息被消費後隊列中不會再存儲,是以消費者不會消費到已經被消費掉的消息。

         ​

(1)demo 隊列的消費生産者

package com.demo.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProduce {
    private static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
    private static final String QUEUE_NAME = "queue01";
    public static void main(String[] args) throws JMSException {
        //1.建立連接配接工廠,按照給定的URL,采用預設的使用者名密碼
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2.通過連接配接工廠,獲得connection并啟動通路
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3.建立會話session
        //兩個參數transacted=事務,acknowledgeMode=确認模式(簽收)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.建立目的地(具體是隊列queue還是主題topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        //5.建立消息的生産者
        MessageProducer messageProducer = session.createProducer(queue);
        //6.通過使用消息生産者,生産三條消息,發送到MQ的隊列裡面
        for (int i = 0; i < 3; i++) {
            //7.建立消息
            TextMessage textMessage = session.createTextMessage("msg---hello" + i);//了解為一個字元串
            //8.通過messageProducer發送給MQ隊列
            messageProducer.send(textMessage);
        }
        //9.關閉資源
        messageProducer.close();
        session.close();           
ActiveMQ 筆記(二)部署和DEMO(隊列、主題)

以及在頁面上的顯示:

ActiveMQ 筆記(二)部署和DEMO(隊列、主題)

控制說明:

Number Of Pending Messages=等待消費的消息,這個是未出隊列的數量,公式=總接收數-總出隊列數。
Number Of Consumers=消費者數量,消費者端的消費者數量。
Messages Enqueued=進隊消息數,進隊列的總消息量,包括出隊列的。這個數隻增不減。
Messages Dequeued=出隊消息數,可以了解為是消費者消費掉的數量。
總結:
當有一個消息進入這個隊列時,等待消費的消息是1,進入隊列的消息是1。
當消息消費後,等待消費的消息是0,進入隊列的消息是1,出隊列的消息是1。           

(2)與之相對應的消息消費者(處理消息的系統)代碼及運作

① 阻塞式消費者

package com.demo.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
 * 簡單消息消費者
 */
public class JmsConsumer {
    private static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
    private static final String QUEUE_NAME = "queue01";
    public static void main(String[] args) throws JMSException {
        //1.建立連接配接工廠,按照給定的URL,采用預設的使用者名密碼
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2.通過連接配接工廠,獲得connection并啟動通路
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3.建立會話session
        //兩個參數transacted=事務,acknowledgeMode=确認模式(簽收)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.建立目的地(具體是隊列queue還是主題topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        //5.建立消息的消費者,指定消費哪一個隊列裡面的消息
        MessageConsumer messageConsumer = session.createConsumer(queue);
        //循環擷取
        while (true) {
            //6.通過消費者調用方法擷取隊列裡面的消息(發送的消息是什麼類型,接收的時候就強轉成什麼類型)
            TextMessage textMessage = (TextMessage) messageConsumer.receive();
            if (textMessage != null) {
                System.out.println("****消費者接收到的消息:  " + textMessage.getText());
            }else {
                break;
            }
        }
        //7.關閉資源
        messageConsumer.close();
        session.close();
        connection.close();
    }
}
           

②異步監聽式消費者

package com.demo.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
/**
 * 監聽模式下的消費者
 */
public class JmsConsumer2 {
    private static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
    private static final String QUEUE_NAME = "queue01";
    public static void main(String[] args) throws JMSException, IOException {
        //1.建立連接配接工廠,按照給定的URL,采用預設的使用者名密碼
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2.通過連接配接工廠,獲得connection并啟動通路
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3.建立會話session
        //兩個參數transacted=事務,acknowledgeMode=确認模式(簽收)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.建立目的地(具體是隊列queue還是主題topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        //5.建立消息的消費者,指定消費哪一個隊列裡面的消息
        MessageConsumer messageConsumer = session.createConsumer(queue);
        //6.通過監聽的方式消費消息
        /*
        異步非阻塞式方式監聽器(onMessage)
        訂閱者或消費者通過建立的消費者對象,給消費者注冊消息監聽器setMessageListener,
        當消息有消息的時候,系統會自動調用MessageListener類的onMessage方法
        我們隻需要在onMessage方法内判斷消息類型即可擷取消息
         */
        messageConsumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if (message != null && message instanceof TextMessage) {
                    //7.把message轉換成消息發送前的類型并擷取消息内容
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("****消費者接收到的消息:  " + textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        System.out.println("執行了39行");
        //保證控制台不關閉,阻止程式關閉
        System.in.read();
        //關閉資源
        messageConsumer.close();
        session.close();
        connection.close();
    }
}
           

控制台顯示結果:

ActiveMQ 筆記(二)部署和DEMO(隊列、主題)

(3)JMS開發的基本步驟:

ActiveMQ 筆記(二)部署和DEMO(隊列、主題)

(4)兩種消費方式的比較:

  • 同步阻塞方式(receive):訂閱者或接收者抵用MessageConsumer的receive()方法來接收消息,receive方法在能接收到消息之前(或逾時之前)将一直阻塞。
  •  異步非阻塞方式(監聽器onMessage()):訂閱者或接收者通過MessageConsumer的setMessageListener(MessageListener listener)注冊一個消息監聽器,當消息到達之後,系統會自動調用監聽器MessageListener的onMessage(Message message)方法。

5、主題模式與案例講解

在釋出訂閱消息傳遞域中,目的地被稱為主題(topic)           

釋出/訂閱消息傳遞域的特點如下:

  • 生産者将消息釋出到topic中,每個消息可以有多個消費者,屬于1:N的關系;
  • 生産者和消費者之間有時間上的相關性。訂閱某一個主題的消費者隻能消費自它訂閱之後釋出的消息。
  • 生産者生産時,topic不儲存消息它是無狀态的不落地,假如無人訂閱就去生産,那就是一條廢消息,是以,一般先啟動消費者再啟動生産者。

JMS規範允許客戶建立持久訂閱,這在一定程度上放松了時間上的相關性要求。持久訂閱允許消費者消費它在未處于激活狀态時發送的消息。一句話,好比我們的微信公衆号訂閱

ActiveMQ 筆記(二)部署和DEMO(隊列、主題)

(1)釋出主題生産者

package com.demo.activemq.topic; 
import org.apache.activemq.ActiveMQConnectionFactory; 
import javax.jms.*; 
public class JmsProducer_Topic {
    public static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
    public static final String TOPIC_NAME = "topic01";
 
    public static void main(String[] args) throws JMSException {
 
        //1.建立連接配接工廠,按照給定的URL,采用預設的使用者名密碼
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2.通過連接配接工廠,獲得connection并啟動通路
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3.建立會話session
        //兩個參數transacted=事務,acknowledgeMode=确認模式(簽收)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.建立目的地(具體是隊列queue還是主題topic)
        Topic topic = session.createTopic(TOPIC_NAME);
        //5.建立消息的生産者
        MessageProducer messageProducer = session.createProducer(topic);
        //6.通過使用消息生産者,生産三條消息,發送到MQ的隊列裡面
        for (int i = 0; i < 3; i++) {
            //7.通過session建立消息
            TextMessage textMessage = session.createTextMessage("TOPIC_NAME---" + i);
            //8.使用指定好目的地的消息生産者發送消息
            messageProducer.send(textMessage);
        }
        //9.關閉資源
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("****TOPIC_NAME消息釋出到MQ完成");
    }
}           

控制台展示結果:

ActiveMQ 筆記(二)部署和DEMO(隊列、主題)

(2)訂閱主題消費者

package com.demo.activemq.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
public class JmsConsumer_Topic {
    public static final String ACTIVEMQ_URL = "tcp://192.168.10.130:61616";
    public static final String TOPIC_NAME = "topic01";
 
    public static void main(String[] args) throws JMSException, IOException {
        System.out.println("我是1号消費者");
        //1.建立連接配接工廠,按照給定的URL,采用預設的使用者名密碼
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2.通過連接配接工廠,獲得connection并啟動通路
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3.建立會話session
        //兩個參數transacted=事務,acknowledgeMode=确認模式(簽收)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.建立目的地(具體是隊列queue還是主題topic)
        Topic topic = session.createTopic(TOPIC_NAME);
        //5.建立消息的消費者
        MessageConsumer messageConsumer = session.createConsumer(topic);
        //5.建立消息的消費者,指定消費哪一個隊列裡面的消息
        messageConsumer.setMessageListener(message -> {
            if (message instanceof TextMessage){
                try {
                    String text = ((TextMessage) message).getText();
                    System.out.println(text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        System.in.read();
    }
}           
ActiveMQ 筆記(二)部署和DEMO(隊列、主題)

注意:先啟動訂閱者再啟動生産者,不然發送的消息是廢消息

控制台消費結果:

ActiveMQ 筆記(二)部署和DEMO(隊列、主題)

6、小總結

重點注意:activemq 好像自帶負載均衡,當先啟動兩個隊列(Queue)的消費者時,在啟動生産者發出消息,此時的消息平均的被兩個消費者消費。 并且消費者不會消費已經被消費的消息(即為已經出隊的消息)

但是當有多個主題(Topic)訂閱者時,釋出者釋出的消息,每個訂閱者都會接收所有的消息。topic 更像是被廣播的消息,但是缺點是不能接受已經發送過的消息。

先要有訂閱者,生産者才有意義。
ActiveMQ 筆記(二)部署和DEMO(隊列、主題)