天天看點

RabbitMQ之Producer(三)死信隊列延遲隊列優先級隊列

文章目錄

  • 死信隊列
  • 延遲隊列
  • 優先級隊列

死信隊列

  1. 死信隊列(

    DLQ(Dead-Letter-Queue)

    )用來儲存處理失敗或者過期的消息。當一個消息變為死信消息(

    Dead Message

    )之後,消息會被投遞到死信交換器(

    DLX(Dead-Letter-Exchange)

    )中,

    DLX

    會将消息投遞到死信隊列(

    DLQ

    )中
  2. 消息變為死信消息的情況
    • 消息沒有被正确消費————被取消确認(

      Basic.Nack

      或者

      Basic.Reject

      ),并且設定

      requeue

      參數為false;
    • 消息TTL過期
    • 隊列達到最大長度
  3. 死信交換器(

    DLX(Dead-Letter-Exchange)

    )就是普通的

    Exchange

    .RabbitMQ 不會去驗證死信交換器(

    DLX(Dead-Letter-Exchange)

    )設定是否有效,當死信消息找不到指定的交換器時,死信消息會被RabbitMQ安靜的丢棄,而不是抛出異常。
  4. 當在某一個隊列中設定

    x-dead-letter-exchange

    屬性時,如果這個隊列中有消息變為死信消息,則

    Broker

    會自動的将這個消息重新釋出到設定的

    Exchange

    上(

    x-dead-letter-exchange

    對應的那個交換器)。
    RabbitMQ之Producer(三)死信隊列延遲隊列優先級隊列
  5. 代碼示例
    /**
         * 死信隊列
         */
        @Test
        public void testDLX() {
            ConnectionFactory factory = new ConnectionFactory();
    
            String userName = "jannal";
            String password = "jannal";
            String virtualHost = "jannal-vhost";
            String dlxQueueName = "jannal.direct.queue.dlx";
            String queueNameNormal = "jannal.direct.queue.normal";
            String dlxExchange = "jannal.direct.exchange.dlx";
            String exchangeNormal = "jannal.direct.exchange.normal";
            String routingKey = "DeadMessage";
            String bindingKey = "DeadMessage";
            String hostName = "jannal.mac.com";
            int portNumber = 5672;
    
            factory.setUsername(userName);
            factory.setPassword(password);
            factory.setVirtualHost(virtualHost);
            factory.setHost(hostName);
            factory.setPort(portNumber);
            factory.setAutomaticRecoveryEnabled(false);
    
            Connection conn = null;
            try {
                conn = factory.newConnection();
                Channel channel = conn.createChannel();
                boolean durable = true;
                boolean exclusive = false;
                boolean autoDelete = false;
    
    
                //普通隊列添加DLX
                HashMap<String, Object> queueArgs = new HashMap<>();
                queueArgs.put("x-dead-letter-exchange", dlxExchange);
                //如果不為DLX執行routingKey,則使用原來隊列的
                //args.put("x-dead-letter-routing-key","dlx");
                queueArgs.put("x-message-ttl", 10000); // 設定隊列中消息存活時間為10秒
                queueArgs.put("x-max-length", 5); // 設定隊列最大消息數量為5
                channel.queueDeclare(queueNameNormal, durable, exclusive, autoDelete, queueArgs);
                channel.exchangeDeclare(exchangeNormal, "direct", true);
                channel.queueBind(queueNameNormal, exchangeNormal, bindingKey);
    
                //死信
                channel.queueDeclare(dlxQueueName, durable, exclusive, autoDelete,null);
                channel.exchangeDeclare(dlxExchange, "direct", true);
                channel.queueBind(dlxQueueName, dlxExchange, bindingKey);
    
                boolean mandatory = false;
                boolean immediate = false;
                String msg = "Hello, world ";
                for (int i = 0; i < 12; i++) {
                    channel.basicPublish(exchangeNormal,
                            routingKey,
                            mandatory,
                            immediate,
                            MessageProperties.PERSISTENT_TEXT_PLAIN,
                            msg.getBytes("UTF-8"));
                }
    
            } catch (IOException e) {
                logger.error(e.getMessage(), e);
            } catch (TimeoutException e) {
                logger.error(e.getMessage(), e);
            } finally {
                if (conn != null) {
                    try {
                        conn.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
        
                    
  6. 檢視web控制台,

    Lim

    表示

    x-max-length=5

    ,

    D

    表示

    durable=true

    ,

    TTL

    表示

    x-message-ttl=10000

    ,

    DLX

    表示

    x-dead-letter-exchange=jannal.direct.exchange.dlx

    。 因為隊列最大長度為5,是以發送12條消息其中5條是進入

    jannal.direct.queue.normal

    ,其中7條被放入到

    jannal.direct.queue.dlx

    死信隊列中。當普通隊列中的5條消息10s後過期時,此時12條消息都被投遞到死信隊列中
    RabbitMQ之Producer(三)死信隊列延遲隊列優先級隊列
    RabbitMQ之Producer(三)死信隊列延遲隊列優先級隊列

延遲隊列

  1. 延遲隊列存儲的是延遲的消息,即

    consumer(消費者)

    隻有在等待特定的時間後才能去消費消息或者看到消息。
  2. 延遲消息應用場景
    • 一個A任務執行之後,等待特定的時間T,B任務才能去執行。比如使用者買火車票,必須在30分鐘以内支付,否則席位會被取消。此時【取消席位】這個任務就可以使用延遲隊列來處理。再比如滴滴打車之後,使用者一直不評價,24小時之後自動評價
  3. x-message-ttl

    設定隊列中消息的存活時間,當消息過期變為死信隊列之後,此時

    Consumer(消息者)

    就可以從死信隊列中消費消息,這樣就達到了延遲特定時間的目的了。
  4. 為不同的延遲時間設定不同的隊列,代碼示例
    /**
         * 延遲隊列
         */
        @Test
        public void testDelayQueue() {
            ConnectionFactory factory = new ConnectionFactory();
    
            String userName = "jannal";
            String password = "jannal";
            String virtualHost = "jannal-vhost";
            String dlxQueueName_5 = "jannal.direct.queue.dlx.5";
            String dlxQueueName_10 = "jannal.direct.queue.dlx.10";
            String queueNameNormal_5 = "jannal.direct.queue.normal_5";
            String queueNameNormal_10 = "jannal.direct.queue.normal_10";
            String dlxExchange_5 = "jannal.direct.exchange.dlx_5";
            String dlxExchange_10 = "jannal.direct.exchange.dlx_10";
            String exchangeNormal = "jannal.direct.exchange.normal";
    
            String hostName = "jannal.mac.com";
            int portNumber = 5672;
    
            factory.setUsername(userName);
            factory.setPassword(password);
            factory.setVirtualHost(virtualHost);
            factory.setHost(hostName);
            factory.setPort(portNumber);
            factory.setAutomaticRecoveryEnabled(false);
    
            Connection conn = null;
            try {
                conn = factory.newConnection();
                Channel channel = conn.createChannel();
                boolean durable = true;
                boolean exclusive = false;
                boolean autoDelete = false;
    
                HashMap<String, Object> queueArgs = new HashMap<>();
                queueArgs.put("x-dead-letter-exchange", dlxExchange_5);
    
                //設定5s
                queueArgs.put("x-message-ttl", 5);
                channel.queueDeclare(queueNameNormal_5, durable, exclusive, autoDelete, queueArgs);
                channel.exchangeDeclare(exchangeNormal, "direct", true);
                channel.queueBind(queueNameNormal_5, exchangeNormal, "5s");
                //設定10s
                queueArgs.put("x-dead-letter-exchange", dlxExchange_10);
                queueArgs.put("x-message-ttl", 10);
                channel.queueDeclare(queueNameNormal_10, durable, exclusive, autoDelete, queueArgs);
                channel.exchangeDeclare(exchangeNormal, "direct", true);
                channel.queueBind(queueNameNormal_10, exchangeNormal, "10s");
    
                //死信
                channel.queueDeclare(dlxQueueName_5, durable, exclusive, autoDelete,null);
                channel.exchangeDeclare(dlxExchange_5, "direct", true);
                channel.queueBind(dlxQueueName_5, dlxExchange_5, "5s");
    
                channel.queueDeclare(dlxQueueName_10, durable, exclusive, autoDelete,null);
                channel.exchangeDeclare(dlxExchange_10, "direct", true);
                channel.queueBind(dlxQueueName_10, dlxExchange_10, "10s");
    
    
                boolean mandatory = false;
                boolean immediate = false;
                String msg = "Hello, world ";
                //過期時間5s的12條
                for (int i = 0; i < 12; i++) {
                    channel.basicPublish(exchangeNormal,
                            "5s",
                            mandatory,
                            immediate,
                            MessageProperties.PERSISTENT_TEXT_PLAIN,
                            msg.getBytes("UTF-8"));
                }
    
                //過期時間10s的10條
                for (int i = 0; i < 10; i++) {
                    channel.basicPublish(exchangeNormal,
                            "10s",
                            mandatory,
                            immediate,
                            MessageProperties.PERSISTENT_TEXT_PLAIN,
                            msg.getBytes("UTF-8"));
                }
    
            } catch (IOException e) {
                logger.error(e.getMessage(), e);
            } catch (TimeoutException e) {
                logger.error(e.getMessage(), e);
            } finally {
                if (conn != null) {
                    try {
                        conn.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
    
    
                    
  5. web控制台檢視
    RabbitMQ之Producer(三)死信隊列延遲隊列優先級隊列

優先級隊列

  1. 對于消息在

    Broker

    中堆積的情況下,優先級高的消息具有優先被消費的特權。
  2. 代碼示例
    /**
             * 優先級隊列
             */
            @Test
            public void testPriority(){
                ConnectionFactory factory = new ConnectionFactory();
        
                String userName = "jannal";
                String password = "jannal";
                String virtualHost = "jannal-vhost";
                String queueName = "jannal.direct.queue.priority";
                String exchange = "jannal.direct.exchange.priority";
                String routingKey = "priority";
                String bindingKey = "priority";
                String hostName = "jannal.mac.com";
                int portNumber = 5672;
        
                factory.setUsername(userName);
                factory.setPassword(password);
                factory.setVirtualHost(virtualHost);
                factory.setHost(hostName);
                factory.setPort(portNumber);
                factory.setAutomaticRecoveryEnabled(false);
        
                Connection conn = null;
                try {
                    conn = factory.newConnection();
                    Channel channel = conn.createChannel();
                    boolean durable = true;
                    boolean exclusive = false;
                    boolean autoDelete = false;
        
        
                    HashMap<String, Object> args = new HashMap<>();
                    args.put("x-max-priority", 10);//設定一個隊列的最大優先級
                    channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
                    channel.exchangeDeclare(exchange, "direct", true);
                    channel.queueBind(queueName, exchange, bindingKey);
        
                    boolean mandatory = false;
                    boolean immediate = false;
                    String msg = "Hello, world ";
        
                    //消息的優先級預設是0,最高為隊列設定的優先級,優先級高的消息會被先消費
                    AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(3).build();
                    channel.basicPublish(exchange,
                            routingKey,
                            mandatory,
                            immediate,
                            properties,
                            msg.getBytes("UTF-8"));
        
                } catch (IOException e) {
                    logger.error(e.getMessage(), e);
                } catch (TimeoutException e) {
                    logger.error(e.getMessage(), e);
                } finally {
                    if (conn != null) {
                        try {
                            conn.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
                    
  3. web控制台
    RabbitMQ之Producer(三)死信隊列延遲隊列優先級隊列

繼續閱讀