類似于rmi、hessian、burlap等遠端方法調用,它們都是同步的,所謂同步調用就是用戶端必須等待操作完成,如果遠端服務沒有傳回任何響應,用戶端會一直等待直到服務完成。

異步調用則不同,用戶端發送完消息無需等待服務處理完成便可立即傳回,就像發送完消息就立刻被處理成功一樣。
在異步處理的世界,我們可以把消息的發送比作一個郵局系統。比如我們要給某個人發送信件,我們隻需準備好信件,把它投入郵局的郵箱即可,我們不必關心郵件如何送出、能否到達,郵局系統會保證信件最終送達到我們希望的接收者手中。和郵局系統類似,當一個應用向另一個應用發送消息,兩個應用之間沒有直接的關聯,而是發送消息的應用把消息交給一個消息系統,由消息系統確定把消息傳遞給接收消息的應用。
在異步消息系統中有兩個重要的角色:消息broker和destination。當一個應用發送一條消息,它會直接把它發送給消息broker,消息broker扮演的就是郵局,它會確定消息被傳遞到特定的destination。當我們郵寄信件時,信件的位址尤為重要,消息系統中的位址就是destination。不過與信件中的位址不同,destination中定義的不是接收者是誰,而是消息被放在消息broker的什麼地方(具體指queue或者topic),destination其實更像郵局系統中的郵筒。
盡管存在各種各樣的消息系統,每個消息系統都有各自的消息路由方式,但總體上有兩種類型的destination:queue和topic,它們也各自關聯着一種特定的消息處理模型:點對點(point-to-point/queue)和釋出/訂閱(publish/subscribe/topic)
在點對點模型中,每個消息隻有一個發送者和一個接收者。如下圖所示:
在點對點模型中, 消息broker會把消息放入一個queue。當一個接收者請求下一個消息時,消息會被從queue中取出并傳遞給接收者。因為消息從queue中取出便會被移除,是以這保證了一個消息隻能有一個接收者。
盡管消息隊列中的每個消息隻有一個接收者,但這并不意味着隻能有一個接收者從隊列擷取消息,可以同時有多個接收者從隊列擷取消息,隻不過它們隻能處理各自接收到的消息。其實這就像在銀行排隊一樣,排隊的人可以看做一個個消息,而銀行工作視窗便是消息的接收者,每個視窗服務完一個客戶之後都會讓隊列中的“下一個”到視窗辦理業務。
還有,如果多個接收者監聽一個隊列,我們是很難确定到底哪個接收者處理哪個消息的。不過這也不一定不好,因為這樣就使得我們很友善的通過增加接收者來拓展應用處理能力了。
在釋出/訂閱模式中,消息是被發送到topic中的。就像queue一樣,很多接收者可以監聽同一個topic,但是與queue每個消息隻傳遞給一個接收者不同,訂閱了同一個topic的所有接收者都會收到消息的拷貝,如下圖所示:
從釋出/訂閱的名字中我們也可看出,釋出者釋出一條消息,所有訂閱者都能收到,這就是釋出訂閱模式最大的特性。對于釋出者來說,它隻知道将消息釋出到了一個特定的topic,它不關心誰監聽這個topic,這也就意味着它并不知道這些消息是被如何處理的。
在具體介紹異步消息系統帶來的好處之前,我們先看看同步系統的局限性:
同步會話意味着等待:當客戶的調用遠端服務的方法時,用戶端必須等待遠端方法結束之後才能繼續,如果用戶端與遠端服務交流頻繁或者遠端服務響應過慢,會影響用戶端的性能
用戶端和服務接口耦合:如果服務接口發生改變,所有客戶的都需要修改
用戶端和服務位置耦合:用戶端要想使用遠端服務就必須配置服務的位址,如果網絡拓撲發生變化,用戶端需要重新配置服務位址
用戶端和服務可用性耦合:如果服務不可用,那麼也會導緻用戶端不可用
下面我們再看一下異步消息系統是如何解決這些問題的。
無需等待
當一個消息被異步發送,用戶端不需要等待它處理完成。用戶端直接把消息扔給broker然後做其它事情,broker負責把消息送到合适的目的地。
因為用戶端不需要等待,是以用戶端的性能會有很大的提升。
面向消息和解耦合
不同于傳統基于方法調用的rpc會話,消息異步發送是以資料為中心的。這就意味着用戶端不需要和某個方法簽名綁定,任何queue或topic的訂閱者都可以處理用戶端發送的消息。用戶端不必再關心服務方任何相關的問題。
位置獨立
同步rpc服務的調用是通過網絡位址定位的,這就意味着用戶端無法擺脫網絡拓撲的變化。如果服務的ip或端口發生改變,用戶端也需要做相應的改變。
相反,異步消息系統中的用戶端并不關心服務所在的位置及其如何處理消息,它隻負責将消息發送到特定的queue或topic。是以,服務位于什麼地方都無所謂,隻要它們能夠從queue或topic中擷取消息即可。
在點對點模式中,可以很友善的利用位置獨立這個特性建立一個服務叢集。用戶端不需要關心服務的位置,叢集中各個服務僅需知道broker的位置,并從同一個queue擷取消息,如果服務壓力過大無法及時處理消息,我們隻需要在叢集中增加一個服務執行個體去監聽同一個queue即可。
在釋出/訂閱模式中,位置獨立同樣有很重要的作用。多個服務可以訂閱同一個topic,他們都能擷取到topic中的每個消息,但是對各個服務的處理可以不同。比如我們有一個服務集合訂閱了一個接收新員工消息的topic,是以這些服務都可以得到每個新員工消息,一個服務可以将新員工添加到薪資系統,另一個服務可以将新員工增加到hr系統,還有服務負責賦予新員工各種系統權限等等,每個訂閱topic的服務都能對各自的消息做出自己的處理。
可靠性保證
當一個用戶端和服務通過同步方式進行互動時,如果服務出現任何問題挂掉,都會影響用戶端正常工作。但是當消息是異步發送時,用戶端與服務之間被broker隔離,用戶端隻負責發送消息,即使當發送消息時服務挂掉,消息也會被broker存儲起來,等到服務可用時再接着進行處理。
java message service是一個java标準,它定義了一套與消息broker互動的通用api。在jms出現之前,每一種消息broker都有自己獨特的一套api,使得應用代碼無法在不同的broker之間适用。但是通過jms,所有與broker互動的代碼就可以适用一套通用的api,就像jdbc一樣。
當然spring對jms也提供了支援,即jmstemplate。通過jmstemplate,我們可以更加友善地向queue和topic發送和接收消息。後面我們會詳細介紹spring對jms的實作,但是在發送和接收消息之前,我們需要現有一個broker。
我們要想發送消息到activemq,就需要先建立到它的連接配接,<code>activemqconnectionfactory</code>就是jms中負責建立到activemq連接配接的工廠類。在spring中配置方式如下:
除此之外,spring為activemq提供了專門的命名空間,我們可以使用spring的activemq命名空間來建立連接配接工廠。首先要在配置檔案中聲明amq命名空間:
然後我們就可以利用<code><amq:connectionfactory></code>元素來聲明一個連接配接工廠:
需要注意,<code><amq:connectionfactory></code>元素是專門針對activemq的。如果我們用到的是其它broker,就需要用另外的标簽元素或注入另外的工廠bean。上面元素中的<code>brokerurl</code>指定了activemq在伺服器中的ip和端口,上面端口值就是activemq預設端口。
除了要有一個連接配接工廠之外,我們還需要知道消息發送到的destination。上面講過了,消息的destination隻有兩類queue或者topic,在spring中,我們需要配置queue或topic對應的bean。
配置一個activemq queue bean:
配置一個activemq topic bean:
上面例子中<code>c:_</code>屬性代表的是構造器參數,它指定了queue或topic的名稱。
像連接配接工廠一樣,spring提供了另外一種配置destination的方式,就是通過spring activemq命名空間進行配置。
使用<code><amq:queue></code>元素配置一個queue:
使用<code><amq:topic></code>元素配置一個topic:
上面元素中<code>physicalname</code>屬性代表消息通道的名稱,也就是queue和topic的名稱。
通過上面兩個元件的配置,我們就可以向activemq發送和接收消息了。發送和接收消息我們使用的是spring提供的jmstempate,它是spring對jms的抽象,下面就詳細介紹jmstemplate的使用。
雖然jms提供了一套與各種broker互動的通用api,但實際使用起來并不是很友善,我們先看一下使用普通jms api與broker互動的代碼。
上面代碼中我們可以看到,為了發送一條 “hello world”的消息卻用了20多行代碼,就像jdbc一樣,我們大部分代碼都是再做一些重複性的準備工作,比如擷取連接配接、建立session、異常處理等等。其實接收消息的代碼也是如此,在jdbc中,spring提供了一個jdbctemplate來簡化jdbc代碼開發,同樣,spring也提供了<code>jmstemplate</code>來簡化jms消息處理的開發。
jmstemplate其實是spring對jms更高一層的抽象,它封裝了大部分建立連接配接、擷取session及發送接收消息相關的代碼,使得我們可以把精力集中在消息的發送和接收上。另外,<code>jmstemplate</code>對異常也做了很好的封裝,其對應的基本的異常為<code>jmsexception</code>。
要使用jmstemplate,就要在spring配置檔案中配置它作為一個bean:
因為jmstemplate需要先和broker進行連接配接,是以它需要依賴一個connectionfactory。
發送消息
假如我們有一個業務需要用到異步消息發送,我們先定義這樣一個業務接口:
上面接口中隻有一個方法,就是發送消息。
我們寫這個接口的實作,在這個接口實作中,我們就是用<code>jmstemplate</code>實作異步消息發送:
我們可以看到,我們業務的實作中注入了一個<code>jmsoperations</code>對象,這個對象就是<code>jmstempate</code>的實作。<code>jmsoperations</code>的<code>send()</code>方法有兩個參數,第一個是消息的<code>destination</code>,第二個便是具體的<code>message</code>,在上面例子中message是通過一個匿名内部類<code>messagecreator</code>的<code>createmessage()</code>方法構造的。
通過上面例子可以發現,通過<code>jmstempate</code>,我們隻需要關心發送消息即可,所有的連接配接和session的維護都由<code>jmstempate</code>負責。
設定預設destination
大部分情況下,一個業務消息的destination是相同的,是以我們不必每次發送都填寫destination,我們可以在配置檔案中對其進行配置:
在上面配置中我們預設destination值為<code>biz1.queue</code>,因為它隻是聲明了一個名稱,并沒有說明是哪種類型的destination,是以,如果存在相同名稱的queue或topic,就會自動與之比對,如果不存在,則會預設建立一個相同名稱的queue。如果我們想指定destination的類型,我們可以通過配置讓其依賴之前配置的destination bean即可:
當我們配置了預設destination,我們就可以在發送消息時省略第一個參數了:
其實上面的<code>send()</code>方法可以變得更簡單,我們可以利用消息轉換器。
使用消息轉換器發送消息
除了<code>send()</code>方法之外,<code>jmstemplate</code>還提供了<code>convertandsend()</code>方法。與<code>send()</code>方法需要依賴一個<code>messagecreator</code>不同,<code>convertandsend()</code>方法隻需要傳入你想發送的消息即可。下面我們用<code>convertandsend()</code>實作接口中的<code>sendmessage()</code>方法:
<code>convertandsend()</code>方法會自動把你發的消息轉換成<code>message</code>,具體如何轉換的由<code>org.springframework.messaging.converter.messageconverter</code>的實作來決定。我們先看一下<code>messageconverter</code>接口:
我們可以看到這個接口中隻有兩個方法而且很容易實作。其實大部分情況下我們不需要自己去實作這個接口,spring已經為我們提供給了很多常用的實作:
預設情況下,當<code>jmstemplate</code>的<code>convertandsend()</code>方法使用的是<code>simplemessageconverter</code>。但是我們也可以通過配置把我們自定義的<code>messageconverter</code>作為屬性注入到<code>jmstemplate</code>中,比如我們有個一<code>messageconverter</code>的實作bean:
我們可以把上面這個bean注入到jmstemplate中:
消費消息
對于消費來說,<code>jmstemplate</code>使用起來比發送更簡單,隻需要調用<code>jmsoperations</code>的<code>receive()</code>方法即可:
當調用 <code>jmsoperations.receive()</code>方法時,它會嘗試從broker擷取消息,若此時沒有消息,<code>receive()</code>方法會一直等待直到有消息産生。前面例子中,當我們發送消息的時候消息被封裝成的是<code>objectmessage</code>,所我們在擷取的時候可以再将其轉換回<code>objectmessage</code>。
這裡有一點需要注意,當調用<code>message.getobject()</code>方法時會抛出<code>jmsexception</code>,這個異常是屬于jms api的。<code>jmsexception</code>是一個檢查異常,在jms操作中會抛出各種各樣的<code>jmsexception</code>,但是前面我們使用<code>jmstemplate</code>時并沒有捕獲任何<code>jmsexception</code>,是因為<code>jmstemplate</code>内部已經将需要檢查的<code>jmsexception</code>轉換成了非檢查的spring自己的<code>jmsexception</code>。在上面代碼中因為調用的是<code>message.getobject()</code>方法而不是<code>jmstemplate</code>的方法,是以我們需要捕獲<code>jmsexception</code>。但是按照spring的設計理念,我們應該盡量減少檢查異常,是以在catch塊裡面我們又通過jmsutils工具把<code>jmsexception</code>轉換成了非檢查的<code>jmsexception</code>。
同樣,就行消息的發送一樣,我們也可以使用jmstemplate的<code>receiveandconvert()</code>方法替換<code>receive()</code>方法:
我們看到,因為使用的是<code>jmstemplate</code>的方法,是以我們不需要再捕獲<code>jmsexception</code>檢查異常。
不管使用<code>mstemplate</code>的<code>receive()</code>還是<code>receiveandconvert()</code>方法消費消息,它們都是同步的。也就是說接收者在消息到達時需要等待。這樣看起來是不是有點奇怪?發送消息時是異步的,接收消息時卻是同步的。
這也就是為什麼會有下面的消息驅動pojo出現的原因,下面我們就看一下如何實作異步的接收消息。
我們上面已經知道,<code>jmstemplate</code>的<code>receive()</code>方法是一個同步方法,在消息到達之前這個方法會挂起一直等待直到消息出現,如果這樣的話,我們的應用可能會出現一直等待消息而不能做其它事情的情況。為何不讓應用先去處理其它業務,當消息出現時再告知應用處理呢?
在ejb中,<code>message driven bean(mdb)</code>就可以實作異步的處理消息。spring在這方面參考了ejb3對mdb的實作,不過在spring中我們把它稱作消息驅動pojo,也就是<code>message-driven pojo(mdp)</code>。
要想在消息出現時得到通知,那麼就需要一個監聽器監聽queue或者topic,之是以稱作消息驅動pojo,意識因為監聽器是消息驅動的,而是因為這個監聽器本身就是一個普通的pojo對象,不需要依賴任何接口:
有了這個pojo對象,下面隻需要做簡單的配置即可。
賦予上面pojo接收消息能力的關鍵在于将其配置成一個spring消息監聽器,spring的jms命名空間提供了所有相關配置。
首先,我們現需要把上面的pojo對象聲明成一個bean:
其次,把messagehandler變成一個消息驅動pojo,即把這個bean聲明成一個listener:
通過上面配置,消息監聽容器裡面就多了一個消息監聽器。消息監聽容器是一個特殊的bean,它能夠監聽jms的destination,監聽消息的到達。一旦消息到達,消息監聽容器會接受這個消息并将其發送給所有相關的listener。下面這幅圖展示了整個内部處理過程:
為了配置監聽容器和監聽者,我們用到了jms命名空間中的兩個元素。<code><jms:listener-container></code>是父元素,<code><jms:listener ></code>是子元素。<code><jms:listener-container></code>依賴一個<code>connectionfactory</code>,這樣它的各個<code><jms:listener ></code>就可以監聽消息了。<code><jms:listener ></code>用來定義具體接收消息的bean及方法。按照上面的配置,當消息到達queue時,<code>mymessagehandler</code>的<code>handlemessage</code>方法便會被調用。
需要注意到是,我們的<code>messagehandler</code>還可以實作一個<code>messagelistener</code>接口,這樣的話就不需要再單獨指定消息處理的方法了,<code>mymessagehandler</code>的<code>onmessage()</code>方法會自動被調用。messagelistener接口定義如下:
我們寫一個簡單的實作類:
然後直接配置listener即可(不用再配置method方法屬性):