簡單的學習,實現,領域事件,事件存儲,事件溯源

来源:https://www.cnblogs.com/lifeng618/archive/2019/11/26/11916831.html
-Advertisement-
Play Games

為什麼寫這篇文章 自己以前都走了彎路,以為學習戰術設計就會DDD了,其實DDD的精華在戰略設計,但是對於我們菜鳥來說,學習一些技術概念也是挺好的 經常看到這些術語,概念太多,也想簡單學習一下,記憶力比較差記錄一下實現的細節 領域事件 1.領域事件是過去發生的與業務有關的事實,一但發生就不可更改,所以 ...


為什麼寫這篇文章

自己以前都走了彎路,以為學習戰術設計就會DDD了,其實DDD的精華在戰略設計,但是對於我們菜鳥來說,學習一些技術概念也是挺好的
經常看到這些術語,概念太多,也想簡單學習一下,記憶力比較差記錄一下實現的細節

領域事件


1.領域事件是過去發生的與業務有關的事實,一但發生就不可更改,所以存儲事件時只能追加
3.領域事件具有時間點的特征,所有事件連接起來會形成明顯的時間軸
4.領域事件會導致目標對象狀態的變化,聚合根的行為會產生領域事件,所以會改變聚合的狀態
在聚合根裡面維護一個領域事件的聚合,每一個事件對應一個Handle,通過反射維護一個數據字典,通過事件查找到指定的Handle
領域事件實現的方式:目前看到有3種方式,MediatR,消息隊列 ,發佈訂閱模式
eShopOnContainers 中使用的是MediatR
ENode 中使用的是EQueue,EQueue是一個純C#寫的消息隊列
使用已經寫好的消息隊列Rabbitmq ,kafka

事件存儲,事件溯源,事件快照


事件存儲:存儲所有聚合根裡面發生過的事件
1.事件存儲中可以做併發的處理,比如Command 重覆,領域事件的重覆
2.領域事件的重覆通過聚合根Id+版本號判斷,可以在資料庫中建立聯合唯一索引,在存儲事件時檢測重覆,記錄重覆的事件,根據業務做處理
3.這裡要保證存儲事件與發佈領域事件的一致性
如何保證存儲事件與發佈領域事件的一致性
先存儲事件然後在發佈領域事件,如果發生異常,就一直重試,一直到成功為止,也可以做一定的處理,比如重試到一定的次數,就通知,進行人工處理
我選擇了CAP + Policy + Dapper
事件溯源:在事件存儲中記錄導致狀態變化的一系列領域事件。通過持久化記錄改變狀態的事件,通過重新播放獲得狀態改變的歷史。 事件回放可以返回系統到任何狀態
聚合快照:聚合的生命周期各有長短,有的聚合裡面有大量的事件,,事件越多載入事件以及重建聚合的執行效率就會越來越低,快照裡面存儲的是聚合
1.定時存儲整個聚合根:使用定時器每隔一段時間就存儲聚合到快照表中
2.定量存儲整個聚合根:根據事件存儲中的數量來存儲聚合到快照表中
事件溯源的實現方式
1.首先我們需要實現聚合In Memory,
2.在CommandHandler中訂閱 Command命令,
創建聚合時 ,在記憶體中維護一個數據字典,key為:聚合根的Id,value為:聚合
修改,刪除,聚合時,根據聚合根的Id,查詢出聚合
如果記憶體中聚合不存在時:根據聚合根的Id 從聚合快照表中查詢出聚合,然後根據聚合快照存儲的時間,聚合根Id,查詢事件存儲中的所有事件,然後回放事件,得到聚合最終的狀態

記錄遇到的問題


