認識MQ(Message Queue)
什麼是消息隊列
消息隊列
首先我們先從以下幾個次元來認識一下消息隊列:
- 消息隊列:一般我們會簡稱它為MQ(MessageQueue)
- 消息(Message):傳輸的資料。
- 隊列(Queue):隊列是一種先進先出的資料結構。
- 消息隊列從字面的含義來看就是一個存放消息的容器。
- 消息隊列可以簡單了解為:把要傳輸的資料放在隊列中。
- 把資料放到消息隊列叫做生産者。
- 從消息隊列裡邊取資料叫做消費者。
為什麼需要消息隊列
使用消息隊列主要是基于以下三個主要場景:
- 解耦
- 異步
- 削峰/限流
下面我們分場景來描述下使用消息隊列帶來的好處
假設我們有一個使用者系統A,使用者系統A可以産生一個userId。
然後,現在有系統B和系統C都需要這個userId去做相關的操作。
解耦前架構
僞碼大緻如下:
java
public class SystemA {
// 系統B和系統C的依賴
SystemB systemB = new SystemB();
SystemC systemC = new SystemC();
// 系統A獨有的資料userId
private String userId = "activeMq-1234567890";
public void doSomething() {
// 系統B和系統C都需要拿着系統A的userId去操作其他的事
systemB.SystemBNeed2do(userId);
systemC.SystemCNeed2do(userId);
}
}
「這樣類似的業務場景大家是不是很熟悉,大家是不是這樣寫很合情合理,也很簡單。」
某一天,系統B的負責人告訴系統A的負責人,現在系統B的SystemBNeed2do(String userId)這個接口不再使用了,讓系統A别去調它了。
于是,系統A的負責人說"好的,那我就不調用你了。",于是就把調用系統B接口的代碼給删掉了。代碼變成這樣了:
java
public void doSomething() {
// 系統A不再調用系統B的接口了
//systemB.SystemBNeed2do(userId);
systemC.SystemCNeed2do(userId);
}
由于業務需要,系統D說也需要用到系統A的userId,于是代碼改成了這樣:
java
public void doSomething() {
// 已經不再需要系統B的依賴了
//systemB.SystemBNeed2do(userId);
// 系統C和系統D都需要拿着系統A的userId去操作其他的事
systemC.SystemCNeed2do(userId);
systemD.SystemDNeed2do(userId);
}
目前系統A、B、C、D系統的互動是這樣子的。
系統互動
随着業務需求的變化,代碼也要一遍一遍的修改。
還會存在另外一個問題,調用系統C的時候,如果系統C挂了,系統A還要想辦法處理。如果調用系統D時,由于網絡延遲,請求逾時了,那系統A是回報fail還是重試?
那麼怎麼去解決這樣的現狀呢,如何從頻繁的修改代碼中解脫呢?
這時候我們就引入一層消息隊列中間件,互動圖如下:
将系統A産生的userId寫到消息隊列中,系統C和系統D從消息隊列中拿資料。
這樣有什麼好處?
- 系統A隻負責把資料寫到隊列中,誰想要或不想要這個資料(消息),系統A一點都不關心。
- 即便現在系統D不想要userId這個資料了,系統B又突然想要userId這個資料了,都跟系統A無關,系統A一點代碼都不用改。
- 系統D拿userId不再經過系統A,而是從消息隊列裡邊拿。系統D即便挂了或者請求逾時,都跟系統A無關,
隻跟消息隊列有關。這樣一來,系統A與系統B、C、D都解耦了。
系統A做的是主要的業務,而系統B、C、D是非主要的業務。比如系統A處理的是訂單下單,而系統B是訂單下單成功了,那發送一條短信告訴具體的使用者此訂單已成功,而系統C和系統D也是處理一些小事而已。
那麼此時,為了提高使用者體驗和吞吐量,其實可以異步地調用系統B、C、D的接口。
我們再來一個場景,現在我們每個月要搞一次大促,大促期間的并發可能會很高的,比如每秒3000個請求。假設我們現在有兩台機器處理請求,并且每台機器隻能每次處理1000個請求。
削峰前
系統B和系統C根據自己的能夠處理的請求數去消息隊列中拿資料,這樣即便有每秒有8000個請求,那隻是把請求放在消息隊列中,去拿消息隊列的消息由系統自己去控制,這樣就不會把整個系統給搞崩。
什麼是JMS MQ
全稱:Java MessageService 中文:Java 消息服務。
JMS 是 Java 的一套 API 标準,最初的目的是為了使應用程式能夠通路現有的MOM 系 統(MOM 是 MessageOriented Middleware 的英文縮寫,指的是利用高效可靠的消息傳遞機制進行平台無關的資料交流,并基于資料通信來進行分布式系統的內建。) 後來被許多現有的 MOM 供應商采用,并實作為MOM 系統。
常見 MOM 系統包括 Apache的 ActiveMQ、阿裡巴巴的 RocketMQ、IBM 的 MQSeries、Microsoft 的 MSMQ、BEA 的 RabbitMQ 等。(并非全部的 MOM 系統都遵循JMS 規範)】
基于 JMS 實作的 MOM,又被稱為JMSProvider。
JMS中的一些概念
「Broker」
消息伺服器,作為server提供消息核心服務
「Provider 生産者」
消息生産者是由會話建立的一個對象,用于把消息發動到一個目的地
「Consumer 消費者」
消息消費者是由會話建立的一個對象,它用于接收發送到目的地的消息。消息的消費可以采用以下兩種方法:
同步消費。通過調用消費者的receive方法從目的地中顯式提取消息。receive方法可以一直阻塞到消息到達。
異步消費。客戶可以為消費者注冊一個消息監聽器,以定義在消息到達時所采取的動作。
「P2P 點對點消息模型」
消息生産者生産消息發送到queue 中,然後消息消費者從queue 中取出并且消費消息。消息被消費以後,queue 中不再有存儲,是以消息消費者不可能消費到已經被消費的消息。Queue支援存在多個消費者,但是對一個消息而言,隻會有一個消費者可以消費、其它的則不能消費此消息了。當消費者不存在時,消息會一直儲存,直到有消費消費。
「Pub/Sub 釋出訂閱消息模型」
消息生産者(釋出)将消息釋出到topic 中,同時有多個消息消費者(訂閱)消費該消息。和點對點方式不同,釋出到 topic 的消息會被所有訂閱者消費。當生産者釋出消息,不管是否有消費者。都不會儲存消息一定要先有消息的消費者,後有消息的生産者。
「P2P vs Pub/Sub」
P2P vs Pub/Sub
「Queue」
隊列存儲,常用于點對點消息模型
預設隻能由唯一的一個消費者處理。一旦處理消息删除。
「Topic」
主題存儲,用于訂閱/釋出消息模型
主題中的消息,會發送給所有的消費者同時處理。隻有在消息可以重複處理的業務場景中可使用。
「ConnectionFactory」
連接配接工廠,jms中用它建立連接配接
連接配接工廠是客戶用來建立連接配接的對象,例如ActiveMQ提供的ActiveMQConnectionFactory。
「Connection」
JMS Connection封裝了客戶與JMS提供者之間的一個虛拟的連接配接。
「Destination 消息的目的地」
目的地是客戶用來指定它生産的消息的目标和它消費的消息的來源的對象。
訂閱一個主題的消費者隻能消費自它訂閱之後釋出的消息。JMS規範允許客戶建立持久訂閱,這在一定程度上放松了時間上的相關性要求。持久訂閱允許消費者消費它在未處于激活狀态時發送的消息。在點對點消息傳遞域中,目的地被成為隊列(queue);在釋出/訂閱消息傳遞域中,目的地被成為主題(topic)。
「Session」
JMS Session是生産和消費消息的一個單線程上下文。會話用于建立消息生産者(producer)、消息消費者(consumer)和消息(message)等。會話提供了一個事務性的上下文,在這個上下文中,一組發送和接收被組合到了一個原子操作中。
消息可靠性機制
「确認 JMS消息」
隻有在被确認之後,才認為已經被成功地消費了。消息的成功消費通常包含三個階段:客戶接收消息、客戶處理消息和消息被确認。
在事務性會話中,當一個事務被送出的時候,确認自動發生。
在非事務性會話中,消息何時被确認取決于建立會話時的應答模式(acknowledgement mode)。該參數有以下三個可選值:
「Session.AUTO_ACKNOWLEDGE」。當客戶成功的從receive方法傳回的時候,或者從MessageListener.onMessage方法成功傳回的時候,會話自動确認客戶收到的消息。
「Session.CLIENT_ACKNOWLEDGE」。客戶通過消息的acknowledge方法确認消息。需要注意的是,在這種模式中,确認是在會話層上進行:确認一個被消費的消息将自動确認所有已被會話消費的消息。例如,如果一個消息消費者消費了10個消息,然後确認第5個消息,那麼所有10個消息都被确認。
「Session.DUPS_ACKNOWLEDGE」。該選擇隻是會話遲鈍的确認消息的送出。如果JMS Provider失敗,那麼可能會導緻一些重複的消息。如果是重複的消息,那麼JMS Provider必須把消息頭的JMSRedelivered字段設定為true。
「持久性」
JMS 支援以下兩種消息送出模式:
「PERSISTENT」。訓示JMSProvider持久儲存消息,以保證消息不會因為JMS Provider的失敗而丢失。
「NON_PERSISTENT」。不要求JMS Provider持久儲存消息。
「優先級」
可以使用消息優先級來訓示JMS Provider首先送出緊急的消息。優先級分10個級别,從0(最低)到9(最高)。如果不指定優先級,預設級别是4。「需要注意的是,JMSProvider并不一定保證按照優先級的順序送出消息。」
「消息過期」
可以設定消息在一定時間後過期,預設是永不過期
「臨時目的地」
可以通過會話上的createTemporaryQueue方法和createTemporaryTopic方法來建立臨時目的地。它們的存在時間隻限于建立它們的連接配接所保持的時間。隻有建立該臨時目的地的連接配接上的消息消費者才能夠從臨時目的地中提取消息。
「持久訂閱」
首先消息生産者必須使用PERSISTENT送出消息。客戶可以通過會話上的createDurableSubscriber方法來建立一個持久訂閱,該方法的第一個參數必須是一個topic,第二個參數是訂閱的名稱。
JMS Provider會存儲釋出到持久訂閱對應的topic上的消息。如果最初建立持久訂閱的客戶或者任何其它客戶使用相同的連接配接工廠和連接配接的客戶ID、相同的主題和相同的訂閱名再次調用會話上的createDurableSubscriber方法,那麼該持久訂閱就會被激活。
JMS Provider會向客戶發送客戶處于非激活狀态時所釋出的消息。
持久訂閱在某個時刻隻能有一個激活的訂閱者。持久訂閱在建立之後會一直保留,直到應用程式調用會話上的unsubscribe方法。
「本地事務」
在一個JMS用戶端,可以使用本地事務來組合消息的發送和接收。JMS Session接口提供了commit和rollback方法。事務送出意味着生産的所有消息被發送,消費的所有消息被确認;事務復原意味着生産的所有消息被銷毀,消費的所有消息被恢複并重新送出,除非它們已經過期。
事務性的會話總是牽涉到事務進行中,commit或rollback方法一旦被調用,一個事務就結束了,而另一個事務被開始。關閉事務性會話将復原其中的事務。
需要注意的是,如果使用請求/回複機制,即發送一個消息,同時希望在同一個事務中等待接收該消息的回複,那麼程式将被挂起,因為知道事務送出,發送操作才會真正執行。需要注意的還有一個,消息的生産和消費不能包含在同一個事務中。
ActiveMQ
存儲
ActiveMQ支援很多種存儲方式,常見的有 KahaDB存儲,AMQ存儲,JDBC存儲,LevelDB存儲,Memory 消息存儲。我們重點介紹一下KahaDB和JDBC存儲方式。
KahaDB存儲
KahaDB是預設的持久化政策,所有消息順序添加到一個日志檔案中,同時另外有一個索引檔案記錄指向這些日志的存儲位址,還有一個事務日志用于消息回複操作。是一個專門針對消息持久化的解決方案,它對典型的消息使用模式進行了優化。
在data/kahadb這個目錄下,會生成四個檔案,來完成消息持久化 db.data 它是消息的索引檔案,本質上是B-Tree(B樹),使用B-Tree作為索引指向db-*.log裡面存儲的消息 db.redo 用來進行消息恢複 *db-.log 存儲消息内容。
kahadb檔案結構
新的資料以APPEND的方式追加到日志檔案末尾。屬于順序寫入,是以消息存儲是比較 快的。預設
是32M,達到閥值會自動遞增 lock檔案 鎖,寫入目前獲得kahadb讀寫權限的broker ,用于在叢集環境下的競争處理。
KahaDB有如下幾個特性:
- 日志形式存儲消息;
- 消息索引以 B-Tree 結構存儲,可以快速更新;
- 完全支援 JMS 事務;
- 支援多種恢複機制kahadb 可以限制每個資料檔案的大小。不代表總計資料容量。
配置方式如下:
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
JDBC 存儲
支援通過 JDBC 将消息存儲到關系資料庫,性能上不如檔案存儲,能通過關系型資料庫查詢到消息的資訊。
MQ 支援的資料庫:Apache Derby、MySQL、PostgreSQL、Oracle、SQLServer、Sybase、Informix、MaxDB。使用JDBC存儲需要用到下面三張資料表。
「activemq_acks」:用于存儲訂閱關系。如果是持久化Topic,訂閱者和伺服器的訂閱關系在這個表儲存。主要的資料庫字段如下:
- container:消息的destination
- sub_dest:如果是使用static叢集,這個字段會有叢集其他系統的資訊
- client_id:每個訂閱者都必須有一個唯一的用戶端id用以區分
- sub_name:訂閱者名稱
- selector:選擇器,可以選擇隻消費滿足條件的消息。條件可以用自定義屬性實作,可支援多屬性and和or操作
- last_acked_id:記錄消費過的消息的id。
「activemq_lock」:在叢集環境中才有用,隻有一個Broker可以獲得消息,稱為Master Broker,其他的隻能作為備份等待Master Broker不可用,才可能成為下一個Master Broker。這個表用于記錄哪個Broker是目前的Master Broker。
「activemq_msgs」:用于存儲消息,Queue和Topic都存儲在這個表中。主要的資料庫字段如下
- id:自增的資料庫主鍵
- msgid_prod:消息發送者用戶端的主鍵
- msg_seq:是發送消息的順序,msgid_prod+msg_seq可以組成jms的messageid
- expiration:消息的過期時間,存儲的是從1970-01-01到現在的毫秒數
- msg:消息本體的java序列化對象的二進制資料
- priority:優先級,從0-9,數值越大優先級越高
- xid:topic
- 配置資料源 conf/acticvemq.xml 檔案:
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="111111"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
- 配置 broke 中的 persistenceAdapter dataSource 指定持久化資料庫的 bean,createTablesOnStartup 是否在啟動的時候建立資料表,預設值是 true,這樣每次啟動都會去建立資料表了,一般是第一次啟動的時候設定為 true,之後改成 false。
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false"/>
</persistenceAdapter>
協定
ActiveMQ支援的client-broker通訊協定有:TCP、NIO、UDP、SSL、Http(s)、VM。
Transmission Control Protocol (TCP)
這是預設的Broker配置,TCP的Client監聽端口是61616。
在網絡傳輸資料前,必須要序列化資料,消息是通過一個叫wire protocol的來序列化成位元組流。預設情況下,ActiveMQ把wire protocol叫做OpenWire,它的目的是促使網絡上的效率和資料快速互動。
TCP連接配接的URI形式:
tcp://hostname:port?key=value&key=value
TCP傳輸的優點:(1)TCP協定傳輸可靠性高,穩定性強 (2)高效性:位元組流方式傳遞,效率很高 (3)有效性、可用性:應用廣泛,支援任何平台
New I/O API Protocol(NIO)
NIO協定和TCP協定類似,但NIO更側重于底層的通路操作。它允許開發人員對同一資源可有更多的client調用和服務端有更多的負載。
适合使用NIO協定的場景:(1)可能有大量的Client去連結到Broker上一般情況下,大量的Client去連結Broker是被作業系統的線程數所限制的。是以,NIO的實作比TCP需要更少的線程去運作,是以建議使用NIO協定 (2)可能對于Broker有一個很遲鈍的網絡傳輸NIO比TCP提供更好的性能
NIO連接配接的URI形式:
nio://hostname:port?key=value
Transport Connector配置示例:
<transportConnectors>
<transportConnector
name="nio"
uri="nio://localhost:61618?trace=true" />
</transportConnectors>
User Datagram Protocol(UDP)
UDP和TCP的差別 (1)TCP是一個原始流的傳遞協定,意味着資料包是有保證的,換句話說,資料包是不會被複制和丢失的。
UDP,另一方面,它是不會保證資料包的傳遞的 (2)TCP也是一個穩定可靠的資料包傳遞協定,意味着資料在傳遞的過程中不會被丢失。這樣確定了在發送和接收之間能夠可靠的傳遞。相反,UDP僅僅是一個連結協定,是以它沒有可靠性之說
從上面可以得出:TCP是被用在穩定可靠的場景中使用的;UDP通常用在快速資料傳遞和不怕資料丢失的場景中,還有ActiveMQ通過防火牆時,隻能用UDP
UDP連接配接的URI形式:
udp://hostname:port?key=value
<transportConnectors>
<transportConnector
name="udp"
uri="udp://localhost:61618?trace=true" />
</transportConnectors>
Active MQ的安全機制
「web控制台安全」
修改jetty-realm.properties
# username: password [,rolename ...](使用者名:密碼 角色)
注意:配置需重新開機ActiveMQ才會生效
「消息安全機制」
修改activemq.xml 在中添加如下代碼:
<plugins>
<simpleAuthenticationPlugin>
<users>
<authenticationUser username="admin" password="admin" groups="admins,publishers,consumers"/>
<authenticationUser username="publisher" password="publisher" groups="publishers,consumers"/>
<authenticationUser username="consumer" password="consumer" groups="consumers"/>
<authenticationUser username="guest" password="guest" groups="guests"/>
</users>
</simpleAuthenticationPlugin>
</plugins>
ActiveMQ使用
在java中使用ActiveMQ隻需要引入相關依賴
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.11</version>
</dependency>
編寫生産者
public class Sender {
public static void main(String[] args) throws JMSException {
// 1. 建立工廠對象,
ActiveMQConnectionFactory acf = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,ActiveMQConnectionFactory.DEFAULT_PASSWORD,"tcp://localhost:61618");
//2 從工廠裡拿一個連接配接
Connection connection = acf.createConnection();
connection.start();
//3 從連接配接中擷取Session(會話)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4 從會話中擷取目的地(Destination)消費者會從這個目的地取消息
Queue queue = session.createQueue("mq.test");
//5 從會話中建立消息提供者
MessageProducer producer = session.createProducer(queue);
//6 從會話中建立文本消息(也可以建立其它類型的消息體)
TextMessage message = session.createTextMessage("msg: hello world");
//7 通過消息提供者發送消息到ActiveMQ
producer.send(message);
//8 關閉連接配接
connection.close();
}
}
編寫消費者
public class Receiver {
public static void main(String[] args) throws JMSException {
// 1. 建立工廠對象,
ActiveMQConnectionFactory acf = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,ActiveMQConnectionFactory.DEFAULT_PASSWORD,"tcp://localhost:61618");
//2 從工廠裡拿一個連接配接
Connection connection = acf.createConnection();
connection.start();
//3 從連接配接中擷取Session(會話)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4 從會話中擷取目的地(Destination)消費者會從這個目的地取消息
Queue queue = session.createQueue("mq.test");
//5 從會話中建立消息消費者
MessageConsumer consumer = session.createConsumer(queue);
while (true){
//6 消費者接收消息
Message msg = consumer.receive();
TextMessage textMessage = (TextMessage) msg;
System.out.println("text:"+textMessage.getText());
}
}
}
常用API及特性
- 事務消息
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
送出事務:session.commit();
復原事務:session.rollback();
開啟事務後,隻有事務commit成功,消息才會發送到MQ中
-
持久化
預設持久化是開啟的;
開啟非持久化示例代碼:
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
- 設定消息優先級
producer.setPriority();
- 設定消息逾時/過期時間
設定了消息逾時的消息,消費端在逾時後無法在消費到此消息。producer.setTimeToLive
-
死信
此類消息會進入到ActiveMQ.DLQ隊列且不會自動清除,稱為死信,有消息堆積的風險。
-
簽收模式
簽收代表接收端的session已收到消息的一次确認,回報給broker
如果session帶有事務,并且事務成功送出,則消息被自動簽收。如果事務復原,則消息會被再次傳送。
消息事務是在生産者producer到broker或broker到consumer過程中同一個session中發生的,保證幾條消息在發送過程中的原子性。在支援事務的session中,producer發送message時在message中帶有transactionID。broker收到message後判斷是否有transactionID,如果有就把message儲存在transaction store中,等待commit或者rollback消息。
ActiveMQ支援自動簽收與手動簽收
「Session.AUTO_ACKNOWLEDGE」
當用戶端從receiver或onMessage成功傳回時,Session自動簽收用戶端的這條消息的收條。
「Session.CLIENT_ACKNOWLEDGE」
用戶端通過調用消息(Message)的acknowledge方法簽收消息。在這種情況下,簽收發生在Session層面:簽收一個已經消費的消息會自動地簽收這個Session所有已消費的收條。
「Session.DUPS_OK_ACKNOWLEDGE」
Session不必確定對傳送消息的簽收,這個模式可能會引起消息的重複,但是降低了Session的開銷,是以隻有用戶端能容忍重複的消息,才可使用。
- 獨占消費者
Queue queue = session.createQueue("xxoo?consumer.exclusive=true");
- 發送異步消息
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"admin",
"admin",
"tcp://localhost:61616"
);
// 2.擷取一個向ActiveMQ的連接配接
connectionFactory.setUseAsyncSend(true);
ActiveMQConnection connection = (ActiveMQConnection)connectionFactory.createConnection();
connection.setUseAsyncSend(true);
-
消息堆積
producer每發送一個消息,統計一下發送的位元組數,當位元組數達到ProducerWindowSize值時,需要等待broker的确認,才能繼續發送。
brokerUrl中設定:
destinationUri中設定:tcp://localhost:61616?jms.producerWindowSize=1048576
myQueue?producer.windowSize=1048576
-
延遲消息投遞
首先在配置檔案中開啟延遲和排程
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">
延遲發送示例代碼:
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,10*1000);
- 建立監聽器
ActiveMQConnectionFactory acf = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,
"tcp://localhost:61618");
//2 從工廠裡拿一個連接配接
Connection connection = acf.createConnection();
connection.start();
//3 從連接配接中擷取Session(會話)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4 從會話中擷取目的地(Destination)消費者會從這個目的地取消息
Queue queue = session.createQueue("mq.test");
//5 從會話中建立消息消費者
MessageConsumer consumer = session.createConsumer(queue);
MyListener myListener = new MyListener();
MessageListener listener = myListener::receiveMessage;
consumer.setMessageListener(listener);
SpringBoot整合ActiveMQ
- 添加依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
- 配置檔案
server:
port: 80
spring:
activemq:
broker-url: tcp://localhost:61618
user: admin
password: admin
pool:
enabled: true
#連接配接池最大連接配接數
max-connections: 5
#空閑的連接配接過期時間,預設為30秒
idle-timeout: 0
packages:
trust-all: true
jms:
pub-sub-domain: true
- 配置類
@Configuration
@EnableJms
public class ActiveMqConfig {
// topic模式的ListenerContainer
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setPubSubDomain(true);
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}
// queue模式的ListenerContainer
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}
}
@Service
public class MqProducerService {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
public void sendStringQueue(String destination, String msg) {
System.out.println("send...");
ActiveMQQueue queue = new ActiveMQQueue(destination);
jmsMessagingTemplate.afterPropertiesSet();
ConnectionFactory factory = jmsMessagingTemplate.getConnectionFactory();
try {
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue2 = session.createQueue(destination);
MessageProducer producer = session.createProducer(queue2);
TextMessage message = session.createTextMessage("hahaha");
producer.send(message);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
jmsMessagingTemplate.convertAndSend(queue, msg);
}
public void sendStringQueueList(String destination, String msg) {
System.out.println("xxooq");
ArrayList<String> list = new ArrayList<>();
list.add("1");
list.add("2");
jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(destination), list);
}
}
@JmsListener(destination = "user",containerFactory = "jmsListenerContainerQueue")
public void receiveStringQueue(String msg) {
System.out.println("接收到消息...." + msg);
}
@JmsListener(destination = "ooo",containerFactory = "jmsListenerContainerTopic")
public void receiveStringTopic(String msg) {
System.out.println("接收到消息...." + msg);
}
小結
本文詳細介紹了為什麼需要引入消息隊列,JMS、ActiveMQ的基礎概念以及常用API,與原生JAVA整合及SpringBoot整合等知識點,可以讓大家更好的了解ActiveMQ的使用場景及使用方式。