天天看點

如何在ASP.NET Core中使用Azure Service Bus Queue

如何在ASP.NET Core中使用Azure Service Bus Queue

原文:USING AZURE SERVICE BUS QUEUES WITH ASP.NET CORE SERVICES

作者:damienbod

譯文:如何在ASP.NET Core中使用Azure Service Bus Queue

位址:

https://www.cnblogs.com/lwqlun/p/10760227.html

作者:Lamond Lu

源代碼:

https://github.com/lamondlu/AzureServiceBusMessaging

本文展示了如何使用Azure Service Bus Queue, 實作2個ASP.NET Core Api應用之間的消息傳輸。

配置Azure Service Bus Queue#

你可以從官網文檔中了解到如何配置一個Azure Service Bus Queue.

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-create-namespace-portal

這裡我們使用Queue或者Topic來實作消息傳輸。Queue是一種消息傳輸類型,一旦一個消息被一個消費者接收了,該消息就會從Queue中被移除。

與Queue不同,Topic提供的是一對多的通訊方式。

架構圖#

整個應用的實作如下:

Api 1負責發送消息

Api 2負責監聽Azure Service Bus,并處理接收到的消息

實作一個Service Bus Queue#

這裡我們首先需要引入 Microsoft.Azure.ServiceBus 程式集。Microsoft.Azure.ServiceBus是Azure Service Bus的用戶端庫。針對Service Bus的連接配接字元串我們儲存在項目的User Secret中。當部署項目的時候,我們可以使用Azure Key Valut來設定這個Secret值。

在Visual Studio中,右鍵點選API1, API2項目屬性,選擇Manage User Secrets就可以管理目前項目使用的所有私密資訊。

為了發送向Azure Service Bus Queue發送消息,我們需要建立一個SendMessage方法,并接收一個消息參數。這裡我們建立了一個我們自己的消息内容類型MyPayload, 将目前該MyPayload對象序列化成Json字元串, 添加到一個Message對象中。

Copy

using Microsoft.Azure.ServiceBus;

using Microsoft.Extensions.Configuration;

using Newtonsoft.Json;

using System.Text;

using System.Threading.Tasks;

namespace ServiceBusMessaging

{

public class ServiceBusSender
{
    private readonly QueueClient _queueClient;
    private readonly IConfiguration _configuration;
    private const string QUEUE_NAME = "simplequeue";

    public ServiceBusSender(IConfiguration configuration)
    {
        _configuration = configuration;
        _queueClient = new QueueClient(
        _configuration
            .GetConnectionString("ServiceBusConnectionString"), 
            QUEUE_NAME);
    }
     
    public async Task SendMessage(MyPayload payload)
    {
        string data = JsonConvert.SerializeObject(payload);
        Message message = new Message(Encoding.UTF8.GetBytes(data));

        await _queueClient.SendAsync(message);
    }
}           

}

在API 1和API 2中,我們需要将ServiceBusSender注冊到應用程式的IOC容器中。這裡為了測試友善,我們同時注冊Swagger服務。

public void ConfigureServices(IServiceCollection services)

services.AddMvc();

services.AddScoped<ServiceBusSender>();

services.AddSwaggerGen(c =>
{
    c.SwaggerDoc("v1", new Info
    {
        Version = "v1",
        Title = "Payload View API",
    });
});           

接下來,我們就可以在控制器中通過構造函數注入的方式使用這個服務了。

在API1中,我們建立一個POST方法,這個方法會将API接收到Payload對象發送到Azure Service Bus Queue中。

[HttpPost]

[ProducesResponseType(typeof(Payload), StatusCodes.Status200OK)]

[ProducesResponseType(typeof(Payload), StatusCodes.Status409Conflict)]

public async Task Create(FromBodyPayload request)

if (data.Any(d => d.Id == request.Id))
{
    return Conflict($"data with id {request.Id} already exists");
}

data.Add(request);

// Send this to the bus for the other services
await _serviceBusSender.SendMessage(new MyPayload
{
    Goals = request.Goals,
    Name = request.Name,
    Delete = false
});

return Ok(request);           

從Queue中擷取消息#

