天天看點

分布式異步隊列學習總結10(消息确認)

rabbitmq服務在運作時,如何確定生産者發送的消息一定能夠進入到消息隊列中,這時候就需要引入消息确認機制。消息确認機制就是在rabbitmq接收到消息時對生産者進行一個回報,告訴生産者,你發送給我的消息,我已經收到;同理,消費者也有相關的消息确認機制,確定rabbitmq隊列中的消息能夠被消費者消費。消息确認機制流程圖如下所示:

分布式異步隊列學習總結10(消息确認)

第一種情況,生産者接收來自rabbitmq的回報。確定生産者發送的消息有被接收到。

分布式異步隊列學習總結10(消息确認)

生産端消息确認的兩種模式:

1.Confirm模式

  • 應答模式,生産者發送一條消息後,rabbitmq服務做了個響應。
  • 異步模式,在應答之前,可以繼續發送消息,單條消息或批量消息。效率高。
  • channel.ConfirmSelect(); 開啟确認模式
  • 消息發送後,提供一個回執方法WaitForConfirms(); 傳回一個bool值。

2.Tx事務模式

  • 基于AMPQ協定,可以讓信道設定成一個帶事務的信道,分為三步:a.開啟事務 b.送出事務 c.支援復原。
  • 同步模式,在事務送出之前不能繼續發送消息。效率低。
  • channel.TxSelect(); 開啟事務
  • channel.TxCommit(); 送出事務
  • channel.TxRollback(); 復原事務

Confirm模式代碼執行個體: 

using RabbitMQ.Client;
using System;
using System.Text;

namespace AspNetCore.RabbitMQ.MessageProducer.MessageProducer
{
    /// <summary>
    /// 消息确認模式
    /// </summary>
    public class ProductionMessageConfirm
    {
        public static void Show()
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "localhost";//rabbitmq服務在本地運作
            factory.UserName = "guest";//使用者名
            factory.Password = "guest";//密碼

            //建立連結
            using (IConnection connection = factory.CreateConnection())
            {
                //建立信道
                using (IModel channel = connection.CreateModel())
                {

                    Console.ForegroundColor = ConsoleColor.Red;
                    Console.WriteLine("生産者已準備就緒......");

                    //聲明兩個隊列
                    channel.QueueDeclare(queue: "MessageConfirmQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);

                    //聲明交換機exchange
                    channel.ExchangeDeclare(exchange: "MessageConfirmExchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);

                    //綁定exchange和queue
                    channel.QueueBind(queue: "MessageConfirmQueue", exchange: "MessageConfirmExchange", routingKey: "MessageConfirmKey");


                    string message = "";
                    //發送消息
                    Console.WriteLine("在控制台輸入資訊,按回車發送");
                    while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase))
                    {
                        message = Console.ReadLine();
                        var body = Encoding.UTF8.GetBytes(message);
                        try
                        {
                            //開啟确認模式
                            channel.ConfirmSelect();
                            //發送消息
                            channel.BasicPublish(exchange: "MessageConfirmExchange", routingKey: "MessageConfirmKey", basicProperties: null, body: body);

                            if (channel.WaitForConfirms())//如果一條消息或多條消息确認發送
                            {
                                Console.WriteLine($"{message} 發送到broke成功");
                            }
                            else
                            {
                                //記錄日志,重試一下
                            }

                            //如果所有消息發送成功,就正常執行;如果有消息發送失敗,就抛出異常;
                            channel.WaitForConfirmsOrDie();
                        }
                        catch (Exception)
                        {
                            Console.WriteLine($"{message} 發送到broke失敗");
                            throw;
                        }
                    }

                    Console.Read();

                }
            }

        }
    }
}
           

main方法:

class Program
    {
        static void Main(string[] args)
        {
            ProductionMessageConfirm.Show();
        }
    }
           

執行結果:

分布式異步隊列學習總結10(消息确認)

 從工具中也可以看到隊列中新增了一條消息。

分布式異步隊列學習總結10(消息确認)

Tx事務模式代碼執行個體:

using RabbitMQ.Client;
using System;
using System.Text;

namespace AspNetCore.RabbitMQ.MessageProducer.MessageProducer
{
    /// <summary>
    /// 消息事務模式
    /// </summary>
    public class ProductionMessageTx
    {
        public static void Show()
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "localhost";//rabbitmq服務在本地運作
            factory.UserName = "guest";//使用者名
            factory.Password = "guest";//密碼

