天天看点

035-云E办_RabbitMQ_confirm确认模式前言: RabbitMQ消息的事务机制(了解)一、confirm确认模式

035-云E办_RabbitMQ_RabbitMQ消息的事务机制

  • 前言: RabbitMQ消息的事务机制(了解)
    • 总结:事务会降低性能
  • 一、confirm确认模式
    • 1、实现生产者confirm 机制有三种方式:
    • 2、异步confirm
    • 3、代码:

前言: RabbitMQ消息的事务机制(了解)

在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达broker代理服务器呢?如果不进行特殊配置的话,默认情况下发布操作是不会返回任何信息给生产者的,也就是默认情况下我们的生产者是不知道消息有没有正确到达broker的,如果在消息到达broker之前已经丢失的话,持久化操作也解决不了这个问题,因为消息根本就没到达代理服务器,你怎么进行持久化,那么这个问题该怎么解决呢?

RabbitMQ为我们提供了两种方式:

通过AMQP事务机制实现,这也是AMQP协议层面提供的解决方案;

通过将channel设置成confirm模式来实现;

RabbitMQ中与事务机制有关的方法有三个: txSelect() , txCommit() 以及 txRollback(),txSelect() 用于将当前channel设置成transaction模式,txCommit() 用于提交事务,txRollback() 用于回滚事务,在通过txSelect() 开启事务之后,我们便可以发布消息给broker代理服务器了,如果 txCommit() 提交成功了,则消息一定到达了broker了,如果在 txCommit() 执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过 txRollback() 回滚事务。

总结:事务会降低性能

事务确实能够解决producer与broker之间消息确认的问题,只有消息成功被broker接受,事务提交才能成功,否则我们便可以在捕获异常进行事务回滚操作同时进行消息重发,但是使用事务机制的话会降低RabbitMQ的性能,那么有没有更好的方法既能保障producer知道消息已经正确送到,又能基本上不带来性能上的损失呢?从AMQP协议的层面看是没有更好的方法,但是RabbitMQ提供了一个更好的方案,即将channel信道设置成confirm模式

一、confirm确认模式

通过AMQP协议层面为我们提供了事务机制解决了这个问题,但是采用事务机制实现会降低RabbitMQ的消息吞吐量,此时处理AMQP协议层面能够实现消息事物控制外,我们还有第二种方式即:Confirm模式。

生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理。

confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack(未确定)消息。

在channel 被设置成 confirm 模式之后,所有被 publish 的后续消息都将被 confirm(即 ack) 或者被nack一次。但是没有对消息被 confirm 的快慢做任何保证,并且同一条消息不会既被 confirm又被nack 。

注意:两种事物控制形式不能同时开启!

1、实现生产者confirm 机制有三种方式:

  • 普通confirm模式-串行-同步:每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm确认。实际上是一种串行confirm了。
  • 批量confirm模式-串行-同步:每发送一批消息后,调用waitForConfirmsOrDie()方法,等待服务器端confirm。
  • 异步confirm模式:提供一个回调方法,服务端confirm了一条或者多条消息后Client端会回调这个

    方法。

2、异步confirm

异步:你发消息,我等待确认,然而,在我等待时,我还可以发送消息。确认了消息返回,返回的时候生产者有回掉的方法,能收到确认消息。

异步confirm模式的编程实现最复杂,Channel对象提供的ConfirmListener() 【该方法用来监听返回的消息】回调方法只包含deliveryTag 【当前Chanel发出的消息序号】,我们需要自己为每一个Channel维护一个 unconfirm的消息序号集合,每publish一条数据,集合中元素加1,每回调一次handleAck 方法【确认的一个处理】, unconfirm 集合删掉相应的一条 (multiple=false)或多条 (multiple=true) 记录。从程序运行效率上看,这个 unconfirm集合最好采用有序集合SortedSet存储结构(有序集合)。实际上waitForConfirms() 方法也是通过SortedSet维护消息序号的。

3、代码:

package com.xxxx.async.send;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;
/**
 * 信道确认模式-异步-生产者
 * 异步模式的优点就是执行效率高,不需要等待消息执行完,只需要监听消息即可。
 */
public class Send {
    // 队列名称
    public static final String QUEUE_NAME = "confirm_async";
    public static void main(String[] args) {
        // 定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.75.100");
        factory.setPort(5672);
        factory.setUsername("yeb");
        factory.setPassword("yeb");
        factory.setVirtualHost("/yeb");

        Connection connection = null;
        Channel channel = null;
        try {
            // 维护信息发送回执deliveryTag
            final SortedSet<Long>
                    confirmSet=Collections.synchronizedSortedSet(new TreeSet<Long>());
            // 创建连接
            connection = factory.newConnection();
            // 获取通道
            channel = connection.createChannel();
            // 开启confirm确认模式
            channel.confirmSelect();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            // 添加channel 监听
            channel.addConfirmListener(new ConfirmListener() {
                // 已确认
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    // multiple=true已确认多条, false已确认单条
                    if (multiple) {
                        System.out.println("handleAck--success-->multiple" + deliveryTag);
                        // 清除前 deliveryTag 项标识id 。 删除多条
                        confirmSet.headSet(deliveryTag + 1L).clear();
                    } else {
                        System.out.println("handleAck--success-->single" + deliveryTag);
                        //如果是单条,就删除单条
                        confirmSet.remove(deliveryTag);
                    }
                }

                // 未确认
                @Override
                public void handleNack(long deliveryTag, boolean multiple)
                        throws IOException {
                    // multiple=true未确认多条 false未确认单条
                    if (multiple) {
                        System.out.println("handleNack--failed-->multiple-->" +
                                deliveryTag);
                        // 清除前 deliveryTag 项标识id
                        confirmSet.headSet(deliveryTag + 1L).clear();
                    } else {
                        System.out.println("handleNack--failed-->single" +
                                deliveryTag);
                        confirmSet.remove(deliveryTag);
                    }
                }
            });

            // 循环发送消息演示消息确认
            while (true) {
                // 创建消息
                String message = "Hello World!";
                // 获取unconfirm的消息序号deliveryTag. 对应发送消息的序列号
                Long seqNo = channel.getNextPublishSeqNo();
                channel.basicPublish("", QUEUE_NAME, null,
                        message.getBytes("utf-8"));
                // 将消息序号deliveryTag添加至SortedSet
                confirmSet.add(seqNo);
            }

        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        } finally {
            try {
                // 关闭通道
                if (null != channel && channel.isOpen())
                { channel.close();}
                // 关闭连接
                if (null != connection && connection.isOpen())
                { connection.close();}
            } catch (TimeoutException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
           
package com.xxxx.async.recv;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 确认模式-异步-消息接收者。消费者
 */
public class Recv {
    // 队列名称
    private final static String QUEUE_NAME = "confirm_async";
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.75.100");
        factory.setPort(5672);
        factory.setUsername("yeb");
        factory.setPassword("yeb");
        factory.setVirtualHost("/yeb");
        try {
            // 通过工厂创建连接
            Connection connection = factory.newConnection();
            // 获取通道,创建信道
            Channel channel = connection.createChannel();
            // 指定队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit pressCTRL+C");

            // 获取消息
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };
            // 监听队列消费()
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag
                    -> {
            });
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}