天天看點

eShopOnContainers 看微服務⑤:消息通信

1.消息通信

傳統的單體應用,元件間的調用都是使用代碼級的方法函數。比如使用者登入自動簽到,增加積分。我們可以在登入函數調用積分子產品的某個函數,為了解耦我們使用以來注入并放棄new Class()這種方式。但是不管哪種方式都是在同一個程序裡。

講一個單體應用改為微服務應用的最大挑戰就是改變通信機制,直接把程序内方法調用改成服務間的 RPC 調用會導緻在分布式環境中性能低下的、零散的和低效的通信。

通信類型

異步還是同步的:

• 同步協定。

  HTTP 是一種同步協定。用戶端發起一個請求然後等待服務端響應。用戶端的代碼可以獨立地實作同步(線程被阻塞)或異步(線程非阻塞,最終的響應通過回調來處理)的執行方式。這裡的重點在于協定(HTTP / HTTPS)是同步的,用戶端代碼隻能在收到 HTTP 服務端的響應後才可以繼續先前的任務。•

異步協定。

  其他協定比如 AMQP(很多作業系統和雲環境支援的一種協定)使用了異步消息。用戶端代碼或消息發送者通常不需要等待響應,隻要把消息發送給 RabbitMQ 隊列或其他消息代理。

第二個次元是看有單個接收者還是多個接收者:

  • 單個接受者。每個請求必須準确地被一個接收者或服務來處理,一個例子就是指令模式。

  • 多個接收者。每個請求能被 0 個或多個接收者來處理,這類通信必須是異步的,一個例子是事件驅動架構裡的釋出/訂閱機制。它基于事件總線接口或消息代理在多個微服務間通過事件來傳送資料。

2、事件總線

事件總線跟觀察者(釋出-訂閱)模式非常相似也可以說是釋出-訂閱模式的一種實作,跟傳統觀察者的差别隻是一個代碼級一個是架構級的。它是一種集中式事件處理機制,允許不同的元件之間進行彼此通信而又不需要互相依賴,達到一種解耦的目的。

為什麼使用事件總線這種異步模式?

  異步整合方式增強微服務自治能力。

  建立微服務應用的重點在于整合微服務的方式。理想情況下,應該減少内部微服務間的通信,微服務之間的互動越少越好。核心規則是微服務間的互動需要異步。并不意味着一定要使用某種特定的協定(比如異步消息或同步的HTTP),隻是表明微服務間通過異步傳輸資料來通信,但請不要依賴于其他内部微服務作為自己 HTTP請求/響應的一部分。

  如果可能,即便隻是用于查詢,也絕不要依賴微服務間的同步通信(請求/響應)。每個微服務的目标是自治的,對用戶端是可用的,即使作為端到端應用一部分的其他服務發生故障或不穩定。如果您認為需要從一個微服務調用其他微服務(比如發起一個 HTTP 請求來查詢資料)為用戶端應用提供響應結果,那麼這樣的架構在其他微服務發生故障時就變得不穩定。此外,在微服務之間如果存在 HTTP 依賴,比如串聯 HTTP 請求建立很長的請求/響應周期,這樣不僅使您的微服務不能自治,而且一旦這個鍊條上的某個服務有性能問題,整個服務的性能都受到影響。微服務間添加的同步依賴(比如查詢請求)越多,用戶端應用的總響應時間就會越長。

工作原理

從上圖可知,核心就4個角色:

  1. 事件(事件源+事件處理)
  2. 事件釋出者
  3. 事件訂閱者
  4. 事件總線

實作事件總線的關鍵是:

  1. 事件總線維護一個事件源與事件處理的映射字典;
  2. 通過單例模式,確定事件總線的唯一入口;
  3. 利用反射完成事件源與事件處理的初始化綁定;
  4. 提供統一的事件注冊、取消注冊和觸發接口。

3. eshop的事件總線

  事件源:IntegrationEvent,通過繼承擴充這個類,完善事件的描述資訊。

  事件處理:IIntegrationEventHandler,IDynamicIntegrationEventHandler,兩個接口都定義了Handle方法來響應事件。IIntegrationEventHandler接收強類型的IntegrationEvent,IDynamicIntegrationEventHandler接收動态類型dynamic。

Integration Event(內建事件)。因為在微服務中事件的消費不再局限于目前領域内,而是多個微服務可能共享同一個事件,是以這裡要和DDD中的領域事件區分開來。內建事件可用于跨多個微服務或外部系統同步領域狀态,這是通過在微服務之外釋出內建事件來實作的。

      事件總線:IEventBus,提供Publish用來釋出事件,Subscriber用來訂閱事件。

