天天看點

RabbitMQ(三):Java - 釋出/訂閱模式

Java - 釋出/訂閱模式

    • 釋出/訂閱模式(Publish/Subscribe)
    • 引入pom
    • 生産者
    • 消費者·接收短信
    • 消費者·接收郵件
    • 補充:結合工作隊列模式

釋出/訂閱模式(Publish/Subscribe)

一個消息 , 多個接收
RabbitMQ(三):Java - 釋出/訂閱模式

特點:

  • 一個生産者将消息發送給交換機
  • 與交換機綁定的有多個隊列,每個消費者監聽自己的隊列
  • 生産者将消息發送給交換機,交換機将消息轉發到綁定此交換機的每個隊列,每個綁定交換機的隊列都将接收到消息。
  • 如果消息發送給沒有綁定隊列的交換機上,消息将丢失。
  • 他比工作隊列模式(work queue)更為強大,可以看成多個工作隊列。

引入pom

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
</dependency>
           

生産者

// 這裡模拟注冊完成後,發送短信和郵件。
public class Producer {
    // 定義兩個隊列和一個交換機
    private final static String QUEUE_EMAIL = "queue_email";
    private final static String QUEUE_SMS = "queue_sms";
    private final static String EXCHANGE_NAME = "exchange_fanout_1";

    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("106.**.**.82");
        factory.setPort(5672);
        // 設定虛拟主機 一個mq的服務可以設定多個虛拟機,每個虛拟機相當于獨立的mq
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = null;
        Channel channel = null;
        try{
            connection = factory.newConnection();
            // 建立内部會話通道,生産者和mq服務通信都在channel中完成
            channel = connection.createChannel();
            // 聲明交換機
            /**
             * 1. 交換機名稱
             * 2、交換機類型:
             *  FANOUT:對應的模式就是 釋出/訂閱模式
             *  DIRECT:對應 路由(Routing) 的工作模式
             *  TOPIC:對應 Topics 工作模式
             *  HEADERS: 對應 HEADERS 工作模式
             */
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
            // 聲明建立隊列,如果沒有,則建立
            channel.queueDeclare(QUEUE_EMAIL,false,false,false, null);
            channel.queueDeclare(QUEUE_SMS,false,false,false,null);

            // 交換機隊列綁定
            /**
             * 1 queue 隊列名稱
             * 2 exchange 交換機名稱
             * 3 routingKey 路由Key 釋出訂閱模式中 設定為""
             */
            channel.queueBind(QUEUE_EMAIL,EXCHANGE_NAME,"");
            channel.queueBind(QUEUE_SMS,EXCHANGE_NAME,"");

            String message = "接好了,SMS和EMAIL~~";
            // 發送消息
            // 參數:String exchange, String routingKey, BasicProperties props, byte[] body
            /**
             * exchange:交換機 如果不指定(""),就預設交換機
             * routingKey:路由key;交換機根據路由key将消息轉發到指定的隊列,如果使用預設交換機,routingKey為隊列名稱
             * props:額外屬性
             * body:消息内容
             */
            channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
            System.out.println("send message: "+message);
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            channel.close();
            connection.close();
        }
    }
}
           

消費者·接收短信

/**
 * 短信 消費者
 */
public class Consumer_SMS {

    private final static String QUEUE_EMAIL = "queue_email";
    private final static String QUEUE_SMS = "queue_sms";
    private final static String EXCHANGE_NAME = "exchange_fanout_1";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("106.**.**.82");
        factory.setPort(5672);
        // 設定虛拟主機 一個mq的服務可以設定多個虛拟機,每個虛拟機相當于獨立的mq
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");

        Connection connection = null;
        Channel channel = null;
        try{
            connection = factory.newConnection();
            // 建立内部會話通道,生産者和mq服務通信都在channel中完成
            channel = connection.createChannel();
            // 聲明交換機
            /**
             * 1. 交換機名稱
             * 2、交換機類型:
             *  FANOUT:對應的模式就是 釋出/訂閱模式
             *  DIRECT:對應路由的工作模式
             *  TOPIC:對應 Topics 工作模式
             *  HEADERS: 對應 HEADERS 工作模式
             */
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
            // 聲明隊列,如果沒有,則建立
//            channel.queueDeclare(QUEUE_EMAIL,false,false,false, null);
            channel.queueDeclare(QUEUE_SMS,false,false,false,null);

            // 交換機隊列綁定
            /**
             * 1 queue 隊列名稱
             * 2 exchange 交換機名稱
             * 3 routingKey 路由Key 釋出訂閱模式中 設定為""
             */
//            channel.queueBind(QUEUE_EMAIL,EXCHANGE_NAME,"");
            channel.queueBind(QUEUE_SMS,EXCHANGE_NAME,"");

            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String (body, StandardCharsets.UTF_8);
                    System.out.println("SMS receive:" + message);
                }
            };
			// 接收消息 監聽隊列
            // 參數:String queue, boolean autoAck, Consumer callback
            /**
             * queue:隊列
             * autoAck:自動回複:當消費者接收到消息後,告訴mq消息已經接收。TRUE:自動回複,false:程式設計回複
             * callback:消費方法,當消費者接收消息執行的方法。
             */
            channel.basicConsume(QUEUE_SMS,true,consumer);
        } catch (TimeoutException | IOException e) {
            e.printStackTrace();
        }
    }
}
           

