天天看點

消息中間件QPID

簡介: Qpid 是 Apache 開發的一款面向對象的消息中間件,它是一個 AMQP 的實作,可以和其他符合 AMQP 協定的系統進行通信。Qpid 提供了 C++/Python/Java/C# 等主流程式設計語言的用戶端庫,安裝使用非常友善。相對于其他的 AMQP 實作,Qpid 社群十分活躍,有望成為标準 AMQP 中間件産品。除了符合 AMQP 基本要求之外,Qpid 提供了很多額外的 HA 特性,非常适于叢集環境下的消息通信。

引子,Qpid 使用場景

通信是一個基本的需求,正如人與人之間需要交流一樣,比較大型的軟體系統也往往需要内部或者外部通信。

在系統間通信最基礎的通信方式是 socket,但 socket 比較底層。使用起來非常不易。如果需要一些進階特性,需要很多的程式設計負擔。

與 socket 的原始對應,企業級的應用軟體往往有着各種各樣從簡單到複雜的通信需求,表現為不同的通信模型。常見的有:

•點對點:A 發消息給 B。

•廣播:A 發給所有其他人的消息

•多點傳播:A 發給多個但不是所有其他人的消息。

•Requester/response:類似通路網頁的通信方式,用戶端發請求并等待,服務端回複該請求

•Pub-sub:類似雜志發行,出版雜志的人并不知道誰在看這本雜志,訂閱的人并不關心誰在發表這本雜志。出版的人隻管将資訊釋出出去,訂閱的人也隻在需要的時候收到該資訊。

•Store-and-forward:存儲轉發模型類似信件投遞,寫信的人将消息寫給某人,但在将信件發出的時候,收信的人并不一定在家等待,也并不知道有消息給他。但這個消息不會丢失,會放在收信者的信箱中。這種模型允許資訊的異步交換。

•其他通信模型。。。

除了各類不同的通信模型之外,系統間的通信還有其他一些需要考慮的問題。比如企業級應用往往有巨量的資料需要交換,對可靠性的要求也比較高。比如一個分布式的财務處理軟體,每時每刻都有成千上萬的使用者在使用,需要産生難以想象的海量消息,每個消息可能都關乎某人的銀行賬戶等關鍵資訊,如果丢失将帶來巨大損失。編寫這樣一個通信中間件不是一件容易的事情,即使編寫出來,假如需要和其他的軟體系統互動資訊,又需要大量的格式轉換,接口遷移等工作。

為了解決以上這些問題,人們開發出了很多的軟體産品和協定。從早期的 RPC,到複雜的面向消息的中間件 (MOM),再到 JMS,人們取得了很多的進步,但是這些技術還是存在各自的問題。

RPC,Corba 等技術是同步的,即調用者必須等待對方的回複,這意味着調用者必須了解接收者,是一種緊耦合的模型。緊耦合意味着不靈活,而在軟體行業唯一不變的就是變化,當需求和環境發生變化時,緊耦合的應用修改代價非常高。

為此衆多的消息中間件産品應運而生,打破了消息生産者和消費者之間的緊耦合關系。但中間件産品是由各個廠商自行定義和實作的,在整合企業級應用的時候,人們發現各種應用往往采用了不同的技術和中間件産品,要想讓這些産品互通消息,又是一件非常困難的事情。

JMS 是标準化的一種努力,但其缺點在于 JMS 是 J2EE 的标準,假如不是采用 Java 技術實作的産品,想使用 JMS 還是比較麻煩的。

是以即便到了今天人們還是希望有一款功能強大,平台 / 語言無關,标準化的面向消息的中間件産品。

假如這正是您時時刻刻所想的問題,那麼 Qpid 便是您值得了解的一款開源軟體。它實作了可靠複雜的通信中間件,支援多種通信模型,效率高,平台語言無關,而且實作了業界的通信标準 AMQP。

AMQP 和 Qpid

AMQP 是 Advanced Message Queuing Protocol,即進階消息隊列協定。和前面羅列的技術不同,AMQP 是一個标準化的消息中間件協定。她的理想是讓不同語言,不同系統的應用互相通信,并提供一個簡單統一的模型和程式設計接口。這樣,人們就可以采用各種語言和平台來實作自己的應用,當需要和其他系統通信時,隻要承認 AMQP 協定即可。

舉個不太自然的例子吧。。。

世界各地的人們由于地理和曆史的原因,使用着各種不同的語言,互相交流十分不易。AMQP 類似一架自動翻譯機,當我用中文對它說了什麼之後,假如一個英語世界的人想聽的話,可以聽到 英文版的 精确的一字不差的翻譯。