為了友善進行訂閱管理,系統提供了額外的一層抽象

IEventBusSubscriptionsManager

,其用于維護事件的訂閱和登出,以及訂閱資訊的持久化。其預設的實作

InMemoryEventBusSubscriptionsManager

就是使用記憶體進行存儲事件源和事件處理的映射字典。

從類圖中看

InMemoryEventBusSubscriptionsManager

中定義了一個内部類

SubscriptionInfo

,其主要用于表示事件訂閱方的訂閱類型和事件處理的類型。

我們來近距離看下

InMemoryEventBusSubscriptionsManager

的定義:

//定義的事件名稱和事件訂閱的字典映射(1:N)
private readonly Dictionary<string, List<SubscriptionInfo>> _handlers;
//儲存所有的事件處理類型
private readonly List<Type> _eventTypes;
//定義事件移除後事件
public event EventHandler<string> OnEventRemoved;

//構造函數初始化
public InMemoryEventBusSubscriptionsManager()
{
    _handlers = new Dictionary<string, List<SubscriptionInfo>>();
    _eventTypes = new List<Type>();
}
//添加動态類型事件訂閱(需要手動指定事件名稱)
public void AddDynamicSubscription<TH>(string eventName)
    where TH : IDynamicIntegrationEventHandler
{
    DoAddSubscription(typeof(TH), eventName, isDynamic: true);
}
//添加強類型事件訂閱(事件名稱為事件源類型)
public void AddSubscription<T, TH>()
    where T : IntegrationEvent
    where TH : IIntegrationEventHandler<T>
{
    var eventName = GetEventKey<T>();

    DoAddSubscription(typeof(TH), eventName, isDynamic: false);

    if (!_eventTypes.Contains(typeof(T)))
    {
        _eventTypes.Add(typeof(T));
    }
}
//移除動态類型事件訂閱
public void RemoveDynamicSubscription<TH>(string eventName)
    where TH : IDynamicIntegrationEventHandler
{
    var handlerToRemove = FindDynamicSubscriptionToRemove<TH>(eventName);
    DoRemoveHandler(eventName, handlerToRemove);
}

//移除強類型事件訂閱
public void RemoveSubscription<T, TH>()
    where TH : IIntegrationEventHandler<T>
    where T : IntegrationEvent
{
    var handlerToRemove = FindSubscriptionToRemove<T, TH>();
    var eventName = GetEventKey<T>();
    DoRemoveHandler(eventName, handlerToRemove);
}      

添加了這麼一層抽象,即符合了單一職責原則,又完成了代碼重用。

IEventBus

的具體實作通過注入對

IEventBusSubscriptionsManager

的依賴,即可完成訂閱管理。

你這裡可能會好奇,為什麼要暴露一個

OnEventRemoved

事件?這裡先按住不表,留給大家思考。

4. EventBusRabbitMQ實作EventBus

eShopOnContainers 看微服務⑤:消息通信
eShopOnContainers 看微服務⑤:消息通信

我們這裡不糾結為什麼使用RabbitMQ,其實可以替代的方案很多。我們隻要知道RabbitMQ提供了可靠的消息機制、跟蹤機制和靈活的消息路由,支援消息叢集和分布式部署。

EventBusRabbitMQ

public class EventBusRabbitMQ : IEventBus, IDisposable
{
    const string BROKER_NAME = "eshop_event_bus";

    private readonly IRabbitMQPersistentConnection _persistentConnection;
    private readonly ILogger<EventBusRabbitMQ> _logger;
    private readonly IEventBusSubscriptionsManager _subsManager;
    private readonly ILifetimeScope _autofac;
    private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus";
    private readonly int _retryCount;

    private IModel _consumerChannel;
    private string _queueName;

    public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger<EventBusRabbitMQ> logger,
        ILifetimeScope autofac, IEventBusSubscriptionsManager subsManager, string queueName = null, int retryCount = 5)
    {
        _persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection));
        _logger = logger ?? throw new ArgumentNullException(nameof(logger));
        _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager();
        _queueName = queueName;
        _consumerChannel = CreateConsumerChannel();
        _autofac = autofac;
        _retryCount = retryCount;
        _subsManager.OnEventRemoved += SubsManager_OnEventRemoved;
    }

    private void SubsManager_OnEventRemoved(object sender, string eventName)
    {
        if (!_persistentConnection.IsConnected)
        {
            _persistentConnection.TryConnect();
        }

        using (var channel = _persistentConnection.CreateModel())
        {
            channel.QueueUnbind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName);

            if (_subsManager.IsEmpty)
            {
                _queueName = string.Empty;
                _consumerChannel.Close();
            }
        }
    }
