天天看點

Axon Framework官方文檔(十)10 Event Processing(事件處理)

10 Event Processing(事件處理)

應用程式生成的事件需要被發送到可以更新查詢資料庫、搜尋引擎或需要這些事件的其他資源的元件上(通俗而言,即事件需要被傳遞到需要它的元件上):事件處理程式(event handler)。事件總線負責向所有感興趣的元件發送事件消息。在接收端,Event Processors(事件處理器)負責處理這些事件,包括調用适當的Event Handlers(事件處理程式)。
           

10.1 Publishing Events(釋出事件)

在絕大多數情況下,聚合将通過應用它們來釋出事件。但是,有時候,需要将事件(可能來自另一個元件)釋出到事件總線。要釋出事件,隻需用将描述事件資訊的payload封裝進一個EventMessage中。GenericEventMessage.asEventMessage(對象)方法允許你将任何對象包裝進EventMessage對象。如果傳遞的對象已經是EventMessage,那麼它将簡單地傳回。
           

10.2 Event Bus(事件總線)

EventBus是将事件分發到訂閱該事件的處理程式上的機制。Axon提供了事件總線的兩個實作:SimpleEventBus和EmbeddedEventStore。雖然這兩種實作都支援訂閱和跟蹤處理器(參見事件處理器,就是10.3),但是EmbeddedEventStore可以持久化事件,這允許您在稍後的階段可以重放它們。SimpleEventBus存儲在一個不穩定的存儲中,而且一旦事件被分發到訂閱它的元件上,那麼它就會将該事件"忘記"掉。
在使用配置API時,SimpleEventBus是預設使用的。要配置EmbeddedEventStore ,您需要提供一個StorageEngine的實作,它可以實際存儲事件。
           
Configurer configurer = DefaultConfigurer.defaultConfiguration();
configurer.configureEmbeddedEventStore(c -> new InMemoryEventStorageEngine());
           

10.3 Event Processors(事件處理器)

Event Handler(事件處理程式)定義了在收到事件時要執行的業務邏輯。Event Processors(事件處理器)是負責處理該處理過程的技術方面的元件。它啟動了一個工作單元,也可能是一個事務,但也確定了相關資料可以正确地連接配接到在事件處理過程中建立的所有消息。 
Event Processors(事件處理器)大緻有兩種形式:訂閱和跟蹤。
>訂閱的事件處理器訂閱了事件源,由分發機制管理的線程負責調用。
>另一方面,跟蹤的事件處理器,使用它自己管理的線程從資料源中拉取消息。
           

10.3.1 Assigning handlers to processors(将處理程式配置設定給處理器)

所有的處理器都有一個名稱,它跨JVM執行個體的來辨別處理器執行個體。兩個具有相同名稱的處理器,可以看作是同一個處理器的兩個執行個體。
所有事件處理程式都附加到一個處理器,而這個處理器的名稱是事件處理程式的類的包名。
我們來看下面的例子:
下面的這些類
           
org.axonframework.example.eventhandling.MyHandler
org.axonframework.example.eventhandling.MyOtherHandler
org.axonframework.example.eventhandling.module.MyHandler
           
會觸發兩個處理器的建立:
           
org.axonframework.example.eventhandling(有兩個處理程式注冊給了它)
org.axonframework.example.eventhandling.module(隻有一個指令處理程式注冊給了它)
           
配置API允許您配置其他政策,将類配置設定給處理器,甚至将特定的執行個體配置設定給特定的處理器
           

10.3.2 Configuring processors(配置處理器)

預設情況下,Axon将使用訂閱的事件處理器(即不存儲事件,分發了就"忘記")。可以通過配置API的EventHandlingConfiguration來更改處理程式的配置設定方式以及處理器的配置方式
EventHandlingConfiguration類定義了大量的方法,可以配置處理器
>registerEventProcessorFactory :允許你定義一個預設的工廠方法,建立沒有明确定義工廠的事件處理器。
>registerEventProcessor(String name, EventProcessorBuilder builder):定義工廠方法,用于建立給定name(名稱)的處理器。注意,此種處理器隻有當名稱被選擇作為任何可用的事件處理程式bean的處理器時才會建立。
>registerTrackingProcessor(String name):定義一個帶有給定名稱的處理器應該被配置為跟蹤事件處理器,使用預設設定。它使用TransactionManager和TokenStore配置。
>usingTrackingProcessors():設定預設使用跟蹤處理器而不是訂閱處理器
跟蹤處理器不同于訂閱的處理器,需要一個令牌存儲來存儲它們的進度。跟蹤處理器通過它的事件流接收到的每個消息都伴随着一個令牌。這個令牌允許處理器在以後的任何點上重新打開流,并在最後一個事件結束時重新打開它。
配置API接受令牌存儲,以及來自全局配置執行個體的大多數其他元件處理器。如果沒有顯式定義TokenStore,則使用InMemoryTokenStore,這在生産中是不推薦的。
           

10.4 Distributing Events(分布式事件)

在某些情況下,需要将事件釋出到外部系統,例如消息代理。
           

10.4.1 Spring AMQP

Axon為來自AMQP消息代理(如RabbitMQ)的事件傳遞提供了開箱即用的支援。
           

