天天看點

RabbitMQ:交換機(fanout exchange)

首先看看​

​AMQP​

​​協定,對​

​RabbitMQ​

​的架構會更了解。

​​深入了解AMQP協定​​

建立一個​

​Maven​

​項目,根據自己伺服器RabbitMQ的版本導入相應的包。

RabbitMQ:交換機(fanout exchange)
<dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>3.6.5</version>
    </dependency>      

扇型交換機

扇型交換機(funout exchange)将消息路由給綁定到它身上的所有隊列,而不理會綁定的路由鍵。如果 N 個隊列綁定到某個扇型交換機上,當有消息發送給此扇型交換機時,交換機會将消息的拷貝分别發送給這所有的 N 個隊列。

因為扇型交換機投遞消息的拷貝到所有綁定到它的隊列,是以它的應用案例都極其相似:

  • 大規模多使用者線上(MMO)遊戲可以使用它來處理排行榜更新等全局事件。
  • 體育新聞網站可以用它來近乎實時地将比分更新分發給移動用戶端。
  • 分發系統使用它來廣播各種狀态和配置更新。
  • 在群聊的時候,它被用來分發消息給參與群聊的使用者。(AMQP 沒有内置 presence 的概念,是以 XMPP 可能會是個更好的選擇)。

扇型交換機圖例:

RabbitMQ:交換機(fanout exchange)

上圖所示,生産者(P)生産消息 1 ,将消息 1 推送到 Exchange,由于 Exchange Type=fanout ,這時候會遵循 fanout exchange的路由規則,将消息推送到所有與它綁定的 Queue,也就是圖上的兩個 Queue, 最後由監聽對應Queue的消費者消費 。

生産端

​routingKey = ""​

​​,因為fanout exchange的路由規則不關心​

​routingKey​

​的值(但是不能為空)。

package com.kaven.rabbitmq.exchange.fanoutExchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class FanoutProducer {

    // 自己伺服器的IP
    private static String ip = "IP";
    // RabbitMQ啟動的預設端口,也是應用程式進行連接配接RabbitMQ的端口
    private static int port = 5672;
    // RabbitMQ有一個 "/" 的虛拟主機
    private static String virtualHost = "/";

    // fanout exchange ,RabbitMQ提供的fanout exchange
    private static String exchangeName = "amq.fanout";
    // exchange type
    private static String exchangeType= "fanout";

    // 交換機路由的routingKey
    private static String routingKey = "";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1 建立一個連接配接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(ip);
        connectionFactory.setPort(port);
        connectionFactory.setVirtualHost(virtualHost);

        // 2 建立連接配接
        Connection connection = connectionFactory.newConnection();

        // 3 建立Channel
        Channel channel = connection.createChannel();

        // 4 發送消息
        String msg = "RabbitMQ:Fanout Exchange 發送資料";
        channel.basicPublish(exchangeName ,routingKey ,null, msg.getBytes());

        // 5 關閉連接配接
        channel.close();
        connection.close();
    }
}      

消費端

​routingKey = "test"​

​​,和生産端的​

​routingKey​

​不一樣,主要為了看看fanout exchange的路由規則是否如上面解釋的一樣,這裡隻建立一個消費者,大家可以試一試多個消費者的情況,其實是一樣的。

package com.kaven.rabbitmq.exchange.fanoutExchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class FanoutConsumer {

    // 自己伺服器的IP
    private static String ip = "IP";
    // RabbitMQ啟動的預設端口,也是應用程式進行連接配接RabbitMQ的端口
    private static int port = 5672;
    // RabbitMQ有一個 "/" 的虛拟主機
    private static String virtualHost = "/";

    // fanout exchange ,RabbitMQ提供的fanout exchange
    private static String exchangeName = "amq.fanout";
    // exchange type
    private static String exchangeType= "fanout";
    // 隊列名
    private static String queueName = "queue";
    // 隊列與交換機綁定的routingKey
    private static String routingKey = "test";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 1 建立一個連接配接工廠
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(ip);
        connectionFactory.setPort(port);
        connectionFactory.setVirtualHost(virtualHost);

        // 2 建立連接配接
        Connection connection = connectionFactory.newConnection();

        // 3 建立Channel
        Channel channel = connection.createChannel();

        // 4 定義Queue ,将Queue綁定到direct exchange
        channel.queueDeclare(queueName,true , false , false , null);
        channel.queueBind(queueName , exchangeName , routingKey);

        // 5 建立消費者
        QueueingConsumer consumer = new QueueingConsumer(channel);

        // 6 設定
        channel.basicConsume(queueName , true , consumer);

        // 7 接收消息
        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println(msg);
        }
    }
}      

測試

因為這裡使用的是RabbitMQ提供給我們的fanout exchange,是以我們無需自己定義。

因為交換機已經定義好了,是以無論先啟動生産端還是消費端,消費端都可以成功收到消息。

消費端輸出如下:

RabbitMQ:Fanout Exchange 發送資料