由於基礎非常的差,所以實現的方式都是以最簡單的方式來寫的,存在許多的問題,代碼中有問題的地方希望大家提出來,讓我學習一下
代碼的實現目前還沒有寫快照的部分,也沒有處理EventStorage中的命令重覆與聚合根+版本號重覆,具體的請看湯總的ENode,裡面有全部的實現
1.怎樣保證存儲事件,發佈事件的最終一致性
2.怎麼解析EventStorage中的事件,回放事件
先存儲事件,當事件存儲成功之後,在發佈事件
存儲事件失敗:就一直重試,發佈事件失敗,使用的是CAP,CAP內部使用的是本地消息表的方式,如果發佈事件失敗,也一直重試,如果伺服器重啟了,Rabbitmq裡面消息為Ack,消息沒有丟,重連後會繼續執行
存儲事件,發佈事件

    /// <summary>
    /// 存儲聚合根中的事件到EventStorage 發佈事件
    /// </summary>
    /// <typeparam name="TAggregationRoot"></typeparam>
    /// <param name="event"></param>
    /// <returns></returns>
    public async Task AppendEventStoragePublishEventAsync<TAggregationRoot>(TAggregationRoot @event)
        where TAggregationRoot : IAggregationRoot
    {
        var domainEventList = @event.UncommittedEvents.ToList();
        if (domainEventList.Count == 0)
        {
            throw new Exception("請添加事件!");
        }

        await TryAppendEventStorageAsync(domainEventList).ContinueWith(async e =>
        {
            if (e.Result == (int)EventStorageStatus.Success)
            {
                await TryPublishDomainEventAsync(domainEventList).ConfigureAwait(false);
                @event.ClearEvents();
            }
        });
    }

    /// <summary>
    /// 發佈領域事件
    /// </summary>
    /// <returns></returns>
    public async Task PublishDomainEventAsync(List<IDomainEvent> domainEventList)
    {
        using (var connection =
            new SqlConnection(ConnectionStr))
        {
            if (connection.State == ConnectionState.Closed)
            {
                await connection.OpenAsync().ConfigureAwait(false);
            }
            using (var transaction = await connection.BeginTransactionAsync().ConfigureAwait(false))
            {
                try
                {
                    if (domainEventList.Count > 0)
                    {
                        foreach (var domainEvent in domainEventList)
                        {
                            await _capPublisher.PublishAsync(domainEvent.GetRoutingKey(), domainEvent).ConfigureAwait(false);
                        }
                    }
                    await transaction.CommitAsync().ConfigureAwait(false);
                }
                catch (Exception e)
                {
                    await transaction.RollbackAsync().ConfigureAwait(false);
                    throw;
                }
            }
        }
    }

    /// <summary>
    /// 發佈領域事件重試
    /// </summary>
    /// <param name="domainEventList"></param>
    /// <returns></returns>
    public async Task TryPublishDomainEventAsync(List<IDomainEvent> domainEventList)
    {
        var policy = Policy.Handle<SocketException>().Or<IOException>().Or<Exception>()
            .RetryForeverAsync(onRetry: exception =>
            {
                Task.Factory.StartNew(() =>
                {
                    //記錄重試的信息
                    _loggerHelper.LogInfo("發佈領域事件異常", exception.Message);
                });
            });
        await policy.ExecuteAsync(async () =>
        {
            await PublishDomainEventAsync(domainEventList).ConfigureAwait(false);
        });

    }

    /// <summary>
    /// 存儲聚合根中的事件到EventStorage中
    /// </summary>
    /// <returns></returns>
    public async Task<int> AppendEventStorageAsync(List<IDomainEvent> domainEventList)
    {
        if (domainEventList.Count == 0)
        {
            throw new Exception("請添加事件!");
        }
        var status = (int)EventStorageStatus.Failure;
        using (var connection = new SqlConnection(ConnectionStr))
        {
            try
            {
                if (connection.State == ConnectionState.Closed)
                {
                    await connection.OpenAsync().ConfigureAwait(false);
                }
                using (var transaction = await connection.BeginTransactionAsync().ConfigureAwait(false))
                {
                    try
                    {
                        if (domainEventList.Count > 0)
                        {
                            foreach (var domainEvent in domainEventList)
                            {
                                EventStorage eventStorage = new EventStorage
                                {
                                    Id = Guid.NewGuid(),
                                    AggregateRootId = domainEvent.AggregateRootId,
                                    AggregateRootType = domainEvent.AggregateRootType,
                                    CreateDateTime = domainEvent.CreateDateTime,
                                    Version = domainEvent.Version,
                                    EventData = Events(domainEvent)
                                };
                                var eventStorageSql =
                                    $"INSERT INTO EventStorageInfo(Id,AggregateRootId,AggregateRootType,CreateDateTime,Version,EventData) VALUES (@Id,@AggregateRootId,@AggregateRootType,@CreateDateTime,@Version,@EventData)";
                                await connection.ExecuteAsync(eventStorageSql, eventStorage, transaction).ConfigureAwait(false);
                            }
                        }
                        await transaction.CommitAsync().ConfigureAwait(false);
                        status = (int)EventStorageStatus.Success;
                    }
                    catch (Exception e)
                    {
                        await transaction.RollbackAsync().ConfigureAwait(false);
                        throw;
                    }
                }

            }
            catch (Exception e)
            {
                connection.Close();
                throw;
            }
        }
        return status;
    }

    /// <summary>
    /// AppendEventStorageAsync異常重試
    /// </summary>
    public async Task<int> TryAppendEventStorageAsync(List<IDomainEvent> domainEventList)
    {
        var policy = Policy.Handle<SocketException>().Or<IOException>().Or<Exception>()
            .RetryForeverAsync(onRetry: exception =>
            {
                Task.Factory.StartNew(() =>
                {
                    //記錄重試的信息
                    _loggerHelper.LogInfo("存儲事件異常", exception.Message);
                });
            });
        var result = await policy.ExecuteAsync(async () =>
          {
              var resulted = await AppendEventStorageAsync(domainEventList).ConfigureAwait(false);
              return resulted;
          });
        return result;
    }

    /// <summary>
    /// 根據DomainEvent序列化事件Json
    /// </summary>
    /// <param name="domainEvent"></param>
    /// <returns></returns>
    public string Events(IDomainEvent domainEvent)
    {
        ConcurrentDictionary<string, string> dictionary = new ConcurrentDictionary<string, string>();
        //獲取領域事件的類型(方便解析Json)
        var domainEventTypeName = domainEvent.GetType().Name;
        var domainEventStr = JsonConvert.SerializeObject(domainEvent);
        dictionary.GetOrAdd(domainEventTypeName, domainEventStr);
        var eventData = JsonConvert.SerializeObject(dictionary);
        return eventData;
    }

