天天看點

AMQP-RabbitMQ/3/釋出訂閱模式

3. 釋出訂閱模式 Publish/Subscribe - 全集監聽

fanout

一次向多個消費者發送消息
  • 圖示

個人了解

  • 生産者定義Exchange,同時将Exchange的類型定義為

    fanout

    ,并向該Exchange發送消息。
  • 消費者定義隊列Queue,并将隊列與該交換機進行綁定。之後交換機付負責将消息全量推送給每一個與之綁定的Queue

**RabbitMQ中消息傳遞模型的核心思想是生産者永遠不會将任何消息直接發送到隊列。實際上,生産者通常甚至不知道消息是否會被傳遞到任何隊列。

相反,生産者隻能向Exchange發送消息。Exchange所做的工作非常簡單。一方面,它接收來自生産者的消息,另一方面将它們推送到隊列。Exchange必須确切知道如何處理它收到的消息。它應該附加到特定隊列嗎?它應該附加到多個隊列嗎?或者它應該被丢棄。其規則由交換類型定義 。**

有幾種交換類型可供選擇:

direct

,

topic

headers

and

fanout

  • fanout: 将它接收到的消息廣播到所有綁定到它的消息隊列上。(忽略路由鍵routingKey)
  • 生産者 - 釋出者
package com.futao.springmvcdemo.mq.rabbit.ps;

import com.futao.springmvcdemo.mq.rabbit.ExchangeTypeEnum;
import com.futao.springmvcdemo.mq.rabbit.RabbitMqConnectionTools;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.Cleanup;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

/**
 * 釋出訂閱-釋出者
 *
 * @author futao
 * Created on 2019-04-22.
 */
@Slf4j
public class Publisher {
    @SneakyThrows
    public static void main(String[] args) {
        @Cleanup
        Connection connection = RabbitMqConnectionTools.getConnection();
        @Cleanup
        Channel channel = connection.createChannel();
        //定義交換器類型
        channel.exchangeDeclare(ExchangeTypeEnum.FANOUT.getExchangeName(), BuiltinExchangeType.FANOUT);
        String msg = "Hello RabbitMq!";
        for (int i = 0; i < 20; i++) {
            channel.basicPublish(ExchangeTypeEnum.FANOUT.getExchangeName(), "", null, (msg + i).getBytes());
            log.info("Send msg:[{}] success", (msg + i));
        }
    }
}           
  • 消費者1 - 訂閱者1
package com.futao.springmvcdemo.mq.rabbit.ps;

import com.futao.springmvcdemo.mq.rabbit.ExchangeTypeEnum;
import com.futao.springmvcdemo.mq.rabbit.RabbitMqConnectionTools;
import com.futao.springmvcdemo.mq.rabbit.RabbitMqQueueEnum;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

/**
 * 釋出訂閱-訂閱者
 *
 * @author futao
 * Created on 2019-04-22.
 */
@Slf4j
public class SubscriberOne {
    @SneakyThrows
    public static void main(String[] args) {
        Channel channel = RabbitMqConnectionTools.getChannel();
        //開啟持久化隊列
        boolean durable = true;
        channel.queueDeclare(RabbitMqQueueEnum.EXCHANGE_QUEUE_FANOUT_ONE.getQueueName(), durable, false, false, null);
        //定義交換器類型
        channel.exchangeDeclare(ExchangeTypeEnum.FANOUT.getExchangeName(), BuiltinExchangeType.FANOUT);
        //将消息隊列與Exchange交換器綁定
        channel.queueBind(RabbitMqQueueEnum.EXCHANGE_QUEUE_FANOUT_ONE.getQueueName(), ExchangeTypeEnum.FANOUT.getExchangeName(), "");
        //告訴rabbitmq一次隻發送一條消息,并且在前一個消息未被處理或者消費之前,不繼續發送下一個消息
        channel.basicQos(1);
        log.info("Waiting for message...");
        DeliverCallback deliverCallback = ((consumerTag, message) -> {
            log.info("收到消息:[{}],tag:[{}]", new String(message.getBody()), consumerTag);
            //acknowledgment應答
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
            try {
                Thread.sleep(1000);
            } catch (Exception e) {

            }
        });
        //關閉自動應答
        boolean autoAck = false;
        channel.basicConsume(RabbitMqQueueEnum.EXCHANGE_QUEUE_FANOUT_ONE.getQueueName(), autoAck, deliverCallback, consumerTag -> {
        });
    }
}           
  • 消費者2 - 訂閱者2