此外這個翻譯機還提供其他很多好處,比如中國和美國有 12 小時的時差,假如我現在希望和某個美國人通話,他必須半夜爬起來,或者我必須等到他起床,但通過這台機器,我說完就行了,那個美國人起床後就會聽到的。我很放心,這句話絕不會丢掉,也絕不會走樣;

我其實可以不關心有多少人來聽,假如有更多的人都想聽,那麼他們也可以随時聽到。

假如我隻想讓部分人聽到,還可以加密認證;

假如有些人不想聽,有些人想聽,那麼這台翻譯機也能知道誰想聽,而不會将我的聲音發給不想聽到的人。

這種交流方式和體驗,作為一個人類我還不曾享受過,但是 AMQP 已經為 衆多的計算機軟體提供了這種服務。

AMQP 來自 JPMorgon,最初隻是這個财大氣粗的投行内部使用的消息中間件。發起人 John O'Hara 很有氣魄,他說“從 1996 年開始到 2003 我一直在等這樣一個标準,但始終沒有等到,我已經等不下去了”,并且“投行對這類标準的需求最迫切,而銀行又從來不缺乏技術專家” ,是以他自己開發了一個。我想一個人如果想成就什麼事,就需要這樣的英雄氣概吧。

因為他的努力,AMQP 從金融界迅速推廣到整個計算機行業,參與者包括了很多 IT 巨頭。雖然今天 AMQP 依舊是一個草案,但值得我們拭目以待。

AMQP 的基本構架如下:

圖 1. AMQP 系統構架

消息中間件QPID

在 AMQP 模型中,消息的 producer 将 Message 發送給 Exchange,Exchange 負責交換 / 路由,将消息正确地轉發給相應的 Queue。消息的 Consumer 從 Queue 中讀取消息。

這個過程是異步的,Producer 和 Consumer 沒有直接聯系甚至可以不知道彼此的存在。

Exchange 如何進行路由的呢?

這便依靠 Routing Key,每個消息都有一個 routing Key,而每個 Queue 都可以通過一個 Binding 将自己所感興趣的 Routing Key 告訴 Exchange,這樣 Exchange 便可以将消息正确地轉發給相應的 Queue。下表列出了這幾個關鍵概念的定義。

表 1. AMQP 的幾個概念

Producer:

A program that writes messages to an Exchange. To do this, the program creates a message, fills the message with content, gives the message a Routing Key, and sends the message to an Exchange.

Routing Key| A string that the Exchangecan use to determine to which Queuesthe message should be delivered.|

Exchange:

  Accepts messages from Producersand routes them to Queuesif the message meets the criteria expressed in a binding.

Binding :

  Defines the relationship between an Exchangeand a Queue, specifying which messages should be routed to a given Queue

Queue:

   Holds messages and delivers them to the Consumersthat subscribe to the Queue.

Consumer A program that reads messages from a Queue. A Consumercan create, subscribe to, share, use, or destroyQueueand their Bindings(as long as it has have permission to do so).

為了支援各種常見的通信模型,AMQP 定義了不同的 Exchange 類型,如下表所示 :

表 2. AMQP 定義的 Exchange 類型

Exchange 類型 路由行為

Fan-Out Messages are routed to every Queue bound to the Exchange, ignoring the Routing Key

Direct A message is routed only if a Queue's Binding Keyis the same as the message's Routing Key

Topic Similar to a Direct Exchange, but it supports multipart keys that contain multiple words separated by the "." delimiter; for instance, a message Producer can create messages with Routing Keys like usa.news, usa.weather, europe.news, and europe.weather.

AMQP 目前還是一個草案,最新版本是 0.10。

QPID 是 Apache Foundation 的一個開源項目,是一個 AMQP 實作。它提供了 C++ 和 Java 兩個版本的 broker,并支援多種語言的用戶端,它還包括一個配置工具集。

除了完全實作了 AMQP 的基本功能之外,Qpid 還提供了一些額外的特性:

•采用 Corosync 來保證了叢集環境下的 Fault-tolerant 特性

•支援 XML 類型的 Exchange,當消息格式為 XML 時,可以利用 Xquery 進行過濾

•支援 plugin,使用者可以友善地增加新的功能,比如新的 exchange 類型

•提供了安全認證特性,任何 producer/consumer 需要和 broker 通信時,都需要提供身份認證,以便防止惡意的不相幹的程式進入消息體系。QPID 的安全認證使用 SSL 協定。

