上一期介紹了我們項目要用到activemq來作為jms總線,并且給大家介紹了activemq的叢集和高可用部署方案,本期給大家再介紹下,如何根據自己的項目需求,更好地使用activemq的兩種消息處理模式。
對比項
topic
queue
概要
publish subscribe messaging 釋出訂閱消息
point-to-point 點對點
有無狀态
topic資料預設不落地,是無狀态的。
queue資料預設會在mq伺服器上以檔案形式儲存,比如active mq一般儲存在$amq_home\data\kr-store\data下面。也可以配置成db存儲。
完整性保障
并不保證publisher釋出的每條資料,subscriber都能接受到。
queue保證每條資料都能被receiver接收。
消息是否會丢失
一般來說publisher釋出消息到某一個topic時,隻有正在監聽該topic位址的sub能夠接收到消息;如果沒有sub在監聽,該topic就丢失了。
sender發送消息到目标queue,receiver可以異步接收這個queue上的消息。queue上的消息如果暫時沒有receiver來取,也不會丢失。
消息釋出接收政策
一對多的消息釋出接收政策,監聽同一個topic位址的多個sub都能收到publisher發送的消息。sub接收完通知mq伺服器
一對一的消息釋出接收政策,一個sender發送的消息,隻能有一個receiver接收。receiver接收完後,通知mq伺服器已接收,mq伺服器對queue裡的消息采取删除或其他操作。
topic和queue的最大差別在于topic是以廣播的形式,通知所有線上監聽的用戶端有新的消息,沒有監聽的用戶端将收不到消息;而queue則是以點對點的形式通知多個處于監聽狀态的用戶端中的一個。
通過增加監聽用戶端的并發數來驗證,topic的消息推送,是否會因為監聽用戶端的并發上升而出現明顯的下降,測試環境的伺服器為ci環境的activemq,用戶端為我的本機。
從實測的結果來看,topic方式發送的消息,發送和接收的效率,在一個訂閱者和100個訂閱者的前提下沒有明顯差異,但在500個訂閱者(線程)并發的前提下,效率差異很明顯(由于500線程并發的情況下,我本機的cpu占用率已高達70-90%,是以無法确認是我本機測試造成的性能瓶頸還是topic消息發送方式存在性能瓶頸,造成效率下降如此明顯)。
topic方式發送的消息與queue方式發送的消息,發送和接收的效率,在一個訂閱者和100個訂閱者的前提下沒有明顯差異,但在500個訂閱者并發的前提下,topic方式的效率明顯低于queue。
queue方式發送的消息,在一個訂閱者、100個訂閱者和500個訂閱者的前提下,發送和接收的效率沒有明顯變化。
topic實測資料:
發送者發送的消息總數
所有訂閱者接收到消息的總數
消息發送和接收平均耗時
單訂閱者
100
101ms
100訂閱者
10000
103ms
500訂閱者
50000
14162ms
queue實測資料:
96ms
100ms
3.1 通過用戶端代碼調用來發送一個topic的消息:
import javax.jms.connection;
import javax.jms.connectionfactory;
import javax.jms.deliverymode;
import javax.jms.destination;
import javax.jms.messageproducer;
import javax.jms.session;
import javax.jms.textmessage;
import org.apache.activemq.activemqconnection;
import org.apache.activemq.activemqconnectionfactory;
publicclass sendtopic {
privatestaticfinalintsend_number = 5;
publicstaticvoid sendmessage(session session, messageproducer producer)
throws exception {
for (int i = 1; i <=send_number; i++) {
textmessage message = session
.createtextmessage("activemq發送的消息" + i);
//發送消息到目的地方
system.out.println("發送消息:" + "activemq 發送的消息" + i);
producer.send(message);
}
}
publicstaticvoid main(string[] args) {
// connectionfactory:連接配接工廠,jms用它建立連接配接
connectionfactory connectionfactory;
// connection:jms用戶端到jms provider的連接配接
connection connection = null;
// session:一個發送或接收消息的線程
session session;
// destination:消息的目的地;消息發送給誰.
destination destination;
// messageproducer:消息發送者
messageproducer producer;
// textmessage message;
//構造connectionfactory執行個體對象,此處采用activemq的實作jar
connectionfactory = new activemqconnectionfactory(
activemqconnection.default_user,
activemqconnection.default_password,
"tcp://10.20.8.198:61616");
try {
//構造從工廠得到連接配接對象
connection = connectionfactory.createconnection();
//啟動
connection.start();
//擷取操作連接配接
session = connection.createsession(true, session.auto_acknowledge);
//擷取session注意參數值firsttopic是一個伺服器的topic(與queue消息的發送相比,這裡是唯一的不同)
destination = session.createtopic("firsttopic");
//得到消息生成者【發送者】
producer = session.createproducer(destination);
//設定不持久化,此處學習,實際根據項目決定
producer.setdeliverymode(deliverymode.persistent);
//構造消息,此處寫死,項目就是參數,或者方法擷取
sendmessage(session, producer);
session.commit();
} catch (exception e) {
e.printstacktrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (throwable ignore) {
}
}
3.2 啟動多個用戶端監聽來接收topic的消息:
publicclass receivetopicimplements runnable {
private stringthreadname;
receivetopic(string threadname) {
this.threadname = threadname;
}
publicvoid run() {
// connectionfactory:連接配接工廠,jms用它建立連接配接
connectionfactory connectionfactory;
// connection:jms用戶端到jms provider的連接配接
connection connection =null;
// session:一個發送或接收消息的線程
session session;
// destination:消息的目的地;消息發送給誰.
destination destination;
//消費者,消息接收者
messageconsumer consumer;
connectionfactory = new activemqconnectionfactory(
activemqconnection.default_user,
activemqconnection.default_password,"tcp://10.20.8.198:61616");
try {
//構造從工廠得到連接配接對象
connection = connectionfactory.createconnection();
//啟動
connection.start();
//擷取操作連接配接,預設自動向伺服器發送接收成功的響應
session = connection.createsession(false, session.auto_acknowledge);
//擷取session注意參數值firsttopic是一個伺服器的topic
destination = session.createtopic("firsttopic");
consumer = session.createconsumer(destination);
while (true) {
//設定接收者接收消息的時間,為了便于測試,這裡設定為100s
textmessage message = (textmessage) consumer
.receive(100 * 1000);
if (null != message) {
system.out.println("線程"+threadname+"收到消息:" + message.gettext());
} else {
continue;
}
}
} catch (exception e) {
e.printstacktrace();
} finally {
try {
if (null != connection)
connection.close();
} catch (throwable ignore) {
}
publicstaticvoid main(string[] args) {
//這裡啟動3個線程來監聽firsttopic的消息,與queue的方式不一樣三個線程都能收到同樣的消息
receivetopic receive1=new receivetopic("thread1");
receivetopic receive2=new receivetopic("thread2");
receivetopic receive3=new receivetopic("thread3");
thread thread1=new thread(receive1);
thread thread2=new thread(receive2);
thread thread3=new thread(receive3);
thread1.start();
thread2.start();
thread3.start();
參考上一期文章:開源jms服務activemq的負載均衡+高可用部署方案探索。