天天看點

Rabbit官方文檔翻譯之Publish Subscribe(三)

 Publish/Subscribe 訂閱模式 (using the Java Client)

In the previous tutorial we created a work queue. The assumption behind a work queue is that each task is delivered to exactly one worker. In this part we'll do something completely different -- we'll deliver a message to multiple consumers. This pattern is known as "publish/subscribe".

在上一個教程中,我們建立了一個工作隊列。 工作隊列的意圖是每個任務都傳遞給一個從業人員。 在這部分中,我們會做一些完全不同的事情 - 我們将傳遞同一個消息給所有的Consumer。 這種模式被稱為“釋出/訂閱”。

To illustrate the pattern, we're going to build a simple logging system. It will consist of two programs -- the first will emit log messages and the second will receive and print them.

我們要建立一個簡單的日志記錄系統來說明這個模式。 日志記錄系統它分為兩個子產品,第一個将發出日志消息,第二個将接收并列印它們

In our logging system every running copy of the receiver program will get the messages. That way we'll be able to run one receiver and direct the logs to disk; and at the same time we'll be able to run another receiver and see the logs on the screen.

在我們的記錄系統中,每個receiver都會收到同一個消息。 這樣我們就可以運作一個receiver用于将receive的log message寫入磁盤; 同時我們可以運作另一個receiver将log message輸出到螢幕上

Essentially, published log messages are going to be broadcast to all the receivers.

本質上說,訂閱模式會将已釋出的日志消息将被廣播到所有接收者。

Exchanges

In previous parts of the tutorial we sent and received messages to and from a queue. Now it's time to introduce the full messaging model in Rabbit.

在之前的教程中,我們是以隊列作為中介來實作消息的傳遞。 現在,我們将在Rabbit完整的消息傳遞模式。

Let's quickly Go over what we covered in the previous tutorials:

  • A producer is a user application that sends messages.
  • A queue is a buffer that stores messages.
  • A consumer is a user application that receives messages.
讓我們快速回顧一下我們在以前的教程中介紹的内容:
  • 生産者是發送消息的使用者應用程式。
  • 隊列是存儲消息的緩沖區。
  • 消費者是接收消息的使用者應用程式。

The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn't even know if a message will be delivered to any queue at all.

RabbitMQ中的消息傳遞模型的核心思想是,生産者不會把消息直接發送到隊列,而是發送到交換機上,消息傳送到隊列的過程有交換機完成,這部分生産者是不知道的。 實際上,生産者通常甚至不知道是否将消息傳遞到任何隊列。

Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.

相反,producer隻能将message發送到exchange。exchange的功能也很簡單。 一方面,它收到來自生産者的消息,另一方将它們推送到隊列。 exchange必然知道接收到的消息如何處理。 應該把它追加到特定隊列上? 還是追加到多個隊列上? 或者丢棄它。 這個rule是通過exchange的類型定義。
Rabbit官方文檔翻譯之Publish Subscribe(三)

There are a few exchange types available: direct, topic, headers and fanout. We'll focus on the last one -- the fanout. Let's create an exchange of this type, and call it logs:

exchange的類型為:direct,topic,headers和fanout。 
這裡我們将重點關注最後一個 - fanout。 如下我們建立一個fanout類型的exchange,并将其命名為logs:
channel.exchangeDeclare("logs", "fanout");//create a fanout exchange,and named logs;
      

The fanout exchange is very simple. As you can probably guess from the name, it just broadcasts all the messages it receives to all the queues it knows. And that's exactly what we need for our logger.

fanout exchange的工作模式非常簡單。 它隻是将所有收到的消息廣播給它知道的所有的隊列。 這正是我們需要的logger
Listing exchangesTo list the exchanges on the server you can run the ever useful rabbitmqctl:sudo rabbitmqctl list_exchanges In this list there will be some amq.* exchanges and the default (unnamed) exchange. These are created by default, but it is unlikely you'll need to use them at the moment.Nameless exchangeIn previous parts of the tutorial we knew nothing about exchanges, but still were able to send messages to queues. That was possible because we were using a default exchange, which we identify by the empty string ("").Recall how we published a message before:channel.basicPublish("", "hello", null, message.getBytes()); The first parameter is the the name of the exchange. The empty string denotes the default or nameless exchange: messages are routed to the queue with the name specified by routingKey, if it exists.

可以在rabbitmqctl中使用sudo rabbitmqctl list_exchanges指令,它會列舉出所有的exchanges。

在這個清單中有許多名為amq.*的exchange和還有一個預設的exchange (AMQP default) 。 這些是預設建立的

Listing exchanges ...
amq.headers     headers
amq.rabbitmq.log        topic
amq.rabbitmq.trace      topic
amq.match       headers
amq.fanout      fanout
amq.direct      direct
amq.topic       topic            
(default exchange) direct 
           
但是現在不太可能需要使用它們。我們在之前的教程裡發送消息沒有使用過這些default (unnamed) exchange,但他們仍在我們将消息發送到隊列過程中起到了作用。之前我們通過空字元串("")來辨別exchange,就代表使用default (unnamed)  exchange。
//在之前的教程中,我們是這樣釋出消息的
channel.basicPublish(“”,“hello”,null,message.getBytes());
//第一個參數是exchange的名稱。 空字元串表示default (unnamed) exchange              //第二個參數是routing key。消息将通過指定的exchange的指定routing key傳遞到綁定的隊列(如果routing key存在)。因為我們使用的是預設的exchange,是以routing key就等于隊列名字
           

預設路由,官方的說明