使用 Qpid 程式設計

目前 Qpid 的最新版本是 0.10。從 0.6 版本開始,Qpid 的程式設計接口有了很大變化,之前的程式設計接口雖然繼續支援但已經過時。是以本文将掠過 0.5 的 API,直接介紹 Qpid Messaging API。

首先需要搭建一個實驗環境。

安裝

第一次看到 Qpid 的下載下傳首頁時,我有點兒不知所措。過去,當我需要試用一個開源軟體時,在它的下載下傳頁面上通常會看到一個 tar 包,最多不過是根據目标作業系統的不同,分成幾個 tar 包,隻管下載下傳相應的便可。但是 Qpid 的下載下傳頁面卻有些讓我困惑,竟有衆多的按程式設計語言分類的 tar 包,一時之間也不知道下載下傳哪個好。。。

如今似乎有些明白了,Qpid 是一個消息中間件,它大體分為兩部分:broker 和 client 庫。

先來看 Client 庫,不同的程式設計語言,比如 Python,Ruby 等都需要其單獨的 client 包。這個很容易了解。

但 broker,Qpid 竟也有兩種實作 :C++ 和 Java。

Client 的選擇比較容易,您熟悉哪種語言便選擇哪個。但選擇 Java 的 Broker 還是 C++ 的 broker 卻會讓新手有些猶豫,相比之下選擇 KFC 還是麥當勞竟然是一件容易的事了。

java 版 broker 和 C++ 版 broker 各有優缺點,選擇也是因人而異。他們之間大部分特性相同,但也有不同,比如 Java 版的支援更多的 AMQP 版本;而 C++ 版本則支援 RDMA。QPID 社群将逐漸消除這些差異,但正如可口可樂和百事可樂共同存在這個世界上一樣,這兩個 broker 也終究會各有個的擁趸。Qpid 社群的資深開發者 Gordon Sim 回答某初學者的文章對此應該很有幫助:http://fossplanet.com/f13/re-why-use-c-broker-versus-java-broker-71322/

本人對 java 基本上沒有什麼了解,對 C++ 和 python 則非常偏愛,是以打算使用 C++ 版本的 broker,client 端則采用 C++ 和 Python。

下載下傳 Qpid broker 的安裝包,解壓,編譯,安裝:

tar xzvf   ./configure   make   make install  

經過以上幾步折騰,我們已經有了 C++ 的 broker 和 C++ 的 client 庫了。如果您想用 Python 編寫應用,那還需要下載下傳 pyhonn 用戶端的 tar 包。

可喜的是 Python 無需編譯,是以所謂安裝隻是設定一些環境變量而已。

這裡要提一下:如果您下載下傳的是那個 full package,恐怕又需要費一點兒周折。該 package 包含了所有的東西 ( 各種語言的 broker 和 client),其中雖然也有 C++ 的 broker,但竟然和單獨下載下傳的 C++ 包有所不同。到 cpp 目錄下,您看不到 configure 可執行檔案。需要首先運作 bootstrap,初始化 autotools。

Bootstrap 之後,autoconfig 所需的條件便準備好了,之後便是正常的幾條安裝指令,總的來說如下:

./bootstrap   ./configure   make   make install  

希望您的安裝一切順利。

啟動 broker

最簡單的啟動方式為

qpidd --port=60302 --no-data-dir --auth=no  

以上啟動方式指定了三個主要的選項

--port 參數指定了 qpidd 監聽的端口号,預設為 5672。

--no-data-dir 說明不需要指定資料持久化的目錄;當然您也可以指定一個目錄來存放持久化消息。

--auth=no 說明 qpidd 對所有的連結請求都不進行安全驗證。

其他的啟動參數在此不做詳細說明,讀者可以自行閱讀 help 資訊。

管理 qpid

預設情況下,Broker 啟動之後,會自動建立一些 Exchange(交換器),對應 AMQP 标準定義的那幾個标準的 exchange 類型。分别叫做

•amp.topic

•amp.direct

•amp.fanout

應用程式可以建立 queue 并綁定到這些預設的 exchange 上進行資訊收發。不過在真實的應用環境下,人們往往需要更多的 exchange,queue 以及 binding 來滿足各種各樣的需求。或者在一些複雜的網絡中還需要配置 broker 的聯邦,即一個互相路由的 broker 網絡。