解析EventStorage中存儲的事件

    public async Task<List<IDomainEvent>> GetAggregateRootEventStorageById(Guid AggregateRootId)
    {
        try
        {
            using (var connection = new SqlConnection(ConnectionStr))
            {
                var eventStorageList = await connection.QueryAsync<EventStorage>($"SELECT * FROM dbo.EventStorageInfo WHERE AggregateRootId='{AggregateRootId}'");
                List<IDomainEvent> domainEventList = new List<IDomainEvent>();
                foreach (var item in eventStorageList)
                {
                    var dictionaryDomainEvent = JsonConvert.DeserializeObject<Dictionary<string, string>>(item.EventData);
                    foreach (var entry in dictionaryDomainEvent)
                    {
                        var domainEventType = TypeNameProvider.GetType(entry.Key);
                        if (domainEventType != null)
                        {
                            var domainEvent = JsonConvert.DeserializeObject(entry.Value, domainEventType) as IDomainEvent;
                            domainEventList.Add(domainEvent);
                        }
                    }
                }
                return domainEventList;
            }
        }
        catch (Exception ex)
        {
            throw;
        }

註意事項

1.事件沒持久化就代表事件還沒發生成功,事件存儲可能失敗,必須先存儲事件,在發佈事件,保證存儲事件與發佈事件一致性
1.使用事件驅動,必須要做好冥等的處理
2.如果業務場景中有狀態時:通過狀態來控制
3.新建一張表,用來記錄消費的信息,消費端的代碼裡面,根據唯一的標識,判斷是否處理過該事件
4.Q端的任何更新都應該把聚合根ID和事件版本號作為條件,Q端的更新不用遵循聚合的原則,可以使用最簡單的方式處理
5.倉儲是用來重建聚合的,它的行為和集合一樣只有Get ,Add ,Delete
6.DDD不是技術,是思想,核心在戰略模塊,戰術設計是實現的一種選擇,戰略設計,需要面向對象的分析能力,職責分配,深層次的分析業務

感謝


雖然學習DDD的時間不短了,感覺還是在入門階段,在學習的過程中有許多的不解,經常問ENode群裡面的大佬,也經常@湯總,謝謝大家的幫助與解惑。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 1.在指南的緩存章節里webpack.config.js文件中,使用new的方法會報錯 const webpack = require('webpack'); + new webpack.optimize.CommonsChunkPlugin({ + name: 'vendor' + }), new ...
  • 運用Taro實現多端導航欄/tabbar實例 (H5 + 小程式 + React Native) 最近一直在搗鼓taro開發,雖說官網介紹支持編譯到多端,但是網上大多數實例都是H5、小程式,很少有支持RN端。恰好Taro是基於React技術,想著之前也做過一些react項目,如是抱著好奇深究了一番, ...
  • 練習1:求兩個數字的和:獲取任意的兩個數字的和 function getSum(x, y) { return x + y; } console.log(getSum(10, 20)); 練習2:求1-100之間所有的數字的和 function geteverySum() { var sum = 0; ...
  • 前言 最近在做 webapp,遇到了很多移動端相容的問題,其中一個問題就是:輸入框觸發 focus 後,鍵盤彈出,然後遮住了輸入框。 然後在 和`IOS`上,這個問題的表現形式不一樣,而原生鍵盤和第三方鍵盤也不一樣,但引起的問題都是一樣的:輸入框被遮住了。 需要的效果 在鍵盤彈出時,獲得焦點的輸入框 ...
  • 首先,我們來看下垂直居中: (1)、如果是單行文本,則可以設置的line-height的數值,讓其等於父級元素的高度! <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta name="viewport" conte ...
  • 關於call和apply,以前也思考良久,很多時候都以為記住了,但是,我太難了。今天我特地寫下筆記,希望可以完全掌握這個東西,也希望可以幫助到任何想對學習這個東西的同學。 一.apply函數定義與理解,先從apply函數出發 在MDN上,apply的定義是: “apply()方法調用一個具有給定th ...
  • 下麵代碼中,setTimeout(fn, 0)在下一輪“事件迴圈”開始時執行,Promise.resolve()在本輪“事件迴圈”結束時執行,console.log('one')則是立即執行,因此最先輸出。 下麵代碼中,setTimeout(fn, 0)在下一輪“事件迴圈”開始時執行,Promise ...
  • 慕課網 實戰班 就業班 2019年11月26號 更新資料整理 300套 只讀模式打開 百度網盤資料鏈接: 鏈接:https://pan.baidu.com/s/1oCq9pOhVYnbg1N0MJQp6Iw提取碼:wsds複製這段內容後打開百度網盤手機App,操作更方便哦 微雲鏈接: https:/ ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...