天天看點

ActiveMQ的queue以及topic兩種消息處理機制分析

        上一期介紹了我們項目要用到activemq來作為jms總線,并且給大家介紹了activemq的叢集和高可用部署方案,本期給大家再介紹下,如何根據自己的項目需求,更好地使用activemq的兩種消息處理模式。

     對比項

topic

queue

概要

publish subscribe messaging 釋出訂閱消息

point-to-point 點對點

有無狀态

topic資料預設不落地,是無狀态的。

queue資料預設會在mq伺服器上以檔案形式儲存,比如active mq一般儲存在$amq_home\data\kr-store\data下面。也可以配置成db存儲。

完整性保障

并不保證publisher釋出的每條資料,subscriber都能接受到。

queue保證每條資料都能被receiver接收。

消息是否會丢失

一般來說publisher釋出消息到某一個topic時,隻有正在監聽該topic位址的sub能夠接收到消息;如果沒有sub在監聽,該topic就丢失了。

sender發送消息到目标queue,receiver可以異步接收這個queue上的消息。queue上的消息如果暫時沒有receiver來取,也不會丢失。

消息釋出接收政策

一對多的消息釋出接收政策,監聽同一個topic位址的多個sub都能收到publisher發送的消息。sub接收完通知mq伺服器

一對一的消息釋出接收政策,一個sender發送的消息,隻能有一個receiver接收。receiver接收完後,通知mq伺服器已接收,mq伺服器對queue裡的消息采取删除或其他操作。

          topic和queue的最大差別在于topic是以廣播的形式,通知所有線上監聽的用戶端有新的消息,沒有監聽的用戶端将收不到消息;而queue則是以點對點的形式通知多個處于監聽狀态的用戶端中的一個。

        通過增加監聽用戶端的并發數來驗證,topic的消息推送,是否會因為監聽用戶端的并發上升而出現明顯的下降,測試環境的伺服器為ci環境的activemq,用戶端為我的本機。

        從實測的結果來看,topic方式發送的消息,發送和接收的效率,在一個訂閱者和100個訂閱者的前提下沒有明顯差異,但在500個訂閱者(線程)并發的前提下,效率差異很明顯(由于500線程并發的情況下,我本機的cpu占用率已高達70-90%,是以無法确認是我本機測試造成的性能瓶頸還是topic消息發送方式存在性能瓶頸,造成效率下降如此明顯)。

        topic方式發送的消息與queue方式發送的消息,發送和接收的效率,在一個訂閱者和100個訂閱者的前提下沒有明顯差異,但在500個訂閱者并發的前提下,topic方式的效率明顯低于queue。

        queue方式發送的消息,在一個訂閱者、100個訂閱者和500個訂閱者的前提下,發送和接收的效率沒有明顯變化。

topic實測資料:

 

發送者發送的消息總數

所有訂閱者接收到消息的總數

消息發送和接收平均耗時

單訂閱者

100

101ms

100訂閱者

10000

103ms

500訂閱者

50000

14162ms

queue實測資料:

96ms

100ms

3.1     通過用戶端代碼調用來發送一個topic的消息:

import javax.jms.connection;

import javax.jms.connectionfactory;

import javax.jms.deliverymode;

import javax.jms.destination;

import javax.jms.messageproducer;

import javax.jms.session;

import javax.jms.textmessage;

import org.apache.activemq.activemqconnection;

import org.apache.activemq.activemqconnectionfactory;

