設計目標 儘量快的處理命令和事件,保證吞吐量; 處理完一個命令後不需要等待命令產生的事件持久化完成就能處理下一個命令,從而保證領域內的業務邏輯處理不依賴於持久化IO,實現真正的in-memory; 保證命令、事件處理的順序性,先來的先處理,先產生的先處理; 保證一個聚合根的事件只有一個線程在持久化,
設計目標
- 儘量快的處理命令和事件,保證吞吐量;
- 處理完一個命令後不需要等待命令產生的事件持久化完成就能處理下一個命令,從而保證領域內的業務邏輯處理不依賴於持久化IO,實現真正的in-memory;
- 保證命令、事件處理的順序性,先來的先處理,先產生的先處理;
- 保證一個聚合根的事件只有一個線程在持久化,並按事件產生的順序持久化;
- 持久化事件時如果遇到併發衝突時(聚合根ID+事件版本號出現重覆)的處理代價要輕;
- 要能利用多核的優勢;
總體設計思路
- 先將命令根據聚合根ID路由到CommandMailBox里;
- 單線程處理CommandMailBox中的命令,由於聚合根在in-memory本地記憶體,所以處理非常快;
- 處理成功後更新聚合根的in-memory記憶體;
- 記憶體更新後將聚合根產生的事件同樣原理路由到EventMailBox里;
- 單線程批量處理EventMailBox中的事件;由於是批量,所以持久化的吞吐量也可以保證;
- 處理完成一批事件後,把這一批事件對應的命令從CommandMailBox中移除;
詳細設計思路
- 設計N個存放命令的CommandMailBox,命令首先按聚合根ID的hashcode取摸路由到對應的CommandMailBox;
- 每個CommandMailBox都有一個maxOffset, consumeOffset,以及一個CommandProcessor(單線程)在不停的處理;maxOffset表示最後一個命令的位置;consumeOffset表示當前正在處理的命令的位置;
- CommandProcessor的處理邏輯;
- 創建、修改聚合根;
- 更新聚合根的in-memory緩存;
- 將聚合根產生的事件按聚合根ID的hashcode取摸路由到對應的EventMailBox;EventMailBox的個數也是程式啟動時配置;
- 每個EventMailBox都有一個maxOffset, consumeOffset,以及一個EventProcessor(單線程)在不停的處理;maxOffset表示最後一個事件的位置;consumeOffset表示當前正在處理的事件的位置;
- EventProcessor的處理邏輯:
- 按次序批量獲取一批要處理的事件;
- 批量持久化事件到EventStore,採用SqlBulkCopy;
- 如果持久化一切順利,則publish這一批事件(publish如果遇到網路IO異常,則重試,直到成功為止),然後繼續持久化下一批,並同時將當前這一批事件對應的命令從CommandMailBox中刪除;. 如果持久化遇到併發衝突(事件的aggregateRootId+Version重覆),則對當前這一批事件一個個按順序持久化。如果當前事件持久化成功,則同樣publish該事件,當然遇到IO異常時也要不斷重試,直到成功為止;成功後通知CommandMailBox移除當前事件對應的命令;如果當前事件持久化出現併發衝突,就做如下處理:
- 先通知當前事件對應聚合根暫停處理後續的命令;
- 用Event Sourcing技術將in-memory中的聚合根的狀態還原到最新狀態,確保下次執行command時基於的聚合根的狀態是最新的;
- 把這一批里該聚合根的所有事件移除,把EventMailBox中的該聚合根的所有事件移除;
- 將CommandMailBox的處理位置重置為當前事件對應的命令的offset;從而可以確保產生併發衝突的事件對應的命令以及後續的命令能再重新被處理一遍;
- 通知當前事件對應聚合根繼續處理後續的命令(從哪個位置開始處理,在上面第4步已經重置過了);
- 這一批的所有事件都一個個處理完之後,按同樣的邏輯繼續處理下一批事件;
其他說明
- 上面的設計基於一個前提,就是一個聚合根幾乎不會同時在兩台伺服器上同時存在並處理命令,否則就會出現併發衝突,而併發衝突的處理的代價還是比較複雜的,應該儘量避免;這點可以通過EQueue保證;
- 當聚合根處理了命令,嘗試更新in-memory記憶體時,可能有一種情況會失敗。就是如果這個命令是創建聚合根的,而有可能併發的時候這個聚合根可能在記憶體中已經有了,則創建完聚合根添加到記憶體時,應該能檢測出來並記錄錯誤日誌,然後該命令產生的事件也不必放入EventMailBox,然後認為該命令處理成功即可。
- 上面的設計中沒有談到當遇到命令重覆執行時的設計思路,大家可以自己想想:)