凡此種種都需要對 broker 進行各種配置,比如添加新的 exchange,queue,添加 broker 路由等等,這些便需要使用 Qpid 提供的各種管理工具。除配置管理之外,Qpid 的管理工具還提供了監控功能。常用的工具有三個:

•Qpid-config

•Qpid-route

•Qpid-tool

Qpid-config 用來完成諸如添加 / 删除 Queue,添加 / 删除 Exchange 等關于 broker 内部的配置工作;Qpid-route 用來配置 broker Federation;Qpid-tool 用來對 Qpid Broker 進行實時監控。羅列他們的 help 資訊對讀者沒有意義,我将在後面的例子中示範他們的一些用法。

程式代碼的基本架構

在一個采用了消息中間件的通信體系中有三個基本的角色,一個是發送消息的程序,一個是接受消息的程序,他們彼此之間通過 broker 連接配接起來,傳遞消息。

圖 2. 基本 Qpid 通信系統的幾個元件

消息中間件QPID

Broker 無需編寫,如前所述,Qpid 實作了兩種 Broker,您隻需要根據需要啟動其中之一既可。Sender 和 Receiver 則是需要使用者編寫的應用程式。這兩類程式都有一些基本的架構,在此簡要介紹一下。

首先他們都是 client,需要和 broker 進行連接配接。連結成功後便生成一個會話 Session。基本代碼如下:

清單 1. 基本的 Qpid 程式架構

         Connection connection(broker, connectionOptions);      

try {          

connection.open();         

 Session session = connection.createSession();   。。。                  

connection.close();          return 0;     

 } catch(const std::exception& error)

 {          std::cerr << error.what() << std::endl;         

 connection.close();         

 return 1;        }  

在 0.5 版本之前,QPID 程式設計接口和 AMQP 的基本概念一一對應,比如需要建立 queue,exchange,用 routing key 進行綁定,等等。必須對 AMQP 的模型完全了解才能自如地動手寫程式。

新的 Messaging API 将複雜的 AMQP 的細節全部都隐藏了起來,極大地簡化了程式設計。

應用程式進而可以将注意力專注于如何處理他們接收到或者将要發出的消息本身,将 AMPQ 模型處理的細節交給 Qpid 庫。如圖 2 所示,在一個消息通信系統中隻有 3 個基本角色,除了 broker 之外,就隻有一個 Sender 一個 Receiver。應用程式看不到 Exchange 或者 Queue 這些細節。與此相應,在 Qpid 的 Messaging API 程式設計接口中,隻有兩個基本對象:Sender 和 Receiver。Sender 類,或者叫 Producer 生産者。即消息的發送方。Receiver 類自然就是資訊的接收者,也叫做 Consumer。

這種抽象帶來更好的功能可擴充性:各種各樣的通信模型都經由修改 Sender 和 Receiver 的位址來實作,當需要修改通信模型時,也隻需要修改 Address 和 broker 的配置,而無需修改應用的代碼。

下面将詳細介紹 Address 類。

Address 位址

寫信的時候,人們需要位址。類似地在 Qpid 中,表示消息的目的地,或者源。

Qpid Address 表示一個節點,有兩種節點:一種是 queue,另外一種是 topic。Queue 節點能夠緩存消息,直到被讀取走為止;而 topic 節點則即時進行轉發,比如假如有 4 個 consumer 對某消息感興趣,當消息到達節點時,有 3 個 consumer 正在運作,那麼 topic 節點會将消息轉發給這 3 個 consumer,然後就将該消息丢棄。剩下的那個 consumer 再運作時,則收不到這個消息。

Qpid 的位址 Address 是一個帶格式的字元串,其文法如下:

address_string ::= <address> [ / <subject> ] [ ; <options> ]   options ::= { <key> : <value>, ... }  

其中 address,subject 和 key 都是字元串。

Subject 類似 email 的主題。每個消息都可以有一個主題,接收者可以通過主題對消息進行過濾。

Option 的具體含義有點兒複雜,可以參考 Qpid 的程式設計手冊擷取完整的描述。

了解了以上這些概念,就可以開始具體的程式設計了。和學習其他技術一樣,我們從研究例子程式開始。Qpid 源代碼包的 example 目錄下有大量的例子程式,Messaging 目錄下面是新的 Message API。我們主要研究 Message API 提供的 Spout 和 Drain 這兩個例子程式。

Spout 和 Drain 的代碼

将 Spout 的主要代碼精簡一下如下:

清單 2. Spout 代碼

      int main(int argc, char** argv)   {      

Connection connection(options.url, options.connectionOptions);     

 connection.open();      

Session session = connection.createSession();      

Sender sender = session.createSender(options.address);      

sender.send(message);      

session.sync();      

connection.close();     

 return 0;   

}  

可以看到 spout 首先用指令行參數 Address 初始化一個 Sender 對象,然後用 Sender 的 send 方法發送消息。

對 Drain 做一些類似的事情:

清單 3. Drain 代碼

      int main(int argc, char** argv)   {         

 Connection connection(options.url, options.connectionOptions);          

connection.open();          

Session session = connection.createSession();          

Receiver receiver = session.createReceiver(options.address);         

 receiver.fetch(message, timeout))          

session.acknowledge();         

 receiver.close();          

session.close();                    

 connection.close();          

return 0;   }  

Drain 接收消息,用指令行參數中的 Address 初始化一個 Receiver 對象,然後調用 Receiver 的 fetch() 方法接收消息。收到消息後需要調用 session 的 acknowledge() 方法确認。

點對點通信小例子

PTP 通信類似寫信。

其一,這種通信是異步的,人們把信發出去之後并不清楚何時能送到收信人的手中。在 Qpid 中,Sender 将消息發給 Broker,并不要求 Receiver 在消息發送的時候也有一個和 Broker 的連結并準備接受該消息。Sender 隻管将消息發給 Broker,就可以放手去做其他的事情了;

其二,信是唯一的,您寫給朋友的信一定不希望其他人也收到吧。在 Qpid 的 PTP 通信中,一個 Receiver 收到消息後,該消息就被消除,其他 Receiver 不能再收到。

下面用例子來說明這種通信模型。首先要建立一個 Queue 節點。如之前在 Address 一節所講,Qpid 目前有兩種 Address,一種叫做 Queue,一種叫做 Topic,我們這裡就要用 Queue 這種節點。Queue 節點滿足前面所說的兩個重要的 PTP 通信的特征,存儲轉發和隻接收一次。

建立一個 queue:

qpid-config add queue hello-world  

現在我們建立了一個叫做 hello-world 的 queue。

用 spout 發送消息給位址 hello-world:

./spout hello-world  

這就相當于将信發給了 hello-world。您已經看到,此時接收者 drain 還沒有啟動,但 Queue 的存儲轉發特性保證 drain 還是可以收到這條消息:

