動手造輪子:實現一個簡單的 EventBus

来源:https://www.cnblogs.com/weihanli/archive/2019/07/22/implement-a-simple-event-bus.html
-Advertisement-
Play Games

動手造輪子:實現一個簡單的 EventBus Intro EventBus 是一種事件發佈訂閱模式,通過 EventBus 我們可以很方便的實現解耦,將事件的發起和事件的處理的很好的分隔開來,很好的實現解耦。 微軟官方的示例項目 "EShopOnContainers" 也有在使用 EventBus ...


動手造輪子:實現一個簡單的 EventBus

Intro

EventBus 是一種事件發佈訂閱模式,通過 EventBus 我們可以很方便的實現解耦,將事件的發起和事件的處理的很好的分隔開來,很好的實現解耦。 微軟官方的示例項目 EShopOnContainers 也有在使用 EventBus 。

這裡的 EventBus 實現也是參考借鑒了微軟 eShopOnContainers 項目。

EventBus 處理流程:

event bus flow

微服務間使用 EventBus 實現系統間解耦:

event bus in microservice

藉助 EventBus 我們可以很好的實現組件之間,服務之間,系統之間的解耦以及相互通信的問題。

起初覺得 EventBus 和 MQ 其實差不多嘛,都是通過非同步處理來實現解耦合,高性能。後來看到了下麵這張圖才算明白為什麼要用 EventBus 以及 EventBus 和 MQ 之間的關係,EventBus 是抽象的,可以用MQ來實現 EventBus.

eventbus implement

為什麼要使用 EventBus

  1. 解耦合(輕鬆的實現系統間解耦)
  2. 高性能可擴展(每一個事件都是簡單獨立且不可更改的對象,只需要保存新增的事件,不涉及其他的變更刪除操作)
  3. 系統審計(每一個事件都是不可變更的,每一個事件都是可追溯的)
  4. ...

EventBus 整體架構:

  • IEventBase :所有的事件應該實現這個介面,這個介面定義了事件的唯一id EventId 和事件發生的事件 EventAt

IEventBase

  • IEventHandler:定義了一個 Handle 方法來處理相應的事件

IEventHandler

  • IEventStore:所有的事件的處理存儲,保存事件的IEventHandler,一般不會直接操作,通過 EventBus 的訂閱和取消訂閱來操作 EventStore

IEventStore

  • IEventBus:用來發佈/訂閱/取消訂閱事件,並將事件的某一個 IEventHandler 保存到 EventStore 或從 EventStore 中移除

IEventBus

使用示例

來看一個使用示例,完整代碼示例

internal class EventTest
{
    public static void MainTest()
    {
        var eventBus = DependencyResolver.Current.ResolveService<IEventBus>();
        eventBus.Subscribe<CounterEvent, CounterEventHandler1>();
        eventBus.Subscribe<CounterEvent, CounterEventHandler2>();
        eventBus.Subscribe<CounterEvent, DelegateEventHandler<CounterEvent>>();
        eventBus.Publish(new CounterEvent { Counter = 1 });

        eventBus.Unsubscribe<CounterEvent, CounterEventHandler1>();
        eventBus.Unsubscribe<CounterEvent, DelegateEventHandler<CounterEvent>>();
        eventBus.Publish(new CounterEvent { Counter = 2 });
    }
}

internal class CounterEvent : EventBase
{
    public int Counter { get; set; }
}

internal class CounterEventHandler1 : IEventHandler<CounterEvent>
{
    public Task Handle(CounterEvent @event)
    {
        LogHelper.GetLogger<CounterEventHandler1>().Info($"Event Info: {@event.ToJson()}, Handler Type:{GetType().FullName}");
        return Task.CompletedTask;
    }
}

internal class CounterEventHandler2 : IEventHandler<CounterEvent>
{
    public Task Handle(CounterEvent @event)
    {
        LogHelper.GetLogger<CounterEventHandler2>().Info($"Event Info: {@event.ToJson()}, Handler Type:{GetType().FullName}");
        return Task.CompletedTask;
    }
}

具體實現

EventStoreInMemory 實現:

EventStoreInMemory 是 IEventStore 將數據放在記憶體中的實現,使用了 ConcurrentDictionary 以及 HashSet 來儘可能的保證高效,具體實現代碼如下:

