1. 寫在前面
昨天簡單學習了一個消息隊列項目——RabbitMQ,今天趁熱打鐵,将學到的東西記錄下來。
學習的資料主要是官網給出的6個基本的消息發送/接收模型,或者稱為6種不同的使用場景,本文便是對這6種模型加以叙述。
2. Tutorials
在學習6種模型之前,我們首先需要安裝RabbitMQ。RabbitMQ支援多種系統平台,各平台的安裝方法可以點此檢視。安裝好之後,我們使用如下指令啟用Web端的管理插件:
rabbitmq-plugins enable rabbitmq_management
,然後啟動RabbitMQ。接着用浏覽器通路
http://localhost:15672/
,若能看到RabbitMQ相關Web頁面,說明啟動成功。
2.1 Hello World
正所謂萬事開頭難,我們先從最簡單的Hello World開始。首先當然是建立一個項目,導入RabiitMQ相關jar。我采用Maven來建構項目,是以隻需要在pom檔案中添加如下依賴:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
接下來學習最簡單的消息隊列模型,如下圖:

在圖中,
P
代表
producer
,它是消息的生産者;
C
consumer
,它是消息的消費者;而紅色的矩形正是我們所謂的消息隊列,它位于
RabbitMQ
中(
RabbitMQ
中可以有很多這樣的隊列,并且每個隊列都有一個唯一的名字)。生産者(們)可以将消息發送到消息隊列中,消費者(們)可以從消息隊列中取出消息。
這種模型是不是很簡單呢?下面我們使用Java,借助于RabbitMQ來實作這種模型的消息通信。
首先我們介紹如何
send
消息到消息隊列。
send
之前,當然是和RabbitMQ伺服器建立連接配接:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
接下來我們建立一個
channel
,大多數API都是通過這個對象來調用的:
Channel channel = connection.createChannel();
之後,我們便可以調用
channel
的如下方法去聲明一個隊列:
channel.queueDeclare("hello", false, false, false, null);
該方法的第一個參數是隊列的名稱,其餘的參數先不管,之後會介紹。我們可以嘗試着去執行以上的5行代碼,然後打開Web端,可以看到建立了一個叫作
hello
的隊列:
有了隊列,我們便可以向其中發送消息了,同樣還是調用
channel
對象的API:
channel.basicPublish("", "hello", null, "Hello World".getBytes());
以上代碼所做的事情就是發送了一條字元串消息“Hello World”(第4個參數)到消息隊列。你可能注意到我們調用了String對象的
getBytes
方法,沒錯,我們發送的實際上二進制資料。是以,理論上你能夠發送任何資料到消息隊列中,而不僅僅是文本資訊。
第2個參數叫做路由鍵(routingKey),在該模型下必須與隊列名相同,至于為什麼,和其他參數一樣,之後會了解到。
我們可以修改發送的文本,再次執行上述代碼,然後打開Web端檢視,便可以檢視到我們發送的消息:
點選上圖的name字段下的hello,可以檢視
hello
隊列中的具體資訊:
接下來,我們去嘗試着去擷取生産者發送的消息,和
send
方法一樣,我們同樣需要連接配接伺服器,建立
channel
,聲明隊列:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
之後我們可以調用
channel
的相關方法去監聽隊列,接收消息:
channel.basicConsume("hello", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body, "UTF-8"));
}
});
以上
basicConsume
方法中,第一個參數是隊列的名字;第二個參數表示是否自動确認消息的接收情況,我們使用true,自動确認;第三個參數需要傳入一個實作了
Consumer
接口的對象,我們簡單的
new
一個預設的
Consumer
的實作類
DefaultConsumer
,然後在
handleDelivery
方法中去處理接收到的消息(
handleDelivery
方法會在接收到消息時被回調)。
運作以上代碼,我們可以列印出之前向隊列中
send
的資料:
Hello World
Hello World2
下面是Hello World的完整代碼:
public class App {
@Test
public void send() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
channel.basicPublish("", "hello", null, "Hello World2".getBytes());
channel.close();
connection.close();
}
@Test
public void receive() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
channel.basicConsume("hello", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body, "UTF-8"));
}
});
synchronized (this){
// 因為以上接收消息的方法是異步的(非阻塞),當采用單元測試方式執行該方法時,程式會在列印消息前結束,是以使用wait來防止程式提前終止。若使用main方法執行,則不需要擔心該問題。
wait();
}
}
}
2.2 Work queues
接下來我們學習第二種模型——Work Queues。顧名思義,這種模型描述的是一個生産者(Boss)向隊列發消息(任務),多個消費者(worker)從隊列接受消息(任務),如下圖所示:
下面我們用代碼去實作。先是生産者
send
消息到隊列,這次我們多發送些資料:
@Test
public void send() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
for (int i = 0; i < 9; i++) {
channel.basicPublish("", "hello", null, String.valueOf(i).getBytes());
}
channel.close();
connection.close();
}
然後是消費者接收資料:
@Test
public void receive() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
channel.basicConsume("hello", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body, "UTF-8"));
try {
// Thread.sleep(1000);
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
synchronized (this) {
wait();
}
}
代碼基本上和
Hello World
的代碼一樣,隻是加上句
sleep
來模拟消費者(worker)處理消息所花的時間。
我們可以先執行三次
receive
方法(修改
sleep
的時間,其中消費者1 sleep 10s,消費者2,3 sleep 1s),讓三個消費者(worker)一起等待消息的到來,然後執行
send
方法發送9條消息,觀察三個消費者收到的消息情況。
若不出意外,你會看到如下的列印結果:
// --------消費者1--------
0
// 10s 後
3
// 10s 後
6
// --------消費者2--------
1
// 1s 後
4
// 1s 後
7
// --------消費者3--------
2
// 1s 後
5
// 1s 後
8
通過列印結果,我們可以總結出Work queues的幾個特點:
- 一條消息隻會被一個消費者接收;
- 消息是平均配置設定給消費者的;
- 消費者隻有在處理完某條消息後,才會收到下一條消息。
事實上,RabbitMQ會循環地(一個接一個地)發送消息給消費者,這種配置設定消息的方式被稱為round-robin(輪詢)。
2.2.1 消息确認
看到這裡,不知你是否會擔心:由于worker(消費者)執行任務需要一定的時間(以上用sleep模拟),要是某個worker在運作過程中挂掉,那配置設定給它的任務豈不是丢失了(永遠不可能被執行了)。為解決這個問題,RabbitMQ提供了消息确認機制,即worker需要主動的去确認消息已經接收了,RabbitMQ才認為消息被“真正地接收了”,實作代碼如下:
// send的代碼不用變,隻需改變basicConsume的第二個參數為false,表示不要自動确認
channel.basicConsume("hello", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body, "UTF-8"));
try {
// 這裡把時間加長了一點便于測試
Thread.sleep(8000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
// 這裡手動地确定
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
});
下面做測試。首先執行
send
方法,發送9條消息到隊列,檢視web端情況如下:
此時隊列中有9條未被分發的消息。接着運作改變後的
receive
方法,然後快速地去Web端檢視隊列中的消息情況(記得重新整理):
發現隊列中沒有待分發(Ready字段)的消息了,而有9條未被确認(Unacked字段)的消息。但控制台列印出數字
6
時,關閉程式,再次去web端檢視:
此時隊列中又有3條待分發的消息了。原因正是由于我們提前終止了
receive
方法的執行,導緻最後3條消息沒有被确認而被重新歸還到Ready中。
2.2.2 消息持久化
如果你不是一次性跟着本文運作代碼到這裡,而是第二天接着昨天的工作繼續進行,你可能會發現昨天你建立的隊列和添加到隊列裡的消息沒有了。很可能的原因就是消息沒有持久化,即按照以上代碼運作生成的隊列和添加到隊列中的消息都是儲存在記憶體中的,RabbitMQ一旦關閉它們就沒有了。如果你想将下次啟動時還能看到關閉前的消息,你應該将其持久化:
// 将第二個參數設為true,表示聲明一個需要持久化的隊列。
// 需要注意的是,若你已經定義了一個非持久的,同名字的隊列,要麼将其先删除(不然會報錯),要麼換一個名字。
channel.queueDeclare("hello", true, false, false, null);
// 修改了第三個參數,這是表明消息需要持久化
channel.basicPublish("", "hello",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
總的來說,Work queues(Task Queuess)的概念在一些Web場景的應用中是很有用的,比如我們能夠用它來建構一個master-slave結構的分布式爬蟲系統:系統中有一個master節點和多個slave節點,master節點負責向各個slave節點配置設定爬取任務。
2.3 Publish/Subscribe
但有些時候,我們可能希望一條消息能夠被多個消費者接受到,比如一些公告資訊等,這時候用Work Queue模型顯然不合适,而Publish/Subscribe模型正是對應這種使用場景的。
在介紹Publish/Subscribe之前,我們快速回顧之前的兩個模型,它們好像都是生産者将消息直接發送到消息隊列,但其實不是這樣的,甚至有可能生産者根本就不知道消息發送到了哪一個消息隊列。
先别着急,下面我們完整地介紹RabbitMQ消息發送/接受的方式。
事實上,生産者是把消息發送到了交換機(exchange)中,然後交換機負責(決定)将消息發送到(哪一個)消息隊列中。其模型如下圖:
這時候你可能會疑惑:既然消息是被發送到了交換機中,那我們之前發送的消息是被發送到了哪一個交換機中了?它有沒有機制能夠讓特定的消息發送到指定的隊列?
先回答第一個問題。還記得我們在Hello World中寫的發送消息的代碼嗎?
channel.basicPublish("", "hello", null, message.getBytes());
事實上第一個參數便是指定交換機的名字,即指定消息被發送到哪一個交換機。空字元串表示預設交換機(Default Exchange),即我們之前發送的消息都是先發送到預設交換機,然後它再路由到相應的隊列中。其實我們可以通過Web頁面去檢視所有存在的交換機:
接着回答第二個問題。路由的依據便是通過第二個參數——路由鍵(routing key)指定的,之前已經提到過。在之前代碼中,我們指定第二個參數為"hello",便是指定消息應該被交換機路由到路由鍵為hello的隊列中。而預設交換機(Default Exchange)有一個非常有用的性質:
每一個被建立的隊列都會被自動的綁定到預設交換機上,并且路由鍵就是隊列的名字。
交換機還有4種不同的類型,分别是
direct
,
fanout
topic
headers
,每種類型路由的政策不同。
direct
類型的交換機要求和它綁定的隊列帶有一個路由鍵K,若有一個帶有路由鍵R的消息到達了交換機,交換機會将此消息路由到路由鍵K = R的隊列。預設交換機便是該類型。是以,在下圖中,消息會沿着綠色箭頭路由:
fanout
類型的交換機會路由每一條消息到所有和它綁定的隊列,忽略路由鍵。
剩下的兩種類型之後再做介紹。
在以上概念基礎上,我們來看第3種消息模型:Publish/Subscribe。如下圖:
該模型是要讓所有的消費者都能夠接收到每一條消息。顯然,
fanout
類型的交換機更符合我們目前的需求。為此,先建立一個
fanout
類型的交換機。
channel.exchangeDeclare("notice", "fanout");
其中,第一個參數是交換機的名稱;第二個參數是交換機的類型。
然後我們可以
send
消息了:
channel.basicPublish( "notice", "", null, message.getBytes());
對于消費者,我們需要為每一個消費者建立一個獨立的隊列,然後将隊列綁定到剛才指定的交換機上即可:
// 該方法會建立一個名稱随機的臨時隊列
String queueName = channel.queueDeclare().getQueue();
// 将隊列綁定到指定的交換機("notice")上
channel.queueBind(queueName, "notice", "");
以下完整的代碼:
@Test
public void send() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("notice", "fanout");
channel.basicPublish( "notice", "", null, "Hello China".getBytes());
channel.close();
connection.close();
}
@Test
public void receive() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("notice", "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "notice", "");
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body, "UTF-8"));
}
});
synchronized (this) {
wait();
}
}
首先運作兩次
receive
方法,讓兩個消費者等待接收消息,然後可以在Web端檢視此時的隊列情況,如下圖所示:
可以看到圖中有兩個名稱随機的隊列。接着運作
send
方法發送一條消息,最終我們會看到兩個消費者都列印出了
Hello China
。然後停止虛拟機讓消費者斷開連接配接,再次在Web端檢視隊列情況,會發現剛才的兩個隊列被自動删除了。
2.4 Routing
以上是Publish/Subscribe模式,它已經能讓我們的通知(notice)系統正常運轉了。現在再考慮這樣一個新需求:對于一些機密通知,我們隻想讓部分人看到。這就要求交換機對綁定在其上的隊列進行篩選,于是引出了又一個新的模型:Routing。
之前我們說過,對于
direct
類型的交換機,它會根據routing key進行路由,是以我們可以借助它來實作我們的需求,模型結構如下圖:
下面用代碼來實作。先看生産者。
首先要聲明一個
direct
類型的交換機:
// 這裡名稱改為notice2
channel.exchangeDeclare("notice2", "direct");
需要注意的是,因為我們之前聲明了一個
fanout
類型的名叫
notice
的交換機,是以不能再聲明一個同名的類型卻不一樣的交換機。
然後可以發送消息了,我們發送10條消息,其中偶數條消息是秘密消息,隻能被routing key 為s的隊列接受,其餘的消息所有的隊列都能接受。
for (int i = 0; i < 10; i++) {
String routingKey = "n"; // normal
if (i % 2 == 0) {
routingKey = "s"; // secret
}
channel.basicPublish("notice2", routingKey, null, String.valueOf(i).getBytes());
}
接下來是消費者:
// 聲明一個名稱随機的臨時的隊列
String queueName = channel.queueDeclare().getQueue();
// 綁定交換機,同時帶上routing key
channel.queueBind(queueName, "notice2", "n");
// 消費者2号運作時,打開以下注釋
// channel.queueBind(queueName, "notice2", "s");
注意,我們可以多次調用隊列綁定方法,調用時,隊列名和交換機名都相同,而routing key不同,這樣可以使一個隊列帶有多個routing key。
以下是完整代碼:
@Test
public void send() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("notice2", "direct");
for (int i = 0; i < 10; i++) {
String routingKey = "n"; // normal
if (i % 2 == 0) {
routingKey = "s"; // secret
}
channel.basicPublish("notice2", routingKey, null, String.valueOf(i).getBytes());
}
channel.close();
connection.close();
}
@Test
public void receive() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("notice2", "direct");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "notice2", "n");
// channel.queueBind(queueName, "notice2", "s");
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body, "UTF-8"));
}
});
synchronized (this) {
wait();
}
}
測試時,我們可以先運作一個
receive
,然後打開
channel.queueBind(queueName, "notice2", "s")
注釋,再運作一次
receive
,這樣就有兩個消費者綁定到notice2交換機上,其中消費者1隻能收到normal類型的消息,而消費者2既能收到normal類型的消息,又能收到secret類型的消息。接着可以運作send方法。如不出意外,可以看到如下列印結果:
// 消費者1
1
3
5
7
9
// 消費者2
0
1
2
3
4
5
6
7
8
9
2.5 Topic
有了以上的改進,我們的
notice
系統基本ok了。但有些時候,我們還需要更加靈活的消息刷選方式。比如我們對于電影資訊,我們可能需要對它的地區,類型,限制級進行篩選。這時候就要借助Topics模型了。
在Topics模型中,我們“更新”了routing key,它可以由多個關鍵詞組成,詞與詞之間由點号(
.
)隔開。特别地,規定
*
表示任意的一個詞;
#
号表示任意的0個或多個詞。
假設我們現在需要接收電影資訊,每條電影消息附帶的routingKey有地區、類型、限制級3個關鍵字,即:
district.type.age
。現在想實作的功能如下圖:
如上圖所示,隊列Q1隻關心美國适合13歲以上的電影資訊,隊列Q2對動作片感興趣,而隊列Q3喜歡中國電影。
下面用Java代碼去實作上述功能,相較于之前基本上沒有什麼改變,下面直接給出代碼:
@Test
public void send() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("movie", "topic");
channel.basicPublish("movie", "American.action.13", null, "The Bourne Ultimatum".getBytes());
channel.basicPublish("movie", "American.comedy.R", null, "The Big Lebowski".getBytes());
channel.basicPublish("movie", "American.romantic.13", null, "Titanic".getBytes());
channel.basicPublish("movie", "Chinese.action.13", null, "卧虎藏龍".getBytes());
channel.basicPublish("movie", "Chinese.comedy.13", null, "大話西遊".getBytes());
channel.basicPublish("movie", "Chinese.romantic.13", null, "梁山伯與祝英台".getBytes());
channel.close();
connection.close();
}
@Test
public void receive() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("movie", "topic");
// 隊列1
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "movie", "American.*.13");
// 隊列2
// String queueName = channel.queueDeclare().getQueue();
// channel.queueBind(queueName, "movie", "*.action.*");
// 隊列3
// String queueName = channel.queueDeclare().getQueue();
// channel.queueBind(queueName, "movie", "Chinese.#");
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body, "UTF-8"));
}
});
synchronized (this) {
wait();
}
}
運作3次
receive
方法,注意打開或關閉相應的注釋;再運作
send
方法,可以看到控制台輸出如下内容:
// 消費者1
The Bourne Ultimatum
Titanic
// 消費者2
The Bourne Ultimatum
卧虎藏龍
// 消費者3
卧虎藏龍
大話西遊
梁山伯與祝英台
2.6 RPC
第6種模型是用來做RPC(Remote procedure call, 遠端程式調用)的。這裡直接貼上代碼,就不做解釋了,想要了解更多細節,可參考這裡。代碼示範的是,用戶端調用服務端的
fib
方法,得到傳回結果。
RPCServer.java
import com.rabbitmq.client.*;
import com.rabbitmq.client.AMQP.BasicProperties;
/**
* Description:
*
* @author derker
* @Time 2016-10-26 18:24
*/
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
private static int fib(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n - 1) + fib(n - 2);
}
public static void main(String[] argv) {
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
System.out.println(" [x] Awaiting RPC requests");
while (true) {
String response = null;
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
AMQP.BasicProperties props = delivery.getProperties();
BasicProperties replyProps = new BasicProperties
.Builder()
.correlationId(props.getCorrelationId())
.build();
try {
String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
response = "" + fib(n);
} catch (Exception e) {
System.out.println(" [.] " + e.toString());
response = "";
} finally {
channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes("UTF-8"));
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (connection != null) {
try {
connection.close();
} catch (Exception ignore) {
}
}
}
}
}
RPCClient.java
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;
import java.util.UUID;
/**
* Description:
*
* @author derker
* @Time 2016-10-26 18:36
*/
public class RPCClient {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingConsumer consumer;
public RPCClient() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
replyQueueName = channel.queueDeclare().getQueue();
consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueueName, true, consumer);
}
public String call(String message) throws Exception {
String response = null;
String corrId = UUID.randomUUID().toString();
BasicProperties props = new BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response = new String(delivery.getBody(), "UTF-8");
break;
}
}
return response;
}
public void close() throws Exception {
connection.close();
}
public static void main(String[] argv) {
RPCClient fibonacciRpc = null;
String response = null;
try {
fibonacciRpc = new RPCClient();
System.out.println(" [x] Requesting fib(10)");
response = fibonacciRpc.call("10");
System.out.println(" [.] Got '" + response + "'");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (fibonacciRpc != null) {
try {
fibonacciRpc.close();
} catch (Exception ignore) {
}
}
}
}
}