簡介
RabbitMQ是一個消息代理,主要的想法很簡單:它接收并轉發消息。你可以把它當做一個郵局,當你發送郵件到郵筒,你相信郵差先生最終會将郵件投遞給收件人。RabbitMQ在這個比喻裡,是一個郵筒,郵局和一個郵差。
RabbitMQ和郵局最大的不同是,RabbitMQ不處理紙張,而是接收、存儲和轉發資料的二進制形式。
RabbitMQ和普通消息,使用的一些術語。
生産:意味着發送,發送消息的程式是生産者,生産者如下:

隊列:是一個郵箱的名稱。它在RabbitMQ裡面。雖然消息流經RabbitMQ和應用程式,但是他們隻能存儲在隊列裡面。一個隊列不受任何限制的限制,隻要你喜歡你可以存儲盡可能多的東西,它本質上是一個無限的緩沖區。很多生産者可以發送消息到一個隊列,很多消費者可以從一個隊列擷取資料。隊列如下:

消費者:也有類似的含義接收,一個消費者是一個程式,主要是等待接收消息,消費者如下:

注意:生産者、消費者和代理不一定在一台機器上,事實上很多應用中也是如此。
HelloWorld
下面我們将用Java編寫兩個程式,生産者發送一個消息,消費者接收消息并列印出來。
在下圖中,”P“是我們的生産者,”C“是我們的消費者,在中間的盒子是一個隊列——即RabbitMQ維持的一個消息緩沖區。

The Java client library
RabbitMQ支援很多種協定,本教程使用AMQP 0-9-1,這是一個開發的通用的消息協定,RabbitMQ的用戶端支援很多種語言,這裡我們将使用RabbitMQ提供的Java用戶端。
下載下傳用戶端庫包,并解壓到工作目錄:
$ unzip rabbitmq-java-client-bin-*.zip
$ cp rabbitmq-java-client-bin-*/*.jar ./
當然,RabbitMQ也在maven中央倉庫中:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.1</version>
</dependency>
現在我們有了Java用戶端和依賴,我們可以寫一點代碼了。
發送(Sending)
我們稱呼我們的消息發送者為send,接受者為recv,發送者将連接配接到RabbitMQ,發送一條消息,然後退出。

在send.java中,我們需要引入一些類:
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
建立這個類,并命名隊列:
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv)
throws java.io.IOException {
...
}
}
然後我們建立一個到服務的連接配接:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
這個連接配接虛拟化了socket連接配接,這裡我們在本地連接配接上代理,如果我們想連接配接其他機器上的代理,我們可以改變name和IP位址即可。
然後我們建立了一個channel——提供了很多API供我們擷取東西的所在。
為了發送消息,我們定義了一個隊列,然後我們能釋出消息到隊列上。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
定義一個幂等的隊列——隻能在未存在的情況下建立。消息體是一個位元組數組。
最後我們關閉通道和連接配接:
channel.close();
connection.close();
以上就是Send.java的所有代碼。
tips:
send不管用!在發送消息之後,在RabbitMQ的背景并沒有看到發送消息,你可能認為是程式錯了,也許是代理沒有足夠的空間了(預設是需要1G的),是以拒絕接受消息。檢查代理的logfile确認原因所在。然後去配置檔案設定disk_free_limit即可。
接收(Receiving)
我們的接受者從RabbitMQ拉取消息,是以不像發送一條消息的發送者,我們必須保持監聽,然後列印出來。

recv和send的import差不多:
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
DefaultConsumer實作了Consumer接口——我們用來緩沖服務推給我們的消息。
實作和sender差不多,我們打開一個連接配接和通道,定義一個我們即将消費的隊列,和send釋出的隊列一樣。
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv)
throws java.io.IOException,
java.lang.InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
...
}
}
這裡我們定義了一個隊列,因為我們也許會在sender之前啟動receiver,我們需要确認這個隊列在我們消費之前就已經存在了。
我們讓服務從隊列裡面給我們傳遞消息,自從他将要異步的推送消息,我們提供了一個緩存消息的回調方法,知道我們去用他,這就是DefaultConsumer做的事情。
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
以上就是全部的Recv.java。
原文位址:https://www.rabbitmq.com/tutorials/tutorial-one-java.html
代碼位址:https://github.com/aheizi/hi-mq
相關:
1.RabbitMQ之HelloWorld
2.RabbitMQ之任務隊列
3.RabbitMQ之釋出訂閱
4.RabbitMQ之路由(Routing)
5.RabbitMQ之主題(Topic)
6.RabbitMQ之遠端過程調用(RPC)