天天看点

RabbitMQ:交换机(fanout exchange)

首先看看​

​AMQP​

​​协议,对​

​RabbitMQ​

​的架构会更了解。

​​深入理解AMQP协议​​

创建一个​

​Maven​

​项目,根据自己服务器RabbitMQ的版本导入相应的包。

RabbitMQ:交换机(fanout exchange)
<dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>3.6.5</version>
    </dependency>      

扇型交换机

扇型交换机(funout exchange)将消息路由给绑定到它身上的所有队列,而不理会绑定的路由键。如果 N 个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这所有的 N 个队列。

因为扇型交换机投递消息的拷贝到所有绑定到它的队列,所以它的应用案例都极其相似:

  • 大规模多用户在线(MMO)游戏可以使用它来处理排行榜更新等全局事件。
  • 体育新闻网站可以用它来近乎实时地将比分更新分发给移动客户端。
  • 分发系统使用它来广播各种状态和配置更新。
  • 在群聊的时候,它被用来分发消息给参与群聊的用户。(AMQP 没有内置 presence 的概念,因此 XMPP 可能会是个更好的选择)。

扇型交换机图例:

RabbitMQ:交换机(fanout exchange)

上图所示,生产者(P)生产消息 1 ,将消息 1 推送到 Exchange,由于 Exchange Type=fanout ,这时候会遵循 fanout exchange的路由规则,将消息推送到所有与它绑定的 Queue,也就是图上的两个 Queue, 最后由监听对应Queue的消费者消费 。

生产端

​routingKey = ""​

​​,因为fanout exchange的路由规则不关心​

​routingKey​

​的值(但是不能为空)。

package com.kaven.rabbitmq.exchange.fanoutExchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class FanoutProducer {

    // 自己服务器的IP
    private static String ip = "IP";
    // RabbitMQ启动的默认端口,也是应用程序进行连接RabbitMQ的端口
    private static int port = 5672;
    // RabbitMQ有一个 "/" 的虚拟主机
    private static String virtualHost = "/";

    // fanout exchange ,RabbitMQ提供的fanout exchange
    private static String exchangeName = "amq.fanout";
    // exchange type
    private static String exchangeType= "fanout";

    // 交换机路由的routingKey
    private static String routingKey = "";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1 创建一个连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(ip);
        connectionFactory.setPort(port);
        connectionFactory.setVirtualHost(virtualHost);

        // 2 创建连接
        Connection connection = connectionFactory.newConnection();

        // 3 创建Channel
        Channel channel = connection.createChannel();

        // 4 发送消息
        String msg = "RabbitMQ:Fanout Exchange 发送数据";
        channel.basicPublish(exchangeName ,routingKey ,null, msg.getBytes());

        // 5 关闭连接
        channel.close();
        connection.close();
    }
}      

消费端

​routingKey = "test"​

​​,和生产端的​

​routingKey​

​不一样,主要为了看看fanout exchange的路由规则是否如上面解释的一样,这里只创建一个消费者,大家可以试一试多个消费者的情况,其实是一样的。

package com.kaven.rabbitmq.exchange.fanoutExchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class FanoutConsumer {

    // 自己服务器的IP
    private static String ip = "IP";
    // RabbitMQ启动的默认端口,也是应用程序进行连接RabbitMQ的端口
    private static int port = 5672;
    // RabbitMQ有一个 "/" 的虚拟主机
    private static String virtualHost = "/";

    // fanout exchange ,RabbitMQ提供的fanout exchange
    private static String exchangeName = "amq.fanout";
    // exchange type
    private static String exchangeType= "fanout";
    // 队列名
    private static String queueName = "queue";
    // 队列与交换机绑定的routingKey
    private static String routingKey = "test";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 1 创建一个连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(ip);
        connectionFactory.setPort(port);
        connectionFactory.setVirtualHost(virtualHost);

        // 2 创建连接
        Connection connection = connectionFactory.newConnection();

        // 3 创建Channel
        Channel channel = connection.createChannel();

        // 4 定义Queue ,将Queue绑定到direct exchange
        channel.queueDeclare(queueName,true , false , false , null);
        channel.queueBind(queueName , exchangeName , routingKey);

        // 5 创建消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);

        // 6 设置
        channel.basicConsume(queueName , true , consumer);

        // 7 接收消息
        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println(msg);
        }
    }
}      

测试

因为这里使用的是RabbitMQ提供给我们的fanout exchange,所以我们无需自己定义。

因为交换机已经定义好了,所以无论先启动生产端还是消费端,消费端都可以成功收到消息。

消费端输出如下:

RabbitMQ:Fanout Exchange 发送数据