動手造輪子:基於 Redis 實現 EventBus

来源:https://www.cnblogs.com/weihanli/archive/2019/07/29/implement-eventbus-with-redis-pubsub.html
-Advertisement-
Play Games

動手造輪子:基於 Redis 實現 EventBus Intro 上次我們造了一個 "簡單的基於記憶體的 " ,但是如果要跨系統的話就不合適了,所以有了這篇基於 的 探索。 本文的實現是基於 來實現的。 實現 既然要實現跨系統的 再使用基於記憶體的 EventStore 自然不行,因此這裡基於 Redi ...


動手造輪子:基於 Redis 實現 EventBus

Intro

上次我們造了一個簡單的基於記憶體的 EventBus,但是如果要跨系統的話就不合適了,所以有了這篇基於 RedisEventBus 探索。

本文的實現是基於 StackExchange.Redis 來實現的。

RedisEventStore 實現

既然要實現跨系統的 EventBus 再使用基於記憶體的 EventStore 自然不行,因此這裡基於 Redis 設計了一個 EventStoreInRedis ,基於 redis 的 Hash 來實現,以 Event 的 EventKey 作為 fieldName,以 Event 對應的 EventHandler 作為 value。

EventStoreInRedis 實現:

public class EventStoreInRedis : IEventStore
{
    protected readonly string EventsCacheKey;
    protected readonly ILogger Logger;

    private readonly IRedisWrapper Wrapper;

    public EventStoreInRedis(ILogger<EventStoreInRedis> logger)
    {
        Logger = logger;
        Wrapper = new RedisWrapper(RedisConstants.EventStorePrefix);

        EventsCacheKey = RedisManager.RedisConfiguration.EventStoreCacheKey;
    }

    public bool AddSubscription<TEvent, TEventHandler>()
        where TEvent : IEventBase
        where TEventHandler : IEventHandler<TEvent>
    {
        var eventKey = GetEventKey<TEvent>();
        var handlerType = typeof(TEventHandler);
        if (Wrapper.Database.HashExists(EventsCacheKey, eventKey))
        {
            var handlers = Wrapper.Unwrap<HashSet<Type>>(Wrapper.Database.HashGet(EventsCacheKey, eventKey));

            if (handlers.Contains(handlerType))
            {
                return false;
            }
            handlers.Add(handlerType);
            Wrapper.Database.HashSet(EventsCacheKey, eventKey, Wrapper.Wrap(handlers));
            return true;
        }
        else
        {
            return Wrapper.Database.HashSet(EventsCacheKey, eventKey, Wrapper.Wrap(new HashSet<Type> { handlerType }), StackExchange.Redis.When.NotExists);
        }
    }

    public bool Clear()
    {
        return Wrapper.Database.KeyDelete(EventsCacheKey);
    }

    public ICollection<Type> GetEventHandlerTypes<TEvent>() where TEvent : IEventBase
    {
        var eventKey = GetEventKey<TEvent>();
        return Wrapper.Unwrap<HashSet<Type>>(Wrapper.Database.HashGet(EventsCacheKey, eventKey));
    }

    public string GetEventKey<TEvent>()
    {
        return typeof(TEvent).FullName;
    }

    public bool HasSubscriptionsForEvent<TEvent>() where TEvent : IEventBase
    {
        var eventKey = GetEventKey<TEvent>();
        return Wrapper.Database.HashExists(EventsCacheKey, eventKey);
    }

    public bool RemoveSubscription<TEvent, TEventHandler>()
        where TEvent : IEventBase
        where TEventHandler : IEventHandler<TEvent>
    {
        var eventKey = GetEventKey<TEvent>();
        var handlerType = typeof(TEventHandler);

        if (!Wrapper.Database.HashExists(EventsCacheKey, eventKey))
        {
            return false;
        }

        var handlers = Wrapper.Unwrap<HashSet<Type>>(Wrapper.Database.HashGet(EventsCacheKey, eventKey));

        if (!handlers.Contains(handlerType))
        {
            return false;
        }

        handlers.Remove(handlerType);
        Wrapper.Database.HashSet(EventsCacheKey, eventKey, Wrapper.Wrap(handlers));
        return true;
    }
}

RedisWrapper 及更具體的代碼可以參考我的 Redis 的擴展的實現 https://github.com/WeihanLi/WeihanLi.Redis

RedisEventBus 實現

