天天看點

RabbitMq死信隊列(六)

    當隊列中的消息過期,或者達到最大長度而被删除,或者達到最大空間時而被删除時,可以将這些被删除的資訊推送到其他交換機中,讓其他消費者訂閱這些被删除的消息,處理這些消息。

    準備要素:一個生産者 一個正常消費者(帶一個正常交換機,一個路由鍵,一個正常隊列) 一個處理過期消息的消費者(帶一個死亡交換機,死亡路由鍵,死亡隊列)

生産者MsgProducer :

@Component
public class MsgProducer {
    //交換機名稱
    private static final String EXCHANGE_NAME = "amqp-hello";
    private static final String EXCHANGE_NAME2 = "amqp-hello2";
    //交換機和交換機綁定的key   "mqkey"
    private static final String ExCHANEROUTING_KEY = "mqkey";
    //對列名稱
    private static final String QUEUE_NAME = "Hello-Queue1";
    //routingkey 名稱
    private static final String ROUTING_KEY = "key-Queue";
    //死亡路由鍵
    private static final String DEADROUTING_KEY = "deadkey-Queue";
    //死亡交換機
    private static final String DEADEXCHANGE_NAME = "deadamqp-hello";
    //處理死亡消息的對列名稱
    private static final String DEADQUEUE_NAME = "deadHello-Queue1";

     public static void main(String[] args) throws Exception{
        String message;
        //1.擷取連接配接
        Connection connection = RabbitConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
        //2.聲明通道
        Channel channel = connection.createChannel();
        //交換機
        channel.exchangeDeclare(EXCHANGE_NAME, "direct", false, false, null);
        Map<String, Object> arguments = new HashMap<String, Object>();
        arguments.put("x-message-ttl", 15000);
        //通過這兩個參數把兩交換機連在一塊!!
        arguments.put("x-dead-letter-exchange", DEADEXCHANGE_NAME);
        arguments.put("x-dead-letter-routing-key", DEADROUTING_KEY);
        //3.聲明(建立)隊列  b:durable是否持久化, 隊列的聲明預設是存放到記憶體中的,如果true :rabbitmq重新開機隊列不會丢失但是裡面消息沒了 false:隊列和消息全沒了
        //b1:exclusive:是否排外的,有兩個作用,一:當連接配接關閉時connection.close()該隊列是否會自動删除
        //b2:autoDelete:是否自動删除,當最後一個消費者斷開連接配接之後隊列裡面消息是否自動被删除,當consumers = 0時隊列裡面消息就會自動删除
        channel.queueDeclare(QUEUE_NAME,true,false,false,arguments);
        //将交換器與隊列通過路由鍵綁定 EXCHANGE_NAME 為與隊列連接配接的交換機
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
        //死亡交換機
        channel.exchangeDeclare(DEADEXCHANGE_NAME, "direct", false, false, null);
        //死亡資訊的隊列
        channel.queueDeclare(DEADQUEUE_NAME,true,false,false,null);
        //将死亡交換器與死信隊列通過死亡路由鍵綁定 DEADEXCHANGE_NAME 為與死信隊列連接配接的死亡交換機
        channel.queueBind(DEADQUEUE_NAME,DEADEXCHANGE_NAME,DEADROUTING_KEY);
        for (int i=0;i<20;i++){
            //4.定義消息内容
            message = "hello rabbitmq-";
            //5.釋出消息  routingkey 是應用于多個消費者的時候明确如何給多個消費者配置設定資訊
            //BasicProperties props -- 消息的基本屬性,例如路由頭等。byte[] body -- 消息體
            //void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
            //注意此處EXCHANGE_NAME 為與生産者相連的交換機!!   MessageProperties.PERSISTENT_TEXT_PLAIN 代表消息持久化(用别的方法還有單獨為幾條消息設定持久化的)
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,null,(message+i).getBytes());
            System.out.println("[x] send'" + (message+i).toString() + "'");
        };
        //6.關閉通道和連接配接
        channel.close();
        connection.close();
      }
   }
 }
           

關鍵代碼如下:

死亡交換機:DEADEXCHANGE_NAME

死亡路由鍵:DEADROUTING_KEY

RabbitMq死信隊列(六)

上圖顯示TTL DLX DLK都是map中的參數

Map<String, Object> arguments = new HashMap<String, Object>();
 //簡稱TTL
 arguments.put("x-message-ttl", 15000);
 //通過這兩個參數把兩交換機連在一塊!!
 //簡稱 DLX
 arguments.put("x-dead-letter-exchange", DEADEXCHANGE_NAME);
 //簡稱 DLK 其實這個路由鍵用正常交換機的也可以
 arguments.put("x-dead-letter-routing-key", DEADROUTING_KEY);
 //死亡交換機,死亡路由鍵放入正常的queueDeclare中
 channel.queueDeclare(QUEUE_NAME,true,false,false,arguments);
           

處理正常消息的消費者MsgReceiver1:

@Component
public class MsgReceiver1 {
 //交換機名稱
    private static final String EXCHANGE_NAME = "amqp-hello";
    private static final String EXCHANGE_NAME2 = "amqp-hello2";
    //交換機和交換機綁定的key   "mqkey"
    private static final String ExCHANEROUTING_KEY = "mqkey";
    //對列名稱
    private static final String QUEUE_NAME = "Hello-Queue1";
    //routingkey 名稱
    private static final String ROUTING_KEY = "key-Queue";
    //死亡路由鍵
    private static final String DEADROUTING_KEY = "deadkey-Queue";
    //死亡交換機
    private static final String DEADEXCHANGE_NAME = "deadamqp-hello";
    //處理死亡消息的對列名稱
    private static final String DEADQUEUE_NAME = "deadHello-Queue1";
   
     public static void main(String[] args)  throws Exception {
        Connection connection = RabbitConnectionUtil.getConnection("127.0.0.1", 5672, "/", "guest", "guest");
        Channel channel = connection.createChannel();
        Map<String, Object> arguments = new HashMap<String, Object>();
        arguments.put("x-message-ttl", 15000);
        //通過這兩個參數把兩交換機連在一塊!!
        arguments.put("x-dead-letter-exchange", DEADEXCHANGE_NAME);
        arguments.put("x-dead-letter-routing-key", DEADROUTING_KEY);
        //3.聲明隊列
        channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
        //需要設定每次從隊列擷取消息的數量 能者多勞的原則不至于做的快的做完了就歇着了!
        channel.basicQos(1);
        // channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY1);
        // channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY2);
        //4.定義隊列的消費者
        QueueingConsumer queueingConsumer1 = new QueueingConsumer(channel);
        /*
        true:表示自動确認,隻要消息從隊列中擷取,無論消費者擷取到消息後是否成功消費,都會認為消息成功消費.
        false:表示手動确認,消費者擷取消息後,伺服器會将該消息标記為不可用狀态,等待消費者的回報,
                如果消費者一直沒有回報,那麼該消息将一直處于不可用狀态,并且伺服器會認為該消費者已經挂掉,不會再給其發送消息,
                直到該消費者回報.例如:channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
     */
        //b:ack
        channel.basicConsume(QUEUE_NAME, false,queueingConsumer1);
        // channel.basicConsume(QUEUE_NAME,true,queueingConsumer2);
        //6.擷取消息
        while (true) {
            anInt += 1;
            QueueingConsumer.Delivery delivery1 = queueingConsumer1.nextDelivery();
            // QueueingConsumer.Delivery delivery2 = queueingConsumer2.nextDelivery();
            String message1 = new String(delivery1.getBody());
            // String message2 = String.valueOf(delivery2.getBody());
            System.out.println("[" + String.valueOf(anInt) + "]:receve msg:" + message1);
            //手動确認ack  是否批量處理.true:将一次性ack所有小于deliveryTag的消息
            channel.basicAck(delivery1.getEnvelope().getDeliveryTag(), false);
            // System.out.println("[x] Received '" + message2 + "'");
            Thread.sleep(500);
        }
    }
}
           

處理過期消息的消費者MsgReceiver2