public class EventStoreInMemory : IEventStore
{
    private readonly ConcurrentDictionary<string, HashSet<Type>> _eventHandlers = new ConcurrentDictionary<string, HashSet<Type>>();

    public bool AddSubscription<TEvent, TEventHandler>()
        where TEvent : IEventBase
        where TEventHandler : IEventHandler<TEvent>
    {
        var eventKey = GetEventKey<TEvent>();
        if (_eventHandlers.ContainsKey(eventKey))
        {
            return _eventHandlers[eventKey].Add(typeof(TEventHandler));
        }
        else
        {
            return _eventHandlers.TryAdd(eventKey, new HashSet<Type>()
            {
                typeof(TEventHandler)
            });
        }
    }

    public bool Clear()
    {
        _eventHandlers.Clear();
        return true;
    }

    public ICollection<Type> GetEventHandlerTypes<TEvent>() where TEvent : IEventBase
    {
        if(_eventHandlers.Count == 0)
            return  new Type[0];
        var eventKey = GetEventKey<TEvent>();
        if (_eventHandlers.TryGetValue(eventKey, out var handlers))
        {
            return handlers;
        }
        return new Type[0];
    }

    public string GetEventKey<TEvent>()
    {
        return typeof(TEvent).FullName;
    }

    public bool HasSubscriptionsForEvent<TEvent>() where TEvent : IEventBase
    {
        if(_eventHandlers.Count == 0)
            return false;

        var eventKey = GetEventKey<TEvent>();
        return _eventHandlers.ContainsKey(eventKey);
    }

    public bool RemoveSubscription<TEvent, TEventHandler>()
        where TEvent : IEventBase
        where TEventHandler : IEventHandler<TEvent>
    {
        if(_eventHandlers.Count == 0)
            return false;

        var eventKey = GetEventKey<TEvent>();
        if (_eventHandlers.ContainsKey(eventKey))
        {
            return _eventHandlers[eventKey].Remove(typeof(TEventHandler));
        }
        return false;
    }
}

EventBus 的實現,從上面可以看到 EventStore 保存的是 IEventHandler 對應的 Type,在 Publish 的時候根據 Type 從 IoC 容器中取得相應的 Handler 即可,如果沒有在 IoC 容器中找到對應的類型,則會嘗試創建一個類型實例,然後調用 IEventHandlerHandle 方法,代碼如下:

/// <summary>
/// EventBus in process
/// </summary>
public class EventBus : IEventBus
{
    private static readonly ILogHelperLogger Logger = Helpers.LogHelper.GetLogger<EventBus>();

    private readonly IEventStore _eventStore;
    private readonly IServiceProvider _serviceProvider;

    public EventBus(IEventStore eventStore, IServiceProvider serviceProvider = null)
    {
        _eventStore = eventStore;
        _serviceProvider = serviceProvider ?? DependencyResolver.Current;
    }

    public bool Publish<TEvent>(TEvent @event) where TEvent : IEventBase
    {
        if (!_eventStore.HasSubscriptionsForEvent<TEvent>())
        {
            return false;
        }
        var handlers = _eventStore.GetEventHandlerTypes<TEvent>();
        if (handlers.Count > 0)
        {
            var handlerTasks = new List<Task>();
            foreach (var handlerType in handlers)
            {
                try
                {
                    if (_serviceProvider.GetServiceOrCreateInstance(handlerType) is IEventHandler<TEvent> handler)
                    {
                        handlerTasks.Add(handler.Handle(@event));
                    }
                }
                catch (Exception ex)
                {
                    Logger.Error(ex, $"handle event [{_eventStore.GetEventKey<TEvent>()}] error, eventHandlerType:{handlerType.FullName}");
                }
            }
            handlerTasks.WhenAll().ConfigureAwait(false);

            return true;
        }
        return false;
    }

    public bool Subscribe<TEvent, TEventHandler>()
        where TEvent : IEventBase
        where TEventHandler : IEventHandler<TEvent>
    {
        return _eventStore.AddSubscription<TEvent, TEventHandler>();
    }

    public bool Unsubscribe<TEvent, TEventHandler>()
        where TEvent : IEventBase
        where TEventHandler : IEventHandler<TEvent>
    {
        return _eventStore.RemoveSubscription<TEvent, TEventHandler>();
    }
}

項目實例