RedisEventBus 是基於 Redis 的 PUB/SUB 實現的,實現的感覺還有一些小問題,我想確保每個客戶端註冊的時候每個 EventHandler 即使多次註冊也只註冊一次,但是還沒找到一個好的實現,如果你有什麼想法歡迎指出,和我一起交流。具體的實現細節如下:

public class RedisEventBus : IEventBus
{
    private readonly IEventStore _eventStore;
    private readonly ISubscriber _subscriber;
    private readonly IServiceProvider _serviceProvider;

    public RedisEventBus(IEventStore eventStore, IConnectionMultiplexer connectionMultiplexer, IServiceProvider serviceProvider)
    {
        _eventStore = eventStore;
        _serviceProvider = serviceProvider;
        _subscriber = connectionMultiplexer.GetSubscriber();
    }

    private string GetChannelPrefix<TEvent>() where TEvent : IEventBase
    {
        var eventKey = _eventStore.GetEventKey<TEvent>();
        var channelPrefix =
            $"{RedisManager.RedisConfiguration.EventBusChannelPrefix}{RedisManager.RedisConfiguration.KeySeparator}{eventKey}{RedisManager.RedisConfiguration.KeySeparator}";
        return channelPrefix;
    }

    private string GetChannelName<TEvent, TEventHandler>() where TEvent : IEventBase
        where TEventHandler : IEventHandler<TEvent>
        => GetChannelName<TEvent>(typeof(TEventHandler));

    private string GetChannelName<TEvent>(Type eventHandlerType) where TEvent : IEventBase
    {
        var channelPrefix = GetChannelPrefix<TEvent>();
        var channelName = $"{channelPrefix}{eventHandlerType.FullName}";

        return channelName;
    }

    public bool Publish<TEvent>(TEvent @event) where TEvent : IEventBase
    {
        if (!_eventStore.HasSubscriptionsForEvent<TEvent>())
        {
            return false;
        }

        var eventData = @event.ToJson();
        var handlerTypes = _eventStore.GetEventHandlerTypes<TEvent>();
        foreach (var handlerType in handlerTypes)
        {
            var handlerChannelName = GetChannelName<TEvent>(handlerType);
            _subscriber.Publish(handlerChannelName, eventData);
        }

        return true;
    }

    public bool Subscribe<TEvent, TEventHandler>()
        where TEvent : IEventBase
        where TEventHandler : IEventHandler<TEvent>
    {
        _eventStore.AddSubscription<TEvent, TEventHandler>();

        var channelName = GetChannelName<TEvent, TEventHandler>();

        //// TODO: if current client subscribed the channel
        //if (true)
        //{
        _subscriber.Subscribe(channelName, async (channel, eventMessage) =>
        {
            var eventData = eventMessage.ToString().JsonToType<TEvent>();
            var handler = _serviceProvider.GetServiceOrCreateInstance<TEventHandler>();
            if (null != handler)
            {
                await handler.Handle(eventData).ConfigureAwait(false);
            }
        });
        return true;
        //}

        //return false;
    }

    public bool Unsubscribe<TEvent, TEventHandler>()
        where TEvent : IEventBase
        where TEventHandler : IEventHandler<TEvent>
    {
        _eventStore.RemoveSubscription<TEvent, TEventHandler>();

        var channelName = GetChannelName<TEvent, TEventHandler>();

        //// TODO: if current client subscribed the channel
        //if (true)
        //{
        _subscriber.Unsubscribe(channelName);
        return true;
        //}
        //return false;
    }
}

使用示例:

使用起來大體上和上一篇使用一致,只是在初始化註入服務的時候,我們需要把 IEventBusIEventStore 替換為對應 Redis 的實現即可。

  1. 註冊服務

    services.AddSingleton<IEventBus, RedisEventBus>();
    services.AddSingleton<IEventStore, EventStoreInRedis>();
  2. 註冊 EventHandler

    services.AddSingleton<NoticeViewEventHandler>();
  3. 訂閱事件

    eventBus.Subscribe<NoticeViewEvent, NoticeViewEventHandler>();
  4. 發佈事件

    [HttpGet("{path}")]
    public async Task<IActionResult> GetByPath(string path, CancellationToken cancellationToken, [FromServices]IEventBus eventBus)
    {
        var notice = await _repository.FetchAsync(n => n.NoticeCustomPath == path, cancellationToken);
        if (notice == null)
        {
            return NotFound();
        }
        eventBus.Publish(new NoticeViewEvent { NoticeId = notice.NoticeId });
        return Ok(notice);
    }