為了監聽Azure Service Bus Queue, 并處理接收到的消息,我們建立了一個新類ServiceBusConsumer,ServiceBusConsumer實作了IServiceBusConsumer接口。

Queue的連接配接字元串是使用IConfiguration讀取的。 RegisterOnMessageHandlerAndReceiveMessages方法負責注冊消息處理程式ProcessMessagesAsync處理消息。ProcessMessagesAsync方法會将得到的消息轉換成對象,并調用IProcessData接口完成最終的消息處理。

using Microsoft.Extensions.Logging;

using System.Threading;

public interface IServiceBusConsumer
{
    void RegisterOnMessageHandlerAndReceiveMessages();
    Task CloseQueueAsync();
}

public class ServiceBusConsumer : IServiceBusConsumer
{
    private readonly IProcessData _processData;
    private readonly IConfiguration _configuration;
    private readonly QueueClient _queueClient;
    private const string QUEUE_NAME = "simplequeue";
    private readonly ILogger _logger;

    public ServiceBusConsumer(IProcessData processData, 
        IConfiguration configuration, 
        ILogger<ServiceBusConsumer> logger)
    {
        _processData = processData;
        _configuration = configuration;
        _logger = logger;
        _queueClient = new QueueClient(
          _configuration.GetConnectionString("ServiceBusConnectionString"), QUEUE_NAME);
    }

    public void RegisterOnMessageHandlerAndReceiveMessages()
    {
        var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
        {
            MaxConcurrentCalls = 1,
            AutoComplete = false
        };

        _queueClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
    }

    private async Task ProcessMessagesAsync(Message message, CancellationToken token)
    {
        var myPayload = JsonConvert.DeserializeObject<MyPayload>(Encoding.UTF8.GetString(message.Body));
        _processData.Process(myPayload);
        await _queueClient.CompleteAsync(message.SystemProperties.LockToken);
    }

    private Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
    {
        _logger.LogError(exceptionReceivedEventArgs.Exception, "Message handler encountered an exception");
        var context = exceptionReceivedEventArgs.ExceptionReceivedContext;

        _logger.LogDebug($"- Endpoint: {context.Endpoint}");
        _logger.LogDebug($"- Entity Path: {context.EntityPath}");
        _logger.LogDebug($"- Executing Action: {context.Action}");

        return Task.CompletedTask;
    }

    public async Task CloseQueueAsync()
    {
        await _queueClient.CloseAsync();
    }
}           

其中IProcessData接口存在于類庫項目ServiceBusMessaging中,它是用來處理消息的。

public interface IProcessData

void Process(MyPayload myPayload);           

在Api 2中,我們建立一個ProcessData類,它實作了IProcessData接口。

public class ProcessData : IProcessData

public void Process(MyPayload myPayload)
{
    DataServiceSimi.Data.Add(new Payload
    {
        Name = myPayload.Name,
        Goals = myPayload.Goals
    });
}           

這裡為了簡單測試,我們建立了一個靜态類DataServiceSimi,其中存放了API2中所有儲存Payload對象。同時,我們還建立了一個新的控制器ViewPayloadMessagesController,在其中添加了一個GET Action,并傳回了靜态類DataServiceSimi中的所有資料。

[Route("api/[controller]")]

[ApiController]

public class ViewPayloadMessagesController : ControllerBase

[HttpGet]
[ProducesResponseType(StatusCodes.Status200OK)]
public ActionResult<List<Payload>> Get()
{
    return Ok(DataServiceSimi.Data);
}           

最後我們還需要将ProcessData注冊到API2的IOC容器中。

services.AddMvc();

services.AddSingleton<IServiceBusConsumer, ServiceBusConsumer>();
services.AddTransient<IProcessData, ProcessData>();           

最終效果#

現在我們分别啟用2個Api項目,并在Api 1的Swagger文檔界面,調用POST請求,添加一個Payload

操作完成之後,我們通路Api 2的/api/ViewPayloadMessages, 獲得結果如下,Api 1發出的消息出現在了Api 2的結果集中,這說明Api 2從Azure Service Bus Queue中擷取了消息,并儲存在了自己的靜态類DataServiceSimi中。

作者:LamondLu

出處: