天天看點

消息被拒MQ

生産者

/**
 * 消息被拒的情況
 */
public class Produce0001 {
    private  static  final  String NORMAL_EXCHANGE="normal_exchange";
    public static void main(String[] args) throws  Exception{
        Channel channel = untils.getChannel();
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        //該消息用作隊列的個數限制
        for(int i=0;i<10;i++)
        {
            String message="info"+i;
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("生産者發送消息"+message);
        }
    }
}      

消費者:

/**
 * 消息被拒的情況
 */
public class Consumer0001 {
        //普通交換機
        private  static  final  String NORMAL_EXCHANGE="normal_exchange";
        //死信交換機
        private  static  final  String DEAD_EXCHANGE="dead_exchange";

        public static void main(String[] args) throws  Exception{
            Channel channel = untils.getChannel();
            //聲明死信交換機,類型為direct
            channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
            channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
            //聲明死信隊列
            String deadQueue="dead_queue";
            channel.queueDeclare(deadQueue,false,false,false,null);
            //死信隊列綁定交換和routingKey值
            channel.queueBind(deadQueue,DEAD_EXCHANGE,"lisi");
            //正常隊列綁定死信隊列
            Map<String,Object> params=new HashMap<>();
            //正常隊列設定死信交換機,參數key是固定值
            params.put("x-dead-letter-exchange",DEAD_EXCHANGE);
            //正常隊列設定死信routing-key,參數key是固定值
            params.put("x-dead-letter-routing-key", "lisi");
            //正常隊列設定的最大限制長度
            params.put("x-max-length",6);
            System.out.println("等待接收消息....");
            String normalQueue="normal_queue";
            channel.queueDeclare(normalQueue,false,false,false,params);
            channel.queueBind(normalQueue,NORMAL_EXCHANGE,"zhangsan");
            DeliverCallback deliverCallback=(consumerTag, message) -> {
                String s = new String(message.getBody(), StandardCharsets.UTF_8);
                if (s.equals("info5"))
                {
                    System.out.println("info5拒接");
                    channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
                }
                else
                {
                    System.out.println("01接收到消息"+s);
                    channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
                }
            };
            channel.basicConsume(normalQueue,false,deliverCallback,consumerTag -> {});
        }
}      

結果:

消息被拒MQ