./drain hello-world   Message(properties={spout-id:fbb93f3 … :0 … )  

當我們打開另外一個 shell 視窗執行 drain,會發現不會再收到這條消息了。

Browse 模式 vs Consume 模式

一個有趣的的例子是如果我們修改一下 Address 的 Option,上面的通信模型就變成另外一種樣子。之前我們看到,第二次執行 ./drain hello-world 将得不到任何資訊,因為資訊已經被第一次執行 ./drain 消費掉了。或者說這個 message 已經從 queue 裡面移除了。這在 Qpid 中被稱為消費模式 (Consume)。

有時候人們可能需要另外一種模式,叫做 Browse,即浏覽。正如我們浏覽網頁上的新聞一樣,一條新聞并不會因為第一個人閱讀了它之後就被消費掉,從網頁中消失了。而是一直在那裡供人浏覽。假如我們希望實作類似這種通信模式,不需要修改 spout 和 Drain 的代碼,隻需要稍微修改 Address 即可。Address 的選項 mode 可以用來設定 Browse 和 Consume 模式。如下例所示:

$ qpid-config add queue my-queue  

建立一個 queue。

$ ./spout my-queue --content one  

 $ ./spout my-queue --content two   

$ ./spout my-queue --content three  

發送了三條消息,接着我們用 drain 來接收這些消息吧。請注意,我們對 Address 字元串進行了小小的修改,在名字之後加了一個分号,後面用花括号添加了一個 mode 選項,并設定該位址為 Browse 模式。

$ ./drain 'my-queue; {mode: browse}'  Message(properties={spout-id:fbb93f3 … :0}, content='one')   Message(properties={spout-id:ab9e7c3 … :0}, content='two')   Message(properties={spout-id:ea75d6e … :0}, content='three')   

再運作一次:   

$ ./drain 'my-queue; {mode: browse}'  Message(properties={spout-id:fbb93f … :0}, content='one')   Message(properties={spout-id:ab9e76 … :0}, content='two')   Message(properties={spout-id:ea75d6 … :0}, content='three')  

僅僅修改了Address的Option,我們就發現用spout和Drain可以實作另外一種通信模型了,這真是非常令人着迷的一個特性啊。

編寫 sub-pub 通信的例子

Pub-sub 是另一種很有用的通信模型。恐怕它的名字就源于出版發行這種現實中的資訊傳遞方式吧,publisher 就是出版商,subscriber 就是訂閱者。

這種模型的特點在于:其一,消息可以根據訂閱的資訊而轉發給不同的訂閱者;其二,消息并不存儲,broker 收到消息後立即将其轉發給當時正在注冊的訂閱者,假如某個訂閱者當時并沒有連結到 broker,那麼它就不能再收到該消息了。沒有多少人願意購買幾天前的舊報紙吧?這是和 Queue 的一個差別。

建立一個 Topic 節點:

qpid-config add exchange topic hello-world  

還是用 spout 和 drain 來示範,先運作 spout:

./spout hello-world  

再運作 drain

./drain hello-world  

哦,什麼也沒有收到。這說明消息沒有被 broker 緩存。

Pub-sub 的主要優點在于訂閱消息的靈活性,broker 會根據消息的主題分發給不同的 subscriber。比如我們建立一個 news-service 的 exchange:

qpid-config add exchange topic hello-world  

打開兩個 shell 視窗,一個運作 drain 并訂閱 news-service/sport,體育新聞;另一個訂閱 news-service/ent 娛樂新聞:

$ ./drain -t 30 news-service/#.news   

$ ./drain -t 30 news-service/#.ent    

$ ./spout news-service/news   

$ ./spout news-service/sports   

$ ./spout news-service/usa.news   

$ ./spout news-service/usa.sports   

$ ./spout news-service/usa.faux.news   

$ ./spout news-service/usa.faux.sports  

可以看到不同的消息被自動分發給不同的訂閱者。第一個 shell 接收 sport 的 drain 将列印:

Message(properties={qpid.subject:news, spout-id:cbd42b0f...   Message(properties={qpid.subject:usa.news, spout-id:234a78d7...   Message(properties={qpid.subject:usa.faux.news, spout-id:6029...  

可另外一個接收 news 的 drain 程式将列印 :

Message(properties={qpid.subject:sports, spout-id:cbd42b0f...   Message(properties={qpid.subject:usa.sports, spout-id:234a78d7...  

編寫 Request/Response 模型的應用

在很多 P2P 和 Pub-Sub 應用中,Sender 和 Reciever 可以見面也不相識。他們多數情況下根本不關心對方是否存在。然而在現實中還有一種典型的通信模型:Request/Response。這種模型由 client 和 server 兩部分組成,即人們常說的 C/S 模型。

Server 必須知道是誰發送了請求,以便回複給正确的 Requester。這是通過解析 Requester 發過來的消息中的 ReplyTo 字段得到的。

代碼清單 4 展示的是 Server 的例子代碼。

清單 4. Server 代碼

      Sender sender = session.createSender("service_queue");  

 Address responseQueue("#response-queue; {create:always, delete:always}");   

Receiver receiver = session.createReceiver(responseQueue);   

Message request;   

request.setReplyTo(responseQueue);  

 request.setContent("ping");   

sender.send(request);   

Message response = receiver.fetch();   

std::cout << request.getContent() << " -> " << response.getContent() << std::endl;  

代碼清單 5 展示的是 Client 的例子代碼。

清單 5. Client 代碼

      Sender sender = session.createSender("service_queue");   

Address responseQueue("#response-queue; {create:always, delete:always}");   

Receiver receiver = session.createReceiver(responseQueue);  

 Message request;   request.setReplyTo(responseQueue);   

request.setContent("ping");   

sender.send(request);   

Message response = receiver.fetch();   

std::cout << request.getContent() << " -> " << response.getContent() << std::endl;  

在 client 代碼中,我們需要調用 Message 的 setReplyTo 方法,設定回複的位址。

代碼清單展示的是 Client 的例子代碼。

小結

至此,我們看到了 Qpid 最基本的一些使用方法。示範了人們通常所使用的兩類通信模式,傳統的面向消息的中間件就實作了這兩個通信模型。但 Qpid 提供了一種更簡潔靈活的程式設計接口,僅通過修改 Address,無需修改代碼就可以改變應用程式的通信模型。

Qpid 是一個 AMQP 的實作,這意味這它不是一個私有的産品,使用 Qpid,您可以和其他任何符合 AMQP 協定的軟體系統進行通信。

不過假如這就是 Qpid 的全部,相信您一定也不以為然,如果這些還不足以打動您,那麼我力圖在下一部分中向您介紹 Qpid 的一些進階特性。