package com.futao.springmvcdemo.mq.rabbit.ps;

import com.futao.springmvcdemo.mq.rabbit.ExchangeTypeEnum;
import com.futao.springmvcdemo.mq.rabbit.RabbitMqConnectionTools;
import com.futao.springmvcdemo.mq.rabbit.RabbitMqQueueEnum;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

/**
 * 釋出訂閱-訂閱者
 *
 * @author futao
 * Created on 2019-04-22.
 */
@Slf4j
public class SubscriberTwo {
    @SneakyThrows
    public static void main(String[] args) {
        Channel channel = RabbitMqConnectionTools.getChannel();
        //開啟持久化隊列
        boolean durable = true;
        channel.queueDeclare(RabbitMqQueueEnum.EXCHANGE_QUEUE_FANOUT_TWO.getQueueName(), durable, false, false, null);
        //定義交換器類型為fanout
        channel.exchangeDeclare(ExchangeTypeEnum.FANOUT.getExchangeName(), BuiltinExchangeType.FANOUT);
        //将消息隊列與Exchange交換器進行綁定
        channel.queueBind(RabbitMqQueueEnum.EXCHANGE_QUEUE_FANOUT_TWO.getQueueName(), ExchangeTypeEnum.FANOUT.getExchangeName(), "");
        //告訴rabbitmq一次隻發送一條消息,并且在前一個消息未被處理或者消費之前,不繼續發送下一個消息
        channel.basicQos(1);
        log.info("Waiting for message...");
        DeliverCallback deliverCallback = ((consumerTag, message) -> {
            log.info("收到消息:[{}],tag:[{}]", new String(message.getBody()), consumerTag);
            //acknowledgment應答
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
            try {
                Thread.sleep(2000);
            } catch (Exception e) {

            }
        });
        //關閉自動應答
        boolean autoAck = false;
        channel.basicConsume(RabbitMqQueueEnum.EXCHANGE_QUEUE_FANOUT_TWO.getQueueName(), autoAck, deliverCallback, consumerTag -> {
        });
    }
}           
  • 注意
    • 沒有在釋出者定義隊列,而是定義了交換器Exchange。釋出者将消息發送到Exchange,而不是Queue
    • 在訂閱者端,每個訂閱者定義了自己的消息隊列,并且将自己的消息隊列與Exchange進行綁定。則在每次釋出者向相應的Exchange發送消息的時候,Exchange會将消息發送至訂閱了該Exchange的隊列。(即:每個訂閱者收到的消息都是一樣的)
  • 測試結果
>>> 訂閱者1
[main] INFO mq.rabbit.ps.SubscriberOne - Waiting for message...
[pool-1-thread-4] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!0],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-5] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!1],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-6] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!2],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-7] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!3],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-8] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!4],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-9] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!5],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-10] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!17],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-22] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!18],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]
[pool-1-thread-23] INFO mq.rabbit.ps.SubscriberOne - 收到消息:[Hello RabbitMq!19],tag:[amq.ctag-Fc_B_CoCYUBoBhEcOlC7vw]


>>> 訂閱者2
[main] INFO mq.rabbit.ps.SubscriberTwo - Waiting for message...
[pool-1-thread-4] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!0],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-5] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!1],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-6] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!2],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-7] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!3],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-8] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!4],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-9] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!5],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-10] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!17],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-22] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!18],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]
[pool-1-thread-23] INFO mq.rabbit.ps.SubscriberTwo - 收到消息:[Hello RabbitMq!19],tag:[amq.ctag-ip59jtcKJBQFC2KU9DperQ]