10.4.1.1 Forwarding events to an AMQP Echange(事件轉發到AMQP)

SpringAMQPPublisher将事件轉發給AMQP Exchanger(交換器)。它是通過SubscribableMessageSource來初始化的,它通常是一個EventBus或EventStore。從理論上講,這可能是釋出者可以訂閱的任何事件源。
要配置SpringAMQPPublisher,隻需将一個它的執行個體定義為Spring Bean。有許多setter方法允許您指定您期望的行為,例如事務支援、釋出者确認(如果代理支援)和交換器名稱。
預設的交換名稱是“axon.eventbus”。
Note:
請注意,交換器不是自動建立的。您仍然必須聲明您希望使用的隊列、交換和綁定。檢視Spring文檔了解更多資訊。
           

10.4.1.2 Reading Events from an AMQP Queue(從AMQP中讀取消息)

Spring廣泛支援從AMQP隊列讀取消息。然而,這需要“橋接”到Axon,這樣就可以從Axon處理這些消息,就好像它們是正常事件消息一樣。
SpringAMQPMessageSource允許事件處理器從隊列讀取消息,而不是從事件存儲或事件總線中讀取。它作為一個Spring AMQP和SubscribableMessageSource之間的擴充卡,它是這些處理器所必需的。
配置SpringAMQPMessageSource的最簡單方法是定義一個bean,它覆寫預設的onMessage方法,并使用@ rabbitlistener注釋它:
           
@Bean
public SpringAMQPMessageSource myMessageSource(Serializer serializer) {
    return new SpringAMQPMessageSource(serializer) {
        @RabbitListener(queues = "myQueue")
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            super.onMessage(message, channel);
        }
    };
}
           
Spring的@RabbitListener注解告訴Spring,該方法需要在給定隊列的每個消息上調用(示例中的隊列是“myQueue”)。此方法簡單地調用super.onmessage()方法,而他則會将事件的實際釋出到那些訂閱到該事件上的處理器上。
要将處理器訂閱到此MessageSource,請将正确的SpringAMQPMessageSource執行個體傳遞給訂閱處理器的構造函數:
           
// in an @Configuration file:
@Autowired
public void configure(EventHandlingConfiguration ehConfig, SpringAmqpMessageSource myMessageSource) {
    ehConfig.registerSubscribingEventProcessor("myProcessor", c -> myMessageSource);
}
           
注意,跟蹤處理器與SpringAMQPMessageSource不相容。
           

10.5 Asynchronous Event Processing(異步事件處理)

異步處理事件推薦的方法是使用跟蹤事件處理器。這個實作可以保證所有事件的處理,甚至在發生系統故障的情況下(假定事件已經被持久化)。
然而,也有可能在SubscribingProcessor中異步處理事件。要做到這一點,SubscribingProcessor必須用EventProcessingStrategy配置。這種政策可以用來改變事件監聽器的調用應如何管理。
預設政策(DirectEventProcessingStrategy)在傳遞事件的線程中調用這些事件處理程式。這允許處理器使用現有的事務
預設政策(DirectEventProcessingStrategy)調用這些線程提供了事件處理程式。這允許處理器使用現有的事務。
其他Axon-provided strategy是AsynchronousEventProcessingStrategy。它使用一個Executor異步調用事件偵聽器。
盡管AsynchronousEventProcessingStrategy異步執行,某些事件按順序處理仍然是可取的。SequencingPolicy定義了事件是否必須以順序、并行或兩者的組合來處理。政策傳回給定事件的序列辨別符。如果政策傳回兩個事件的相同辨別符,則意味着它們必須由事件處理程式按順序處理。空一個空序列辨別符意味着事件可能與任何其他事件并行處理.
Axon提供了一些可以使用的通用政策:
>FullConcurrencyPolicy将告訴Axon這個事件處理程式可以同時處理所有事件。這意味着,需要按照特定順序處理的事件之間沒有關系。
>SequentialPolicy告訴Axon,所有事件都必須按順序處理。事件的處理将在處理前一個事件完成時開始。
>SequentialPerAggregatePolicy将迫使域事件從同一聚合順序處理。然而,來自不同聚合的事件可以同時處理。這通常是一個合适的政策,用于事件偵聽器更新資料庫中聚合的細節。

除了這些提供的政策之外,您還可以定義自己的政策。所有政策都必須實作SequencingPolicy接口。這個接口定義了一個getSequenceIdentifierFor方法,傳回為一個給定的事件序列辨別符。傳回一個相等序列辨別符的事件必須按順序處理。産生不同序列辨別符的事件可以同時處理。出于性能原因,如果事件可以與任何其他事件并行處理,則政策實作應該傳回null。這更快,因為Axon不需要檢查事件處理的任何限制。
當使用AsynchronousEventProcessingStrategy時建議明确定義一個ErrorHandler。預設的ErrorHandler傳播異常,但在異步執行中沒有什麼可以傳播的,除了Executor。這可能導緻事件沒有被處理。相反,建議使用一個ErrorHandler報告錯誤,并允許繼續處理。ErrorHandler被配置在SubscribingEventProcessor的構造函數上,還提供了EventProcessingStrategy。