天天看點

RabbitMQ:通信模型之釋出訂閱模型

釋出訂閱模型

釋出訂閱模型(Publish/Subscribe):簡單的說就是隊列裡面的消息會被多個消費者同時接受到,消費者接收到的資訊一緻。

釋出訂閱模型适合于做子產品之間的異步通信。

RabbitMQ:通信模型之釋出訂閱模型

适用場景

  1. 發送并記錄日志資訊
  2. springcloud的config元件裡面通知配置自動更新
  3. 緩存同步
  4. 微信訂閱号

示範

生産者

public class Producer {
    private static final String EXCHANGE_NAME = "exchange_publish_1";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        // 聲明交換機
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 發送消息到交換機
        for (int i = 0; i < 100; i++) {
            channel.basicPublish(EXCHANGE_NAME, "", null, ("釋出訂閱模型的第 " + i + " 條消息").getBytes());
        }
        // 關閉資源
        channel.close();
        connection.close();
    }
}
           

消費者

// 消費者1
public class Consumer {
    private static final String QUEUE_NAME = "queue_publish_1";
    private static final String EXCHANGE_NAME = "exchange_publish_1";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 聲明交換機
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 将隊列綁定到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("隊列1接收到的消息是:" + new String(body));
            }
        };
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}
           
// 消費者2
public class Consumer2 {
    private static final String QUEUE_NAME = "queue_publish_2";
    private static final String EXCHANGE_NAME = "exchange_publish_1";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        // 聲明隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 聲明交換機
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 将隊列綁定到交換機
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("隊列2接收到的消息是:" + new String(body));
            }
        };
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}
           

測試

先啟動2個消費者,再啟動生産者

RabbitMQ:通信模型之釋出訂閱模型
RabbitMQ:通信模型之釋出訂閱模型

可以看出來消費者1和消費者2接收到的消息是一模一樣的,每個消費者都收到了生産者發送的消息;

釋出訂閱模型,用到了一個新的東西-交換機,這裡也解釋一下相關方法的參數:

// 聲明交換機
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

// 該方法的最多參數的重載方法是:
Exchange.DeclareOk exchangeDeclare(String exchange,
                                    BuiltinExchangeType type,
                                    boolean durable,
                                    boolean autoDelete,
                                    boolean internal,
                                    Map<String, Object> arguments) throws IOException;

/**
 *  param1:exchange,交換機名稱
 *  param2:type,交換機類型;直接寫 string效果一緻;内置了4種交換機類型:
 *   direct(路由模式)、fanout(釋出訂閱模式)、
 *   topic(topic模式-模糊比對)、headers(标頭交換,由Headers的參數配置設定,不常用)
 *  param3:durable,是否持久化交換機   false:預設值,不持久化
 *  param4:autoDelete,沒有消費者使用時,是否自動删除交換機   false:預設值,不删除
 *  param5:internal,是否内置,如果設定 為true,則表示是内置的交換器, 用戶端程式無法直接發送消息到這個交換器中, 隻能通過交換器路由到交換器的方式  false:預設值,允許外部直接通路
 *  param6:arguments,交換機的一些其他屬性,預設值為 null
 */
           
// 将隊列綁定到交換機
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
/**
 *  param1:destination,目的地,隊列的名字
 *  param2:source,資源,交換機的名字
 *  param3:routingKey,路由鍵(目前沒有用到routingKey,填 "" 即可)
 */
           

小結

本文到這裡就結束了,介紹了RabbitMQ通信模型中的釋出訂閱模型,适合于做子產品之間的異步通信。