天天看點

架構設計:系統間通信(20)——MQ:消息協定(下)4、不得不提的JMS規範5、後文介紹

(接上文《架構設計:系統間通信(19)——MQ:消息協定(上)》)

上篇文章中我們重點讨論了“協定”的重要性,并為各位讀者介紹了Stomp協定和XMPP協定。這兩種協定是消息隊列中兩種不同使用場景下的典型代表。本文主要接續上文的篇幅,繼續讨論消息隊列中另一種典型協定:AMQP協定。

3-3、AMQP協定

AMQP協定的全稱是:Advanced Message Queuing Protocol(進階消息隊列協定)。目前AMQP協定的版本為 Version 1.0,這個協定标準在2014年通過了國際标準組織 (ISO) 和國際電工委員會 (IEC) 的投票,成為了新的 ISO 和 IEC 國際化标準。目前支援AMQP的軟體廠商包括:

架構設計:系統間通信(20)——MQ:消息協定(下)4、不得不提的JMS規範5、後文介紹

3-3-1、協定概覽

在網絡上講解AQMP協定的文章已經有很多了,您可以在百度、Google、必應上搜尋關鍵字‘AMQP’,就會出現很多相關文章。雖然文章數量比較多,但是卻鮮有品質過硬的文章(當然除了AMQP官網 http://www.amqp.org/ 的協定說明文檔)。本小節的内容我試圖在能力所及的範圍内,為各位讀者将AMQP協定的核心要點講清楚。為了達到這個目的,首先将AMQP協定的原理用下圖進行一個全面呈現,然後在詳細講解圖中的每一個要點:

架構設計:系統間通信(20)——MQ:消息協定(下)4、不得不提的JMS規範5、後文介紹

從上圖我們可以看到AMQP協定的各個組成部分:

  • AMQP協定中的元素包括:Message(消息體)、Producer(消息生産者)、Consumer(消息消費者)、Virtual Host(虛拟節點)、Exchange(交換機)、Queue(隊列)等
  • 由Producer(消息生産者)和Consumer(消息消費者)構成了AMQP的用戶端,他們是發送消息和接收消息的主體。AMQP服務端稱為Broker,一個Broker中一定包含完整的Virtual Host(虛拟主機)、 Exchange(交換機)、Queue(隊列)定義。
  • 一個Broker可以建立多個Virtual Host(虛拟主機),我們将讨論的Exchange和Queue都是虛拟機中的工作元素(還有User元素)。注意,如果AMQP是由多個Broker構成的叢集提供服務,那麼一個Virtual Host也可以由多個Broker共同構成。
  • Connection是由Producer(消息生産者)和Consumer(消息消費者)建立的連接配接,連接配接到Broker實體節點上。但是有了Connection後用戶端還不能和伺服器通信,在Connection之上用戶端會建立Channel,連接配接到Virtual Host或者Queue上,這樣用戶端才能向Exchange發送消息或者從Queue接受消息。一個Connection上允許存在多個Channel,隻有Channel中能夠發送/接受消息。
  • Exchange元素是AMQP協定中的交換機,Exchange可以綁定多個Queue也可以同時綁定其他Exchange。消息通過Exchange時,會按照Exchange中設定的Routing(路由)規則,将消息發送到符合的Queue或者Exchange中。

那麼AMQP消息在這個結構中是如何通過Producer發出,又經過Broker最後到達Consumer的呢?請看下圖:

架構設計:系統間通信(20)——MQ:消息協定(下)4、不得不提的JMS規範5、後文介紹
  1. 在Producer(消息生産者)用戶端建立了Channel後,就建立了到Broker上Virtual Host的連接配接。接下來Producer就可以向這個Virtual Host中的Exchange發送消息了。
  2. Exchange(交換機)能夠處理消息的前提是:它至少已經和某個Queue或者另外的Exchange形成了綁定關系,并設定好了到這些Queue和Excahnge的Routing(路由規則)。Excahnge中的Routing有三種模式,我們随後會講到。在Exchange收到消息後,會根據設定的Routing(路由規則),将消息發送到符合要求的Queue或者Exchange中(路由規則還會和Message中的Routing Key屬性配合使用)。
  3. Queue收到消息後,可能會進行如下的處理:如果目前沒有Consumer的Channel連接配接到這個Queue,那麼Queue将會把這條消息進行存儲直到有Channel被建立(AMQP協定的不同實作産品中,存儲方式又不盡相同);如果已經有Channel連接配接到這個Queue,那麼消息将會按順序被發送給這個Channel。
  4. Consumer收到消息後,就可以進行消息的處理了。但是整個消息傳遞的過程還沒有完成:視設定情況,Consumer在完成某一條消息的處理後,将需要手動的發送一條ACK消息給對應的Queue(當然您可以設定為自動發送,或者無需發送)。Queue在收到這條ACK資訊後,才會認為這條消息處理成功,并将這條消息從Queue中移除;如果在對應的Channel斷開後,Queue都沒有這條消息的ACK資訊,這條消息将會重新被發送給另外的Channel。當然,您還可以發送NACK資訊,這樣這條消息将會立即歸隊,并發送給另外的Channel。

3-3-2、Message(消息體)

通過上一小節的描述,我們可以看到AMQP協定中消息的處理規則和Stomp協定中消息的處理規則有類似之處,比如對ACK、NACK的使用。但明顯不同的地方還是很多,例如AMQP中Exchange元素提供的Routing路由規則,這顯然比Stomp協定中直接發送給Queue要靈活得多。

為了支援AMQP協定中的這些規則,AMQP協定中的消息也必須有特定的格式,實際上AMQP協定要比Stomp協定複雜得多。下面我們就根據ISO/IEC釋出的AMQP Version 1.0标準文檔,來讨論一下AMQP協定中的消息格式。

首先要說明的是目前國内多個技術站點,詳細介紹AMQP消息格式的文章本來就不多(不包括那些聊聊幾筆的轉發),而且基本上都沒有詳細講解格式本身,隻是粗略說明了AMQP消息采用二進制格式(任何應用層協定在網絡上進行傳輸,都是使用二進制流進行的,是以這個說法當然沒錯)。

有的文章還向讀者傳遞了錯誤的資訊。例如說AMQP消息格式包括兩部分:消息頭和消息正文。 這是完全錯誤的,雖然AMQP消息格式确實包括Header和Body部分,但是絕對不止這兩個部分。(如果真是這樣,ISO/IEC組織就不需要使用125頁的文檔篇幅來進行說明了)

首先我們需要說明的是,作為一種網絡通訊協定,AMQP工作在七層/五層網絡模型的應用層,是一個典型的應用層協定;另外,由于AMQP協定存在多種元素定義,且這些元素定義工作在不同的領域。例如Channel的定義是為了基于網絡連接配接記錄會話狀态;Queue等元素幫助AMQP完成路由規則,這些元素在Message消息記錄中都需要有所展現。

是以AMQP協定首先要記錄網絡狀态和會話狀态,格式如下(AMQP幀的定義在《OASIS Advanced Message Queueing Protocol

(AMQP) Version 1.0》文檔的第38頁):

架構設計:系統間通信(20)——MQ:消息協定(下)4、不得不提的JMS規範5、後文介紹

其中非PAYLOAD部分,在網絡協定的應用層說明Channel的工作狀态(當然還有說明整個AMQP消息的長度區域:SIZE),我們真正需要的内容存在PAYLOAD區域。PAYLOAD區域(譯文稱為‘傳遞區’)的格式如下(可以在《OASIS Advanced Message Queueing Protocol

(AMQP) Version 1.0》文檔的第3部分:messaging第82頁找到詳細說明):

架構設計:系統間通信(20)——MQ:消息協定(下)4、不得不提的JMS規範5、後文介紹

在PAYLAOD區域一共包含7個資料區域:header、delivery-annotations、message-annotations、properties、application-properties、application-data、footer。這些元素的作用如下:

  • header:header部分記錄了AMQP消息的在‘支援AMQP的中間件’中的互動狀态。例如該條消息在節點間被互動的總次數、優先級、TTL(Time To Live)值等資訊。
  • delivery-annotations:在header部分隻能傳遞規範的、标準的、經過ISO/IEC組織定義的屬性。那麼如果需要在header部分傳遞一些非标準資訊怎麼辦呢?這就是delivery-annotations資料區域存在的意義:用來記錄那些’非标’的header資訊。
  • message-annotations:這個資料區域,用于存儲一些自定義的輔助屬性。和delivery-annotations區域的非标準資訊不同,這裡的自定義屬性主要用于消息的轉換。例如AMQP-JMS資訊轉換過程中将依據這個資料區域的“x-opt-jms-type”、“x-opt-to-type”、“x-opt-reply-type”和“name”屬性進行JMS規範中對應的“JMSType”、“Type of the JMSDestination”、“Type of the JMSReplyTo”和“JMS_AMQP_MA_name”屬相的轉換。
  • properties:從整個AMQP消息的properties屬性開始,到AMQP消息的application-data部分結束,才是AMQP消息的正文内容(譯文稱為‘裸消息’)。Properties屬性記錄了AMQP消息正文的那些‘不可變’屬性。在properties部分隻能傳遞規範的、标準的、經過ISO/IEC組織定義的屬性。例如:消息id、分組id、發送者id、内容編碼等。以下是AMQP協定文檔中對Properties部分屬性的描述(隻能包含這些資訊):
<type name="properties" class="composite" source="list" provides="section">
    <descriptor name="amqp:properties:list" code="0x00000000:0x00000073"/>
    <field name="message-id" type="*" requires="message-id"/>
    <field name="user-id" type="binary"/>
    <field name="to" type="*" requires="address"/>
    <field name="subject" type="string"/>
    <field name="reply-to" type="*" requires="address"/>
    <field name="correlation-id" type="*" requires="message-id"/>
    <field name="content-type" type="symbol"/>
    <field name="content-encoding" type="symbol"/>
    <field name="absolute-expiry-time" type="timestamp"/>
    <field name="creation-time" type="timestamp"/>
    <field name="group-id" type="string"/>
    <field name="group-sequence" type="sequence-no"/>
    <field name="reply-to-group-id" type="string"/>
</type>
           
  • application-properties:‘應用資料’屬性,在這部分資料中主要記錄和應用有關的資料,AMQP的實作産品(例如RabbitMQ)需要用這部分資料決定其處理邏輯。例如:送入哪一個Exchange、消息的Routing值是什麼、是否進行持久化等。
  • application-data:使用二進制格式描述的AMQP消息的使用者部分内容。既是我們發送出去的真實内容。
  • footer:一般在這個資料區域存儲輔助内容,例如消息的哈希值,HMAC,簽名或者加密細節。

以上才是一個AMQP消息的完整結構。當然由于篇幅限制,在某一個資料區域的‘标準’屬性就沒有再細講了,例如Properties資料區域定義的creation-time屬性、Header資料區域定義的durable屬性。有興趣的朋友可以參考《OASIS Advanced Message Queueing Protocol

(AMQP) Version 1.0》,這個文檔我已經上傳到我的下載下傳清單中,供大家免費下載下傳^_^(http://download.csdn.net/detail/yinwenjie/9460653)。

3-3-3、Exchange(交換機)路由規則

Exchange交換機在AMQP協定中主要負責按照一定的規則,将收到的消息轉發到已經和它事先綁定好的Queue或者另外的Exchange中。Excahnge交換機的這個處理過程稱之為Routing(路由)。目前流行的AMQP路由實作一共有三種:分别是Direct Exchange、Fanout Exchange和Topic Exchange。需要特别注意的是:Exhange需要具備怎樣的‘路由’規則,并沒有在AMQP标準協定進行強行規定,目前流行的AMQP轉發規則都是AMQP實作産品自行開發的(這也是為什麼AMQP消息中和路由、過濾規則相關的屬性是存放在application-properties區域的原因)。

A、Direct路由

direct模式從字面上的了解應該是‘引導’、‘直接’的含義。direct模式下Exchange将使用AMQP消息中所攜帶的Routing-Key和Queue中的Routing Key進行比較。如果兩者完全比對,就會将這條消息發送到這個Queue中。如下圖所示:

架構設計:系統間通信(20)——MQ:消息協定(下)4、不得不提的JMS規範5、後文介紹

B、Fanout路由

Fanout路由模式不需要Routing Key。當設定為Fanout模式的Exchange收到AMQP消息後,将會将這個AMQP消息複制多份,分别發送到和自己綁定的各個Queue中。如下圖所示:

架構設計:系統間通信(20)——MQ:消息協定(下)4、不得不提的JMS規範5、後文介紹

C、Topic路由

Topic模式是Routing Key的比對模式。Exchange将支援使用‘#’和‘ * ’通配符進行Routing Key的比對查找(‘#’表示0個或若幹個關鍵詞,‘ * ’表示一個關鍵詞,注意是關鍵詞不是字母)。如下圖所示:

架構設計:系統間通信(20)——MQ:消息協定(下)4、不得不提的JMS規範5、後文介紹

為了友善各位讀者的了解,這裡我們再舉幾個通配符比對的示例:

  • “param.#”,可以比對“param”、“param.test”、“param.value”、“param.test.child”等等AMQP消息的Routing Key;但是不能比對諸如“param1.test”、“param2.test”、“param3.test”。以為param這個關鍵詞和param1這個關鍵詞不相同。
  • “param.*.* ”,可以比對“param.test.test”、“param.test.value”、“param.test.child”等等AMQP消息的Routing Key;但是不能比對諸如“param”、“param.test”、“parm.child”等等Routing Key。
  • “param.*.value”,可以比對“param.value.value”、“param.test.value”等Routing Key;但是不能比對諸如“param.value”、“param.value.child”等Routing Key。

注意,以上介紹的Direct 路由模式和Topic 路由模式中,如果Exchange交換機沒有找到任何比對Routing Key的Queue,那麼這條AMQP消息會被丢棄。(隻有Queue有儲存消息的功能,但是Exchange并不負責儲存消息)

4、不得不提的JMS規範

JMS不是消息隊列,更不是某種消息隊列協定。**JMS是Java消息服務接口,是一套規範的JAVA API 接口。這套規範接口由SUN提出,并在2002年釋出JMS規範的Version 1.1版本。**JMS和消息中間件廠商無關,既然是一套接口規範,就代表這它需要各個廠商進行實作。好消息是,大部分消息中間件産品都支援JMS 接口規範。也就是說,您可以使用JMS API來連接配接Stomp協定的産品(例如ActiveMQ)。就像您可以使用JDBC API來連接配接ORACLE或者MYSQL一樣。

部分網絡上的資料都介紹JMS是一個消息隊列,這個說法是錯誤的,會誤導讀者。難道你能說JDBC是資料庫?

當然,這些具體實作JMS規範的JAVA API都是由具體的中間件廠商提供的。下面一段代碼示範了如何使用JMS建立與ActiveMQ的連接配接:

package com.yinwenjie.test.testActivemq.jms;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;

/**
 * 測試使用JMS API連接配接ActiveMQ
 * @author yinwenjie
 */
public class JMSProducer {
    /**
     * 由于是測試代碼,這裡忽略了異常處理。
     * 正是代碼可不能這樣做
     * @param args
     * @throws RuntimeException
     */
    public static void main (String[] args) throws Exception {
        // 定義JMS-ActiveMQ連接配接資訊
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616", "username", "password");
        Session session = null;
        Destination sendQueue;
        Connection connection = null;

        //進行連接配接
        connection = connectionFactory.createConnection();
        connection.start();

        //建立會話
        session = connection.createSession(true, Session.SESSION_TRANSACTED);
        //建立queue(當然如果有了就不會重複建立)
        sendQueue = session.createQueue("");
        //建立消息發送者對象
        MessageProducer sender = session.createProducer(sendQueue);
        TextMessage outMessage = session.createTextMessage();
        outMessage.setText("這是發送的消息内容");

        //發送(JMS是支援事務的)
        sender.send(outMessage);
        session.commit();

        //關閉
        sender.close();
        connection.close();
        connectionFactory.close();
    }
}
           

這裡,再給各位讀者一個官方文檔。這個官方文檔用于描述ActiveMQ消息中間件中實作的AMQP協定資訊轉換為JMS服務接口能夠識别的資料資訊(請仔細了解這句話黑體字部分的描述)。http://activemq.apache.org/amqp.html

5、後文介紹

通過兩篇文章的篇幅,我們介紹了典型的消息隊列協定。當然還有很多具體的消息隊列協定沒有講到,但是通過介紹XMPP、AMQP、Stomp協定可以起到一個管中窺豹可見一斑的效果。另外我們還說明了JMS規範的具體含義,以便幫助讀者糾正一些不正确的觀點。

下一篇文章開始,我們将講解兩個典型的消息隊列中間件:ActiveMQ和RabbitMQ。最後我們還會列舉一個實際場景,然後通過消息隊列技術一起搭建一個高性能的業務處理方案。

繼續閱讀