在《當我們在讨論CQRS時,我們在讨論些神馬》中,我們讨論了當使用CQRS的過程中,需要關心的一些問題。其中與CQRS關聯最為緊密的模式莫過于Event Sourcing了,CQRS與ES的結合,為我們構造高性能、可擴充系統提供了基本思路。本文将介紹
Kanasz Robert在《Introduction to CQRS》中的示例項目Diary.CQRS。
該項目為Kanasz Robert為了介紹CQRS模式而寫的一個測試項目,原始項目可以通過通路《Introduction to CQRS》來擷取,由于項目版本比較舊,沒有使用nuget管理程式包等,導緻下載下傳以後并不能正常運作,我下載下傳了這個項目,更新到Visual Studio 2017,重新引用了StructMap架構(使用nuget),移除了Web層報錯的代碼,并上傳到部落格園,可以從這裡下載下傳:Diary.CQRS.rar
Diary.CQRS項目的場景為日記本管理,提供了新增、編輯、删除、清單等功能,整個解決方案分為三個項目:
Diary.CQRS:核心項目,完成了EventBus、CommandBus、Domain、Storage等功能,也是我們分析的重點。
Diary.CQRS.Configuration:服務配置,通過ServiceLocator類進行依賴注入、服務查找功能。
Diary.CQRS.Web:使用者界面,MVC項目。
這是一個很好的入門項目,功能簡單、結構清晰,概念覆寫全面。如果CQRS是一個城堡,那麼Diary.CQRS則是打開第一重門的鑰匙,接下來讓我們一起推開這扇門吧。
運作項目,最先看到的是一個Web頁面,如下圖:

很簡單,隻有一個Add按鈕,當我們點選以後,會進入添加的頁面:
我們填上一些内容,然後點選Save按鈕,就會傳回到清單頁,我們可以看到已添加的條目:
然後我們進行編輯操作,點選清單中的Edit按鈕,跳轉到編輯頁面:
雖然頁面中顯示的是Add,但确實是Edit頁面。我們編輯以後點選Save按鈕,然後傳回清單頁即可看到編輯後的内容。
在清單頁中,如果我們點選Delete按鈕,則會删除改條目。
到此為止,我們已經看到了這個項目的所有頁面,一個簡單的CURD操作。我們繼續看它的代碼(在HomeController中)。
通過ServiceLocator定位ReportDatabase,并從ReportDatabase中擷取所有條目。
兩個方法:
Add()方法,處理Get請求,傳回新增視圖;
Add(DiaryItemDto item)方法,接收DiaryItemDto參數,處理Post請求,建立并發送CreateItemCommand指令,然後傳回到Index頁面
仍然是兩個方法:
Edit(Guid id)方法,接收Guid作為參數,并從ReportDatabase中擷取資料,建構dto對象傳回給頁面
Edit(DiaryItemDto item)方法,接收DiaryItemDto對象,處理Post請求,接收到請求以後根據dto對象建立ChangeItemCommand指令,然後傳回到Index頁面
對于删除操作來說,它沒有視圖頁面,接收到請求以後,先擷取該記錄,建立并發送DeleteImteCommand指令,然後傳回到Index頁面
題外話:對于改變資料狀态的操作,使用Get請求是不可取的,可能存在安全隐患
通過上面的代碼,你會發現所有的操作都是從ServiceLocator發起的,通過它我們能夠定位到CommandBus和ReportDatabase,進而進行相應的操作,我們在接下來會介紹ServiceLocator類。
Diary.CQRS.Configuration 項目中定義了ServiceLocator類,這個類的作用是完成IoC容器的服務注冊、服務定位功能。例如我們可以通過ServiceLocator擷取到CommandBus執行個體、擷取ReportDatabase執行個體。
ServiceLocator使用StructureMap作為依賴注入架構,提供了服務注冊、服務導航的功能。ServiceLocator類通過靜态構造函數完成對服務注冊和服務執行個體化工作:
首先調用ContainerBootstrapper.BootstrapStructureMap()方法,這個方法裡面包含了對将服務添加到容器的代碼;然後使用容器建立CommandBus和ReportDatabase的執行個體。
CommandBus:指令總線,對應Command操作,用來發送指令,程式中需要定義相應的指令處理器,進而完成具體的操作。
ReportDatabase:報表資料庫,對應Query操作,用來擷取資料。
ServiceLocator的重要之處在于對外暴露了兩個至關重要的執行個體,分别處理CQRS中的Command和Query。
為什麼沒有Event相關操作呢?到目前為止我們還沒有涉及到,因為對于UI層來說,使用者的意圖都是通過Command表示的,而資料的狀态變化才會觸發Event。
在ServiceLocator中定義了擷取CommandBus和ReportDatabase的方法,我們順着這兩個對象繼續分析。
在基于消息的系統設計中,我們常會看到總線的身影,Command也是一種消息,是以使用總線是再合适不過的了。CommandBus就是我們在Diary.CQRS項目中用到的一種消息總線。
在Diary.CQRS中,它被定義在Messaging目錄,在這個目錄下面,還有與Event相關的EventBus,我們稍後再進行介紹。
CommandBus實作ICommandBus接口,ICommandBus接口的定義如下:
它隻包含了Send方法,用來将指令發送到對應的處理程式。
CommandBus是ICommand的實作,具體代碼如下:
在CommandBus中,顯式依賴ICommandHandlerFactory類,通過構造函數進行注入。那麼 _commandHandlerFactory 的作用是什麼呢?我們在Send方法中可以看到,通過 _commandHandlerFactory 可以擷取到與Command對應的CommandHandler(指令處理程式),在程式的設計上,每一個Command都會有一個對應的CommandHandler,而手工判斷類型、執行個體化處理程式顯然不符合使用習慣,此處采用工廠模式來擷取指令處理程式。
當擷取到與Command對應的CommandHandler後,調用handler的Execute方法,執行該指令。
截止目前為止,我們又接觸了三個概念:CommandHandlerFactory、CommandHandler、Command:
CommandHandlerFactory:指令處理程式工廠,通過GetHandler方法擷取到與指令對應的處理程式
CommandHandler:指令處理程式,用于執行對應的指令
Command:指令,描述使用者的意圖、并包含與意圖相關的資料
使用簡單工廠模式,用來擷取與指令對應的處理程式。它的代碼在Utils檔案夾中,它的作用是提供一種擷取Handler的方式,是以它隻能作為工具存在。
接口定義如下:
隻有GetHandler一個方法,它的實作是 StructureMapCommandHandlerFactory,即通過StructureMap作為依賴注入架構來實作的,代碼也比較簡單,這裡不再貼出來了。
指令是代表使用者的意圖、并包含與意圖相關的資料,比如使用者想要添加一條資料,這便是一個意圖,于是就有了CreateItemCommand,使用者要在界面上填寫添加操作必須的資料,于是就有了指令的屬性。
關于指令的定義如下:
ICommand接口:包含Id屬性,這個Id表示Command對應聚合的Id。聚合是領域驅動開發(DDD)的概念,表示一組強關聯的領域對象,而對聚合中狀态的變更,隻能通過聚合根(AggregateRoot)來完成。
Command類:實作了ICommand接口,并增加了Version屬性,用來标記目前操作對應的聚合跟的版本。
為什麼要有版本的概念的?因為當使用ES模式的時候,資料庫中的資料都是事件産生的資料鏡像,儲存了某個時間點的資料快照,如果要擷取到最新的資料,則需要通過加載該聚合根對應的所有Event來回放到最新狀态。如果引入版本的概念,每一個Event對應一個版本,而景象中的資料也有一個版本,在進行回放的時候,可以僅加載高版本的Event進行回放,節省了系統資源,并提高了運作效率。
指令處理程式,它的作用是處理與它相對應的指令,處理CQRS的核心,接口定義如下:
它接收command作為參數,執行該指令的處理邏輯。每一個指令都有一個與之對應的處理程式。
我們再重新梳理一下流程,首先使用者要新增一個資料,點選儲存按鈕後,生成CreateItemCommand指令,随後這個指令被發送到CommandBus中,CommandBus通過CommandHandlerFactory找到該Command的處理程式,此時在CommandBus的Send方法中,我們有一個Command和CommandHandler,然後調用CommandHandler的Execute方法,即完成了該方法的處理。至此,Command的處理流程完結。
我們來看一下CreateItemCommand的代碼:
它繼承自Command基類,繼承後即擁有了Id和Version屬性,然後又定義了幾個其它的屬性。它隻包含資料,與該指令對應的處理程式叫做CreateItemCommandHandler,代碼如下:
這才是我們要分析的核心,在Handler中,我們看到了Repository,看到了DiaryItem聚合:
IRepository:倉儲類,代表資料的儲存方式,通過倉儲能夠進行資料操作
DiaryItem:領域對象,聚合根,所有資料狀态的變更隻能通過聚合根來修改
在上面的代碼中,由于是新增,是以聚合的版本為-1,然後調用倉儲的Save方法進行儲存。我們繼續往下扒,看看倉儲和聚合的實作。
對于Repository的定義,仍然先看一下接口中的定義,代碼如下:
在倉儲中隻有兩個方法:
Save(AggregateRoot aggregate, int expectedVersion):儲存期望版本的聚合根
GetById(Guid id):根據聚合根Id擷取聚合根
關于IRepository的實作,代碼在Repository.cs中,我們拆開來進行介紹:
首先是它的構造函數,強依賴IEventStorage,通過構造函數注入。EventStorage是事件的儲存倉庫,有個更為熟知的名字EventStore,我們稍後進行介紹。
GetById(Guid id)方法通過Id擷取一個聚合對象,擷取一個聚合對象有以下幾個步驟:
首先會從EventStorage中擷取到該聚合的快照(memento的翻譯為記憶碎片、紀念品、備忘錄,用來聚合對象的快照)。
加載Event清單,加載到的事件清單将用來做事件回放。
如果擷取到快照的話,則加載版本高于該快照版本的事件清單,如果沒有擷取到快照,則加載全部事件清單。此處在上面已經介紹過,通過快照的方式儲存聚合對象,在擷取資料時可以減少重放事件的數量,起到提高加載速度的作用。
執行個體化聚合根,對應代碼中的<code>var obj = new T();</code>。
從快照中設定聚合根的狀态。在擷取到快照以後,如果快照不為空,則調用聚合根的SetMemento方法設定為快照中的狀态,SetMemento方法定義在IOriginator接口中,聚合根需要實作該接口。
加載曆史事件,完成重放。完成這個步驟以後,聚合根将更新到最新狀态。
通過這幾個步驟以後,我們得到了一個最新狀态的聚合根對象。
Save方法,用來儲存一個聚合根對象。在這個方法中,參數expectedVersion表示期望的版本,這裡約定<code>-1</code>為新增的聚合根,當聚合根為新增的時候,會直接調用EventStorage中的Save方法。
關于expectedVersion參數,我們可以了解為對并發的控制,隻有當expectedVersion與GetById擷取到的聚合根對象的版本相同時才能進行儲存操作。
在介紹Repository類的時候,我們接觸了兩個新的概念:EventStorage和AggregateRoot,接下來我們分别進行介紹。
AggregateRoot是聚合根,他表示一組強關聯的領域對象,所有對象的狀态變更隻能通過聚合根來完成,這樣可以保證資料的一緻性,以及減少并發沖突。應用到EventSourcing模式中,聚合根的好處也是很明顯的,我們所有對資料狀态的變更都通過聚合根完成,而每次變更,聚合根都會生成相應的事件,在進行事件回放的時候,又通過聚合根來完成曆史事件的加載。由此我們可以看到,聚合根對象應該具備生成事件、重放事件的能力。
我們來看看聚合根基類的定義,在Domain檔案夾中:
首先這是一個抽象類,實作了IEventProvider接口,該接口的定義如下:
它定義了兩個方法,我們分别進行說明:
LoadsFromHistory()方法:加載曆史事件,還原聚合根的最新狀态,我們在Repository中已經用過這個方法。
GetUncommittedChanges()方法:擷取未送出的事件。一個指令可能造成聚合根發生多次更改,每次更改都會産生一個事件,這些事件被暫時的儲存在聚合根對象中,通過該方法可以擷取到未送出的事件清單。
為了實作這個接口,聚合根中定義了 <code>List<Event> _changes</code>對象,用來臨時存儲所有未送出的事件,該對象在構造函數中進行初始化。
AggregateRoot中對于該事件的實作如下:
LoadsFromHistory方法周遊曆史事件,并調用ApplyChange方法更新聚合根的狀态,在完成更新後設定版本号為最後一個事件的版本。GetUncommittedChanges方法比較簡單,傳回對象的_changes事件清單。
接下來我們看看ApplyChange方法,該方法有兩個實作,代碼如下:
這兩個方法定義為protected,隻能被子類通路。我們可以了解為,ApplyChange(Event @event)方法為簡化操作,對第二個參數進行了預設為true的操作,然後調用ApplyChange(Event @event, bool isNew)方法。
在ApplyChange(Event @event, bool isNew)方法中,調用了聚合根的Handle方法,用來處理事件。如果isNew參數為true,則将事件添加到change清單中,如果為false,則認為是在進行事件回放,是以不進行事件的添加。
需要注意的是,聚合根的Handle方法,與EventHandler不同,當Event産生以後,首先由它對應的聚合根進行處理,是以聚合根要具備處理該事件的能力,如何具備呢?聚合根要實作IHandle接口,該接口的定義如下:
這裡可以看出,IHandle接口是泛型的,它隻對一個具體的Event類型生效,在代碼上的展現如下:
最後,聚合根還定義了清除所有事件的方法,代碼如下:
MarkChangesAsCommitted()方法用來清空事件清單。
終于到我們今天的另外一個核心内容了,Event是ES中的一等公民,所有的狀态變更最終都以Event的形式進行存儲,當我們要檢視聚合根最新狀态的時候,可以通過事件回放來擷取。我們來看看Event的定義:
IEvent接口定義了一個事件必須擁有唯一的Id進行辨別。然後Event實作了IEvent接口:
可以看到,除了Id屬性外,還添加了兩個字段Version和AggregateId。AggregateId表示該事件關聯的聚合根Id,通過該Id可以擷取到唯一的聚合根對象;Version表示事件發生時該事件的版本,每次産生新的事件,Version都會進行累加。
進而可以知道,在EventStorage中,聚合根Id對應的所有Event中的Version是順序累加的,按照Version進行排序可以得到事件發生的先後順序。
顧名思義,EventStorage是用來存儲Event的地方。在Diary.CQRS中,EventStorage的定義如下:
GetEvents(Guid aggregateId):根據聚合根Id擷取該聚合根的所有事件
Save(AggregateRoot aggregate):儲存方法,入參為聚合根對象,在實作上則是擷取聚合根中所有未送出的事件,随後對這些事件進行處理
GetMemento():擷取快照
SaveMemento():存儲快照
Diary.CQRS中使用InMemory的方式實作了EventStorage,屬性和構造函數如下:
_events:事件清單,記憶體中存儲事件的位置,所有事件最終都會存儲在該清單中
_mementoes:快照清單,用于存儲聚合根的某個事件版本的狀态
_eventBus:事件總線,用于釋出任務
當Event生成後,它并沒有馬上存入EventStorage,而是在Repository顯示調用Save方法時,倉儲将存儲權交給了EventStorage,EventStorage是事件倉庫,事件倉儲在存儲時進行了如下操作:
擷取聚合根中所有未送出的Event,同時擷取到聚合根目前的版本号
周遊未送出Event清單,根據聚合根版本号自動為Event生成版本号,保持自增長的特性;
生成聚合根快照。示例中每3個版本生成一次,并保持到事件倉儲中。
将任務添加到事件倉庫中。
再次周遊未送出Event清單,此時将進行任務釋出,調用事件總線的Publish方法進行釋出。
Save方法的代碼如下:
至此Event的處理流程就算完結了。此時所有的操作都是在主庫完成的,當事件被釋出以後,訂閱了該事件的所有Handler都将會被觸發。
在Diary.CQRS項目中,EventHandler都被用來處理ReportDatabase了。
當你使用ES模式時,都存在一個嚴重問題,那就是資料查詢的問題。當使用者進行資料檢索是,必然會使用各種查詢條件,然而無論那種事件倉庫都很難滿足複雜查詢。為了解決此問題,ReportDatabase就顯得格外重要。
ReportDatabase的作用被定義為擷取資料、應對資料查詢、生成報表等,它的結構與主庫不同,可以根據不同的業務場景進行定義。
ReportDatabase的資料不是通過業務邏輯進行更新的,它通過訂閱Event進行更新。在本示例中ReportDatabase實作的很簡單,接口定義如下:
實作上,通過記憶體中維護一個清單,每次接收到事件以後,都對相應資料進行更新,此處不在貼出。
在上文中已經介紹過Event,而針對Event的處理,實作邏輯上與Command非常相似,唯一的差別是,指令隻可以有一個對應的處理程式,而事件則可以有多個處理程式。是以在EventHandlerFactory中擷取處理程式的方法傳回了EventHandler清單,代碼如下:
在EventBus中,如果一個事件沒有處理程式也不會引發錯誤,如果有一個或多個處理程式,則會以此調用他們的Handle方法,代碼如下:
Diary.CQRS是一個典型的CQRS+ES示範項目,通過對該項目的分析,我們能了解到Command、AggregateRoot、Event、EventStorage、ReportDatabase的基礎知識,了解他們互相關系,尤其是如何進行事件存儲、如何進行事件回放的内容。
另外,我們發現在使用CQRS+ES的過程中,項目的複雜度增加了很多,我們不可避免的要使用EventStore、Messaging等架構,進而影響那些不了解CQRS的團隊成員的加入,是以在應用到實際項目的時候,要适可而止,慎重選擇,避免過度設計。
由于這是一個示例,項目代碼中存在很多不夠嚴謹的地方,大家在學習的過程中應進行甄别。
由于本人的知識有限,如果内容中存在不準确或錯誤的地方,還請不吝賜教!
2021年9月 北京、西安兩地,高薪誠聘 .NET工程師,請私信聯系!
如果認為此文對您有幫助,别忘了支援一下哦!
聲明:本部落格原創文字隻代表本人工作中在某一時間内總結的觀點或結論,與本人所在機關沒有直接利益關系。轉載時請在文章頁面明顯位置給出原文連結。