天天看點

rabbitmq系列(四)死信隊列

一、什麼是死信隊列

當消息在一個隊列中變成一個死信之後,它将被重新publish到另一個交換機上,這個交換機我們就叫做死信交換機,死信交換機将死信投遞到一個隊列上就是死信隊列。具體原理如下圖:

rabbitmq系列(四)死信隊列

消息變成死信的三種情況:

  • 消息被拒絕(basic.reject / basic.nack),并且requeue = false
  • 消息TTL過期
  • 隊列達到最大長度

二、手動簽收應答模式

應答模式分為兩種,手動簽收和自動簽收,自動應答就是消費者消費了一條消息就自動告訴隊列删除消息。這樣的弊端就是不管消費邏輯有沒有成功,都會将消息删除,這樣就會造成消息丢失。而使用手動簽收後,就是在消費邏輯處理成功後,手動告訴隊列消費成功,然後隊列再去删除這條消息。

  1. 再消費者配置檔案中開啟手動簽收模式
spring.rabbitmq.listener.simple.acknowledge-mode = manual           
  1. 在消費邏輯處理成功後手動簽收,修改消費者代碼
@RabbitListener(queues = QUEUE_NAME)
    public void receiveMessage(Message message,@Headers Map<String,Object> headers, Channel channel) throws Exception {

        Jedis jedis = new Jedis("localhost", 6379);

        String messageId = message.getMessageProperties().getMessageId();
        String msg = new String(message.getBody(),"UTF-8");
        System.out.println("接收導的消息為:"+msg+"==消息id為:"+messageId);

        String messageIdRedis = jedis.get("messageId");

        if(messageId == messageIdRedis){
            return;
        }
        JSONObject jsonObject = JSONObject.parseObject(msg);
        String email = jsonObject.getString("email");
        String content = jsonObject.getString("timestamp");

        String httpUrl = "http://127.0.0.1:8080/email?email"+email+"&content="+content;
        // 如果發生異常則傳回null
        String body = HttpUtils.httpGet(httpUrl, "utf-8");
        //
        if(body == null){
            throw new Exception();
        }
        jedis.set("messageId",messageId);
        Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
        // 手動簽收
        channel.basicAck(deliveryTag,false);
    }           

三、産生死信的三種情況示範

  • 消息被拒絕

我們繼續修改一下消費者代碼,嘗試讓消費者消費的時候發生異常。然後在catch塊中拒絕消息。

// 拒絕消息,給死信隊列
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);           

我們運作程式後發現,當消息消費異常後在隊列”zb-byte1“中的消息被消費了,同時發現在死信隊列”dead-byte-zb“中有一條未被消費的消息。消息到死信隊列後,然後我們在建立一個消費者去消費消息就可以了。當然死信隊列也需要去手動簽收消息。

這種模式我們也叫做延遲消費,有一種特别經典的案例就是使用者在一個商品搶購系統中,使用者搶到商品後需要在30分鐘時間内支付,不然訂單無效。這時候我們就可以通過消息TTL過期來實作,設定隊列消息過期時間為30分鐘,30分鐘後publish到死信隊列,我們在死信隊列中消費訂單狀态是否支付成功來判斷該訂單是否有效。

非常簡單,我們隻需要在配置死信交換機的時候設定有效時間就可以了

@Bean
    public Queue queue(){

        Map<String,Object> map = new HashMap<>();
        map.put("x-dead-letter-exchange",BEI_EXCHANGE_NAME);
        map.put("x-dead-letter-routing-key",BEI_ROUTING_KEY);
        map.put("x-message-ttl",7200); // 隊列過期時間
        Queue queue = new Queue(QUEUE_NAME,true,false,false,map);
        return queue;
    }           

設定隊列長度即可:

@Bean
    public Queue queue(){

        Map<String,Object> map = new HashMap<>();
        map.put("x-dead-letter-exchange",BEI_EXCHANGE_NAME);
        map.put("x-dead-letter-routing-key",BEI_ROUTING_KEY);
        map.put("x-max-length",3);
//        map.put("x-message-ttl",7200); // 隊列過期時間
        Queue queue = new Queue(QUEUE_NAME,true,false,false,map);
        return queue;
    }           

設定好之後,我們先不要啟動消費者,然後調用生成者往隊列中發送消息,當消息長度大于3時,我們發現消息進入了死信隊列。

注意:前文中也提到過,隊列不能被修改,也就是說已經建立好的隊列設定了過期時常為7200s,然後我們注釋掉,增加隊列長度是3的代碼,這樣運作會報錯,必須在rabbitmq中将該隊列删除,然後重新生成隊列才可以。