//....
}      

構造函數主要做了以下幾件事:

  1. 注入

    IRabbitMQPersistentConnection

    用來管理連結。
  2. 使用空對象模式注入

    IEventBusSubscriptionsManager

    ,進行訂閱管理。
  3. 建立消費者信道,用于消息消費。
  4. 注冊

    OnEventRemoved

    事件,取消隊列的綁定。(這也就回答了上面遺留的問題)

訂閱:

/// <summary>
        /// 動态類型訂閱
        /// </summary>
        /// <typeparam name="TH"></typeparam>
        /// <param name="eventName"></param>
        public void SubscribeDynamic<TH>(string eventName)
            where TH : IDynamicIntegrationEventHandler
        {
            DoInternalSubscription(eventName);
            _subsManager.AddDynamicSubscription<TH>(eventName);
        }

        /// <summary>
        /// 強類型訂閱
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <typeparam name="TH"></typeparam>
        public void Subscribe<T, TH>()
            where T : IntegrationEvent
            where TH : IIntegrationEventHandler<T>
        {
            var eventName = _subsManager.GetEventKey<T>();
            DoInternalSubscription(eventName);
            _subsManager.AddSubscription<T, TH>();
        }

        /// <summary>
        /// rabbitmq隊列的綁定。以eventName為routingKey進行路由
        /// </summary>
        /// <param name="eventName">事件名稱</param>
        private void DoInternalSubscription(string eventName)
        {
            var containsKey = _subsManager.HasSubscriptionsForEvent(eventName);
            if (!containsKey)
            {
                if (!_persistentConnection.IsConnected)
                {
                    _persistentConnection.TryConnect();
                }

                using (var channel = _persistentConnection.CreateModel())
                {
                    channel.QueueBind(queue: _queueName,
                                      exchange: BROKER_NAME,
                                      routingKey: eventName);
                }
            }
        }      

釋出:

/// <summary>
        /// 釋出
        /// </summary>
        /// <param name="event">事件</param>
        public void Publish(IntegrationEvent @event)
        {
            if (!_persistentConnection.IsConnected)
            {
                _persistentConnection.TryConnect();
            }
            //使用Polly進行重試
            var policy = RetryPolicy.Handle<BrokerUnreachableException>()
                .Or<SocketException>()
                .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
                {
                    _logger.LogWarning(ex.ToString());
                });

            using (var channel = _persistentConnection.CreateModel())
            {
                var eventName = @event.GetType()
                    .Name;
                //使用direct全比對、單點傳播形式的路由機制進行消息分發
                channel.ExchangeDeclare(exchange: BROKER_NAME,
                                    type: "direct");
                //消息主體是json字元串
                var message = JsonConvert.SerializeObject(@event);
                var body = Encoding.UTF8.GetBytes(message);

                policy.Execute(() =>
                {
                    var properties = channel.CreateBasicProperties();
                    properties.DeliveryMode = 2; // 進行消息持久化

                    channel.BasicPublish(exchange: BROKER_NAME,
                                     routingKey: eventName,
                                     mandatory:true,//告知伺服器當根據指定的routingKey和消息找不到對應的隊列時,直接傳回消息給生産者。
                                     basicProperties: properties,
                                     body: body);
                });
            }
        }      

 監聽:

構造函數中有一句

_consumerChannel = CreateConsumerChannel();      
private IModel CreateConsumerChannel()
        {
            if (!_persistentConnection.IsConnected)
            {
                _persistentConnection.TryConnect();
            }
            //建立信道Channel
            var channel = _persistentConnection.CreateModel();
            //申明Exchange使用direct模式
            channel.ExchangeDeclare(exchange: BROKER_NAME,
                                 type: "direct");
            //聲明隊列綁定Channel的消費者執行個體
            channel.QueueDeclare(queue: _queueName,
                                 durable: true,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);


            var consumer = new EventingBasicConsumer(channel);
            //注冊Received事件委托處理消息接收事件
            consumer.Received += async (model, ea) =>
            {
                var eventName = ea.RoutingKey;
                var message = Encoding.UTF8.GetString(ea.Body);
                //事件處理的邏輯
                await ProcessEvent(eventName, message);

                channel.BasicAck(ea.DeliveryTag,multiple:false);
            };
            //啟動監聽
            channel.BasicConsume(queue: _queueName,
                                 autoAck: false,
                                 consumer: consumer);

            channel.CallbackException += (sender, ea) =>
            {
                _consumerChannel.Dispose();
                _consumerChannel = CreateConsumerChannel();
            };

            return channel;
        }      