來看一個實際的項目中的使用,在我的活動室預約項目中有一個公告的模塊,訪問公告詳情頁面,這個公告的訪問次數加1,把這個訪問次數加1改成了用 EventBus 來實現,實際項目代碼:https://github.com/WeihanLi/ActivityReservation/blob/67e2cb8e92876629a7af6dc051745dd8c7e9faeb/ActivityReservation/Startup.cs

  1. 定義 Event 以及 EventHandler
public class NoticeViewEvent : EventBase
{
    public Guid NoticeId { get; set; }

    // UserId
    // IP
    // ...
}

public class NoticeViewEventHandler : IEventHandler<NoticeViewEvent>
{
    public async Task Handle(NoticeViewEvent @event)
    {
        await DependencyResolver.Current.TryInvokeServiceAsync<ReservationDbContext>(async dbContext =>
        {
            var notice = await dbContext.Notices.FindAsync(@event.NoticeId);
            notice.NoticeVisitCount += 1;
            await dbContext.SaveChangesAsync();
        });
    }
}

這裡的 Event 只定義了一個 NoticeId ,其實也可以把請求信息如IP/UA等信息加進去,在 EventHandler里處理以便日後數據分析。

  1. 註冊 EventBus 相關服務以及 EventHandlers
services.AddSingleton<IEventBus, EventBus>();
services.AddSingleton<IEventStore, EventStoreInMemory>();
//register EventHandlers
services.AddSingleton<NoticeViewEventHandler>();
  1. 訂閱事件
public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory, IEventBus eventBus)
{
    eventBus.Subscribe<NoticeViewEvent, NoticeViewEventHandler>(); 
    // ...
}
  1. 發佈事件
eventBus.Publish(new NoticeViewEvent { NoticeId = notice.NoticeId });

Reference


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 一、設置 登錄百度雲控制台,添加應用-添加人臉識別,查找,對比等。 記住API Key和Secret Key 二、創建Demo程式 1、使用Nuget安裝 Baidu.AI 和 Newtonsoft.Json 2、直接下載 C# SDK 3、案例代碼 4、最終界面 ...
  • 一、Angel工作室簡單通用許可權系統簡介 AngelRM(Asp.net MVC Web api)是基於asp.net(C#)MVC+前端bootstrap+ztree+lodash+jquery技術,採用bootstrap為前臺開發展示UI,Web Api主要負責前端的邏輯交互,再結合jQuery ...
  • 一、前言 surging 開源地址:https://github.com/dotnetcore/surging 隨著業務的發展,併發量的增多,業務的複雜度越來越大,對於系統架構能力要求越來越高,這時候微服務的設計思想應運而生,但是對於微服務需要引擎進行驅動,這時候基於.NET CORE 的微服務引擎 ...
  • OSGeo.GDAL.Gdal.SetConfigOption("GDAL_FILENAME_IS_UTF8", "YES");OSGeo.GDAL.Gdal.SetConfigOption("SHAPE_ENCODING", "");OSGeo.GDAL.Gdal.AllRegister();OS... ...
  • 最近一個項目中用到了https的請求,在實際調用過程中發現之前的http方法不支持https,調用一直報錯。 查詢了一下,添加幾行代碼解決問題。 public string HttpPost(string Url, string postDataStr, string useragent = nul ...
  • 描述: 在 C# 中,System.Threading.Thread 類用於線程的工作。它允許創建並訪問多線程應用程式中的單個線程。進程中第一個被執行的線程稱為主線程。 案例: static void Main(string[] args) { int num = 100; for (int i = ...
  • 1 Lambda —— 表達式 Lambda 表達式是一個匿名函數,用它可以高效簡化代碼,常用作委托,回調 Lambda 表達式都使用運算符=>,所以當你見到這個符號,基本上就是一個 Lambda 表達式 Lambda 運算符的左邊是輸入參數(),=>,右邊是表達式或語句塊 Lambda 表達式,是 ...
  • 這篇文章主要是介紹劍指offer中的【位運算:二進位中1的個數】,【代碼的完整性:數值的整數次方】,【代碼的完整性:調整數組順序使奇數位於偶數前面】的實現。 1. 位運算:二進位中1的個數, 題目描述 輸入一個整數,輸出該數二進位表示中1的個數。其中負數用補碼表示。 解題思路 把一個整數減去1,再和 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...