rabbitmq服務在運作時,如何確定生産者發送的消息一定能夠進入到消息隊列中,這時候就需要引入消息确認機制。消息确認機制就是在rabbitmq接收到消息時對生産者進行一個回報,告訴生産者,你發送給我的消息,我已經收到;同理,消費者也有相關的消息确認機制,確定rabbitmq隊列中的消息能夠被消費者消費。消息确認機制流程圖如下所示:
第一種情況,生産者接收來自rabbitmq的回報。確定生産者發送的消息有被接收到。
生産端消息确認的兩種模式:
1.Confirm模式
- 應答模式,生産者發送一條消息後,rabbitmq服務做了個響應。
- 異步模式,在應答之前,可以繼續發送消息,單條消息或批量消息。效率高。
- channel.ConfirmSelect(); 開啟确認模式
- 消息發送後,提供一個回執方法WaitForConfirms(); 傳回一個bool值。
2.Tx事務模式
- 基于AMPQ協定,可以讓信道設定成一個帶事務的信道,分為三步:a.開啟事務 b.送出事務 c.支援復原。
- 同步模式,在事務送出之前不能繼續發送消息。效率低。
- channel.TxSelect(); 開啟事務
- channel.TxCommit(); 送出事務
- channel.TxRollback(); 復原事務
Confirm模式代碼執行個體:
using RabbitMQ.Client;
using System;
using System.Text;
namespace AspNetCore.RabbitMQ.MessageProducer.MessageProducer
{
/// <summary>
/// 消息确認模式
/// </summary>
public class ProductionMessageConfirm
{
public static void Show()
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "localhost";//rabbitmq服務在本地運作
factory.UserName = "guest";//使用者名
factory.Password = "guest";//密碼
//建立連結
using (IConnection connection = factory.CreateConnection())
{
//建立信道
using (IModel channel = connection.CreateModel())
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine("生産者已準備就緒......");
//聲明兩個隊列
channel.QueueDeclare(queue: "MessageConfirmQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
//聲明交換機exchange
channel.ExchangeDeclare(exchange: "MessageConfirmExchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
//綁定exchange和queue
channel.QueueBind(queue: "MessageConfirmQueue", exchange: "MessageConfirmExchange", routingKey: "MessageConfirmKey");
string message = "";
//發送消息
Console.WriteLine("在控制台輸入資訊,按回車發送");
while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase))
{
message = Console.ReadLine();
var body = Encoding.UTF8.GetBytes(message);
try
{
//開啟确認模式
channel.ConfirmSelect();
//發送消息
channel.BasicPublish(exchange: "MessageConfirmExchange", routingKey: "MessageConfirmKey", basicProperties: null, body: body);
if (channel.WaitForConfirms())//如果一條消息或多條消息确認發送
{
Console.WriteLine($"{message} 發送到broke成功");
}
else
{
//記錄日志,重試一下
}
//如果所有消息發送成功,就正常執行;如果有消息發送失敗,就抛出異常;
channel.WaitForConfirmsOrDie();
}
catch (Exception)
{
Console.WriteLine($"{message} 發送到broke失敗");
throw;
}
}
Console.Read();
}
}
}
}
}
main方法:
class Program
{
static void Main(string[] args)
{
ProductionMessageConfirm.Show();
}
}
執行結果:
從工具中也可以看到隊列中新增了一條消息。
Tx事務模式代碼執行個體:
using RabbitMQ.Client;
using System;
using System.Text;
namespace AspNetCore.RabbitMQ.MessageProducer.MessageProducer
{
/// <summary>
/// 消息事務模式
/// </summary>
public class ProductionMessageTx
{
public static void Show()
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "localhost";//rabbitmq服務在本地運作
factory.UserName = "guest";//使用者名
factory.Password = "guest";//密碼
//建立連結
using (IConnection connection = factory.CreateConnection())
{
//建立信道
using (IModel channel = connection.CreateModel())
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine("生産者已準備就緒......");
//聲明兩個隊列
channel.QueueDeclare(queue: "MessageTxQueue01", durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.QueueDeclare(queue: "MessageTxQueue02", durable: true, exclusive: false, autoDelete: false, arguments: null);
//聲明交換機exchange
channel.ExchangeDeclare(exchange: "MessageTxExchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
//綁定exchange和queue
channel.QueueBind(queue: "MessageTxQueue01", exchange: "MessageTxExchange", routingKey: "MessageTxKey01");
channel.QueueBind(queue: "MessageTxQueue02", exchange: "MessageTxExchange", routingKey: "MessageTxKey02");
string message = "";
//發送消息
Console.WriteLine("在控制台輸入資訊,按回車發送");
while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase))
{
message = Console.ReadLine();
var body = Encoding.UTF8.GetBytes(message);
try
{
//開啟事物
channel.TxSelect();
//發送消息
//同時發送給多個隊列,要麼都成功,要麼都失敗
channel.BasicPublish(exchange: "MessageTxExchange", routingKey: "MessageTxKey01", basicProperties: null, body: body);
channel.BasicPublish(exchange: "MessageTxExchange", routingKey: "MessageTxKey02", basicProperties: null, body: body);
//送出事物
channel.TxCommit();//送出後消息才會發送到隊列中
Console.WriteLine($"{message} 發送到broke成功");
}
catch (Exception)
{
Console.WriteLine($"{message} 發送到broke失敗");
channel.TxRollback();//復原事物
throw;
}
}
Console.Read();
}
}
}
}
}
main方法:
class Program
{
static void Main(string[] args)
{
ProductionMessageTx.Show();
}
}
執行結果:
從工具中也可以看到兩個隊列中都有了一條消息。
總結:通過上面的兩種方法,可以確定消息能夠正常的發送到rabbitmq服務的隊列中。
第二種情況,rabbitmq接收來自消費者的回報。確定消費者能夠正常消費隊列中的消息。
消費端消息确認的兩種模式:
1.自動确認
當消費者消費消息時,隻要收到消息,就直接回執給rabbitmq,告訴rabbitmq服務,我已經接收到消息了。存在的問題是,當隊列中有多條消息時,隻要有一條消息被消費,rabbitmq會認為是全部成功了,會将所有消息從隊列中移除。導緻消息丢失。
此方法性能快,但是因為有弊端,不常用。
2.手動确認
消費者消費一條消息,回執給rabbitmq一條資訊,rabbitmq就把這條消息從隊列中移除。消費者消費一條,rabbitmq從隊列中删除一條。
此性能沒有自動确認快,但是安全、靠譜,經常用。
先聲明一個生産者,往隊列中寫入消息。
using RabbitMQ.Client;
using System;
using System.Text;
using System.Threading;
namespace AspNetCore.RabbitMQ.MessageProducer.MessageProducer
{
/// <summary>
/// 生産者
/// </summary>
public class ProductionMessageACKConfirm
{
public static void Show()
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "localhost";//rabbitmq服務在本地運作
factory.UserName = "guest";//使用者名
factory.Password = "guest";//密碼
//建立連結
using (IConnection connection = factory.CreateConnection())
{
//建立信道
using (IModel channel = connection.CreateModel())
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine("生産者已準備就緒......");
//聲明兩個隊列
channel.QueueDeclare(queue: "MessageACKConfirmQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
//聲明交換機exchange
channel.ExchangeDeclare(exchange: "MessageACKConfirmExchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
//綁定exchange和queue
channel.QueueBind(queue: "MessageACKConfirmQueue", exchange: "MessageACKConfirmExchange", routingKey: "MessageACKConfirmKey");
for (int i = 0; i < 1000; i++)
{
string message = $"消息{i}";
//發送消息
channel.BasicPublish(exchange: "MessageACKConfirmExchange", routingKey: "MessageACKConfirmKey", basicProperties: null, body: Encoding.UTF8.GetBytes(message));
Thread.Sleep(300);
Console.WriteLine($"{message} 已發送");
}
Console.Read();
}
}
}
}
}
autoAck: true代表自動确認。
main方法:
class Program
{
static void Main(string[] args)
{
ProductionMessageACKConfirm.Show();
}
}
執行結果:
自動确認代碼執行個體:
消費者
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace AspNetCore.RabbitMQ.MessageConsumer_01.MessageConsumer
{
/// <summary>
/// 消費者 自動确認
/// </summary>
public class ConsumerAutoACKConfirm
{
public static void Show()
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "localhost";//rabbitmq服務在本地運作
factory.UserName = "guest";//使用者名
factory.Password = "guest";//密碼
//建立連結
using (IConnection connection = factory.CreateConnection())
{
//建立信道
using (IModel channel = connection.CreateModel())
{
Console.ForegroundColor = ConsoleColor.Green;
try
{
//基于目前信道建立事件
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine($"接收成功 {message}");
};
//處理消息
channel.BasicConsume(queue: "MessageACKConfirmQueue", autoAck: true, consumer: consumer);
Console.ReadLine();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}
}
}
}
main方法:
class Program
{
static void Main(string[] args)
{
ConsumerAutoACKConfirm.Show();
Console.ReadLine();
}
}
執行結果:
手動确認代碼執行個體:
消費者
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace AspNetCore.RabbitMQ.MessageConsumer_01.MessageConsumer
{
/// <summary>
/// 消費者 手動确認
/// </summary>
public class ConsumerACKConfirm
{
public static void Show()
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "localhost";//rabbitmq服務在本地運作
factory.UserName = "guest";//使用者名
factory.Password = "guest";//密碼
//建立連結
using (IConnection connection = factory.CreateConnection())
{
//建立信道
using (IModel channel = connection.CreateModel())
{
Console.ForegroundColor = ConsoleColor.Green;
try
{
//基于目前信道建立事件
var consumer = new EventingBasicConsumer(channel);
int count = 0;
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
if (count < 50)
{
//手動确認,消息正常消費,告訴broker 你可以删除目前這條消息
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
Console.WriteLine($"接收成功 {message}");
}
else
{
//否定:告訴broker,這個消息沒有正常被消費 requeue: true表示重新寫到隊列中,false 從隊列中删除
channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);
}
count++;
};
//處理消息
channel.BasicConsume(queue: "MessageACKConfirmQueue", autoAck: false, consumer: consumer);
Console.ReadLine();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}
}
}
}
autoAck: false代表手動确認。
main方法:
class Program
{
static void Main(string[] args)
{
ConsumerACKConfirm.Show();
Console.ReadLine();
}
}
執行結果:
可以看到隻有50條消息被消費了。
分布式事務:
1.消息持久化
2.生産端消息确認
3..消費端消息确認