            //建立連結
            using (IConnection connection = factory.CreateConnection())
            {
                //建立信道
                using (IModel channel = connection.CreateModel())
                {

                    Console.ForegroundColor = ConsoleColor.Red;
                    Console.WriteLine("生産者已準備就緒......");

                    //聲明兩個隊列
                    channel.QueueDeclare(queue: "MessageTxQueue01", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    channel.QueueDeclare(queue: "MessageTxQueue02", durable: true, exclusive: false, autoDelete: false, arguments: null);

                    //聲明交換機exchange
                    channel.ExchangeDeclare(exchange: "MessageTxExchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);

                    //綁定exchange和queue
                    channel.QueueBind(queue: "MessageTxQueue01", exchange: "MessageTxExchange", routingKey: "MessageTxKey01");
                    channel.QueueBind(queue: "MessageTxQueue02", exchange: "MessageTxExchange", routingKey: "MessageTxKey02");


                    string message = "";
                    //發送消息
                    Console.WriteLine("在控制台輸入資訊,按回車發送");
                    while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase))
                    {
                        message = Console.ReadLine();
                        var body = Encoding.UTF8.GetBytes(message);
                        try
                        {
                            //開啟事物
                            channel.TxSelect();
                            //發送消息
                            //同時發送給多個隊列,要麼都成功,要麼都失敗
                            channel.BasicPublish(exchange: "MessageTxExchange", routingKey: "MessageTxKey01", basicProperties: null, body: body);
                            channel.BasicPublish(exchange: "MessageTxExchange", routingKey: "MessageTxKey02", basicProperties: null, body: body);

                            //送出事物
                            channel.TxCommit();//送出後消息才會發送到隊列中
                            Console.WriteLine($"{message} 發送到broke成功");
                        }
                        catch (Exception)
                        {
                            Console.WriteLine($"{message} 發送到broke失敗");
                            channel.TxRollback();//復原事物
                            throw;
                        }
                    }

                    Console.Read();

                }
            }

        }
    }
}
           

main方法:

class Program
    {
        static void Main(string[] args)
        {
            ProductionMessageTx.Show();
        }
    }
           

執行結果:

分布式異步隊列學習總結10(消息确認)

從工具中也可以看到兩個隊列中都有了一條消息。

分布式異步隊列學習總結10(消息确認)

總結:通過上面的兩種方法,可以確定消息能夠正常的發送到rabbitmq服務的隊列中。 

第二種情況,rabbitmq接收來自消費者的回報。確定消費者能夠正常消費隊列中的消息。

分布式異步隊列學習總結10(消息确認)

消費端消息确認的兩種模式:

1.自動确認

當消費者消費消息時,隻要收到消息,就直接回執給rabbitmq,告訴rabbitmq服務,我已經接收到消息了。存在的問題是,當隊列中有多條消息時,隻要有一條消息被消費,rabbitmq會認為是全部成功了,會将所有消息從隊列中移除。導緻消息丢失。

此方法性能快,但是因為有弊端,不常用。

2.手動确認

消費者消費一條消息,回執給rabbitmq一條資訊,rabbitmq就把這條消息從隊列中移除。消費者消費一條,rabbitmq從隊列中删除一條。

此性能沒有自動确認快,但是安全、靠譜,經常用。

先聲明一個生産者,往隊列中寫入消息。

using RabbitMQ.Client;
using System;
using System.Text;
using System.Threading;

namespace AspNetCore.RabbitMQ.MessageProducer.MessageProducer
{
    /// <summary>
    /// 生産者
    /// </summary>
    public class ProductionMessageACKConfirm
    {
        public static void Show()
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "localhost";//rabbitmq服務在本地運作
            factory.UserName = "guest";//使用者名
            factory.Password = "guest";//密碼