消費者·接收郵件

public class Consumer_EMAIL {
    private final static String QUEUE_EMAIL = "queue_email";
    private final static String QUEUE_SMS = "queue_sms";
    private final static String EXCHANGE_NAME = "exchange_fanout_1";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("106.**.**.82");
        factory.setPort(5672);
        // 設定虛拟主機 一個mq的服務可以設定多個虛拟機,每個虛拟機相當于獨立的mq
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");

        Connection connection = null;
        Channel channel = null;

        try{
            connection = factory.newConnection();
            // 建立内部會話通道,生産者和mq服務通信都在channel中完成
            channel = connection.createChannel();
            // 聲明交換機
            /**
             * 1. 交換機名稱
             * 2、交換機類型:
             *  FANOUT:對應的模式就是 釋出/訂閱模式
             *  DIRECT:對應路由的工作模式
             *  TOPIC:對應 Topics 工作模式
             *  HEADERS: 對應 HEADERS 工作模式
             */
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
            // 聲明隊列,如果沒有,則建立
            channel.queueDeclare(QUEUE_EMAIL,false,false,false, null);

            // 交換機隊列綁定
            /**
             * 1 queue 隊列名稱
             * 2 exchange 交換機名稱
             * 3 routingKey 路由Key 釋出訂閱模式中 設定為""
             */
            channel.queueBind(QUEUE_EMAIL,EXCHANGE_NAME,"");

            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String (body, StandardCharsets.UTF_8);
                    System.out.println("Email receive:" + message);
                }
            };

            // 接收消息 監聽隊列
            // 參數:String queue, boolean autoAck, Consumer callback
            /**
             * queue:隊列
             * autoAck:自動回複:當消費者接收到消息後,告訴mq消息已經接收。TRUE:自動回複,false:程式設計回複
             * callback:消費方法,當消費者接收消息執行的方法。
             */
            channel.basicConsume(QUEUE_EMAIL,true,consumer);
        } catch (TimeoutException | IOException e) {
            e.printStackTrace();
        }
    }
}
           

補充:結合工作隊列模式

其實隻要在 消費者【c1】并列加上一個【c3】即可,c1、c3共享同一隊列
// 新增消費者 Consumer_SMS_2
public class Consumer_SMS_2 {
    private final static String QUEUE_EMAIL = "queue_email";
    private final static String QUEUE_SMS = "queue_sms";
    private final static String EXCHANGE_NAME = "exchange_fanout_1";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("106.**.**.82");
        factory.setPort(5672);
        // 設定虛拟主機 一個mq的服務可以設定多個虛拟機,每個虛拟機相當于獨立的mq
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");

        Connection connection = null;
        Channel channel = null;
        try{
            connection = factory.newConnection();
            // 建立内部會話通道,生産者和mq服務通信都在channel中完成
            channel = connection.createChannel();
            // 聲明交換機
            /**
             * 1. 交換機名稱
             * 2、交換機類型:
             *  FANOUT:對應的模式就是 釋出/訂閱模式
             *  DIRECT:對應路由的工作模式
             *  TOPIC:對應 Topics 工作模式
             *  HEADERS: 對應 HEADERS 工作模式
             */
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
            // 聲明隊列,如果沒有,則建立
//            channel.queueDeclare(QUEUE_EMAIL,false,false,false, null);
            channel.queueDeclare(QUEUE_SMS,false,false,false,null);

            // 交換機隊列綁定
            /**
             * 1 queue 隊列名稱
             * 2 exchange 交換機名稱
             * 3 routingKey 路由Key 釋出訂閱模式中 設定為""
             */
//            channel.queueBind(QUEUE_EMAIL,EXCHANGE_NAME,"");
            channel.queueBind(QUEUE_SMS,EXCHANGE_NAME,"");

            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String (body, StandardCharsets.UTF_8);
                    System.out.println("SMS_2 receive:" + message);
                }
            };
			// 接收消息 監聽隊列
            // 參數:String queue, boolean autoAck, Consumer callback
            /**
             * queue:隊列
             * autoAck:自動回複:當消費者接收到消息後,告訴mq消息已經接收。TRUE:自動回複,false:程式設計回複
             * callback:消費方法,當消費者接收消息執行的方法。
             */
            channel.basicConsume(QUEUE_SMS,true,consumer);
        } catch (TimeoutException | IOException e) {
            e.printStackTrace();
        }
    }
}