深入淺出JMS(一)--JMS基本概念
摘要:The Java Message Service (JMS) API is a messaging standard that allows application components based on the Java Platform Enterprise Edition (Java EE) to create, send, receive, and read messages. It enables distributed communication that is loosely coupled, reliable, and asynchronous.
JMS(JAVA Message Service,java消息服務)API是一個消息服務的标準或者說是規範,允許應用程式元件基于JavaEE平台建立、發送、接收和讀取消息。它使分布式通信耦合度更低,消息服務更加可靠以及異步性。
這篇博文我們主要介紹J2EE中的一個重要規範JMS,因為這個規範在企業中的應用十分的廣泛,也比較重要,我們主要介紹JMS的基本概念和它的模式,消息的消費以及JMS程式設計步驟。
-
基本概念
JMS是java的消息服務,JMS的用戶端之間可以通過JMS服務進行異步的消息傳輸。
- 消息模型
即點對點和釋出訂閱模型○ Point-to-Point(P2P) ○ Publish/Subscribe(Pub/Sub)
- P2P
- P2P模式圖
深入淺出JMS ActiveMQ - 涉及到的概念
- 消息隊列(Queue)
- 發送者(Sender)
- 接收者(Receiver)
- 每個消息都被發送到一個特定的隊列,接收者從隊列中擷取消息。隊列保留着消息,直到他們被消費或逾時。
- P2P的特點
- 每個消息隻有一個消費者(Consumer)(即一旦被消費,消息就不再在消息隊列中)
- 發送者和接收者之間在時間上沒有依賴性,也就是說當發送者發送了消息之後,不管接收者有沒有正在運作,它不會影響到消息被發送到隊列
- 接收者在成功接收消息之後需向隊列應答成功
- P2P模式圖
- Pub/Sub
- Pub/Sub模式圖
深入淺出JMS ActiveMQ - 涉及到的概念
- 主題(Topic)
- 釋出者(Publisher)
-
訂閱者(Subscriber)
用戶端将消息發送到主題。多個釋出者将消息發送到Topic,系統将這些消息傳遞給多個訂閱者。
- Pub/Sub的特點
- 每個消息可以有多個消費者
- 釋出者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須建立一個訂閱者之後,才能消費釋出者的消息,而且為了消費消息,訂閱者必須保持運作的狀态。
- 為了緩和這樣嚴格的時間相關性,JMS允許訂閱者建立一個可持久化的訂閱。這樣,即使訂閱者沒有被激活(運作),它也能接收到釋出者的消息。
- Pub/Sub模式圖
-
消息的消費
在JMS中,消息的産生和消息是異步的。對于消費來說,JMS的消息者可以通過兩種方式來消費消息。
○ 同步
訂閱者或接收者調用receive方法來接收消息,receive方法在能夠接收到消息之前(或逾時之前)将一直阻塞
○ 異步
訂閱者或接收者可以注冊為一個消息監聽器。當消息到達之後,系統自動調用監聽器的onMessage方法。
-
JMS程式設計模型
(1) ConnectionFactory
建立Connection對象的工廠,針對兩種不同的jms消息模型,分别有QueueConnectionFactory和TopicConnectionFactory兩種。可以通過JNDI來查找ConnectionFactory對象。
(2) Destination
Destination的意思是消息生産者的消息發送目标或者說消息消費者的消息來源。對于消息生産者來說,它的Destination是某個隊列(Queue)或某個主題(Topic);對于消息消費者來說,它的Destination也是某個隊列或主題(即消息來源)。
是以,Destination實際上就是兩種類型的對象:Queue、Topic可以通過JNDI來查找Destination。
(3) Connection
Connection表示在用戶端和JMS系統之間建立的連結(對TCP/IP socket的包裝)。Connection可以産生一個或多個Session。跟ConnectionFactory一樣,Connection也有兩種類型:QueueConnection和TopicConnection。
(4) Session
Session是我們操作消息的接口。可以通過session建立生産者、消費者、消息等。Session提供了事務的功能。當我們需要使用session發送/接收多個消息時,可以将這些發送/接收動作放到一個事務中。同樣,也分QueueSession和TopicSession。
(5) 消息的生産者
消息生産者由Session建立,并用于将消息發送到Destination。同樣,消息生産者分兩種類型:QueueSender和TopicPublisher。可以調用消息生産者的方法(send或publish方法)發送消息。
(6) 消息消費者
消息消費者由Session建立,用于接收被發送到Destination的消息。兩種類型:QueueReceiver和TopicSubscriber。可分别通過session的createReceiver(Queue)或createSubscriber(Topic)來建立。當然,也可以session的creatDurableSubscriber方法來建立持久化的訂閱者。
(7) MessageListener
消息監聽器。如果注冊了消息監聽器,一旦消息到達,将自動調用監聽器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一種MessageListener。
- 企業消息系統的好處
我們先來看看下圖,應用程式A将Message發送到伺服器上,然後應用程式B從伺服器中接收A發來的消息,通過這個圖我們一起來分析一下JMS的好處:
- 提供消息靈活性
- 松散耦合
- 異步性
對于JMS的基本概念我們就介紹這麼多,下篇博文介紹一種JMS的實作。
深入淺出JMS(二)--ActiveMQ簡單介紹以及安裝
現實的企業中,對于消息通信的應用一直都非常的火熱,而且在J2EE的企業應用中扮演着特殊的角色,是以對于它研究是非常有必要的。
上篇博文深入淺出JMS(一)–JMS基本概念,我們介紹了消息通信的規範JMS,我們這篇博文介紹一款開源的JMS具體實作——ActiveMQ。ActiveMQ是一個易于使用的消息中間件。
消息中間件
我們簡單的介紹一下消息中間件,對它有一個基本認識就好,消息中間件(MOM:Message Orient middleware)。
消息中間件有很多的用途和優點:
1. 将資料從一個應用程式傳送到另一個應用程式,或者從軟體的一個子產品傳送到另外一個子產品;
2. 負責建立網絡通信的通道,進行資料的可靠傳送。
3. 保證資料不重發,不丢失
4. 能夠實作跨平台操作,能夠為不同作業系統上的軟體內建技工資料傳送服務
MQ
首先簡單的介紹一下MQ,MQ英文名MessageQueue,中文名也就是大家用的消息隊列,幹嘛用的呢,說白了就是一個消息的接受和轉發的容器,可用于消息推送。
下面進入我們今天的主題,為大家介紹ActiveMQ:
ActiveMQ
簡要概述ActiveMQ
Apache ActiveMQ ™ is the most popular and powerful open source messaging and Integration Patterns server.
Apache ActiveMQ is fast, supports many Cross Language Clients and Protocols, comes with easy to use Enterprise Integration Patterns and many advanced features while fully supporting JMS 1.1 and J2EE 1.4.
ActiveMQ是由Apache出品的,一款最流行的,能力強勁的開源消息總線。ActiveMQ是一個完全支援JMS1.1和J2EE 1.4規範的 JMS Provider實作,它非常快速,支援多種語言的用戶端和協定,而且可以非常容易的嵌入到企業的應用環境中,并有許多進階功能。
下面我們下載下傳一個版本,玩一玩。
下載下傳ActiveMQ
官方網站:http://activemq.apache.org/
現在ActiveMQ最新的版本是5.11.1,下載下傳挺簡單的,就不再截圖了。
運作ActiveMQ服務
-
下載下傳,解壓縮
大家現在好之後,将apache-activemq-5.11.1-bin.zip解壓縮,我們可以看到它的整體目錄結構:
從它的目錄來說,還是很簡單的:深入淺出JMS ActiveMQ - bin存放的是腳本檔案
- conf存放的是基本配置檔案
- data存放的是日志檔案
- docs存放的是說明文檔
- examples存放的是簡單的執行個體
- lib存放的是activemq所需jar包
- webapps用于存放項目的目錄
-
啟動ActiveMQ
我們了解activemq的基本目錄,下面我們運作一下activemq服務,輕按兩下bin目錄下的activemq.bat腳本檔案或運作自己電腦版本下的activemq.bat,就可以看下圖的效果。
深入淺出JMS ActiveMQ
從上圖我們可以看到activemq的存放位址,以及浏覽器要通路的位址.
3. 測試
ActiveMQ預設使用的TCP連接配接端口是61616, 通過檢視該端口的資訊可以測試ActiveMQ是否成功啟動 netstat -an|find “61616”
C:\Documents and Settings\Administrator>netstat -an|find "61616"
TCP 0.0.0.0:61616 0.0.0.0:0 LISTENING
4. 監控
ActiveMQ預設啟動時,啟動了内置的jetty伺服器,提供一個用于監控ActiveMQ的admin應用。
admin:http://127.0.0.1:8161/admin/
使用者名和密碼都是admin
5. 至此,服務端啟動完畢
停止伺服器,隻需要按着Ctrl+Shift+C,之後輸入y即可。
我們簡單說說ActiveMQ特性,網上很多,隻是為了保證博文的完整。
ActiveMQ特性清單
- 多種語言和協定編寫用戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。應用協定: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
- 完全支援JMS1.1和J2EE 1.4規範 (持久化,XA消息,事務)
- 對Spring的支援,ActiveMQ可以很容易内嵌到使用Spring的系統裡面去,而且也支援Spring2.0的特性
- 通過了常見J2EE伺服器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的測試,其中通過JCA 1.5 resource adaptors的配置,可以讓ActiveMQ可以自動的部署到任何相容J2EE 1.4 商業伺服器上
- 支援多種傳送協定:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
- 支援通過JDBC和journal提供高速的消息持久化
- 從設計上保證了高性能的叢集,用戶端-伺服器,點對點
- 支援Ajax
- 支援與Axis的整合
- 可以很容易得調用内嵌JMS provider,進行測試
什麼情況下使用ActiveMQ?
-
多個項目之間內建
(1) 跨平台
(2) 多語言
(3) 多項目
-
降低系統間子產品的耦合度,解耦
(1) 軟體擴充性
-
系統前後端隔離
(1) 前後端隔離,屏蔽高安全區
其實ActiveMQ的應用還有很多,大家可以上網查查,不再一一舉例。
總結
ActiveMQ并不難,具有很多的優勢。
深入淺出JMS(三)--ActiveMQ簡單的HelloWorld執行個體
我們使用ActiveMQ為大家實作一種點對點的消息模型。如果你對點對點模型的認識較淺,可以看一下第一篇博文的介紹。
JMS其實并沒有想象的那麼高大上,看完這篇博文之後,你就知道什麼叫簡單,下面直接進入主題。
開發環境
我們使用的是ActiveMQ 5.11.1 Release的Windows版,官網最新版是ActiveMQ 5.12.0 Release,大家可以自行下載下傳,下載下傳位址。
需要注意的是,開發時候,要将apache-activemq-5.11.1-bin.zip解壓縮後裡面的activemq-all-5.11.1.jar包加入到classpath下面,這個包包含了所有jms接口api的實作。
搭建開發環境
-
建立項目
我們隻需要建立一個java項目就可以了,導入jar包,項目截圖:
深入淺出JMS ActiveMQ
點對點的消息模型,隻需要一個消息生成者和消息消費者,下面我們編寫代碼。
- 編寫生産者
package com.tgb.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 消息的生産者(發送者)
* @author liang
*
*/
public class JMSProducer {
//預設連接配接使用者名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//預設連接配接密碼
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//預設連接配接位址
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
//發送的消息數量
private static final int SENDNUM = ;
public static void main(String[] args) {
//連接配接工廠
ConnectionFactory connectionFactory;
//連接配接
Connection connection = null;
//會話 接受或者發送消息的線程
Session session;
//消息的目的地
Destination destination;
//消息生産者
MessageProducer messageProducer;
//執行個體化連接配接工廠
connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);
try {
//通過連接配接工廠擷取連接配接
connection = connectionFactory.createConnection();
//啟動連接配接
connection.start();
//建立session
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//建立一個名稱為HelloWorld的消息隊列
destination = session.createQueue("HelloWorld");
//建立消息生産者
messageProducer = session.createProducer(destination);
//發送消息
sendMessage(session, messageProducer);
session.commit();
} catch (Exception e) {
e.printStackTrace();
}finally{
if(connection != null){
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
/**
* 發送消息
* @param session
* @param messageProducer 消息生産者
* @throws Exception
*/
public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
for (int i = ; i < JMSProducer.SENDNUM; i++) {
//建立一條文本消息
TextMessage message = session.createTextMessage("ActiveMQ 發送消息" +i);
System.out.println("發送消息:Activemq 發送消息" + i);
//通過消息生産者發出消息
messageProducer.send(message);
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 編寫消費者
package com.tgb.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 消息的消費者(接受者)
* @author liang
*
*/
public class JMSConsumer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//預設連接配接使用者名
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//預設連接配接密碼
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//預設連接配接位址
public static void main(String[] args) {
ConnectionFactory connectionFactory;//連接配接工廠
Connection connection = null;//連接配接
Session session;//會話 接受或者發送消息的線程
Destination destination;//消息的目的地
MessageConsumer messageConsumer;//消息的消費者
//執行個體化連接配接工廠
connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);
try {
//通過連接配接工廠擷取連接配接
connection = connectionFactory.createConnection();
//啟動連接配接
connection.start();
//建立session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//建立一個連接配接HelloWorld的消息隊列
destination = session.createQueue("HelloWorld");
//建立消息消費者
messageConsumer = session.createConsumer(destination);
while (true) {
TextMessage textMessage = (TextMessage) messageConsumer.receive();
if(textMessage != null){
System.out.println("收到的消息:" + textMessage.getText());
}else {
break;
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
運作
- 首先,啟動ActiveMQ,如何啟動ActiveMQ如何啟動,請看第二篇博文。在浏覽器中輸入:http://localhost:8161/admin/,然後開始執行:
- 運作發送者,eclipse控制台輸出,如下圖: 此時,我們先看一下ActiveMQ伺服器,Queues内容如下:
深入淺出JMS ActiveMQ 我們可以看到建立了一個名稱為HelloWorld的消息隊列,隊列中有10條消息未被消費,我們也可以通過Browse檢視是哪些消息,如下圖:深入淺出JMS ActiveMQ 如果這些隊列中的消息,被删除,消費者則無法消費。深入淺出JMS ActiveMQ - 我們繼續運作一下消費者,eclipse控制台列印消息,如下: 此時,我們先看一下ActiveMQ伺服器,Queues内容如下:
深入淺出JMS ActiveMQ 深入淺出JMS ActiveMQ 我們可以看到HelloWorld的消息隊列發生變化,多一個消息者,隊列中的10條消息被消費了,點選Browse檢視,已經為空了。
點選Active Consumers,我們可以看到這個消費者的詳細資訊:
深入淺出JMS ActiveMQ
我們的執行個體到此就結束了,大家可以自己多點ActiveMQ伺服器的内容,進一步熟悉ActiveMQ。
總結
實作了點對點的消息模型以及發送的一個同步消息,是不是非常的簡單?
深入淺出JMS(四)--Spring和ActiveMQ整合的完整執行個體
我們基于spring+JMS+ActiveMQ+Tomcat,做一個Spring4.1.0和ActiveMQ5.11.1整合執行個體,實作了Point-To-Point的異步隊列消息和PUB/SUB(釋出/訂閱)模型,簡單執行個體,不包含任何業務。
環境準備
工具
- JDK1.6或1.7
- Spring4.1.0
- ActiveMQ5.11.1
- Tomcat7.x
目錄結構
所需jar包
項目的配置
配置ConnectionFactory
connectionFactory是Spring用于建立到JMS伺服器連結的,Spring提供了多種connectionFactory,我們介紹兩個SingleConnectionFactory和CachingConnectionFactory。
SingleConnectionFactory:對于建立JMS伺服器連結的請求會一直傳回同一個連結,并且會忽略Connection的close方法調用。
CachingConnectionFactory:繼承了SingleConnectionFactory,是以它擁有SingleConnectionFactory的所有功能,同時它還新增了緩存功能,它可以緩存Session、MessageProducer和MessageConsumer。我們使用CachingConnectionFactory來作為示例。
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
</bean>
- 1
- 2
- 1
- 2
Spring提供的ConnectionFactory隻是Spring用于管理ConnectionFactory的,真正産生到JMS伺服器連結的ConnectionFactory還得是由JMS服務廠商提供,并且需要把它注入到Spring提供的ConnectionFactory中。我們這裡使用的是ActiveMQ實作的JMS,是以在我們這裡真正的可以産生Connection的就應該是由ActiveMQ提供的ConnectionFactory。是以定義一個ConnectionFactory的完整代碼應該如下所示:
<!-- ActiveMQ 連接配接工廠 -->
<!-- 真正可以産生Connection的ConnectionFactory,由對應的 JMS服務廠商提供-->
<!-- 如果連接配接網絡:tcp://ip:61616;未連接配接網絡:tcp://localhost:61616 以及使用者名,密碼-->
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="tcp://192.168.3.3:61616" userName="admin" password="admin" />
<!-- Spring Caching連接配接工廠 -->
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- 目标ConnectionFactory對應真實的可以産生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<!-- 同上,同理 -->
<!-- <constructor-arg ref="amqConnectionFactory" /> -->
<!-- Session緩存數量 -->
<property name="sessionCacheSize" value="100" />
</bean>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
配置生産者
配置好ConnectionFactory之後我們就需要配置生産者。生産者負責産生消息并發送到JMS伺服器。但是我們要怎麼進行消息發送呢?通常是利用Spring為我們提供的JmsTemplate類來實作的,是以配置生産者其實最核心的就是配置消息發送的JmsTemplate。對于消息發送者而言,它在發送消息的時候要知道自己該往哪裡發,為此,我們在定義JmsTemplate的時候需要注入一個Spring提供的ConnectionFactory對象。
在利用JmsTemplate進行消息發送的時候,我們需要知道發送哪種消息類型:一個是點對點的ActiveMQQueue,另一個就是支援訂閱/釋出模式的ActiveMQTopic。如下所示:
<!-- Spring JmsTemplate 的消息生産者 start-->
<!-- 定義JmsTemplate的Queue類型 -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 -->
<constructor-arg ref="connectionFactory" />
<!-- 非pub/sub模型(釋出/訂閱),即隊列模式 -->
<property name="pubSubDomain" value="false" />
</bean>
<!-- 定義JmsTemplate的Topic類型 -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 -->
<constructor-arg ref="connectionFactory" />
<!-- pub/sub模型(釋出/訂閱) -->
<property name="pubSubDomain" value="true" />
</bean>
<!--Spring JmsTemplate 的消息生産者 end-->
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
生産者如何指定目的地和發送消息?大家看源碼即可,就不再這提供了。
配置消費者
生産者往指定目的地Destination發送消息後,接下來就是消費者對指定目的地的消息進行消費了。那麼消費者是如何知道有生産者發送消息到指定目的地Destination了呢?每個消費者對應每個目的地都需要有對應的MessageListenerContainer。對于消息監聽容器而言,除了要知道監聽哪個目的地之外,還需要知道到哪裡去監聽,也就是說它還需要知道去監聽哪個JMS伺服器,通過配置MessageListenerContainer的時候往裡面注入一個ConnectionFactory來實作的。是以我們在配置一個MessageListenerContainer的時候有三個屬性必須指定:一個是表示從哪裡監聽的ConnectionFactory;一個是表示監聽什麼的Destination;一個是接收到消息以後進行消息處理的MessageListener。
<!-- 消息消費者 start-->
<!-- 定義Queue監聽器 -->
<jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
<jms:listener destination="test.queue" ref="queueReceiver1"/>
<jms:listener destination="test.queue" ref="queueReceiver2"/>
</jms:listener-container>
<!-- 定義Topic監聽器 -->
<jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
<jms:listener destination="test.topic" ref="topicReceiver1"/>
<jms:listener destination="test.topic" ref="topicReceiver2"/>
</jms:listener-container>
<!-- 消息消費者 end -->
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
ActiveMQ.xml
此時,Spring和JMS,ActiveMQ整合的ActiveMQ.xml已經完成,下面展示所有的xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.0.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd">
<!-- ActiveMQ 連接配接工廠 -->
<!-- 真正可以産生Connection的ConnectionFactory,由對應的 JMS服務廠商提供-->
<!-- 如果連接配接網絡:tcp://ip:61616;未連接配接網絡:tcp://localhost:61616 以及使用者名,密碼-->
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="tcp://192.168.3.3:61616" userName="admin" password="admin" />
<!-- Spring Caching連接配接工廠 -->
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- 目标ConnectionFactory對應真實的可以産生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<!-- 同上,同理 -->
<!-- <constructor-arg ref="amqConnectionFactory" /> -->
<!-- Session緩存數量 -->
<property name="sessionCacheSize" value="100" />
</bean>
<!-- Spring JmsTemplate 的消息生産者 start-->
<!-- 定義JmsTemplate的Queue類型 -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 -->
<constructor-arg ref="connectionFactory" />
<!-- 非pub/sub模型(釋出/訂閱),即隊列模式 -->
<property name="pubSubDomain" value="false" />
</bean>
<!-- 定義JmsTemplate的Topic類型 -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 -->
<constructor-arg ref="connectionFactory" />
<!-- pub/sub模型(釋出/訂閱) -->
<property name="pubSubDomain" value="true" />
</bean>
<!--Spring JmsTemplate 的消息生産者 end-->
<!-- 消息消費者 start-->
<!-- 定義Queue監聽器 -->
<jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
<jms:listener destination="test.queue" ref="queueReceiver1"/>
<jms:listener destination="test.queue" ref="queueReceiver2"/>
</jms:listener-container>
<!-- 定義Topic監聽器 -->
<jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
<jms:listener destination="test.topic" ref="topicReceiver1"/>
<jms:listener destination="test.topic" ref="topicReceiver2"/>
</jms:listener-container>
<!-- 消息消費者 end -->
</beans>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
鑒于博文内容較多,我們隻是在粘貼web.xml的配置,就不在博文中提供Spring和SpringMVC的XML配置,其他内容,大家檢視源碼即可。
web.xml
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://java.sun.com/xml/ns/javaee" xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
version="3.0">
<display-name>ActiveMQSpringDemo</display-name>
<!-- Log4J Start -->
<context-param>
<param-name>log4jConfigLocation</param-name>
<param-value>classpath:log4j.properties</param-value>
</context-param>
<context-param>
<param-name>log4jRefreshInterval</param-name>
<param-value>6000</param-value>
</context-param>
<!-- Spring Log4J config -->
<listener>
<listener-class>org.springframework.web.util.Log4jConfigListener</listener-class>
</listener>
<!-- Log4J End -->
<!-- Spring 編碼過濾器 start -->
<filter>
<filter-name>characterEncoding</filter-name>
<filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
<init-param>
<param-name>encoding</param-name>
<param-value>UTF-8</param-value>
</init-param>
<init-param>
<param-name>forceEncoding</param-name>
<param-value>true</param-value>
</init-param>
</filter>
<filter-mapping>
<filter-name>characterEncoding</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
<!-- Spring 編碼過濾器 End -->
<!-- Spring Application Context Listener Start -->
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath*:applicationContext.xml,classpath*:ActiveMQ.xml</param-value>
</context-param>
<listener>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>
<!-- Spring Application Context Listener End -->
<!-- Spring MVC Config Start -->
<servlet>
<servlet-name>SpringMVC</servlet-name>
<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
<init-param>
<param-name>contextConfigLocation</param-name>
<param-value>classpath:spring-mvc.xml</param-value>
</init-param>
<load-on-startup>1</load-on-startup>
</servlet>
<servlet-mapping>
<servlet-name>SpringMVC</servlet-name>
<!-- Filter all resources -->
<url-pattern>/</url-pattern>
</servlet-mapping>
<!-- Spring MVC Config End -->
</web-app>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
運作效果
從上圖可以看出隊列模型和PUB/SUB模型的差別,Queue隻能由一個消費者接收,其他Queue中的成員無法接受到被已消費的資訊,而Topic則可以,隻要是訂閱了Topic的消費者,全部可以擷取到生産者釋出的資訊。
總結
Spring提供了對JMS的支援,ActiveMQ提供了很好的實作,而此時我們已經将兩者完美的結合在了一起。
下篇博文我們實作Spring和ActiveMQ消息的持久化。
源碼下載下傳