天天看点

RabbitMQ(四)------消息手动应答前言一、示例二、效果演示

文章预览

  • 前言
  • 一、示例
    • 1.1、消息生产者
    • 1.2、消息消费者01
    • 1.3、消息消费者02
    • 1.4、工具类
  • 二、效果演示

前言

部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费这的消息,因为它无法接收到。为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。

==手动应答的好处是可以批量应答并且减少网络拥堵 ==

一、示例

默认消息采用的是自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改为手动应答,消费者在上面代码的基础上增加下面画红色部分代码。

RabbitMQ(四)------消息手动应答前言一、示例二、效果演示

1.1、消息生产者

import com.rabbitmq.client.Channel;
import com.zzuli.rabbitmq.utils.RabbitMqUtils;

public class Task02 {
    private static final String TASK_QUEUE_NAME = "ack_queue";

    public static void main(String[] argv) throws Exception {
        try (Channel channel = RabbitMqUtils.getChannel()) {
            channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
            Scanner sc = new Scanner(System.in);
            System.out.println("请输入信息");
            while (sc.hasNext()) {
                String message = sc.nextLine();
                channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
                System.out.println("生产者发出消息" + message);
            }
        }
    }
}

           

1.2、消息消费者01

两个消息消费者除了消息应答方式改为了手动之外,它们两者之间的区别是等待接收消息处理时间的长短

public class Work03 {
 private static final String ACK_QUEUE_NAME="ack_queue";
 public static void main(String[] args) throws Exception {
 Channel channel = RabbitMqUtils.getChannel();
 System.out.println("C1 等待接收消息处理时间较短");
 //消息消费的时候如何处理消息
 DeliverCallback deliverCallback=(consumerTag,delivery)->{
 String message= new String(delivery.getBody());
 SleepUtils.sleep(1);
 System.out.println("接收到消息:"+message);
 /**
 * 1.消息标记 tag
 * 2.是否批量应答未应答消息
 */
 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
 };
 //采用手动应答
 boolean autoAck=false;
 channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag)->{
 System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
 });
 }
}
           

1.3、消息消费者02

public class Work04 {
 private static final String ACK_QUEUE_NAME="ack_queue";
 public static void main(String[] args) throws Exception {
 Channel channel = RabbitMqUtils.getChannel();
 System.out.println("C2 等待接收消息处理时间较长");
 //消息消费的时候如何处理消息
 DeliverCallback deliverCallback=(consumerTag,delivery)->{
 String message= new String(delivery.getBody());
 SleepUtils.sleep(30);
 System.out.println("接收到消息:"+message);
 /**
 * 1.消息标记 tag
 * 2.是否批量应答未应答消息
 */
 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
 };
 //采用手动应答
 boolean autoAck=false;
 channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,(consumerTag)->{
 System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
 });
 }
}
           

1.4、工具类

睡眠工具类

public class SleepUtils {
 public static void sleep(int second){
 try {
 Thread.sleep(1000*second);
 } catch (InterruptedException _ignored) {
 Thread.currentThread().interrupt();
 }
 }
}
           

二、效果演示

正常情况下消息发送方发送两个消息 C1 和 C2 分别接收到消息并进行处理

RabbitMQ(四)------消息手动应答前言一、示例二、效果演示

在发送者发送消息 dd,发出消息之后的把 C2 消费者停掉,按理说该 C2 来处理该消息,但是由于它处理时间较长,在还未处理完,也就是说 C2 还没有执行 ack 代码的时候,C2 被停掉了,此时会看到消息被 C1 接收到了,说明消息 dd 被重新入队,然后分配给能处理消息的 C1 处理了

RabbitMQ(四)------消息手动应答前言一、示例二、效果演示
RabbitMQ(四)------消息手动应答前言一、示例二、效果演示