            //建立連結
            using (IConnection connection = factory.CreateConnection())
            {
                //建立信道
                using (IModel channel = connection.CreateModel())
                {

                    Console.ForegroundColor = ConsoleColor.Red;
                    Console.WriteLine("生産者已準備就緒......");

                    //聲明兩個隊列
                    channel.QueueDeclare(queue: "MessageACKConfirmQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);

                    //聲明交換機exchange
                    channel.ExchangeDeclare(exchange: "MessageACKConfirmExchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);

                    //綁定exchange和queue
                    channel.QueueBind(queue: "MessageACKConfirmQueue", exchange: "MessageACKConfirmExchange", routingKey: "MessageACKConfirmKey");


                    for (int i = 0; i < 1000; i++)
                    {
                        string message = $"消息{i}";
                        //發送消息
                        channel.BasicPublish(exchange: "MessageACKConfirmExchange", routingKey: "MessageACKConfirmKey", basicProperties: null, body: Encoding.UTF8.GetBytes(message));

                        Thread.Sleep(300);
                        Console.WriteLine($"{message} 已發送");
                    }

                    Console.Read();

                }
            }

        }
    }
}
           

autoAck: true代表自動确認。

main方法:

class Program
    {
        static void Main(string[] args)
        {
            ProductionMessageACKConfirm.Show();
        }
    }
           

執行結果:

分布式異步隊列學習總結10(消息确認)
分布式異步隊列學習總結10(消息确認)

自動确認代碼執行個體: 

消費者

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace AspNetCore.RabbitMQ.MessageConsumer_01.MessageConsumer
{
    /// <summary>
    /// 消費者 自動确認
    /// </summary>
    public class ConsumerAutoACKConfirm
    {
        public static void Show()
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "localhost";//rabbitmq服務在本地運作
            factory.UserName = "guest";//使用者名
            factory.Password = "guest";//密碼

            //建立連結
            using (IConnection connection = factory.CreateConnection())
            {
                //建立信道
                using (IModel channel = connection.CreateModel())
                {
                    Console.ForegroundColor = ConsoleColor.Green;
                    try
                    {
                        //基于目前信道建立事件
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body.ToArray());
                            Console.WriteLine($"接收成功 {message}");
                        };


                        //處理消息
                        channel.BasicConsume(queue: "MessageACKConfirmQueue", autoAck: true, consumer: consumer);

                        Console.ReadLine();
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine(ex.Message);
                    }

                }
            }

        }
    }
}
           

main方法:

class Program
    {
        static void Main(string[] args)
        {
            ConsumerAutoACKConfirm.Show();
            Console.ReadLine();
        }
    }
           

執行結果:

分布式異步隊列學習總結10(消息确認)
分布式異步隊列學習總結10(消息确認)

手動确認代碼執行個體: 

消費者

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;

namespace AspNetCore.RabbitMQ.MessageConsumer_01.MessageConsumer
{
    /// <summary>
    /// 消費者 手動确認
    /// </summary>
    public class ConsumerACKConfirm
    {
        public static void Show()
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "localhost";//rabbitmq服務在本地運作
            factory.UserName = "guest";//使用者名
            factory.Password = "guest";//密碼

            //建立連結
            using (IConnection connection = factory.CreateConnection())
            {
                //建立信道
                using (IModel channel = connection.CreateModel())
                {
                    Console.ForegroundColor = ConsoleColor.Green;
                    try
                    {
                        //基于目前信道建立事件
                        var consumer = new EventingBasicConsumer(channel);
                        int count = 0;
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body.ToArray());

                            if (count < 50)
                            {
                                //手動确認,消息正常消費,告訴broker 你可以删除目前這條消息
                                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);

                                Console.WriteLine($"接收成功 {message}");
                            }
                            else
                            {
                                //否定:告訴broker,這個消息沒有正常被消費 requeue: true表示重新寫到隊列中,false 從隊列中删除
                                channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);
                            }

                            count++;
                        };


                        //處理消息
                        channel.BasicConsume(queue: "MessageACKConfirmQueue", autoAck: false, consumer: consumer);

                        Console.ReadLine();
                    }
                    catch (Exception ex)
                    {
                        Console.WriteLine(ex.Message);
                    }

                }
            }

        }
    }
}
           

autoAck: false代表手動确認。

main方法:

class Program
    {
        static void Main(string[] args)
        {
            ConsumerACKConfirm.Show();
            Console.ReadLine();
        }
    }
           

執行結果:

分布式異步隊列學習總結10(消息确認)
分布式異步隊列學習總結10(消息确認)

可以看到隻有50條消息被消費了。

分布式事務:

1.消息持久化

2.生産端消息确認

3..消費端消息确認