天天看点

RabbitMq死信队列(六)

    当队列中的消息过期,或者达到最大长度而被删除,或者达到最大空间时而被删除时,可以将这些被删除的信息推送到其他交换机中,让其他消费者订阅这些被删除的消息,处理这些消息。

    准备要素:一个生产者 一个正常消费者(带一个正常交换机,一个路由键,一个正常队列) 一个处理过期消息的消费者(带一个死亡交换机,死亡路由键,死亡队列)

生产者MsgProducer :

@Component
public class MsgProducer {
    //交换机名称
    private static final String EXCHANGE_NAME = "amqp-hello";
    private static final String EXCHANGE_NAME2 = "amqp-hello2";
    //交换机和交换机绑定的key   "mqkey"
    private static final String ExCHANEROUTING_KEY = "mqkey";
    //对列名称
    private static final String QUEUE_NAME = "Hello-Queue1";
    //routingkey 名称
    private static final String ROUTING_KEY = "key-Queue";
    //死亡路由键
    private static final String DEADROUTING_KEY = "deadkey-Queue";
    //死亡交换机
    private static final String DEADEXCHANGE_NAME = "deadamqp-hello";
    //处理死亡消息的对列名称
    private static final String DEADQUEUE_NAME = "deadHello-Queue1";

     public static void main(String[] args) throws Exception{
        String message;
        //1.获取连接
        Connection connection = RabbitConnectionUtil.getConnection("localhost", 5672, "/", "guest", "guest");
        //2.声明通道
        Channel channel = connection.createChannel();
        //交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "direct", false, false, null);
        Map<String, Object> arguments = new HashMap<String, Object>();
        arguments.put("x-message-ttl", 15000);
        //通过这两个参数把两交换机连在一块!!
        arguments.put("x-dead-letter-exchange", DEADEXCHANGE_NAME);
        arguments.put("x-dead-letter-routing-key", DEADROUTING_KEY);
        //3.声明(创建)队列  b:durable是否持久化, 队列的声明默认是存放到内存中的,如果true :rabbitmq重启队列不会丢失但是里面消息没了 false:队列和消息全没了
        //b1:exclusive:是否排外的,有两个作用,一:当连接关闭时connection.close()该队列是否会自动删除
        //b2:autoDelete:是否自动删除,当最后一个消费者断开连接之后队列里面消息是否自动被删除,当consumers = 0时队列里面消息就会自动删除
        channel.queueDeclare(QUEUE_NAME,true,false,false,arguments);
        //将交换器与队列通过路由键绑定 EXCHANGE_NAME 为与队列连接的交换机
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
        //死亡交换机
        channel.exchangeDeclare(DEADEXCHANGE_NAME, "direct", false, false, null);
        //死亡信息的队列
        channel.queueDeclare(DEADQUEUE_NAME,true,false,false,null);
        //将死亡交换器与死信队列通过死亡路由键绑定 DEADEXCHANGE_NAME 为与死信队列连接的死亡交换机
        channel.queueBind(DEADQUEUE_NAME,DEADEXCHANGE_NAME,DEADROUTING_KEY);
        for (int i=0;i<20;i++){
            //4.定义消息内容
            message = "hello rabbitmq-";
            //5.发布消息  routingkey 是应用于多个消费者的时候明确如何给多个消费者分配信息
            //BasicProperties props -- 消息的基本属性,例如路由头等。byte[] body -- 消息体
            //void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
            //注意此处EXCHANGE_NAME 为与生产者相连的交换机!!   MessageProperties.PERSISTENT_TEXT_PLAIN 代表消息持久化(用别的方法还有单独为几条消息设置持久化的)
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,null,(message+i).getBytes());
            System.out.println("[x] send'" + (message+i).toString() + "'");
        };
        //6.关闭通道和连接
        channel.close();
        connection.close();
      }
   }
 }
           

关键代码如下:

死亡交换机:DEADEXCHANGE_NAME

死亡路由键:DEADROUTING_KEY

RabbitMq死信队列(六)

上图显示TTL DLX DLK都是map中的参数

Map<String, Object> arguments = new HashMap<String, Object>();
 //简称TTL
 arguments.put("x-message-ttl", 15000);
 //通过这两个参数把两交换机连在一块!!
 //简称 DLX
 arguments.put("x-dead-letter-exchange", DEADEXCHANGE_NAME);
 //简称 DLK 其实这个路由键用正常交换机的也可以
 arguments.put("x-dead-letter-routing-key", DEADROUTING_KEY);
 //死亡交换机,死亡路由键放入正常的queueDeclare中
 channel.queueDeclare(QUEUE_NAME,true,false,false,arguments);
           

处理正常消息的消费者MsgReceiver1:

@Component
public class MsgReceiver1 {
 //交换机名称
    private static final String EXCHANGE_NAME = "amqp-hello";
    private static final String EXCHANGE_NAME2 = "amqp-hello2";
    //交换机和交换机绑定的key   "mqkey"
    private static final String ExCHANEROUTING_KEY = "mqkey";
    //对列名称
    private static final String QUEUE_NAME = "Hello-Queue1";
    //routingkey 名称
    private static final String ROUTING_KEY = "key-Queue";
    //死亡路由键
    private static final String DEADROUTING_KEY = "deadkey-Queue";
    //死亡交换机
    private static final String DEADEXCHANGE_NAME = "deadamqp-hello";
    //处理死亡消息的对列名称
    private static final String DEADQUEUE_NAME = "deadHello-Queue1";
   
     public static void main(String[] args)  throws Exception {
        Connection connection = RabbitConnectionUtil.getConnection("127.0.0.1", 5672, "/", "guest", "guest");
        Channel channel = connection.createChannel();
        Map<String, Object> arguments = new HashMap<String, Object>();
        arguments.put("x-message-ttl", 15000);
        //通过这两个参数把两交换机连在一块!!
        arguments.put("x-dead-letter-exchange", DEADEXCHANGE_NAME);
        arguments.put("x-dead-letter-routing-key", DEADROUTING_KEY);
        //3.声明队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
        //需要设置每次从队列获取消息的数量 能者多劳的原则不至于做的快的做完了就歇着了!
        channel.basicQos(1);
        // channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY1);
        // channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY2);
        //4.定义队列的消费者
        QueueingConsumer queueingConsumer1 = new QueueingConsumer(channel);
        /*
        true:表示自动确认,只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都会认为消息成功消费.
        false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,
                如果消费者一直没有反馈,那么该消息将一直处于不可用状态,并且服务器会认为该消费者已经挂掉,不会再给其发送消息,
                直到该消费者反馈.例如:channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
     */
        //b:ack
        channel.basicConsume(QUEUE_NAME, false,queueingConsumer1);
        // channel.basicConsume(QUEUE_NAME,true,queueingConsumer2);
        //6.获取消息
        while (true) {
            anInt += 1;
            QueueingConsumer.Delivery delivery1 = queueingConsumer1.nextDelivery();
            // QueueingConsumer.Delivery delivery2 = queueingConsumer2.nextDelivery();
            String message1 = new String(delivery1.getBody());
            // String message2 = String.valueOf(delivery2.getBody());
            System.out.println("[" + String.valueOf(anInt) + "]:receve msg:" + message1);
            //手动确认ack  是否批量处理.true:将一次性ack所有小于deliveryTag的消息
            channel.basicAck(delivery1.getEnvelope().getDeliveryTag(), false);
            // System.out.println("[x] Received '" + message2 + "'");
            Thread.sleep(500);
        }
    }
}
           

处理过期消息的消费者MsgReceiver2

