天天看点

分布式异步队列学习总结10(消息确认)

rabbitmq服务在运行时,如何确保生产者发送的消息一定能够进入到消息队列中,这时候就需要引入消息确认机制。消息确认机制就是在rabbitmq接收到消息时对生产者进行一个反馈,告诉生产者,你发送给我的消息,我已经收到;同理,消费者也有相关的消息确认机制,确保rabbitmq队列中的消息能够被消费者消费。消息确认机制流程图如下所示:

分布式异步队列学习总结10(消息确认)

第一种情况,生产者接收来自rabbitmq的反馈。确保生产者发送的消息有被接收到。

分布式异步队列学习总结10(消息确认)

生产端消息确认的两种模式:

1.Confirm模式

  • 应答模式,生产者发送一条消息后,rabbitmq服务做了个响应。
  • 异步模式,在应答之前,可以继续发送消息,单条消息或批量消息。效率高。
  • channel.ConfirmSelect(); 开启确认模式
  • 消息发送后,提供一个回执方法WaitForConfirms(); 返回一个bool值。

2.Tx事务模式

  • 基于AMPQ协议,可以让信道设置成一个带事务的信道,分为三步:a.开启事务 b.提交事务 c.支持回滚。
  • 同步模式,在事务提交之前不能继续发送消息。效率低。
  • channel.TxSelect(); 开启事务
  • channel.TxCommit(); 提交事务
  • channel.TxRollback(); 回滚事务

Confirm模式代码实例: 

using RabbitMQ.Client;
using System;
using System.Text;

namespace AspNetCore.RabbitMQ.MessageProducer.MessageProducer
{
    /// <summary>
    /// 消息确认模式
    /// </summary>
    public class ProductionMessageConfirm
    {
        public static void Show()
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "localhost";//rabbitmq服务在本地运行
            factory.UserName = "guest";//用户名
            factory.Password = "guest";//密码

            //创建链接
            using (IConnection connection = factory.CreateConnection())
            {
                //创建信道
                using (IModel channel = connection.CreateModel())
                {

                    Console.ForegroundColor = ConsoleColor.Red;
                    Console.WriteLine("生产者已准备就绪......");

                    //声明两个队列
                    channel.QueueDeclare(queue: "MessageConfirmQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);

                    //声明交换机exchange
                    channel.ExchangeDeclare(exchange: "MessageConfirmExchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);

                    //绑定exchange和queue
                    channel.QueueBind(queue: "MessageConfirmQueue", exchange: "MessageConfirmExchange", routingKey: "MessageConfirmKey");


                    string message = "";
                    //发送消息
                    Console.WriteLine("在控制台输入信息,按回车发送");
                    while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase))
                    {
                        message = Console.ReadLine();
                        var body = Encoding.UTF8.GetBytes(message);
                        try
                        {
                            //开启确认模式
                            channel.ConfirmSelect();
                            //发送消息
                            channel.BasicPublish(exchange: "MessageConfirmExchange", routingKey: "MessageConfirmKey", basicProperties: null, body: body);

                            if (channel.WaitForConfirms())//如果一条消息或多条消息确认发送
                            {
                                Console.WriteLine($"{message} 发送到broke成功");
                            }
                            else
                            {
                                //记录日志,重试一下
                            }

                            //如果所有消息发送成功,就正常执行;如果有消息发送失败,就抛出异常;
                            channel.WaitForConfirmsOrDie();
                        }
                        catch (Exception)
                        {
                            Console.WriteLine($"{message} 发送到broke失败");
                            throw;
                        }
                    }

                    Console.Read();

                }
            }

        }
    }
}
           

main方法:

class Program
    {
        static void Main(string[] args)
        {
            ProductionMessageConfirm.Show();
        }
    }
           

执行结果:

分布式异步队列学习总结10(消息确认)

 从工具中也可以看到队列中新增了一条消息。

分布式异步队列学习总结10(消息确认)

Tx事务模式代码实例:

using RabbitMQ.Client;
using System;
using System.Text;

namespace AspNetCore.RabbitMQ.MessageProducer.MessageProducer
{
    /// <summary>
    /// 消息事务模式
    /// </summary>
    public class ProductionMessageTx
    {
        public static void Show()
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "localhost";//rabbitmq服务在本地运行
            factory.UserName = "guest";//用户名
            factory.Password = "guest";//密码