事件處理邏輯

/// <summary>
        /// 事件處理邏輯
        /// </summary>
        /// <param name="eventName">事件名稱</param>
        /// <param name="message">消息</param>
        /// <returns></returns>
        private async Task ProcessEvent(string eventName, string message)
        {
            if (_subsManager.HasSubscriptionsForEvent(eventName))
            {
                using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME))
                {
                    var subscriptions = _subsManager.GetHandlersForEvent(eventName);
                    foreach (var subscription in subscriptions)
                    {
                        if (subscription.IsDynamic)
                        {
                            //Event Handler執行個體
                            var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler;
                            if (handler == null) continue;
                            //反序列化為動态類型
                            dynamic eventData = JObject.Parse(message);
                            //調用Handle方法
                            await handler.Handle(eventData);
                        }
                        else
                        {
                            //Event Handler執行個體
                            var handler = scope.ResolveOptional(subscription.HandlerType);
                            if (handler == null) continue;
                            var eventType = _subsManager.GetEventTypeByName(eventName);
                            //反序列化為強類型
                            var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
                            var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
                            //調用Handle方法
                            await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
                        }
                    }
                }
            }
        }      

5. EventBus的使用

eShopOnContainers 看微服務⑤:消息通信

微服務的內建

各個Startup類中注冊

①注冊

IRabbitMQPersistentConnection

服務用于設定RabbitMQ連接配接

services.AddSingleton<IRabbitMQPersistentConnection>(sp =>
{
    var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>();
    //...
    return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount);
});      

②注冊單例模式的

IEventBusSubscriptionsManager

用于訂閱管理

services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>();      

③ 注冊單例模式的

EventBusRabbitMQ

services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>
{
    var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
    var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
    var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();
    var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();

    var retryCount = 5;
    if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"]))
    {
        retryCount = int.Parse(Configuration["EventBusRetryCount"]);
    }

    return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount);
});      

④釋出事件

若要釋出事件,需要根據是否需要事件源(參數傳遞)來決定是否需要申明相應的內建事件,需要則繼承自

IntegrationEvent

進行申明。然後在需要釋出事件的地方進行執行個體化,并通過調用

IEventBus

的執行個體的

Publish

方法進行釋出。

//事件源的聲明
public class ProductPriceChangedIntegrationEvent : IntegrationEvent
{        
    public int ProductId { get; private set; }

    public decimal NewPrice { get; private set; }

    public decimal OldPrice { get; private set; }

    public ProductPriceChangedIntegrationEvent(int productId, decimal newPrice, decimal oldPrice)
    {
        ProductId = productId;
        NewPrice = newPrice;
        OldPrice = oldPrice;
    }
}
//聲明事件源
var priceChangedEvent = new ProductPriceChangedIntegrationEvent(1001, 200.00, 169.00)
//釋出事件
_eventBus.Publish(priceChangedEvent)      

 事件總線實作對象将被注入控制器構造函數

eShopOnContainers 看微服務⑤:消息通信

然後,您可以從控制器中的方法中使用它,如 UpdateProduct 方法:

eShopOnContainers 看微服務⑤:消息通信

在這種情況下,由于原始微服務是簡單的 CRUD 微服務,是以該代碼被直接放置在 Web API 控制器中。 在更進階的微服務中,比如使用 CQRS 方法時,它可以在送出原始資料之後,在CommandHandler 類中實作。 

⑤ 訂閱事件

若要訂閱事件,需要根據需要處理的事件類型,申明對應的事件處理類,繼承自

IIntegrationEventHandler

IDynamicIntegrationEventHandler

,并注冊到IOC容器。然後建立

IEventBus

的執行個體調用

Subscribe

方法進行顯式訂閱。

//定義事件處理
public class ProductPriceChangedIntegrationEventHandler : IIntegrationEventHandler<ProductPriceChangedIntegrationEvent>
{
    public async Task Handle(ProductPriceChangedIntegrationEvent @event)
    {
        //do something
    }
}
//事件訂閱
var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();
eventBus.Subscribe<ProductPriceChangedIntegrationEvent, ProductPriceChangedIntegrationEventHandler>();      

⑥跨服務事件消費