@Component
public class MsgReceiver2 {
    private static int  anInt = 0;
    //交換機名稱
    private static final String EXCHANGE_NAME = "amqp-hello";
    private static final String EXCHANGE_NAME2 = "amqp-hello2";
    //交換機和交換機綁定的key   "mqkey"
    private static final String ExCHANEROUTING_KEY = "mqkey";
    //對列名稱
    private static final String QUEUE_NAME = "Hello-Queue1";
    //routingkey 名稱
    private static final String ROUTING_KEY = "key-Queue";
    //死亡路由鍵
    private static final String DEADROUTING_KEY = "deadkey-Queue";
    //死亡交換機
    private static final String DEADEXCHANGE_NAME = "deadamqp-hello";
    //處理死亡消息的對列名稱
    private static final String DEADQUEUE_NAME = "deadHello-Queue1";
    //routingkey 名稱
   // private static final String ROUTING_KEY1 = "key1-Queue";
    private static final String ROUTING_KEY2 = "key2-Queue";
    public static void main(String[] args)  throws Exception {
        Connection connection = RabbitConnectionUtil.getConnection("127.0.0.1", 5672, "/", "guest", "guest");
        Channel channel = connection.createChannel();
        //3.聲明隊列
        channel.queueDeclare(DEADQUEUE_NAME, true, false, false, null);
        //輪詢時需要設定每次從隊列擷取消息的數量 能者多勞的原則不至于做的快的做完了就歇着了!輪詢時需要
        //這個不起作用
        channel.basicQos(1);
        // channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY1);
        // channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY2);
        //4.定義隊列的消費者
        QueueingConsumer queueingConsumer1 = new QueueingConsumer(channel);
        /*
    true:表示自動确認,隻要消息從隊列中擷取,無論消費者擷取到消息後是否成功消費,都會認為消息成功消費.
    false:表示手動确認,消費者擷取消息後,伺服器會将該消息标記為不可用狀态,等待消費者的回報,
    如果消費者一直沒有回報,那麼該消息将一直處于不可用狀态,并且伺服器會認為該消費者已經挂掉,不會再給其發送消息,
    直到該消費者回報.例如:channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
     */
        //b:ack
        channel.basicConsume(DEADQUEUE_NAME, false, queueingConsumer1);
        // channel.basicConsume(QUEUE_NAME,true,queueingConsumer2);
        //6.擷取消息
        while (true) {
            anInt += 1;
            QueueingConsumer.Delivery delivery1 = queueingConsumer1.nextDelivery();
            // QueueingConsumer.Delivery delivery2 = queueingConsumer2.nextDelivery();
            String message1 = new String(delivery1.getBody());
            // String message2 = String.valueOf(delivery2.getBody());
            System.out.println("[" + String.valueOf(anInt) + "]:receve msg:" + message1);
            //手動确認ack  是否批量處理.true:将一次性ack所有小于deliveryTag的消息
            channel.basicAck(delivery1.getEnvelope().getDeliveryTag(), false);
            // System.out.println("[x] Received '" + message2 + "'");
            Thread.sleep(500);
        }
    }
           

