天天看點

RabbitMQ之HelloWorld【譯】

簡介

RabbitMQ是一個消息代理,主要的想法很簡單:它接收并轉發消息。你可以把它當做一個郵局,當你發送郵件到郵筒,你相信郵差先生最終會将郵件投遞給收件人。RabbitMQ在這個比喻裡,是一個郵筒,郵局和一個郵差。

RabbitMQ和郵局最大的不同是,RabbitMQ不處理紙張,而是接收、存儲和轉發資料的二進制形式。

RabbitMQ和普通消息,使用的一些術語。

生産:意味着發送,發送消息的程式是生産者,生産者如下:

![](http://images2015.cnblogs.com/blog/658141/201608/658141-20160809231101981-751146404.png)

隊列:是一個郵箱的名稱。它在RabbitMQ裡面。雖然消息流經RabbitMQ和應用程式,但是他們隻能存儲在隊列裡面。一個隊列不受任何限制的限制,隻要你喜歡你可以存儲盡可能多的東西,它本質上是一個無限的緩沖區。很多生産者可以發送消息到一個隊列,很多消費者可以從一個隊列擷取資料。隊列如下:

![](http://images2015.cnblogs.com/blog/658141/201608/658141-20160809231119246-778373077.png)

消費者:也有類似的含義接收,一個消費者是一個程式,主要是等待接收消息,消費者如下:

![](http://images2015.cnblogs.com/blog/658141/201608/658141-20160809231132746-118059440.png)

注意:生産者、消費者和代理不一定在一台機器上,事實上很多應用中也是如此。

HelloWorld

下面我們将用Java編寫兩個程式,生産者發送一個消息,消費者接收消息并列印出來。

在下圖中,”P“是我們的生産者,”C“是我們的消費者,在中間的盒子是一個隊列——即RabbitMQ維持的一個消息緩沖區。

![](http://images2015.cnblogs.com/blog/658141/201608/658141-20160809231148652-591142369.png)

The Java client library

RabbitMQ支援很多種協定,本教程使用AMQP 0-9-1,這是一個開發的通用的消息協定,RabbitMQ的用戶端支援很多種語言,這裡我們将使用RabbitMQ提供的Java用戶端。

下載下傳用戶端庫包,并解壓到工作目錄:

$ unzip rabbitmq-java-client-bin-*.zip  
$ cp rabbitmq-java-client-bin-*/*.jar ./
           

當然,RabbitMQ也在maven中央倉庫中:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.6.1</version>
</dependency>
           

現在我們有了Java用戶端和依賴,我們可以寫一點代碼了。

發送(Sending)

我們稱呼我們的消息發送者為send,接受者為recv,發送者将連接配接到RabbitMQ,發送一條消息,然後退出。

![](http://images2015.cnblogs.com/blog/658141/201608/658141-20160809231302574-525128259.png)

在send.java中,我們需要引入一些類:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
           

建立這個類,并命名隊列:

public class Send {
  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv)
      throws java.io.IOException {
      ...
  }
}
           

然後我們建立一個到服務的連接配接:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
           

這個連接配接虛拟化了socket連接配接,這裡我們在本地連接配接上代理,如果我們想連接配接其他機器上的代理,我們可以改變name和IP位址即可。

然後我們建立了一個channel——提供了很多API供我們擷取東西的所在。

為了發送消息,我們定義了一個隊列,然後我們能釋出消息到隊列上。

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
           

定義一個幂等的隊列——隻能在未存在的情況下建立。消息體是一個位元組數組。

最後我們關閉通道和連接配接:

channel.close();
connection.close();
           

以上就是Send.java的所有代碼。

tips:

send不管用!在發送消息之後,在RabbitMQ的背景并沒有看到發送消息,你可能認為是程式錯了,也許是代理沒有足夠的空間了(預設是需要1G的),是以拒絕接受消息。檢查代理的logfile确認原因所在。然後去配置檔案設定disk_free_limit即可。

接收(Receiving)

我們的接受者從RabbitMQ拉取消息,是以不像發送一條消息的發送者,我們必須保持監聽,然後列印出來。

![](http://images2015.cnblogs.com/blog/658141/201608/658141-20160809231320793-15208918.png)

recv和send的import差不多:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
           

DefaultConsumer實作了Consumer接口——我們用來緩沖服務推給我們的消息。

實作和sender差不多,我們打開一個連接配接和通道,定義一個我們即将消費的隊列,和send釋出的隊列一樣。

public class Recv {
  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv)
      throws java.io.IOException,
             java.lang.InterruptedException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    ...
    }
}
           

這裡我們定義了一個隊列,因為我們也許會在sender之前啟動receiver,我們需要确認這個隊列在我們消費之前就已經存在了。

我們讓服務從隊列裡面給我們傳遞消息,自從他将要異步的推送消息,我們提供了一個緩存消息的回調方法,知道我們去用他,這就是DefaultConsumer做的事情。

Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
      throws IOException {
    String message = new String(body, "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
  }
};
channel.basicConsume(QUEUE_NAME, true, consumer);
           

以上就是全部的Recv.java。

原文位址:https://www.rabbitmq.com/tutorials/tutorial-one-java.html

代碼位址:https://github.com/aheizi/hi-mq

相關:

1.RabbitMQ之HelloWorld

2.RabbitMQ之任務隊列

3.RabbitMQ之釋出訂閱

4.RabbitMQ之路由(Routing)

5.RabbitMQ之主題(Topic)

6.RabbitMQ之遠端過程調用(RPC)