@Component
public class MsgReceiver2 {
    private static int  anInt = 0;
    //交换机名称
    private static final String EXCHANGE_NAME = "amqp-hello";
    private static final String EXCHANGE_NAME2 = "amqp-hello2";
    //交换机和交换机绑定的key   "mqkey"
    private static final String ExCHANEROUTING_KEY = "mqkey";
    //对列名称
    private static final String QUEUE_NAME = "Hello-Queue1";
    //routingkey 名称
    private static final String ROUTING_KEY = "key-Queue";
    //死亡路由键
    private static final String DEADROUTING_KEY = "deadkey-Queue";
    //死亡交换机
    private static final String DEADEXCHANGE_NAME = "deadamqp-hello";
    //处理死亡消息的对列名称
    private static final String DEADQUEUE_NAME = "deadHello-Queue1";
    //routingkey 名称
   // private static final String ROUTING_KEY1 = "key1-Queue";
    private static final String ROUTING_KEY2 = "key2-Queue";
    public static void main(String[] args)  throws Exception {
        Connection connection = RabbitConnectionUtil.getConnection("127.0.0.1", 5672, "/", "guest", "guest");
        Channel channel = connection.createChannel();
        //3.声明队列
        channel.queueDeclare(DEADQUEUE_NAME, true, false, false, null);
        //轮询时需要设置每次从队列获取消息的数量 能者多劳的原则不至于做的快的做完了就歇着了!轮询时需要
        //这个不起作用
        channel.basicQos(1);
        // channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY1);
        // channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY2);
        //4.定义队列的消费者
        QueueingConsumer queueingConsumer1 = new QueueingConsumer(channel);
        /*
    true:表示自动确认,只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都会认为消息成功消费.
    false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,
    如果消费者一直没有反馈,那么该消息将一直处于不可用状态,并且服务器会认为该消费者已经挂掉,不会再给其发送消息,
    直到该消费者反馈.例如:channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
     */
        //b:ack
        channel.basicConsume(DEADQUEUE_NAME, false, queueingConsumer1);
        // channel.basicConsume(QUEUE_NAME,true,queueingConsumer2);
        //6.获取消息
        while (true) {
            anInt += 1;
            QueueingConsumer.Delivery delivery1 = queueingConsumer1.nextDelivery();
            // QueueingConsumer.Delivery delivery2 = queueingConsumer2.nextDelivery();
            String message1 = new String(delivery1.getBody());
            // String message2 = String.valueOf(delivery2.getBody());
            System.out.println("[" + String.valueOf(anInt) + "]:receve msg:" + message1);
            //手动确认ack  是否批量处理.true:将一次性ack所有小于deliveryTag的消息
            channel.basicAck(delivery1.getEnvelope().getDeliveryTag(), false);
            // System.out.println("[x] Received '" + message2 + "'");
            Thread.sleep(500);
        }
    }
           

