為什麼寫這篇文章 自己以前都走了彎路,以為學習戰術設計就會DDD了,其實DDD的精華在戰略設計,但是對於我們菜鳥來說,學習一些技術概念也是挺好的 經常看到這些術語,概念太多,也想簡單學習一下,記憶力比較差記錄一下實現的細節 領域事件 1.領域事件是過去發生的與業務有關的事實,一但發生就不可更改,所以 ...
為什麼寫這篇文章
自己以前都走了彎路,以為學習戰術設計就會DDD了,其實DDD的精華在戰略設計,但是對於我們菜鳥來說,學習一些技術概念也是挺好的
經常看到這些術語,概念太多,也想簡單學習一下,記憶力比較差記錄一下實現的細節
領域事件
1.領域事件是過去發生的與業務有關的事實,一但發生就不可更改,所以存儲事件時只能追加
3.領域事件具有時間點的特征,所有事件連接起來會形成明顯的時間軸
4.領域事件會導致目標對象狀態的變化,聚合根的行為會產生領域事件,所以會改變聚合的狀態
在聚合根裡面維護一個領域事件的聚合,每一個事件對應一個Handle,通過反射維護一個數據字典,通過事件查找到指定的Handle
領域事件實現的方式:目前看到有3種方式,MediatR,消息隊列 ,發佈訂閱模式
eShopOnContainers 中使用的是MediatR
ENode 中使用的是EQueue,EQueue是一個純C#寫的消息隊列
使用已經寫好的消息隊列Rabbitmq ,kafka
事件存儲,事件溯源,事件快照
事件存儲:存儲所有聚合根裡面發生過的事件
1.事件存儲中可以做併發的處理,比如Command 重覆,領域事件的重覆
2.領域事件的重覆通過聚合根Id+版本號判斷,可以在資料庫中建立聯合唯一索引,在存儲事件時檢測重覆,記錄重覆的事件,根據業務做處理
3.這裡要保證存儲事件與發佈領域事件的一致性
如何保證存儲事件與發佈領域事件的一致性
先存儲事件然後在發佈領域事件,如果發生異常,就一直重試,一直到成功為止,也可以做一定的處理,比如重試到一定的次數,就通知,進行人工處理
我選擇了CAP + Policy + Dapper
事件溯源:在事件存儲中記錄導致狀態變化的一系列領域事件。通過持久化記錄改變狀態的事件,通過重新播放獲得狀態改變的歷史。 事件回放可以返回系統到任何狀態
聚合快照:聚合的生命周期各有長短,有的聚合裡面有大量的事件,,事件越多載入事件以及重建聚合的執行效率就會越來越低,快照裡面存儲的是聚合
1.定時存儲整個聚合根:使用定時器每隔一段時間就存儲聚合到快照表中
2.定量存儲整個聚合根:根據事件存儲中的數量來存儲聚合到快照表中
事件溯源的實現方式
1.首先我們需要實現聚合In Memory,
2.在CommandHandler中訂閱 Command命令,
創建聚合時 ,在記憶體中維護一個數據字典,key為:聚合根的Id,value為:聚合
修改,刪除,聚合時,根據聚合根的Id,查詢出聚合
如果記憶體中聚合不存在時:根據聚合根的Id 從聚合快照表中查詢出聚合,然後根據聚合快照存儲的時間,聚合根Id,查詢事件存儲中的所有事件,然後回放事件,得到聚合最終的狀態
記錄遇到的問題
由於基礎非常的差,所以實現的方式都是以最簡單的方式來寫的,存在許多的問題,代碼中有問題的地方希望大家提出來,讓我學習一下
代碼的實現目前還沒有寫快照的部分,也沒有處理EventStorage中的命令重覆與聚合根+版本號重覆,具體的請看湯總的ENode,裡面有全部的實現
1.怎樣保證存儲事件,發佈事件的最終一致性
2.怎麼解析EventStorage中的事件,回放事件
先存儲事件,當事件存儲成功之後,在發佈事件
存儲事件失敗:就一直重試,發佈事件失敗,使用的是CAP,CAP內部使用的是本地消息表的方式,如果發佈事件失敗,也一直重試,如果伺服器重啟了,Rabbitmq裡面消息為Ack,消息沒有丟,重連後會繼續執行
存儲事件,發佈事件
/// <summary>
/// 存儲聚合根中的事件到EventStorage 發佈事件
/// </summary>
/// <typeparam name="TAggregationRoot"></typeparam>
/// <param name="event"></param>
/// <returns></returns>
public async Task AppendEventStoragePublishEventAsync<TAggregationRoot>(TAggregationRoot @event)
where TAggregationRoot : IAggregationRoot
{
var domainEventList = @event.UncommittedEvents.ToList();
if (domainEventList.Count == 0)
{
throw new Exception("請添加事件!");
}
await TryAppendEventStorageAsync(domainEventList).ContinueWith(async e =>
{
if (e.Result == (int)EventStorageStatus.Success)
{
await TryPublishDomainEventAsync(domainEventList).ConfigureAwait(false);
@event.ClearEvents();
}
});
}
/// <summary>
/// 發佈領域事件
/// </summary>
/// <returns></returns>
public async Task PublishDomainEventAsync(List<IDomainEvent> domainEventList)
{
using (var connection =
new SqlConnection(ConnectionStr))
{
if (connection.State == ConnectionState.Closed)
{
await connection.OpenAsync().ConfigureAwait(false);
}
using (var transaction = await connection.BeginTransactionAsync().ConfigureAwait(false))
{
try
{
if (domainEventList.Count > 0)
{
foreach (var domainEvent in domainEventList)
{
await _capPublisher.PublishAsync(domainEvent.GetRoutingKey(), domainEvent).ConfigureAwait(false);
}
}
await transaction.CommitAsync().ConfigureAwait(false);
}
catch (Exception e)
{
await transaction.RollbackAsync().ConfigureAwait(false);
throw;
}
}
}
}
/// <summary>
/// 發佈領域事件重試
/// </summary>
/// <param name="domainEventList"></param>
/// <returns></returns>
public async Task TryPublishDomainEventAsync(List<IDomainEvent> domainEventList)
{
var policy = Policy.Handle<SocketException>().Or<IOException>().Or<Exception>()
.RetryForeverAsync(onRetry: exception =>
{
Task.Factory.StartNew(() =>
{
//記錄重試的信息
_loggerHelper.LogInfo("發佈領域事件異常", exception.Message);
});
});
await policy.ExecuteAsync(async () =>
{
await PublishDomainEventAsync(domainEventList).ConfigureAwait(false);
});
}
/// <summary>
/// 存儲聚合根中的事件到EventStorage中
/// </summary>
/// <returns></returns>
public async Task<int> AppendEventStorageAsync(List<IDomainEvent> domainEventList)
{
if (domainEventList.Count == 0)
{
throw new Exception("請添加事件!");
}
var status = (int)EventStorageStatus.Failure;
using (var connection = new SqlConnection(ConnectionStr))
{
try
{
if (connection.State == ConnectionState.Closed)
{
await connection.OpenAsync().ConfigureAwait(false);
}
using (var transaction = await connection.BeginTransactionAsync().ConfigureAwait(false))
{
try
{
if (domainEventList.Count > 0)
{
foreach (var domainEvent in domainEventList)
{
EventStorage eventStorage = new EventStorage
{
Id = Guid.NewGuid(),
AggregateRootId = domainEvent.AggregateRootId,
AggregateRootType = domainEvent.AggregateRootType,
CreateDateTime = domainEvent.CreateDateTime,
Version = domainEvent.Version,
EventData = Events(domainEvent)
};
var eventStorageSql =
$"INSERT INTO EventStorageInfo(Id,AggregateRootId,AggregateRootType,CreateDateTime,Version,EventData) VALUES (@Id,@AggregateRootId,@AggregateRootType,@CreateDateTime,@Version,@EventData)";
await connection.ExecuteAsync(eventStorageSql, eventStorage, transaction).ConfigureAwait(false);
}
}
await transaction.CommitAsync().ConfigureAwait(false);
status = (int)EventStorageStatus.Success;
}
catch (Exception e)
{
await transaction.RollbackAsync().ConfigureAwait(false);
throw;
}
}
}
catch (Exception e)
{
connection.Close();
throw;
}
}
return status;
}
/// <summary>
/// AppendEventStorageAsync異常重試
/// </summary>
public async Task<int> TryAppendEventStorageAsync(List<IDomainEvent> domainEventList)
{
var policy = Policy.Handle<SocketException>().Or<IOException>().Or<Exception>()
.RetryForeverAsync(onRetry: exception =>
{
Task.Factory.StartNew(() =>
{
//記錄重試的信息
_loggerHelper.LogInfo("存儲事件異常", exception.Message);
});
});
var result = await policy.ExecuteAsync(async () =>
{
var resulted = await AppendEventStorageAsync(domainEventList).ConfigureAwait(false);
return resulted;
});
return result;
}
/// <summary>
/// 根據DomainEvent序列化事件Json
/// </summary>
/// <param name="domainEvent"></param>
/// <returns></returns>
public string Events(IDomainEvent domainEvent)
{
ConcurrentDictionary<string, string> dictionary = new ConcurrentDictionary<string, string>();
//獲取領域事件的類型(方便解析Json)
var domainEventTypeName = domainEvent.GetType().Name;
var domainEventStr = JsonConvert.SerializeObject(domainEvent);
dictionary.GetOrAdd(domainEventTypeName, domainEventStr);
var eventData = JsonConvert.SerializeObject(dictionary);
return eventData;
}
解析EventStorage中存儲的事件
public async Task<List<IDomainEvent>> GetAggregateRootEventStorageById(Guid AggregateRootId)
{
try
{
using (var connection = new SqlConnection(ConnectionStr))
{
var eventStorageList = await connection.QueryAsync<EventStorage>($"SELECT * FROM dbo.EventStorageInfo WHERE AggregateRootId='{AggregateRootId}'");
List<IDomainEvent> domainEventList = new List<IDomainEvent>();
foreach (var item in eventStorageList)
{
var dictionaryDomainEvent = JsonConvert.DeserializeObject<Dictionary<string, string>>(item.EventData);
foreach (var entry in dictionaryDomainEvent)
{
var domainEventType = TypeNameProvider.GetType(entry.Key);
if (domainEventType != null)
{
var domainEvent = JsonConvert.DeserializeObject(entry.Value, domainEventType) as IDomainEvent;
domainEventList.Add(domainEvent);
}
}
}
return domainEventList;
}
}
catch (Exception ex)
{
throw;
}
註意事項
1.事件沒持久化就代表事件還沒發生成功,事件存儲可能失敗,必須先存儲事件,在發佈事件,保證存儲事件與發佈事件一致性
1.使用事件驅動,必須要做好冥等的處理
2.如果業務場景中有狀態時:通過狀態來控制
3.新建一張表,用來記錄消費的信息,消費端的代碼裡面,根據唯一的標識,判斷是否處理過該事件
4.Q端的任何更新都應該把聚合根ID和事件版本號作為條件,Q端的更新不用遵循聚合的原則,可以使用最簡單的方式處理
5.倉儲是用來重建聚合的,它的行為和集合一樣只有Get ,Add ,Delete
6.DDD不是技術,是思想,核心在戰略模塊,戰術設計是實現的一種選擇,戰略設計,需要面向對象的分析能力,職責分配,深層次的分析業務
感謝
雖然學習DDD的時間不短了,感覺還是在入門階段,在學習的過程中有許多的不解,經常問ENode群裡面的大佬,也經常@湯總,謝謝大家的幫助與解惑。