天天看点

RabbitMQ实现可靠性消息投递---保证消费者接收到消息一、保证可靠性消息投递二、解决方案

一、保证可靠性消息投递

首先需要明确,效率与可靠性是无法兼得的,如果要保证每一个环节都成功,势必会对消息的收发效率造成影响。 如果是一些业务实时一致性要求不是特别高的场合,可以牺牲一些可靠性来换取效率。

在消息投递过程中,一共有四处需要保证可靠性。如图。

RabbitMQ实现可靠性消息投递---保证消费者接收到消息一、保证可靠性消息投递二、解决方案
  1. 代表消息从生产者发送到Exchange;
  2. 代表消息从Exchange路由到Queue;
  3. 代表消息在Queue中存储;
  4. 代表消费者订阅Queue并消费消息。

二、解决方案

2.1 确保消息发送到RabbitMQ服务器

可能因为网络或者Broker的问题导致1失败,而生产者是无法知道消息是否正确发送到Broker的。 有两种解决方案,第一种是Transaction(事务)模式,第二种Confirm(确认)模式。

  1. Transaction(事务)模式:

    在通过

    channel.txSelect

    方法开启事务之后,我们便可以发布消息给RabbitMQ了,如果事务提交成功,则消息一定到达了RabbitMQ中,如果在事务提交执行之前由于RabbitMQ异常崩溃或者其他原因抛出异常,这个时候我们便 可以将其捕获,进而通过执行

    channel.txRollback

    方法来实现事务回滚。使用事务机制的话会很大程度消耗RabbitMQ的性能,一般不建议使用。
  2. Confirm(确认)模式:

    生产者通过调用channel.confirmSelect方法(即Confirm.Select命令)将信道设置为confirm模式。一旦消息被投 递到所有匹配的队列之后,RabbitMQ就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这就使得生产者知晓消息已经正确到达了目的地了。

2.2 确保消息从交换机路由到正确的队列

可能因为路由关键字错误,或者队列不存在,或者队列名称错误导致2失败。 这种情况一般不存在,队列和交换机都是进行绑定过了的,即使出现错误,在测试环境也会修正掉。

  1. 使用mandatory参数和ReturnListener,可以实现消息无法路由的时候返回给生产者。
  2. 另一种方式就是使用备份交换机(alternate-exchange),无法路由的消息会发送到这个交换机上。如:
Map<String,Object> arguments = new HashMap<String,Object>();
  // 指定交换机的备份交换机
  arguments.put("alternate-exchange","ALTERNATE_EXCHANGE"); 
  channel.exchangeDeclare("TEST_EXCHANGE","topic", false, false, false, arguments);
           

2.3、确保消息在队列正确地存储

可能因为系统宕机、重启、关闭等等情况导致存储在队列的消息丢失,即3出现问题。 解决方案:

  1. 队列持久化:声明队列的时候可以使用参数

    durable

    ,设置为true。就会持久化。
  2. 交换机持久化:声明交换机的时候可以使用参数

    durable

    ,设置为true。就会持久化。
  3. 消息持久化:可以构建一个

    BasicProperties

    deliveryMode

    参数,设置为2,即表示消息会持久化。
  4. 集群,镜像队列

    如:

// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments 队列持久化设置
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

  // String exchange, boolean durable 交换机持久化设置
channel.exchangeDeclare("MY_EXCHANGE","true");

// deliveryMode 2代表持久化,其他代表瞬态
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .deliveryMode(2) 
.build();
channel.basicPublish("", QUEUE_NAME, properties, msg.getBytes());
           

2.4、确保消息从队列正确地投递到消费者

如果消费者收到消息后未来得及处理即发生异常,或者处理过程中发生异常,会导致4失败。

为了保证消息从队列可靠地达到消费者,RabbitMQ提供了消息确认机制(message acknowledgement)。消费者在订阅队列时,可以指定autoAck参数,当autoAck等于false时,RabbitMQ会等待消费者显式地回复确认信号后才从队列中移去消息

如果消息消费失败,也可以调用Basic.Reject或者Basic.Nack来拒绝当前消息而不是确认。如果requeue参数设置为 true,可以把这条消息重新存入队列,以便发给下一个消费者(当然,只有一个消费者的时候,这种方式可能会出 现无限循环重复消费的情况,可以投递到新的队列中,或者只打印异常日志)。