最近在開發過程中,遇到了一個場景,甚是棘手,在這裡分享一下。希望大家腦洞大開一起來想一下解決思路。鄙人也想了一個方案拿出來和大家一起探討一下是否合理。 一、簡單介紹一下涉及的對象概念 工作單元:維護變化的對象列表,在整塊業務邏輯處理完全之後一次性寫入到資料庫中。 領域事件:領域對象本身發生某些變化時 ...
最近在開發過程中,遇到了一個場景,甚是棘手,在這裡分享一下。希望大家腦洞大開一起來想一下解決思路。鄙人也想了一個方案拿出來和大家一起探討一下是否合理。
一、簡單介紹一下涉及的對象概念
工作單元:維護變化的對象列表,在整塊業務邏輯處理完全之後一次性寫入到資料庫中。
領域事件:領域對象本身發生某些變化時,發佈的通知事件,告訴訂閱者處理相關流程。
二、問題來了
我認為最合理的領域事件的觸發點應該設計在領域對象內部,那麼問題來了。當這個領域對象發生變化的上下文是一個複雜的業務場景,整個流程中會涉及到多個領域對象,所以需要通過工作單元來保證數據寫入的一致性。此時其中各個產生變化的領域對象的領域事件如果實時被髮布出去,那麼當工作單元在最終提交到資料庫時,如果產生了回滾,那麼會導致發佈了錯誤的領域事件,產生未知的後果。
三、問題分析
我能夠想到的方案是,這裡領域事件的發佈也通過一個類似於工作單元一樣的概念進行持續的管理,在領域對象中的發佈只是做一個記錄,只有在工作單元提交成功之後,才實際發佈其中所有的領域事件。
四、說乾就乾
實現類:
1 public class DomainEventConsistentQueue : IDisposable 2 { 3 private readonly List<IDomainEvent> _domainEvents = new List<IDomainEvent>(); 4 private bool _publishing = false; 5 6 public void RegisterEvent(IDomainEvent domainEvent) 7 { 8 if (_publishing) 9 { 10 throw new ApplicationException("當前事件一致性隊列已被髮布,無法添加新的事件!"); 11 } 12 13 if (_domainEvents.Any(ent => ent == domainEvent)) //防止相同事件被重覆添加 14 return; 15 16 _domainEvents.Add(domainEvent); 17 } 18 19 public void Clear() 20 { 21 _domainEvents.Clear(); 22 _publishing = false; 23 } 24 25 public void PublishEvents() 26 { 27 if (_publishing) 28 { 29 return; 30 } 31 32 if (_domainEvents == null) 33 return; 34 35 try 36 { 37 _publishing = true; 38 foreach (var domainEvent in _domainEvents) 39 { 40 DomainEventBus.Instance().Publish(domainEvent); 41 } 42 } 43 finally 44 { 45 Clear(); 46 } 47 } 48 49 public void Dispose() 50 { 51 Clear(); 52 } 53 }
使用方式:
1 var aggregateA = new AggregateRootA(); 2 var aggregateB = new AggregateRootB(); 3 4 using (var queue = new DomainEventConsistentQueue()) 5 { 6 using (var unitwork = new SqlServerUnitOfWork(GlobalConfig.DBConnectString)) 7 { 8 aggregateA.Event(queue); 9 aggregateB.Event(queue); 10 11 var isSuccess = unitwork.Commit(); 12 if (isSuccess) 13 queue.PublishEvents(); 14 } 15 } 16 17 18 public class AggregateRootA : AggregateRoot 19 { 20 public void Event(DomainEventConsistentQueue queue) 21 { 22 queue.RegisterEvent(new DomainEventA()); 23 } 24 } 25 26 public class AggregateRootB : AggregateRoot 27 { 28 public void Event(DomainEventConsistentQueue queue) 29 { 30 queue.RegisterEvent(new DomainEventB()); 31 } 32 } 33 34 public class DomainEventA : IDomainEvent 35 { 36 public DateTime OccurredOn() 37 { 38 throw new NotImplementedException(); 39 } 40 41 public void Read() 42 { 43 throw new NotImplementedException(); 44 } 45 46 public bool IsRead 47 { 48 get { throw new NotImplementedException(); } 49 } 50 } 51 52 public class DomainEventB : IDomainEvent 53 { 54 public DateTime OccurredOn() 55 { 56 throw new NotImplementedException(); 57 } 58 59 public void Read() 60 { 61 throw new NotImplementedException(); 62 } 63 64 public bool IsRead 65 { 66 get { throw new NotImplementedException(); } 67 } 68 }
問題是解決了,但是標紅的這段代碼看著特別變扭,在產生領域事件的領域對象方法上需要增加一個與表達的業務無關的參數,這個大大破壞了DDD設計的初衷——統一語言(Ubiquitous Language),簡潔明瞭的表達出每個業務行為,業務交流應與代碼保持一致。像這2行表達起來如“AggregateRootA Event DomainEventConsistentQueue”這個 DomainEventConsistentQueue其實並不是領域對象,所以其並不是領域的一部分。
五、陷入思考
這裡突然想到,如果在運行中的每個線程的共用區域存儲待發佈的領域事件集合,那麼不就可以隨時隨地的管理當前操作上下文中的領域事件了嗎?這裡需要引入ThreadLocal<T> 類。MSDN的解釋參見https://msdn.microsoft.com/zh-cn/library/dd642243(v=vs.110).aspx。該泛型類可以提供僅針對當前線程的全局存儲空間,正好能夠恰到好處的解決我們現在遇到的問題。
六、說改就改
實現類:
1 public class DomainEventConsistentQueue : IDisposable 2 { 3 private static readonly ThreadLocal<List<IDomainEvent>> _domainEvents = new ThreadLocal<List<IDomainEvent>>(); 4 private static readonly ThreadLocal<bool> _publishing = new ThreadLocal<bool> { Value = false }; 5 6 private static DomainEventConsistentQueue _current; 7 /// <summary> 8 /// 獲取當前的領域事件一致性隊列。 9 /// 由於使用了線程本地存儲變數,此處為單例模式。 10 /// </summary> 11 /// <returns></returns> 12 public static DomainEventConsistentQueue Current() 13 { 14 if (_current != null) 15 return _current; 16 var temp = new DomainEventConsistentQueue(); 17 Interlocked.CompareExchange(ref _current, temp, null); 18 return temp; 19 } 20 21 public void RegisterEvent(IDomainEvent domainEvent) 22 { 23 if (_publishing.Value) 24 { 25 throw new ApplicationException("當前事件一致性隊列已被髮布,無法添加新的事件!"); 26 } 27 28 var domainEvents = _domainEvents.Value; 29 if (domainEvents == null) 30 { 31 domainEvents = new List<IDomainEvent>(); 32 _domainEvents.Value = domainEvents; 33 } 34 35 if (domainEvents.Any(ent => ent == domainEvent)) //防止相同事件被重覆添加 36 return; 37 38 domainEvents.Add(domainEvent); 39 } 40 41 public void Clear() 42 { 43 _domainEvents.Value = null; 44 _publishing.Value = false; 45 } 46 47 public void PublishEvents() 48 { 49 if (_publishing.Value) 50 { 51 return; 52 } 53 54 if (_domainEvents.Value == null) 55 return; 56 57 try 58 { 59 _publishing.Value = true; 60 foreach (var domainEvent in _domainEvents.Value) 61 { 62 DomainEventBus.Instance().Publish(domainEvent); 63 } 64 } 65 finally 66 { 67 Clear(); 68 } 69 } 70 71 public void Dispose() 72 { 73 Clear(); 74 } 75 }
使用方式:
1 var aggregateA = new AggregateRootA(); 2 var aggregateB = new AggregateRootB(); 3 4 using (var queue = DomainEventConsistentQueue.Current()) 5 { 6 using (var unitwork = new SqlServerUnitOfWork(GlobalConfig.DBConnectString)) 7 { 8 aggregateA.Event(); 9 aggregateB.Event(); 10 11 var isSuccess = unitwork.Commit(); 12 if (isSuccess) 13 queue.PublishEvents(); 14 } 15 } 16 17 public class AggregateRootA : AggregateRoot 18 { 19 public void Event() 20 { 21 DomainEventConsistentQueue.Current().RegisterEvent(new DomainEventA()); 22 } 23 } 24 25 public class AggregateRootB : AggregateRoot 26 { 27 public void Event() 28 { 29 DomainEventConsistentQueue.Current().RegisterEvent(new DomainEventB()); 30 } 31 } 32 33 public class DomainEventA : IDomainEvent 34 { 35 public DateTime OccurredOn() 36 { 37 throw new NotImplementedException(); 38 } 39 40 public void Read() 41 { 42 throw new NotImplementedException(); 43 } 44 45 public bool IsRead 46 { 47 get { throw new NotImplementedException(); } 48 } 49 } 50 51 public class DomainEventB : IDomainEvent 52 { 53 public DateTime OccurredOn() 54 { 55 throw new NotImplementedException(); 56 } 57 58 public void Read() 59 { 60 throw new NotImplementedException(); 61 } 62 63 public bool IsRead 64 { 65 get { throw new NotImplementedException(); } 66 } 67 }
這樣代碼看起來比之前優雅多了。這裡的 DomainEventConsistentQueue.Current() 中操作的變數針對同一個線程在哪都是共用的,所以我們只管往裡丟數據就好了~
七、方案的局限性。
對於執行上下文的要求較高,整個領域事件的發佈必須要求在同一線程內操作。所以在使用的過程中儘量避免這種情況的發生。如果實在無法避免只能通過把DomainEventConsistentQueue 當作變數在多個線程之間傳遞了。
以上是個人的想法,可能有所考慮不周~ 不知道各位園子里的小伙伴們是否有處理過類似場景的經驗,歡迎留言探討,相互學習~
作者: Zachary_Fan
出處:http://www.cnblogs.com/Zachary-Fan/p/5586887.html