publicclass sendtopic {

    privatestaticfinalintsend_number = 5;

    publicstaticvoid sendmessage(session session, messageproducer producer)

            throws exception {

        for (int i = 1; i <=send_number; i++) {

            textmessage message = session

                    .createtextmessage("activemq發送的消息" + i);

            //發送消息到目的地方

            system.out.println("發送消息:" + "activemq 發送的消息" + i);

            producer.send(message);

        }

    }

    publicstaticvoid main(string[] args) {

        // connectionfactory:連接配接工廠,jms用它建立連接配接

        connectionfactory connectionfactory;

        // connection:jms用戶端到jms provider的連接配接

        connection connection = null;

        // session:一個發送或接收消息的線程

        session session;

        // destination:消息的目的地;消息發送給誰.

        destination destination;

        // messageproducer:消息發送者

        messageproducer producer;

        // textmessage message;

        //構造connectionfactory執行個體對象,此處采用activemq的實作jar

        connectionfactory = new activemqconnectionfactory(

                activemqconnection.default_user,

                activemqconnection.default_password,

                "tcp://10.20.8.198:61616");

        try {

            //構造從工廠得到連接配接對象

            connection = connectionfactory.createconnection();

            //啟動

            connection.start();

            //擷取操作連接配接

            session = connection.createsession(true, session.auto_acknowledge);

            //擷取session注意參數值firsttopic是一個伺服器的topic(與queue消息的發送相比,這裡是唯一的不同)

            destination = session.createtopic("firsttopic");

            //得到消息生成者【發送者】

            producer = session.createproducer(destination);

            //設定不持久化,此處學習,實際根據項目決定

            producer.setdeliverymode(deliverymode.persistent);

            //構造消息,此處寫死,項目就是參數,或者方法擷取

            sendmessage(session, producer);

            session.commit();

        } catch (exception e) {

            e.printstacktrace();

        } finally {

            try {

                if (null != connection)

                    connection.close();

            } catch (throwable ignore) {

            }

}

3.2     啟動多個用戶端監聽來接收topic的消息:

publicclass receivetopicimplements runnable {

      private stringthreadname;

      receivetopic(string threadname) {

           this.threadname = threadname;

      }

      publicvoid run() {

           // connectionfactory:連接配接工廠,jms用它建立連接配接

           connectionfactory connectionfactory;

           // connection:jms用戶端到jms provider的連接配接

           connection connection =null;

           // session:一個發送或接收消息的線程

           session session;

           // destination:消息的目的地;消息發送給誰.

           destination destination;

           //消費者,消息接收者

           messageconsumer consumer;

           connectionfactory = new activemqconnectionfactory(

                      activemqconnection.default_user,

                      activemqconnection.default_password,"tcp://10.20.8.198:61616");

           try {

                 //構造從工廠得到連接配接對象

                 connection = connectionfactory.createconnection();

                 //啟動

                 connection.start();

                 //擷取操作連接配接,預設自動向伺服器發送接收成功的響應

                 session = connection.createsession(false, session.auto_acknowledge);

                 //擷取session注意參數值firsttopic是一個伺服器的topic

                 destination = session.createtopic("firsttopic");

                 consumer = session.createconsumer(destination);

                 while (true) {

                      //設定接收者接收消息的時間,為了便于測試,這裡設定為100s

                      textmessage message = (textmessage) consumer

                                  .receive(100 * 1000);

                      if (null != message) {

                            system.out.println("線程"+threadname+"收到消息:" + message.gettext());

                      } else {

                            continue;

                      }

                 }

           } catch (exception e) {

                 e.printstacktrace();

           } finally {

                 try {

                      if (null != connection)

                            connection.close();

                 } catch (throwable ignore) {

           }

      publicstaticvoid main(string[] args) {

            //這裡啟動3個線程來監聽firsttopic的消息,與queue的方式不一樣三個線程都能收到同樣的消息

           receivetopic receive1=new receivetopic("thread1");

           receivetopic receive2=new receivetopic("thread2");

           receivetopic receive3=new receivetopic("thread3");

           thread thread1=new thread(receive1);

           thread thread2=new thread(receive2);

           thread thread3=new thread(receive3);

           thread1.start();

           thread2.start();

           thread3.start();

         參考上一期文章:開源jms服務activemq的負載均衡+高可用部署方案探索。