在微服務中跨服務事件消費很普遍,這裡有一點需要說明的是如果訂閱的強類型事件非目前微服務中訂閱的事件,需要複制定義訂閱的事件類型。換句話說,比如在A服務釋出的

TestEvent

事件,B服務訂閱該事件,同樣需要在B服務複制定義一個

TestEvent

這也是微服務的一個通病,重複代碼。

6. 保證資料的一緻性,事件日志持久化

我們使用事件總線處理了微服務間的異步通信問題。但是既然是異步通信那麼就要考慮一緻性問題。當遇到網絡中斷,系統斷電,包括我們的服務異常等情況的時候怎麼辦。

比如當産品價格更改後,代碼将資料送出給資料庫,然後釋出

ProductPriceChangedIntegrationEvent

 事件。

如果服務在資料庫更新後崩潰,但又發生在內建事件成功釋出前,就會導緻本地微服務價格已成功更新,但內建事件未釋出的問題。就會導緻目錄微服務中定義的價格和顧客購物車中緩存的價格不一緻。

以上問題的關鍵在于是如何確定兩個獨立的操作的原子性。如果單從單體應用的角度來處理的話,我們完全是可以将他們放到同一個事務中去保證。然而在微服務中,就違背了其高可用的基本要求。因為一旦事件總線處于癱瘓狀态,那麼整個目錄微服務就不可用了。這種強制通過事務保證的一緻性,就引入了太多的問題依賴。

如果從微服務的角度來看,每個微服務負責各自的業務邏輯,對于目錄微服務來說,它的關注點是産品的更新是否成功。至于借助事件總線通過異步事件實作微服務間的通信,并不是其關注點。這也就是關注點分離。換句話說,産品的更新不應該依賴外部狀态。在這裡,外部狀态就是事件總線的可用性。

A、持久化事件源

解決上面的問題就要確定事件總線能夠正确進行事件轉發。

換句話說:事件總線挂了,但是事件消息不能丢失,這樣我們還有機會挽救(重新釋出消息)。這裡我們就要對事件進行持久化了。

eShopOnContainers已經考慮了這一點,內建了事件日志用于持久化。

eShopOnContainers 看微服務⑤:消息通信

主要是定義了一個

IntegrationEventLogEntry

實體、

EventStateEnum

事件狀态枚舉和

IntegrationEventLogContext

EF上下文用于事件日志持久化。暴露

IIntegrationEventLogService

用于事件狀态的更新。 

其他微服務通過在啟動類中注冊

IntegrationEventLogContext

即可完成事件日志的內建。

eShopOnContainers 看微服務⑤:消息通信

B、借助事件日志確定高可用

主要分兩步走:

  1. 應用程式開始本地資料庫事務,然後更新領域實體狀态,并将內建事件插入內建事件日志表中,最後送出事務來確定領域實體更新和儲存事件日志所需的原子性。
  2. 釋出事件

第一步毋庸置疑,第二步釋出事件,我們又有多種實作方式:

  1. 在送出事務後立即釋出內建事件,并将其标記為已釋出。當微服務發生故障時,可以通過周遊存儲的內建事件(未釋出)執行補救措施。
  2. 将事件日志表用作一種隊列。使用單獨的線程或程序查詢事件日志表,将事件釋出到事件總

    線,然後将事件标記為已釋出。

eShopOnContainers 看微服務⑤:消息通信

這裡很顯然第二種方式更為穩妥。而eShopOnContainers出于簡單考慮,采用了第一種方案,具體代碼如下:

eShopOnContainers 看微服務⑤:消息通信

至此,eShopOnContainers確定事件總線能夠正确轉發消息的解決方案闡述完畢。你可能會問,這對應的是引言中的哪一種方案?都不是,你可以看作其是基于事件日志的簡化版的事件溯源。

C、其它問題

通過持久化事件日志來避免事件釋出失敗導緻的一緻性問題,是一種有效措施。然而消息從發送到接收再到正常消費的過程中,每一個環節都可能故障,是以僅僅在消息發送端使用事件日志隻是確定最終一緻性的一小步。還有很多問題有待完善:

  1. 消息發送成功了,但未被成功接收
  2. 消息發送且成功接收,但未被正确消費
  3. 消息重複發送,導緻多次消費問題
  4. 消息被多個微服務訂閱,如何確定每個微服務都成功接收并消費
  5. 等等

這裡要實作完整的代碼其實還挺複雜的。大家可以參考下楊曉東的CAP方案。

eShopOnContainers 看微服務⑤:消息通信

繼續閱讀