Memo

如果要實現基於消息隊列的事件處理,需要註意,消息可能會重覆,可能會需要在事件處理中註意一下業務的冪等性或者對消息對一個去重處理。

我在使用 Redis 的事件處理中使用了一個基於 Redis 原子遞增的特性設計的一個防火牆,從而實現一段時間內某一個消息id只會被處理一次,實現源碼:https://github.com/WeihanLi/ActivityReservation/blob/dev/ActivityReservation.Helper/Events/NoticeViewEvent.cs

public class NoticeViewEvent : EventBase
{
    public Guid NoticeId { get; set; }

    // UserId
    // IP
    // ...
}

public class NoticeViewEventHandler : IEventHandler<NoticeViewEvent>
{
    public async Task Handle(NoticeViewEvent @event)
    {
        var firewallClient = RedisManager.GetFirewallClient($"{nameof(NoticeViewEventHandler)}_{@event.EventId}", TimeSpan.FromMinutes(5));
        if (await firewallClient.HitAsync())
        {
            await DependencyResolver.Current.TryInvokeServiceAsync<ReservationDbContext>(async dbContext =>
            {
                //var notice = await dbContext.Notices.FindAsync(@event.NoticeId);
                //notice.NoticeVisitCount += 1;
                //await dbContext.SaveChangesAsync();

                var conn = dbContext.Database.GetDbConnection();
                await conn.ExecuteAsync($@"UPDATE tabNotice SET NoticeVisitCount = NoticeVisitCount +1 WHERE NoticeId = @NoticeId", new { @event.NoticeId });
            });
        }
    }
}

Reference


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 前言:何謂版本控制系統吶?有兩種說法:一種叫法為SCM,==source code management (源代碼管理系統)另一種說法為VCS,==version control system (版本控制系統),既然兩種叫法都可以的話,姑且把她叫成版本控制系統吧,從名稱不難看出此系統一是用來解決代碼 ...
  • 前言 在記憶體當道的日子里,無論什麼時候都要考慮這些代碼是否會影響程式性能呢? 在現在的世界里,幾乎不會去考慮用了幾百毫秒,可是在特別的場景了,往往這幾百毫米確影響了整個項目的快慢。 通過瞭解這兩者之間的性能差異,希望幫助大家在合適的場景里選擇正確的編碼。 實例 c public class Poin ...
  • 1.前言 ASP.NET Core在應用程式上引入Microsoft.Extensions.Configuration配置,可以支持多種方式配置,包括命令行配置、環境變數配置、文件配置、記憶體配置,自定義配置等等。下麵我們就其中幾個配置來聊聊。 2.命令行配置 CommandLineConfigura ...
  • 這些什麼綁定都是從Borland中學來的,MVVM只是冠上新名稱而於,不是什麼新技術。依稀記得是微軟挖了Delphi一位重量級的人員後,這些東西加進了IDE。如果從數據流來講,綁定只是減輕了前臺顯示的編寫工作而於。等到你想寫更自定式的東西時,這些東西反而變成阻礙。在寫入UI的數據流中,我們想要獲取b ...
  • 現在開源項目越來越多,Git使用越來越方便,用Git的人也越來越多。創建項目的時候,喜歡把日誌,臨時文件,項目編譯的中間文件,引用的類庫等等,這時就要設置響應的規則,來忽略這些文件。例如創建一個C#項目,項目下麵會有.vs,bin,obj等,這些都是不需要提交的需要忽略的,如何忽略呢?其實很簡單,增 ...
  • 1.智能快遞櫃(開篇) 2.智能快遞櫃(終端篇) 3.智能快遞櫃(通信篇-HTTP) 4.智能快遞櫃(通信篇-SOCKET) 5.智能快遞櫃(通信篇-Server程式) 6.智能快遞櫃(平臺篇) 7.智能快遞櫃(APP及微信公眾號) 8.智能快遞櫃SDK(聯網型鎖板) 9.智能快遞櫃SDK(串口型鎖 ...
  • .net core 基於 IHostedService 實現定時任務 ...
  • 1. 前言 關於單元測試的定義和好處可以借用Stephen Cleary的一段話來概括: 單元測試是現代開發的基礎。對項目進行單元測試的好處非常容易理解:單元測試降低了 Bug 數量,縮短了上市時間,防止過度耦合的設計。這些都是很好的優勢,但它還有更多與開發人員更直接相關的優點。在我編寫單元測試時, ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...