天天看點

RabbitMQ:釋出訂閱模式

✨ RabbitMQ:釋出訂閱模式

  • ​​1.訂閱模式基本介紹​​
  • ​​2.交換機​​
  • ​​3.釋出訂閱模式​​
  • ​​3.1基本介紹​​
  • ​​3.2生産者​​
  • ​​3.3消費者​​
  • ​​3.4測試​​

📃個人首頁:​​不斷前進的皮卡丘​​​

🌞部落格描述:夢想也許遙不可及,但重要的是追夢的過程,用部落格記錄自己的成長,記錄自己一步一步向上攀登的印記

🔥個人專欄:​​消息中間件​​

1.訂閱模式基本介紹

RabbitMQ:釋出訂閱模式
RabbitMQ:釋出訂閱模式
  • P:生産者,發送消息給交換機
  • C:消費者,接收消息
  • X:交換機,一方面接收生産者發送的消息,另一方面知道怎麼處理消息,是否應将其附加到特定隊列?是否應将其附加到多個隊列中?或者它應該被丢棄。其規則由交換類型定義。
  • Queue:消息隊列,接收消息,緩存消息
  • 每個消費者都監聽自己的隊列
  • 生産者把消息發送給broker,然後交換機把消息轉發到綁定此交換機的每個隊列,每個綁定交換機的隊列都将接收到消息。

2.交換機

  • RabbitMQ 中消息傳遞模型的核心思想是,生産者從不将任何消息直接發送到隊列。實際上,很多時候,生産者甚至不知道消息是否會傳遞到任何隊列。相反,生産者隻能将消息發送到_交換機_。交換機的工作是一件非常簡單的事情。一方面,它接收來自生産者的消息,另一方面則将它們推送到隊列。交換必須确切地知道如何處理它收到的消息。是否應将其附加到特定隊列?是否應将其附加到多個隊列中?或者它應該被丢棄。其規則由_交換類型_定義。
  • 交換機隻負責轉發消息,并沒有存儲消息的能力,是以如果沒有任何隊列與Exchange綁定,或者沒有符合路由規則的隊列,那麼消息會丢失!

交換機類型

  • Fanout:廣播,将消息交給所有綁定到交換機的隊列
  • Direct:定向,把消息交給符合指定routing key 的隊列
  • Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊列

3.釋出訂閱模式

3.1基本介紹

RabbitMQ:釋出訂閱模式
要配置一個fanout類型的交換機,不需要指定對應的路由key,同時會把消息路由到每一個消息隊列中,每個消息隊列都可以對相同的消息進行存儲,然被由各自的消息隊列相關聯的消費者消費

3.2生産者

public class Producer {
    public static String FANOUT_EXCHANGE = " fanout_exchange";
    public static String FANOUT_QUEUE_1 = "fanout_queue_1";
    public static String FANOUT_QUEUE_2 = "fanout_queue_2";

    public static void main(String[] args) {
        try {
            Channel channel = ConnectUtil.getChannel();
            //聲明交換機(交換機名稱,交換機類型)
            channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
            //聲明隊列
            channel.queueDeclare(FANOUT_QUEUE_1,true,false,false,null);
            channel.queueDeclare(FANOUT_QUEUE_2,true,false,false,null);
            //把交換機和隊列進行綁定
            channel.queueBind(FANOUT_QUEUE_1,FANOUT_EXCHANGE,"");
            channel.queueBind(FANOUT_QUEUE_2,FANOUT_EXCHANGE,"");
            //發送消息
            for (int i = 1; i <=10 ; i++) {
                String msg="你好,小兔子,釋出訂閱模式 : "+i;
                channel.basicPublish(FANOUT_EXCHANGE, "", null, msg.getBytes());

            }

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }


    }


}      

3.3消費者

消費者1

public class Consumer1 {
    public static void main(String[] args) {
        try {
            Channel channel = ConnectUtil.getChannel();
            channel.queueDeclare(Producer.FANOUT_QUEUE_1, true, false, false, null);
            channel.exchangeDeclare(Producer.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
            //把隊列和交換機綁定 隊列名稱,交換機名稱,路由key
            channel.queueBind(Producer.FANOUT_QUEUE_1, Producer.FANOUT_EXCHANGE, "");
            //接受消息
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                /**
                 * 消費回調函數,當收到消息以後,會自動執行這個方法
                 * @param consumerTag 消費者辨別
                 * @param envelope    消息包的内容(比如交換機,路由key,消息id等)
                 * @param properties   屬性資訊
                 * @param body         消息資料
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消息者1接受到的消息:" + new String(body, "UTF-8"));
                }
            };
            //監聽消息(隊列名稱,是否自動确認消息,消費對象)
            channel.basicConsume(Producer.FANOUT_QUEUE_1, true, consumer);

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}      

消費者2

public class Consumer2 {
    public static void main(String[] args) {
        try {
            Channel channel = ConnectUtil.getChannel();
            channel.queueDeclare(Producer.FANOUT_QUEUE_2, true, false, false, null);
            channel.exchangeDeclare(Producer.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);
            //把隊列和交換機綁定 隊列名稱,交換機名稱,路由key
            channel.queueBind(Producer.FANOUT_QUEUE_2, Producer.FANOUT_EXCHANGE, "");
            //接受消息
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                /**
                 * 消費回調函數,當收到消息以後,會自動執行這個方法
                 * @param consumerTag 消費者辨別
                 * @param envelope    消息包的内容(比如交換機,路由key,消息id等)
                 * @param properties   屬性資訊
                 * @param body         消息資料
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("消息者2接受到的消息:" + new String(body, "UTF-8"));
                }
            };
            //監聽消息(隊列名稱,是否自動确認消息,消費對象)
            channel.basicConsume(Producer.FANOUT_QUEUE_2, true, consumer);

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}      

3.4測試

RabbitMQ:釋出訂閱模式