rabbitmq服务在运行时,如何确保生产者发送的消息一定能够进入到消息队列中,这时候就需要引入消息确认机制。消息确认机制就是在rabbitmq接收到消息时对生产者进行一个反馈,告诉生产者,你发送给我的消息,我已经收到;同理,消费者也有相关的消息确认机制,确保rabbitmq队列中的消息能够被消费者消费。消息确认机制流程图如下所示:
第一种情况,生产者接收来自rabbitmq的反馈。确保生产者发送的消息有被接收到。
生产端消息确认的两种模式:
1.Confirm模式
- 应答模式,生产者发送一条消息后,rabbitmq服务做了个响应。
- 异步模式,在应答之前,可以继续发送消息,单条消息或批量消息。效率高。
- channel.ConfirmSelect(); 开启确认模式
- 消息发送后,提供一个回执方法WaitForConfirms(); 返回一个bool值。
2.Tx事务模式
- 基于AMPQ协议,可以让信道设置成一个带事务的信道,分为三步:a.开启事务 b.提交事务 c.支持回滚。
- 同步模式,在事务提交之前不能继续发送消息。效率低。
- channel.TxSelect(); 开启事务
- channel.TxCommit(); 提交事务
- channel.TxRollback(); 回滚事务
Confirm模式代码实例:
using RabbitMQ.Client;
using System;
using System.Text;
namespace AspNetCore.RabbitMQ.MessageProducer.MessageProducer
{
/// <summary>
/// 消息确认模式
/// </summary>
public class ProductionMessageConfirm
{
public static void Show()
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "localhost";//rabbitmq服务在本地运行
factory.UserName = "guest";//用户名
factory.Password = "guest";//密码
//创建链接
using (IConnection connection = factory.CreateConnection())
{
//创建信道
using (IModel channel = connection.CreateModel())
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine("生产者已准备就绪......");
//声明两个队列
channel.QueueDeclare(queue: "MessageConfirmQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
//声明交换机exchange
channel.ExchangeDeclare(exchange: "MessageConfirmExchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
//绑定exchange和queue
channel.QueueBind(queue: "MessageConfirmQueue", exchange: "MessageConfirmExchange", routingKey: "MessageConfirmKey");
string message = "";
//发送消息
Console.WriteLine("在控制台输入信息,按回车发送");
while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase))
{
message = Console.ReadLine();
var body = Encoding.UTF8.GetBytes(message);
try
{
//开启确认模式
channel.ConfirmSelect();
//发送消息
channel.BasicPublish(exchange: "MessageConfirmExchange", routingKey: "MessageConfirmKey", basicProperties: null, body: body);
if (channel.WaitForConfirms())//如果一条消息或多条消息确认发送
{
Console.WriteLine($"{message} 发送到broke成功");
}
else
{
//记录日志,重试一下
}
//如果所有消息发送成功,就正常执行;如果有消息发送失败,就抛出异常;
channel.WaitForConfirmsOrDie();
}
catch (Exception)
{
Console.WriteLine($"{message} 发送到broke失败");
throw;
}
}
Console.Read();
}
}
}
}
}
main方法:
class Program
{
static void Main(string[] args)
{
ProductionMessageConfirm.Show();
}
}
执行结果:
从工具中也可以看到队列中新增了一条消息。
Tx事务模式代码实例:
using RabbitMQ.Client;
using System;
using System.Text;
namespace AspNetCore.RabbitMQ.MessageProducer.MessageProducer
{
/// <summary>
/// 消息事务模式
/// </summary>
public class ProductionMessageTx
{
public static void Show()
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "localhost";//rabbitmq服务在本地运行
factory.UserName = "guest";//用户名
factory.Password = "guest";//密码
//创建链接
using (IConnection connection = factory.CreateConnection())
{
//创建信道
using (IModel channel = connection.CreateModel())
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine("生产者已准备就绪......");
//声明两个队列
channel.QueueDeclare(queue: "MessageTxQueue01", durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.QueueDeclare(queue: "MessageTxQueue02", durable: true, exclusive: false, autoDelete: false, arguments: null);
//声明交换机exchange
channel.ExchangeDeclare(exchange: "MessageTxExchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
//绑定exchange和queue
channel.QueueBind(queue: "MessageTxQueue01", exchange: "MessageTxExchange", routingKey: "MessageTxKey01");
channel.QueueBind(queue: "MessageTxQueue02", exchange: "MessageTxExchange", routingKey: "MessageTxKey02");
string message = "";
//发送消息
Console.WriteLine("在控制台输入信息,按回车发送");
while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase))
{
message = Console.ReadLine();
var body = Encoding.UTF8.GetBytes(message);
try
{
//开启事物
channel.TxSelect();
//发送消息
//同时发送给多个队列,要么都成功,要么都失败
channel.BasicPublish(exchange: "MessageTxExchange", routingKey: "MessageTxKey01", basicProperties: null, body: body);
channel.BasicPublish(exchange: "MessageTxExchange", routingKey: "MessageTxKey02", basicProperties: null, body: body);
//提交事物
channel.TxCommit();//提交后消息才会发送到队列中
Console.WriteLine($"{message} 发送到broke成功");
}
catch (Exception)
{
Console.WriteLine($"{message} 发送到broke失败");
channel.TxRollback();//回滚事物
throw;
}
}
Console.Read();
}
}
}
}
}
main方法:
class Program
{
static void Main(string[] args)
{
ProductionMessageTx.Show();
}
}
执行结果:
从工具中也可以看到两个队列中都有了一条消息。
总结:通过上面的两种方法,可以确保消息能够正常的发送到rabbitmq服务的队列中。
第二种情况,rabbitmq接收来自消费者的反馈。确保消费者能够正常消费队列中的消息。
消费端消息确认的两种模式:
1.自动确认
当消费者消费消息时,只要收到消息,就直接回执给rabbitmq,告诉rabbitmq服务,我已经接收到消息了。存在的问题是,当队列中有多条消息时,只要有一条消息被消费,rabbitmq会认为是全部成功了,会将所有消息从队列中移除。导致消息丢失。
此方法性能快,但是因为有弊端,不常用。
2.手动确认
消费者消费一条消息,回执给rabbitmq一条信息,rabbitmq就把这条消息从队列中移除。消费者消费一条,rabbitmq从队列中删除一条。
此性能没有自动确认快,但是安全、靠谱,经常用。
先声明一个生产者,往队列中写入消息。
using RabbitMQ.Client;
using System;
using System.Text;
using System.Threading;
namespace AspNetCore.RabbitMQ.MessageProducer.MessageProducer
{
/// <summary>
/// 生产者
/// </summary>
public class ProductionMessageACKConfirm
{
public static void Show()
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "localhost";//rabbitmq服务在本地运行
factory.UserName = "guest";//用户名
factory.Password = "guest";//密码
//创建链接
using (IConnection connection = factory.CreateConnection())
{
//创建信道
using (IModel channel = connection.CreateModel())
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine("生产者已准备就绪......");
//声明两个队列
channel.QueueDeclare(queue: "MessageACKConfirmQueue", durable: true, exclusive: false, autoDelete: false, arguments: null);
//声明交换机exchange
channel.ExchangeDeclare(exchange: "MessageACKConfirmExchange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
//绑定exchange和queue
channel.QueueBind(queue: "MessageACKConfirmQueue", exchange: "MessageACKConfirmExchange", routingKey: "MessageACKConfirmKey");
for (int i = 0; i < 1000; i++)
{
string message = $"消息{i}";
//发送消息
channel.BasicPublish(exchange: "MessageACKConfirmExchange", routingKey: "MessageACKConfirmKey", basicProperties: null, body: Encoding.UTF8.GetBytes(message));
Thread.Sleep(300);
Console.WriteLine($"{message} 已发送");
}
Console.Read();
}
}
}
}
}
autoAck: true代表自动确认。
main方法:
class Program
{
static void Main(string[] args)
{
ProductionMessageACKConfirm.Show();
}
}
执行结果:
自动确认代码实例:
消费者
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace AspNetCore.RabbitMQ.MessageConsumer_01.MessageConsumer
{
/// <summary>
/// 消费者 自动确认
/// </summary>
public class ConsumerAutoACKConfirm
{
public static void Show()
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "localhost";//rabbitmq服务在本地运行
factory.UserName = "guest";//用户名
factory.Password = "guest";//密码
//创建链接
using (IConnection connection = factory.CreateConnection())
{
//创建信道
using (IModel channel = connection.CreateModel())
{
Console.ForegroundColor = ConsoleColor.Green;
try
{
//基于当前信道创建事件
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine($"接收成功 {message}");
};
//处理消息
channel.BasicConsume(queue: "MessageACKConfirmQueue", autoAck: true, consumer: consumer);
Console.ReadLine();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}
}
}
}
main方法:
class Program
{
static void Main(string[] args)
{
ConsumerAutoACKConfirm.Show();
Console.ReadLine();
}
}
执行结果:
手动确认代码实例:
消费者
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace AspNetCore.RabbitMQ.MessageConsumer_01.MessageConsumer
{
/// <summary>
/// 消费者 手动确认
/// </summary>
public class ConsumerACKConfirm
{
public static void Show()
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "localhost";//rabbitmq服务在本地运行
factory.UserName = "guest";//用户名
factory.Password = "guest";//密码
//创建链接
using (IConnection connection = factory.CreateConnection())
{
//创建信道
using (IModel channel = connection.CreateModel())
{
Console.ForegroundColor = ConsoleColor.Green;
try
{
//基于当前信道创建事件
var consumer = new EventingBasicConsumer(channel);
int count = 0;
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
if (count < 50)
{
//手动确认,消息正常消费,告诉broker 你可以删除当前这条消息
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
Console.WriteLine($"接收成功 {message}");
}
else
{
//否定:告诉broker,这个消息没有正常被消费 requeue: true表示重新写到队列中,false 从队列中删除
channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);
}
count++;
};
//处理消息
channel.BasicConsume(queue: "MessageACKConfirmQueue", autoAck: false, consumer: consumer);
Console.ReadLine();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}
}
}
}
autoAck: false代表手动确认。
main方法:
class Program
{
static void Main(string[] args)
{
ConsumerACKConfirm.Show();
Console.ReadLine();
}
}
执行结果:
可以看到只有50条消息被消费了。
分布式事务:
1.消息持久化
2.生产端消息确认
3..消费端消息确认