天天看点

RabbitMQ消息传递模式和NetCore案例

作者:小乖兽技术
RabbitMQ消息传递模式和NetCore案例

RabbitMQ中有四种基本的消息传递模式,它们是:

1. Direct模式:Exchange将消息路由到与Routing Key完全匹配的Queue中。

2. Fanout模式:Exchange将消息路由到所有与其绑定的Queue中。

3. Topic模式:Exchange将消息路由到所有与其绑定的Queue中,同时根据指定的通配符规则进行匹配,实现灵活的消息路由。

4. Header模式:Exchange不使用Routing Key进行消息路由,而是利用消息Header中设置的键值对进行路由。

下面是详细介绍:

1. Direct模式

在Direct模式中,Exchange将消息路由到与Routing Key完全匹配的Queue中。这种模式下,可以使用RabbitMQ的默认交换机(direct类型),也可以创建自定义的交换机。

在生产者发送消息时,需要将消息指定一个Routing Key,该Routing Key与消费者绑定的队列名称相同,Exchange将消息路由到和该Routing Key相同的队列中,消费者就可以获取到队列中的消息了。

2. Fanout模式

在Fanout模式中,Exchange将消息路由到所有与其绑定的Queue中。这种模式下,只能使用自定义的交换机(fanout类型),Exchange不会考虑Routing Key的情况,直接把消息分发给所有绑定的队列。

3. Topic模式

在Topic模式中,Exchange将消息路由到所有与其绑定的Queue中,同时根据指定的通配符规则进行匹配,实现灵活的消息路由。这种模式下,可以使用自定义的交换机(topic类型)。

在生产者发送消息时,需要将消息指定一个Routing Key,而消费者则需要指定一个匹配模式(例如"*.logs"),当Exchange收到消息时,会根据Routing Key和通配符规则来判断应该将消息发送给哪些队列。

4. Header模式

在Header模式中,Exchange不使用Routing Key进行消息路由,而是利用消息Header中设置的键值对进行路由。这种模式下,可以使用自定义的交换机(header类型)。

在生产者发送消息时,需要指定一个包含键值对的Header,消费者则需要指定一组键值对,只有当消息Header中的键值对满足消费者指定的条件时,才会将消息发送给消费者。

在Netcore开发的项目中引入RabbitMQ可以实现应用程序和其他服务之间的异步通信,这种方式可以大大提高系统的可靠性、扩展性和性能。下面给出一个案例来说明引入RabbitMQ的具体应用。

假设我们正在开发一个电商网站,当用户下单时,需要通知订单处理系统进行订单处理和库存管理。采用传统同步方式,应用程序会等待订单处理完成之后才继续进行,这会降低应用程序的响应速度和吞吐量;另外,如果订单处理系统出现故障或者繁忙,应用程序也会出现阻塞。而通过引入RabbitMQ,我们可以将订单信息发送到一个队列中,然后由订单处理系统异步地从队列中获取订单信息进行处理,这样就可以使得应用程序能够快速响应客户请求同时保证订单的处理不被阻塞。如果订单处理系统出现故障或者繁忙,消息可以在队列中等待并重试,这样可以提高系统的可靠性。

以下是一个基于Netcore开发的电商网站采用RabbitMQ异步处理订单的实现示例:

1. 安装RabbitMQ.Client

在Netcore项目中,我们可以通过NuGet包管理器安装RabbitMQ.Client库来引入RabbitMQ客户端。

2. 配置RabbitMQ连接信息

在appsettings.json文件中添加RabbitMQ连接信息的配置:

{

"RabbitMQ": {

"HostName": "localhost",

"UserName": "guest",

"Password": "guest",

"VirtualHost": "/"

}

}           

3. 创建RabbitMQ服务

创建一个名为RabbitMQService的服务,在这个服务中我们可以封装一些RabbitMQ方法,比如发送消息到队列等,具体实现如下:

using RabbitMQ.Client;

using System.Text;

public class RabbitMQService

{

private readonly IConfiguration _configuration;

public RabbitMQService(IConfiguration configuration)

{

_configuration = configuration;

}

public void SendMessage(string queueName, string message)

{

var factory = new ConnectionFactory()

{

HostName = _configuration["RabbitMQ:HostName"],

UserName = _configuration["RabbitMQ:UserName"],

Password = _configuration["RabbitMQ:Password"],

VirtualHost = _configuration["RabbitMQ:VirtualHost"]

};

using (var connection = factory.CreateConnection())

{

using (var channel = connection.CreateModel())

{

channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);

var body = Encoding.UTF8.GetBytes(message);

channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);

}

}

}

}           

4. 创建订单控制器

在Netcore项目中,我们可以创建一个名为OrderController的控制器,当用户下单时,控制器通过调用RabbitMQService中的方法将订单信息发送到一个队列中,然后立即返回一个成功响应。

[ApiController]

public class OrderController : ControllerBase

{

private readonly RabbitMQService _rabbitMQService;

public OrderController(RabbitMQService rabbitMQService)

{

_rabbitMQService = rabbitMQService;

}

[HttpPost]

public IActionResult CreateOrder(Order order)

{

// 处理订单逻辑

...

// 发送订单消息到RabbitMQ

_rabbitMQService.SendMessage("order_queue", JsonConvert.SerializeObject(order));

return Ok("Order created successfully.");

}

}           

5. 创建订单处理服务

创建一个名为OrderProcessingService的服务,用于从队列中获取订单信息,并进行订单处理和库存管理等。具体实现如下:

using System.Text;

using System.Threading.Tasks;

using RabbitMQ.Client;

using RabbitMQ.Client.Events;

public class OrderProcessingService : BackgroundService

{

private readonly IConfiguration _configuration;

public OrderProcessingService(IConfiguration configuration)

{

_configuration = configuration;

}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)

{

var factory = new ConnectionFactory()

{

HostName = _configuration["RabbitMQ:HostName"],

UserName = _configuration["RabbitMQ:UserName"],

Password = _configuration["RabbitMQ:Password"],

VirtualHost = _configuration["RabbitMQ:VirtualHost"]

};

using (var connection = factory.CreateConnection())

using (var channel = connection.CreateModel())

{

channel.QueueDeclare(queue: "order_queue",

durable: false,

exclusive: false,

autoDelete: false,

arguments: null);

var consumer = new EventingBasicConsumer(channel);

consumer.Received += (model, ea) =>

{

var body = ea.Body.ToArray();

var message = Encoding.UTF8.GetString(body);

var order = JsonConvert.DeserializeObject<Order>(message);

Console.WriteLine(#34;Order processed: {order.OrderNumber}");

};

channel.BasicConsume(queue: "order_queue", autoAck: true, consumer: consumer);

await Task.Delay(Timeout.Infinite, stoppingToken);

}

}

}           

在以上代码中,我们使用了Netcore中的BackgroundService类来创建一个订单处理服务。在ExecuteAsync方法中,我们通过创建RabbitMQ连接,然后从队列中获取订单信息,并进行订单处理。当消息被成功消费后,队列会自动将消息删除。

引入RabbitMQ可以实现应用程序和其他服务之间的异步通信,这种方式可以大大提高系统的可靠性、扩展性和性能。特别是在高并发的情况下,使用RabbitMQ可以避免系统响应变慢,提高系统的吞吐量。