    先开启多个消费者在开启生产者。最后运行生产者后,发现MsgReceiver2在等待,当MsgReceiver1把正常消息消费完后,出现过期的消息MsgReceiver2才开始消费

运行结果:

生产者50条

[x] send'hello rabbitmq-0'
[x] send'hello rabbitmq-1'
[x] send'hello rabbitmq-2'
[x] send'hello rabbitmq-3'
[x] send'hello rabbitmq-4'
[x] send'hello rabbitmq-5'
[x] send'hello rabbitmq-6'
[x] send'hello rabbitmq-7'
[x] send'hello rabbitmq-8'
[x] send'hello rabbitmq-9'
[x] send'hello rabbitmq-10'
[x] send'hello rabbitmq-11'
[x] send'hello rabbitmq-12'
[x] send'hello rabbitmq-13'
[x] send'hello rabbitmq-14'
[x] send'hello rabbitmq-15'
[x] send'hello rabbitmq-16'
[x] send'hello rabbitmq-17'
[x] send'hello rabbitmq-18'
[x] send'hello rabbitmq-19'
[x] send'hello rabbitmq-20'
[x] send'hello rabbitmq-21'
[x] send'hello rabbitmq-22'
[x] send'hello rabbitmq-23'
[x] send'hello rabbitmq-24'
[x] send'hello rabbitmq-25'
[x] send'hello rabbitmq-26'
[x] send'hello rabbitmq-27'
[x] send'hello rabbitmq-28'
[x] send'hello rabbitmq-29'
[x] send'hello rabbitmq-30'
[x] send'hello rabbitmq-31'
[x] send'hello rabbitmq-32'
[x] send'hello rabbitmq-33'
[x] send'hello rabbitmq-34'
[x] send'hello rabbitmq-35'
[x] send'hello rabbitmq-36'
[x] send'hello rabbitmq-37'
[x] send'hello rabbitmq-38'
[x] send'hello rabbitmq-39'
[x] send'hello rabbitmq-40'
[x] send'hello rabbitmq-41'
[x] send'hello rabbitmq-42'
[x] send'hello rabbitmq-43'
[x] send'hello rabbitmq-44'
[x] send'hello rabbitmq-45'
[x] send'hello rabbitmq-46'
[x] send'hello rabbitmq-47'
[x] send'hello rabbitmq-48'
[x] send'hello rabbitmq-49'
           

MsgReceiver1正常处理31条

[1]:receve msg:hello rabbitmq-0
[2]:receve msg:hello rabbitmq-1
[3]:receve msg:hello rabbitmq-2
[4]:receve msg:hello rabbitmq-3
[5]:receve msg:hello rabbitmq-4
[6]:receve msg:hello rabbitmq-5
[7]:receve msg:hello rabbitmq-6
[8]:receve msg:hello rabbitmq-7
[9]:receve msg:hello rabbitmq-8
[10]:receve msg:hello rabbitmq-9
[11]:receve msg:hello rabbitmq-10
[12]:receve msg:hello rabbitmq-11
[13]:receve msg:hello rabbitmq-12
[14]:receve msg:hello rabbitmq-13
[15]:receve msg:hello rabbitmq-14
[16]:receve msg:hello rabbitmq-15
[17]:receve msg:hello rabbitmq-16
[18]:receve msg:hello rabbitmq-17
[19]:receve msg:hello rabbitmq-18
[20]:receve msg:hello rabbitmq-19
[21]:receve msg:hello rabbitmq-20
[22]:receve msg:hello rabbitmq-21
[23]:receve msg:hello rabbitmq-22
[24]:receve msg:hello rabbitmq-23
[25]:receve msg:hello rabbitmq-24
[26]:receve msg:hello rabbitmq-25
[27]:receve msg:hello rabbitmq-26
[28]:receve msg:hello rabbitmq-27
[29]:receve msg:hello rabbitmq-28
[30]:receve msg:hello rabbitmq-29
[31]:receve msg:hello rabbitmq-30
           

MsgReceiver2处理过期消息19条

[1]:receve msg:hello rabbitmq-31
[2]:receve msg:hello rabbitmq-32
[3]:receve msg:hello rabbitmq-33
[4]:receve msg:hello rabbitmq-34
[5]:receve msg:hello rabbitmq-35
[6]:receve msg:hello rabbitmq-36
[7]:receve msg:hello rabbitmq-37
[8]:receve msg:hello rabbitmq-38
[9]:receve msg:hello rabbitmq-39
[10]:receve msg:hello rabbitmq-40
[11]:receve msg:hello rabbitmq-41
[12]:receve msg:hello rabbitmq-42
[13]:receve msg:hello rabbitmq-43
[14]:receve msg:hello rabbitmq-44
[15]:receve msg:hello rabbitmq-45
[16]:receve msg:hello rabbitmq-46
[17]:receve msg:hello rabbitmq-47
[18]:receve msg:hello rabbitmq-48
[19]:receve msg:hello rabbitmq-49