The default exchange is implicitly bound to every queue, with a routing key equal to the queue name. It is not possible to explicitly bind to

, or unbind from the default exchange. It also cannot be deleted.

defalut exchange會隐式的綁定到所有隊列上每個隊列上,routing key等于隊列名,任何隊列都不能夠明确的指明綁定到default exchange,也不能從default exchange上解除綁定。default也不能夠被删除

Now, we can publish to our named exchange instead:

現在,我們通過命名的exchange來釋出消息了
channel.basicPublish( "logs", "", null, message.getBytes());
      

Temporary queues 臨時隊列

As you may remember previously we were using queues which had a specified name (remember hello and task_queue?). Being able to name a queue was crucial for us -- we needed to point the workers to the same queue. Giving a queue a name is important when you want to share the queue between producers and consumers.

截至現在,我們用的queue都是有名字的:第一個是hello,第二個是task_queue。使用有名字的queue,使得在Producer和Consumer之前共享queue成為可能。可見對隊列命名是十分重要的

But that's not the case for our logger. We want to hear about all log messages, not just a subset of them. We're also interested only in currently flowing messages not in the old ones. To solve that we need two things.

  但是對于我們将要建構的日志系統,并不需要有名字的queue。我們希望每個consumer都能receive到所有的日志message,而不僅僅是它們中間的一部分。 我們也隻對目前流行的消息不感興趣。 要解決我們需要兩件事情。

Firstly, whenever we connect to Rabbit we need a fresh, empty queue. To do this we could create a queue with a random name, or, even better - let the server choose a random queue name for us.

首先,當consumer connect to Rabbit,需要一個新的隊列。 為此,我們可以建立一個具有随機名稱的隊列,或者甚至是更好的 - 這裡讓伺服器為我們選擇一個随機隊列名稱。

Secondly, once we disconnect the consumer the queue should be automatically deleted.

其次,一旦consumer斷開連接配接,隊列應該被自動删除。

In the Java client, when we supply no parameters to queueDeclare() we create a non-durable, exclusive, autodelete queue with a generated name:

在Java用戶端中,當我們沒有為queueDeclare()提供參數時,意味着我們建立了一個具有随機名稱的非持久,排他,自動删除的隊列:
String queueName = channel.queueDeclare().getQueue();
      

At that point queueName contains a random queue name. For example it may look like amq.gen-JzTY20BRgKO-HjmUJj0wLg.

此時,queueName包含一個随機隊列名稱。 例如,它可能看起來像amq.gen-JzTY20BRgKO-HjmUJj0wLg。

Bindings 綁定

Rabbit官方文檔翻譯之Publish Subscribe(三)

We've already created a fanout exchange and a queue. Now we need to tell the exchange to send messages to our queue. That relationship between exchange and a queue is called a binding.

我們已經建立了一個fanout exchange和queue。 現在我們需要告訴exchange發送消息給隊列。 exchange和隊列之間的關系稱為綁定。
channel.queueBind(queueName, "logs", "");
      

From now on the logs exchange will append messages to our queue.

從現在開始,log exchange将追加消息到綁定的隊列中。
Listing bindingsYou can list existing bindings using, you guessed it,rabbitmqctl list_bindings
列出綁定您可以使用,您猜測它,rabbitmqctl清單綁定列出現有綁定

Putting it all together 把它們放在一起

Rabbit官方文檔翻譯之Publish Subscribe(三)

The producer program, which emits log messages, doesn't look much different from the previous tutorial. The most important change is that we now want to publish messages to our logsexchange instead of the nameless one. We need to supply a routingKey when sending, but its value is ignored for fanout exchanges. Here goes the code for EmitLog.Java program:

發送log的producer與之前的教程提及的并沒有太大的差別。 最重要的改變是我們現在釋出消息的是到logs exchange而不是default exchange。 發送時需要提供一個routingKey,但是對于faount類型的exchange來說,routing key的值是被忽略的,因為fanout是要廣播所有從producer接受到的消息給所有綁定的隊列。 以下是EmitLog.Java程式的代碼:
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class EmitLog {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv)
                  throws java.io.IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
    //...
}
      

(EmitLog.java source)

As you see, after establishing the connection we declared the exchange. This step is necessary as publishing to a non-existing exchange is forbidden.

如你所見,建立連接配接後,我們申明了一個fanout類型的exchange。 這個步驟是必須的,因為publish 消息到不存在的exchange是禁止的

The messages will be lost if no queue is bound to the exchange yet, but that's okay for us; if no consumer is listening yet we can safely discard the message.

如果沒有任何隊列綁定到交換機,消息将丢失。

The code for ReceiveLogs.java:

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogs {
  private static final String EXCHANGE_NAME = "logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    Consumer consumer = new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope,
                                 AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
      }
    };
    channel.basicConsume(queueName, true, consumer);
  }
}
      

(ReceiveLogs.java source)

Compile as before and we're done.

javac -cp $CP EmitLog.java ReceiveLogs.java
      

If you want to save logs to a file, just open a console and type:

java -cp $CP ReceiveLogs > logs_from_rabbit.log
      

If you wish to see the logs on your screen, spawn a new terminal and run:

java -cp $CP ReceiveLogs
      

And of course, to emit logs type:

java -cp $CP EmitLog
      

Using rabbitmqctl list_bindings you can verify that the code actually creates bindings and queues as we want. With two ReceiveLogs.java programs running you should see something like:

sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
# => logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
# => ...done.
      

The interpretation of the result is straightforward: data from exchange logs goes to two queues with server-assigned names. And that's exactly what we intended.

To find out how to listen for a subset of messages, let's move on to tutorial 4