            //创建链接
            using (IConnection connection = factory.CreateConnection())
            {
                //创建信道
                using (IModel channel = connection.CreateModel())
                {

                    Console.ForegroundColor = ConsoleColor.Red;
                    Console.WriteLine("生产者已准备就绪......");

                    //声明两个队列
                    channel.QueueDeclare(queue: "MessageTxQueue01", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    channel.QueueDeclare(queue: "MessageTxQueue02", durable: true, exclusive: false, autoDelete: false, arguments: null);

                    //声明交换机exchange
                    channel.ExchangeDeclare(exchange: "MessageTxExchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);

                    //绑定exchange和queue
                    channel.QueueBind(queue: "MessageTxQueue01", exchange: "MessageTxExchange", routingKey: "MessageTxKey01");
                    channel.QueueBind(queue: "MessageTxQueue02", exchange: "MessageTxExchange", routingKey: "MessageTxKey02");


                    string message = "";
                    //发送消息
                    Console.WriteLine("在控制台输入信息,按回车发送");
                    while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase))
                    {
                        message = Console.ReadLine();
                        var body = Encoding.UTF8.GetBytes(message);
                        try
                        {
                            //开启事物
                            channel.TxSelect();
                            //发送消息
                            //同时发送给多个队列,要么都成功,要么都失败
                            channel.BasicPublish(exchange: "MessageTxExchange", routingKey: "MessageTxKey01", basicProperties: null, body: body);
                            channel.BasicPublish(exchange: "MessageTxExchange", routingKey: "MessageTxKey02", basicProperties: null, body: body);

                            //提交事物
                            channel.TxCommit();//提交后消息才会发送到队列中
                            Console.WriteLine($"{message} 发送到broke成功");
                        }
                        catch (Exception)
                        {
                            Console.WriteLine($"{message} 发送到broke失败");
                            channel.TxRollback();//回滚事物
                            throw;
                        }
                    }

                    Console.Read();

                }
            }

        }
    }
}
           

main方法:

class Program
    {
        static void Main(string[] args)
        {
            ProductionMessageTx.Show();
        }
    }
           

执行结果:

分布式异步队列学习总结10(消息确认)

从工具中也可以看到两个队列中都有了一条消息。

分布式异步队列学习总结10(消息确认)

总结:通过上面的两种方法,可以确保消息能够正常的发送到rabbitmq服务的队列中。 

第二种情况,rabbitmq接收来自消费者的反馈。确保消费者能够正常消费队列中的消息。

分布式异步队列学习总结10(消息确认)

消费端消息确认的两种模式:

1.自动确认

当消费者消费消息时,只要收到消息,就直接回执给rabbitmq,告诉rabbitmq服务,我已经接收到消息了。存在的问题是,当队列中有多条消息时,只要有一条消息被消费,rabbitmq会认为是全部成功了,会将所有消息从队列中移除。导致消息丢失。

此方法性能快,但是因为有弊端,不常用。

2.手动确认

消费者消费一条消息,回执给rabbitmq一条信息,rabbitmq就把这条消息从队列中移除。消费者消费一条,rabbitmq从队列中删除一条。

此性能没有自动确认快,但是安全、靠谱,经常用。

先声明一个生产者,往队列中写入消息。

using RabbitMQ.Client;
using System;
using System.Text;
using System.Threading;

namespace AspNetCore.RabbitMQ.MessageProducer.MessageProducer
{
    /// <summary>
    /// 生产者
    /// </summary>
    public class ProductionMessageACKConfirm
    {
        public static void Show()
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "localhost";//rabbitmq服务在本地运行
            factory.UserName = "guest";//用户名
            factory.Password = "guest";//密码

            //创建链接
            using (IConnection connection = factory.CreateConnection())
            {
                //创建信道
                using (IModel channel = connection.CreateModel())
                {

                    Console.ForegroundColor = ConsoleColor.Red;
                    Console.WriteLine("生产者已准备就绪......");

                    //声明两个队列
                    channel.QueueDeclare(queue: "MessageACKConfirmQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);

                    //声明交换机exchange
                    channel.ExchangeDeclare(exchange: "MessageACKConfirmExchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);

                    //绑定exchange和queue
                    channel.QueueBind(queue: "MessageACKConfirmQueue", exchange: "MessageACKConfirmExchange", routingKey: "MessageACKConfirmKey");


                    for (int i = 0; i < 1000; i++)
                    {
                        string message = $"消息{i}";
                        //发送消息
                        channel.BasicPublish(exchange: "MessageACKConfirmExchange", routingKey: "MessageACKConfirmKey", basicProperties: null, body: Encoding.UTF8.GetBytes(message));

                        Thread.Sleep(300);
                        Console.WriteLine($"{message} 已发送");
                    }

                    Console.Read();

                }
            }

        }
    }
}
           

autoAck: true代表自动确认。

main方法:

class Program
    {
        static void Main(string[] args)
        {
            ProductionMessageACKConfirm.Show();
        }
    }
           

执行结果:

分布式异步队列学习总结10(消息确认)
分布式异步队列学习总结10(消息确认)

自动确认代码实例: 

消费者

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace AspNetCore.RabbitMQ.MessageConsumer_01.MessageConsumer
{
    /// <summary>
    /// 消费者 自动确认
    /// </summary>
    public class ConsumerAutoACKConfirm
    {
        public static void Show()
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "localhost";//rabbitmq服务在本地运行
            factory.UserName = "guest";//用户名
            factory.Password = "guest";//密码

            //创建链接
            using (IConnection connection = factory.CreateConnection())
            {
                //创建信道
                using (IModel channel = connection.CreateModel())
                {
                    Console.ForegroundColor = ConsoleColor.Green;
                    try
                    {
                        //基于当前信道创建事件
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body.ToArray());
                            Console.WriteLine($"接收成功 {message}");
                        };


                        //处理消息
                        channel.BasicConsume(queue: "MessageACKConfirmQueue", autoAck: true, consumer: consumer);

                        Console.ReadLine();
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine(ex.Message);
                    }

                }
            }

        }
    }
}
           

main方法:

class Program
    {
        static void Main(string[] args)
        {
            ConsumerAutoACKConfirm.Show();
            Console.ReadLine();
        }
    }
           

执行结果:

分布式异步队列学习总结10(消息确认)
分布式异步队列学习总结10(消息确认)

手动确认代码实例: 

消费者

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace AspNetCore.RabbitMQ.MessageConsumer_01.MessageConsumer
{
    /// <summary>
    /// 消费者 手动确认
    /// </summary>
    public class ConsumerACKConfirm
    {
        public static void Show()
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "localhost";//rabbitmq服务在本地运行
            factory.UserName = "guest";//用户名
            factory.Password = "guest";//密码

            //创建链接
            using (IConnection connection = factory.CreateConnection())
            {
                //创建信道
                using (IModel channel = connection.CreateModel())
                {
                    Console.ForegroundColor = ConsoleColor.Green;
                    try
                    {
                        //基于当前信道创建事件
                        var consumer = new EventingBasicConsumer(channel);
                        int count = 0;
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body.ToArray());

                            if (count < 50)
                            {
                                //手动确认,消息正常消费,告诉broker 你可以删除当前这条消息
                                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);

                                Console.WriteLine($"接收成功 {message}");
                            }
                            else
                            {
                                //否定:告诉broker,这个消息没有正常被消费 requeue: true表示重新写到队列中,false 从队列中删除
                                channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);
                            }

                            count++;
                        };


                        //处理消息
                        channel.BasicConsume(queue: "MessageACKConfirmQueue", autoAck: false, consumer: consumer);

                        Console.ReadLine();
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine(ex.Message);
                    }

                }
            }

        }
    }
}
           

autoAck: false代表手动确认。

main方法:

class Program
    {
        static void Main(string[] args)
        {
            ConsumerACKConfirm.Show();
            Console.ReadLine();
        }
    }
           

执行结果:

分布式异步队列学习总结10(消息确认)
分布式异步队列学习总结10(消息确认)

可以看到只有50条消息被消费了。

分布式事务:

1.消息持久化

2.生产端消息确认

3..消费端消息确认