架構名稱:enode
架構特色:ddd+cqrs + eda + event sourcing + in memory
設計目标:讓程式員隻關注業務代碼、高性能、分布式、可水準擴充
nuget包id:enode

熟悉cqrs架構的人看到這圖應該就再熟悉不過了,enode實作的是一個cqrs架構。基本的概念就不多介紹了,如果大家對上圖中的一些概念還不太清楚,可以看一下我的部落格裡的其他相關文章,我應該都有寫到。下面主要介紹一下enode 2.0在實作cqrs架構時的一些不一樣的地方(由于篇幅的限制,先說三點吧):
就是你不能在command handler中一次修改多個聚合根,我覺得這應該是enode對開發人員的最大限制,可能也是最讓開發人員覺得不爽的地方。但我覺得這個不是限制,而是對資料強一緻性和最終一緻性的一個正确認識。在我學過ddd+cqrs+event sourcing這三個東西之後,我認識到,聚合内必須確定強一緻性,聚合間最終一緻性。傳統三層開發,我們通過unit of work模式(簡稱uow,比如nhibernate的session, entity framework的dbcontext)可以輕易實作多個對象修改的強一緻性事務;确實在傳統三層模式開發中,這種利用uow的方式來實作跨聚合的強一緻性事務的方式很實用,開發起來很友善,開發人員可以不必擔心會出現資料不一緻的問題了,因為所有修改總是在一個事務内儲存。
但enode的設計目标不是為了支援傳統三層開發,而是面向ddd+cqrs+eda+event sourcing架構的架構。曾經我也想讓command handler支援修改多個聚合根,但這樣做必須要面臨一個很棘手的問題:command在發送到command queue時,無法根據聚合根id來路由了。因為一個command會修改多個聚合根,也就是說一個command不會和一個聚合根一一對應了。這意味着同一個聚合根沒辦法總是被路由到同一個command queue裡,這樣就導緻相同id的聚合根可能會在兩台伺服器被同時修改,這就會導緻整個系統可能會頻繁的産生并發更新沖突。很多command就會不斷的重試,整個系統的性能就會下降。而enode設計之初就是為了高性能,是以這點讓我覺得很難接收。
相反,如果一個command總是隻會建立或修改一個聚合根,那我們的command就能根據聚合根id來路由到特定的消息隊列,同一個聚合根id總是會被路由到同一個queue,而一個queue的消費者伺服器(command handler所在的伺服器)同一時刻總是隻有一個,那我們就能保證一個聚合根的修改不會有并發問題。當然光這樣還不夠,在這個command消費者伺服器裡,enode架構會用記憶體級别的queue對同一個聚合根的所有command再次進行排隊(如果需要排隊的話),之是以要這樣是因為有時對一個聚合根的并發修改command可能1s内發送了很多過來,是以command handler肯定來不及在1s内全部處理掉這些command,是以需要在記憶體裡再次排隊(天貓雙十一的時候,應用伺服器内部也會有類似的對同一個聚合根設計一個相應的記憶體queue來避免對同一個聚合根的修改的并發沖突的問題)。通過這樣的設計,我們可以做到絕大部分情況下,不會再有并發沖突的問題,也就是command不會再出現重試的情況。這樣最後的效果就是:不同id的聚合根的處理可以并行,同一個id的聚合根的處理是串行,通過兩級排隊實作。前面說到,這樣隻能做到絕大部分情況下不會有并發沖突,那麼什麼時候還是會有并發沖突呢?就是在新增command消費者伺服器的時候,比如我們發現最近系統繁忙,我們希望增加command消費者伺服器來加快command的處理,那在新增伺服器後,原來修改某個聚合根的command可能會被路由到新的伺服器,但是這個聚合根的有些command可能還在原來的伺服器上還沒執行完,此時就會出現同一個聚合根在兩台伺服器上被同時修改的可能了;那這個怎麼解決呢?我現在的想法是架構層面不必解決了,我們隻需要在系統最空的時候(比如淩晨4點)的時候,增加伺服器即可,因為那個時候消息隊列裡的消息是最少的,也就是不太可能會産生因為增加command handler伺服器而導緻并發沖突的問題,這樣我們就可以最大限度的避免可能帶來的并發沖突。
相比一般的cqrs架構,enode每次在處理一個command,在擷取聚合根時,不是從eventstore擷取,而是從緩存擷取。從上面的架構圖可以看出,enode架構中有一個domain memory cache,目前用redis實作。這樣做的好處是,将所有的聚合根都緩存在redis緩存中,這樣就能提高聚合根的讀取時間;有一個問題需要考慮,redis緩存伺服器當機了怎麼辦?當機後緩存資料就沒了,那如何恢複這些緩存資料呢?這也是我選擇redis的一個主要理由,因為redis支援持久化,我們可以利用redis的aof或快照方式的持久化功能,來持久化緩存資料。進而可以在redis挂了後能最快的速度恢複緩存,重新開機redis伺服器即可。那重新開機之前以及重新開機的過程中,因為無法從redis擷取聚合根了,那隻能從eventstore通過event sourcing的方式去擷取,那樣的話性能肯定會比較差,那怎麼辦呢?答案是通過定時為聚合根建立快照,這也是采用event sourcing架構的一個好處。我們可以定時對某些聚合跟建立快照(注意,我覺得隻需要考慮那些對性能要求很高的子產品所涉及到的聚合根建立快照即可),那怎麼建立呢?可以開一個獨立的程序,監聽domain event,對需要建立快照的domain event做出判斷,根據某種快照建立政策進行判斷,如果認為需要建立快照,則從event store拿出該聚合根的相關事件,通過event sourcing還原得到某個版本的聚合根,這樣就得到了某個聚合根的某個版本的快照了。然後持久化起來即可。然後,enode支援在從event store擷取聚合根前,先檢查是否有快照,如果有快照,則會先加載快照,再把快照之後的domain event從event store擷取,再把這些快照之後的domain event一個個apply到目前聚合根,進而得到最新狀态的聚合根。這個過程比擷取該聚合根的所有領域事件在一個個通過event sourcing還原得到聚合根要快的多;尤其是在一個聚合根的domain event比較多的情況下就更有意義。是以,通過緩存的引入,我們可以提高command handler的處理速度。
另外一點很重要的是,因為我們的command是會發送到分布式消息隊列,然後隊列中的command消息會被取出來執行;大家知道,我們很難保證一個消息不會被重複執行,也就是說,一個command可能會重複執行。是以,我們的應用要支援對command的密等處理。而對于使用enode架構的應用,因為整個command side的資料持久化就是持久化domain event,程式員不必關心domain event的持久化過程。是以enode很有必要能内置支援對command的重複處理的判斷。那麼如何做呢?我覺得最靠譜的做法是,在持久化domain event的時候就能絕對靠譜的檢測出來某個command是否被重複執行了。那很自然就想到将被持久化的domain event和産生他的對應command關聯起來。是以我設計了如下的結構,用來表示一個command在操作聚合根後所産生的領域事件的資訊。
commitid:就是目前的commandid;
aggregaterootid:目前被操作的聚合根的全局唯一id;
aggregateroottypecode:表示聚合根的類型的一個code,通過該code我們可以知道目前記錄是哪個類型的聚合根的;
version:一個版本号,表示聚合根産生領域事件後的新版本号,是産生事件前的版本号+1;也就是說,聚合根的版本是每次被修改一次,那version就加1;
timestamp:一個時間戳,用于記錄産生domain event時的時間;
events:表示目前command操作聚合根後所産生的領域事件,一次操作可以産生多個領域事件;
對于上面的結構體,我們可以實作兩個重要的功能:1)為aggregaterootid和version這兩個字段建立唯一索引,這樣我們就能實作判斷某個聚合根是否被并發修改,因為如果有并發修改導緻并發沖突,那儲存到eventstore時,它們的version肯定是相同的;2)為aggregaterootid和commitid兩個字段建立唯一索引,這樣我們就能判斷某個command是否被重複執行,因為一個command被執行個體化出來後,它所要修改的聚合根id就不可能再修改了,是以如果該command被重複執行,那最後産生的領域事件(上面這個結構體)最後被持久化到eventstore時就會違反這個唯一索引,進而架構就能知道是否有command被重複執行了;
另外,上面這個結構體被儲存到eventstore時,是以一條記錄的方式被儲存,events集合會被序列化為一段二進制;是以,假如我們用關系型資料庫來儲存,那就是隻有一條insert語句即可,這樣就實作了一個聚合根的一次修改的事務持久化。然後因為上兩個索引的存在,我們就能在儲存時判斷是否有并發沖突或command是否被重複執行。
在設計event store時,我考慮了很多。最後認為event store要解決的最大的兩個問題是持久化性能和可水準擴充性。首先,因為每次command handler在處理完一個聚合根後,都會把産生的領域事件持久化到event store,沒持久化完成則不能認為該command已處理完,是以持久化的性能對處理command的吞吐量至關重要。另外一點就是可水準擴充性,因為event store裡儲存的都是domain event,而enode又是為了實作高性能為目标的,是以event store裡的資料肯定會非常多,比如1s中要持久化1k個domain event,那一天就會有8600w條記錄要記錄,一天就真麼多,那1年就更多了,是以用單點存儲所有的domain event顯示不靠譜了。是以我們的event store必須要支援水準擴充。比如我們可以設計100個分區,那每個分區一天隻需要儲存86w條記錄,一年也隻需要儲存3億多條記錄即可。之前我很追求單個存儲節點的高性能,是以曾經想過要用leveldb,stsdb,甚至redis這種高性能的基于key,value的nosql存儲。但後來發現這種nosql存儲雖然性能很高,但因為隻是key,value的存儲結構,是以沒辦法支援二級索引,這樣就沒辦法實作上面第一點中提到的command的幂等處理和聚合根并發沖突的檢測。另一個重要的原因是,event store中的資料我們有時候是要被查詢的。比如現在某個command遇到的并發沖突,那架構需要自動重試,但是重試之前需要先更新redis緩存,就是把eventstore裡的最新的聚合根更新到redis緩存裡,這樣command在重試時才能拿到最新版本的聚合根,這樣重試才能成功。那如何從eventstore裡拿最新的聚合根呢?隻能根據聚合根id從eventstore裡查詢。而聚合根id又不是key,value nosql的key,自然就沒辦法實作這個需求了;是以,我覺得合理的辦法應該是用關系型資料庫來實作eventstore。有人說關系型資料庫的性能不行。我覺得隻要關系型資料庫支援水準擴充,也就是将domain event sharding(分片)到不同的分庫分表中,那平均到每個庫裡的domain event的數量就不大了;這樣整個eventstore的持久化性能就可以随着分庫的數量的增加而線性增加;比如我現在單個db insert domain event的性能是1k tps(mysql配合ssd硬碟完全無壓力,呵呵),那10個庫的tps就能達到1w tps了。因為我們分庫會根據聚合根id的hash code來平均散列,這樣能確定每個庫中的聚合根的domain event數量是基本一樣的;進而就能實作整個event store的持久化性能随着分庫的增加而線性增加。是以,有了分庫的優勢,大資料量和性能都不是問題了。且因為關系型資料庫支援二級索引和唯一索引,那查詢domain event也不是問題了。
上圖是enode在實際項目中我目前認為的一個實體部署結構圖。
首先用戶端浏覽器通過網絡最後通路到我們的web伺服器叢集,當然web伺服器前面肯定還有網關和負載均衡器,我這裡為了突出重點就不畫出來了。然後每個web伺服器接受到httprequest後會生成command,然後通過enode架構發送到分布式消息隊列伺服器(message queue server),目前由我開發的equeue實作。然後消息隊列伺服器上的消息會被推送到command process servers,command process server就是執行command handler、完成domain logic,持久化domain event,以及publish domain event的伺服器。command process server處理完之後,domain event會由enode架構自動發送到message queue server,然後會被event process server處理,event process server就是訂閱domain event,然後根據domain event更新query db。對于查詢,web server可以直接通過sql查詢query db即可。
各種伺服器的叢集:
web server:無狀态,可以任意增加伺服器;
command process server:就是處理業務邏輯的伺服器,也是無狀态,可以任意增加伺服器;但伺服器的數目最好和command所對應的topic下的queue的數量保持一緻,這點後續在寫分布式消息隊列equeue的文章時在詳細談吧;
redis server:就是緩存聚合根的伺服器,屬于緩存伺服器;可以按需要存儲的容量來規劃需要開多少台redis server;目前我覺得最好的redis動态擴容方法就是pre sharding;
event db server:就是存儲domain event的伺服器,按照上的分析,我們采用的是關系型資料庫,比如用mysql;mysql的分庫分表技術已經很成熟,後續文章我們再詳細讨論如何分庫以及如何做資料遷移;
event process server:就是訂閱domain event,根據domain event更新query db的伺服器;可以根據需要來部署多少台,和command process server類似;這裡有一點必須要先提一下,就是在更新query db時,因為每次更新都是針對某個domain event來更新query db的,而domain event隻表示一個聚合根的修改,是以每次我們更新query db時,也隻更新該聚合根所在範圍的表;我們千萬不要去更新超過該聚合根範圍的表,否則就會産生并發沖突,導緻event handler執行失敗;這樣就會是的cqrs的query db同步資料變的很慢。對于query side,如果我們覺得直接從query db查詢資料太慢,可以考慮設計查詢緩存,也就是不走query db來查詢資料,而是走緩存。這種緩存就和我們平時的緩存設計類似了;利用domain event,我們先天就有優勢可以讓緩存非常及時的更新,呵呵。因為一旦有domain event過來,我們就能快速更新我們的query side緩存,而query db就可以異步更新即可。這樣就可以解決query side同步更新資料慢的問題。
好了,就寫這些吧,後續的再後續文章中補上,呵呵。