    先開啟多個消費者在開啟生産者。最後運作生産者後,發現MsgReceiver2在等待,當MsgReceiver1把正常消息消費完後,出現過期的消息MsgReceiver2才開始消費

運作結果:

生産者50條

[x] send'hello rabbitmq-0'
[x] send'hello rabbitmq-1'
[x] send'hello rabbitmq-2'
[x] send'hello rabbitmq-3'
[x] send'hello rabbitmq-4'
[x] send'hello rabbitmq-5'
[x] send'hello rabbitmq-6'
[x] send'hello rabbitmq-7'
[x] send'hello rabbitmq-8'
[x] send'hello rabbitmq-9'
[x] send'hello rabbitmq-10'
[x] send'hello rabbitmq-11'
[x] send'hello rabbitmq-12'
[x] send'hello rabbitmq-13'
[x] send'hello rabbitmq-14'
[x] send'hello rabbitmq-15'
[x] send'hello rabbitmq-16'
[x] send'hello rabbitmq-17'
[x] send'hello rabbitmq-18'
[x] send'hello rabbitmq-19'
[x] send'hello rabbitmq-20'
[x] send'hello rabbitmq-21'
[x] send'hello rabbitmq-22'
[x] send'hello rabbitmq-23'
[x] send'hello rabbitmq-24'
[x] send'hello rabbitmq-25'
[x] send'hello rabbitmq-26'
[x] send'hello rabbitmq-27'
[x] send'hello rabbitmq-28'
[x] send'hello rabbitmq-29'
[x] send'hello rabbitmq-30'
[x] send'hello rabbitmq-31'
[x] send'hello rabbitmq-32'
[x] send'hello rabbitmq-33'
[x] send'hello rabbitmq-34'
[x] send'hello rabbitmq-35'
[x] send'hello rabbitmq-36'
[x] send'hello rabbitmq-37'
[x] send'hello rabbitmq-38'
[x] send'hello rabbitmq-39'
[x] send'hello rabbitmq-40'
[x] send'hello rabbitmq-41'
[x] send'hello rabbitmq-42'
[x] send'hello rabbitmq-43'
[x] send'hello rabbitmq-44'
[x] send'hello rabbitmq-45'
[x] send'hello rabbitmq-46'
[x] send'hello rabbitmq-47'
[x] send'hello rabbitmq-48'
[x] send'hello rabbitmq-49'
           

MsgReceiver1正常處理31條

[1]:receve msg:hello rabbitmq-0
[2]:receve msg:hello rabbitmq-1
[3]:receve msg:hello rabbitmq-2
[4]:receve msg:hello rabbitmq-3
[5]:receve msg:hello rabbitmq-4
[6]:receve msg:hello rabbitmq-5
[7]:receve msg:hello rabbitmq-6
[8]:receve msg:hello rabbitmq-7
[9]:receve msg:hello rabbitmq-8
[10]:receve msg:hello rabbitmq-9
[11]:receve msg:hello rabbitmq-10
[12]:receve msg:hello rabbitmq-11
[13]:receve msg:hello rabbitmq-12
[14]:receve msg:hello rabbitmq-13
[15]:receve msg:hello rabbitmq-14
[16]:receve msg:hello rabbitmq-15
[17]:receve msg:hello rabbitmq-16
[18]:receve msg:hello rabbitmq-17
[19]:receve msg:hello rabbitmq-18
[20]:receve msg:hello rabbitmq-19
[21]:receve msg:hello rabbitmq-20
[22]:receve msg:hello rabbitmq-21
[23]:receve msg:hello rabbitmq-22
[24]:receve msg:hello rabbitmq-23
[25]:receve msg:hello rabbitmq-24
[26]:receve msg:hello rabbitmq-25
[27]:receve msg:hello rabbitmq-26
[28]:receve msg:hello rabbitmq-27
[29]:receve msg:hello rabbitmq-28
[30]:receve msg:hello rabbitmq-29
[31]:receve msg:hello rabbitmq-30
           

MsgReceiver2處理過期消息19條

[1]:receve msg:hello rabbitmq-31
[2]:receve msg:hello rabbitmq-32
[3]:receve msg:hello rabbitmq-33
[4]:receve msg:hello rabbitmq-34
[5]:receve msg:hello rabbitmq-35
[6]:receve msg:hello rabbitmq-36
[7]:receve msg:hello rabbitmq-37
[8]:receve msg:hello rabbitmq-38
[9]:receve msg:hello rabbitmq-39
[10]:receve msg:hello rabbitmq-40
[11]:receve msg:hello rabbitmq-41
[12]:receve msg:hello rabbitmq-42
[13]:receve msg:hello rabbitmq-43
[14]:receve msg:hello rabbitmq-44
[15]:receve msg:hello rabbitmq-45
[16]:receve msg:hello rabbitmq-46
[17]:receve msg:hello rabbitmq-47
[18]:receve msg:hello rabbitmq-48
[19]:receve msg:hello rabbitmq-49