天天看點

ENode架構單台機器在處理Command時的設計思路

盡量快的處理指令和事件,保證吞吐量;

處理完一個指令後不需要等待指令産生的事件持久化完成就能處理下一個指令,進而保證領域内的業務邏輯處理不依賴于持久化io,實作真正的in-memory;

保證指令、事件處理的順序性,先來的先處理,先産生的先處理;

保證一個聚合根的事件隻有一個線程在持久化,并按事件産生的順序持久化;

持久化事件時如果遇到并發沖突時(聚合根id+事件版本号出現重複)的處理代價要輕;

要能利用多核的優勢;

先将指令根據聚合根id路由到commandmailbox裡;

單線程處理commandmailbox中的指令,由于聚合根在in-memory本地記憶體,是以處理非常快;

處理成功後更新聚合根的in-memory記憶體;

記憶體更新後将聚合根産生的事件同樣原理路由到eventmailbox裡;

單線程批量處理eventmailbox中的事件;由于是批量,是以持久化的吞吐量也可以保證;

處理完成一批事件後,把這一批事件對應的指令從commandmailbox中移除;

設計n個存放指令的commandmailbox,指令首先按聚合根id的hashcode取摸路由到對應的commandmailbox;

每個commandmailbox都有一個maxoffset, consumeoffset,以及一個commandprocessor(單線程)在不停的處理;maxoffset表示最後一個指令的位置;consumeoffset表示目前正在處理的指令的位置;

commandprocessor的處理邏輯;

建立、修改聚合根;

更新聚合根的in-memory緩存;

将聚合根産生的事件按聚合根id的hashcode取摸路由到對應的eventmailbox;eventmailbox的個數也是程式啟動時配置;

每個eventmailbox都有一個maxoffset, consumeoffset,以及一個eventprocessor(單線程)在不停的處理;maxoffset表示最後一個事件的位置;consumeoffset表示目前正在處理的事件的位置;

eventprocessor的處理邏輯:

按次序批量擷取一批要處理的事件;

批量持久化事件到eventstore,采用sqlbulkcopy;

如果持久化一切順利,則publish這一批事件(publish如果遇到網絡io異常,則重試,直到成功為止),然後繼續持久化下一批,并同時将目前這一批事件對應的指令從commandmailbox中删除;.如果持久化遇到并發沖突(事件的aggregaterootid+version重複),則對目前這一批事件一個個按順序持久化。如果目前事件持久化成功,則同樣publish該事件,當然遇到io異常時也要不斷重試,直到成功為止;成功後通知commandmailbox移除目前事件對應的指令;如果目前事件持久化出現并發沖突,就做如下處理:

先通知目前事件對應聚合根暫停處理後續的指令;

用event sourcing技術将in-memory中的聚合根的狀态還原到最新狀态,確定下次執行command時基于的聚合根的狀态是最新的;

把這一批裡該聚合根的所有事件移除,把eventmailbox中的該聚合根的所有事件移除;

将commandmailbox的處理位置重置為目前事件對應的指令的offset;進而可以確定産生并發沖突的事件對應的指令以及後續的指令能再重新被處理一遍;

通知目前事件對應聚合根繼續處理後續的指令(從哪個位置開始處理,在上面第4步已經重置過了);

這一批的所有事件都一個個處理完之後,按同樣的邏輯繼續處理下一批事件;

上面的設計基于一個前提,就是一個聚合根幾乎不會同時在兩台伺服器上同時存在并處理指令,否則就會出現并發沖突,而并發沖突的處理的代價還是比較複雜的,應該盡量避免;這點可以通過equeue保證;

當聚合根處理了指令,嘗試更新in-memory記憶體時,可能有一種情況會失敗。就是如果這個指令是建立聚合根的,而有可能并發的時候這個聚合根可能在記憶體中已經有了,則建立完聚合根添加到記憶體時,應該能檢測出來并記錄錯誤日志,然後該指令産生的事件也不必放入eventmailbox,然後認為該指令處理成功即可。

上面的設計中沒有談到當遇到指令重複執行時的設計思路,大家可以自己想想:)

繼續閱讀