基於channel的非同步事件匯流排

来源:https://www.cnblogs.com/wygbjd/archive/2023/09/19/17715763.html
-Advertisement-
Play Games

生成者/消費者概念編程模型 通道是生成者/使用者概念編程模型的實現。 在此編程模型中,生成者非同步生成數據,使用者非同步使用該數據。 換句話說,此模型將數據從一方移交給另一方。 嘗試將通道視為任何其他常見的泛型集合類型,例如 List。 主要區別在於,此集合管理同步,並通過工廠創建選項提供各種消耗模型。 ...


生成者/消費者概念編程模型

通道是生成者/使用者概念編程模型的實現。 在此編程模型中,生成者非同步生成數據,使用者非同步使用該數據。 換句話說,此模型將數據從一方移交給另一方。 嘗試將通道視為任何其他常見的泛型集合類型,例如 List。 主要區別在於,此集合管理同步,並通過工廠創建選項提供各種消耗模型。 這些選項控制通道的行為,例如允許它們存儲的元素數,以及達到該限制時會發生什麼情況,或者通道是由多個生成者還是多個使用者同時訪問

channel簡介

channel提供了用於在生成者和使用者之間以非同步方式傳遞數據的一組同步數據結構。

channel(管道)提供了有界通道和無界通道

無界通道

該通道可以同時供任意數量的讀取器和編寫器使用。 或者,可以通過提供 UnboundedChannelOptions 實例在創建無限制通道時指定非預設行為。 該通道的容量不受限制,並且所有寫入均以同步方式執行

有界通道

創建有界通道時,該通道將綁定到最大容量。 達到邊界時,預設行為是通道非同步阻止生成者,直到空間可用。 可以通過在創建通道時指定選項來配置此行為。 可以使用任何大於零的容量值創建有界通道

模式行為

BoundedChannelFullMode.Wait

這是預設值。 WriteAsync調用 以等待空間可用以完成寫入操作。 調用 以 TryWrite 立即返回 false 。

BoundedChannelFullMode.DropNewest

刪除並忽略通道中的最新項,以便為要寫入的項留出空間。

BoundedChannelFullMode.DropOldest

刪除並忽略通道中的最舊項,以便為要寫入的項留出空間。
BoundedChannelFullMode.DropWrite 刪除要寫入的項。

Channel.Writer API

生成者功能在 Channel<TWrite,TRead>.Writer 上公開。 下表詳細介紹了生成者 API 和預期行為:

ChannelWriter.Complete

將通道標記為已完成,這意味著不再向該通道寫入更多項。

ChannelWriter.TryComplete

嘗試將通道標記為已完成,這意味著不會向通道寫入更多數據。

ChannelWriter.TryWrite

嘗試將指定的項寫入到通道。 當與無界通道一起使用時,除非通道的編寫器通過 ChannelWriter.Complete 或 ChannelWriter.TryComplete 發出完成信號,否則這將始終返回 true。

ChannelWriter.WaitToWriteAsync

返回一個 ValueTask ,當有空間可以寫入項時完成。
ChannelWriter.WriteAsync 以非同步方式將項寫入到通道

Channel.Reader API

ChannelReader.ReadAllAsync

創建允許從通道中讀取所有數據的 IAsyncEnumerable

ChannelReader.ReadAsync

以非同步方式從通道中讀取項。

ChannelReader.TryPeek

嘗試從通道中查看項。

ChannelReader.TryRead

嘗試從通道中讀取項。

ChannelReader.WaitToReadAsync

返回在 ValueTask 數據可供讀取時完成的 。

channel的具體使用

https://learn.microsoft.com/zh-cn/dotnet/core/extensions/channels

基於channel實現事件匯流排

EventDiscriptorAttribute 特性定義

    [AttributeUsage(AttributeTargets.Class,AllowMultiple = false,Inherited = false)]
    public class EventDiscriptorAttribute:Attribute
    {
       /// <summary>
       /// 事件2名稱
       /// </summary>
       public string EventName { get; private set; }
       /// <summary>
       /// channel 容量設置
       /// </summary>
       public int Capacity { get; private set; }  
       /// <summary>
       /// 是否維持一個生產者多個消費者模型
       /// </summary>
       public bool SigleReader { get; private set; }

       public EventDiscriptorAttribute(string eventName, int capacity = 1000, bool sigleReader = true)
        {
            EventName = eventName;
            Capacity = capacity;
            SigleReader = sigleReader;
        }   
    }

定義通道容器

    //通道容器單列註入,在拓展類中初始化
    public class ChannelContainer : IChannelContainer
    {
        public List<EventDiscription> Events { get; private set; }

        private readonly IServiceCollection Services;

        public EventHandlerContainer(IServiceCollection services)
        {
            Events = new List<EventDiscription>();
            Services = services;         
            services.AddSingleton<IEventHandlerContainer>(this);
        }

        private bool Check(Type type)
        {
            var discription = Events.FirstOrDefault(p=>p.EtoType == type);

            return discription is null;
        }
        
        ///訂閱並且註入EventHandler
        public void TryAddChannle(Type eto,Type handler)
        {
            if(!Check(eto))
            {
                return;
            }

            Events.Add(new EventDiscription(eto, handler));

            var handlerbaseType = typeof(IEventHandler<>);

            var handlertype = handlerbaseType.MakeGenericType(eto);

            if(Services.Any(P=>P.ServiceType==handlertype))
            {
                return;
            }

            Services.AddTransient(handlertype, handler);
        }

        public void TryAddChannle<TEto, THandler>()
        {
            TryAddChannle(typeof(TEto),typeof(THandler));  
        }

        
        public void TryAddChannle(Type eto)
        {
            if (!Check(eto))
            {
                return;
            }

            Events.Add(new EventDiscription(eto));

            var handlerbaseType = typeof(IEventHandler<>);

            var handlertype = handlerbaseType.MakeGenericType(eto);

            if (Services.Any(P => P.ServiceType == handlertype))
            {
                return;
            }
        }

        public void TryAddChannle<TEto>()
        {
            TryAddChannle(typeof(TEto));
        }

事件管理器

事件管理器通過線程安全字典管理事件通道和事件的觸發

可以看到在Subscribe 方法中消費者並不是在訂閱後立即執行的而是放到EventTrigger中的定義的非同步事件中去

消費者執行最後又.,NET提供的托管任務去執行

 public class EventHandlerManager : IEventHandlerManager,IDisposable 
    {
        private ConcurrentDictionary<string, Channel<string>> Channels;
        private bool IsDiposed = false;

        private readonly IServiceProvider ServiceProvider;

        private readonly CancellationToken _cancellation;

        private readonly IEventHandlerContainer _eventHandlerContainer;

        private readonly ILogger _logger;

        private ConcurrentDictionary<string,EventTrigger> EventTriggers;

        private bool IsInitConsumer = true;

        public EventHandlerManager( IServiceProvider serviceProvider
            , IEventHandlerContainer eventHandlerContainer
            , ILoggerFactory loggerFactory)
        {
            ServiceProvider = serviceProvider;
            _cancellation = CancellationToken.None;
            _eventHandlerContainer = eventHandlerContainer;
            Channels = new ConcurrentDictionary<string, Channel<string>>();
            EventTriggers = new ConcurrentDictionary<string, EventTrigger>();
            _logger = loggerFactory.CreateLogger<IEventHandlerManager>();
        }
        //初始化通信管道
        public async Task CreateChannles()
        {
            var eventDiscriptions = _eventHandlerContainer.Events;

            foreach(var item in eventDiscriptions)
            {
                var attribute = item.EtoType.GetCustomAttributes()
                                            .OfType<EventDiscriptorAttribute>()
                                            .FirstOrDefault();

                if (attribute is null)
                {
                    ThorwEventAttributeNullException.ThorwException();
                }

                var channel = Channels.GetValueOrDefault(attribute.EventName);

                if (channel is not null)
                {
                    return;
                }
                //創建無界通道模型,並且初始化容量大小,當無容量寫入後等待寫入
                channel = Channel.CreateBounded<string>(
                        new BoundedChannelOptions(attribute.Capacity)
                              {
                                SingleWriter = true,
                                SingleReader = false,
                                AllowSynchronousContinuations = false,
                                FullMode = BoundedChannelFullMode.Wait
                        });

                Channels.TryAdd(attribute.EventName, channel);

                _logger.LogInformation($"創建通信管道{item.EtoType}--{attribute.EventName}");
            }
            await Task.CompletedTask;
        }

        private Channel<string> Check(Type type)
        {
            var attribute = type .GetCustomAttributes()
                                   .OfType<EventDiscriptorAttribute>()
                                   .FirstOrDefault();

            if (attribute is null)
            {
                ThorwEventAttributeNullException.ThorwException();
            }

            var channel = Channels.GetValueOrDefault(attribute.EventName);

            if(channel is null)
            {
                ThrowChannelNullException.ThrowException(attribute.EventName);
            } 

            return channel;
        }

        public void Dispose()
        {
            IsDiposed = true;
            IsInitConsumer = true;
            foreach(var trigger in EventTriggers.Values)
            {
                trigger.Dispose();
            }
            _cancellation.ThrowIfCancellationRequested();
        }

        /// <summary>
        /// 生產者
        /// </summary>
        /// <typeparam name="TEto"></typeparam>
        /// <param name="eto"></param>
        /// <returns></returns>
        public async Task WriteAsync<TEto>(TEto eto) where TEto : class
        {
            var channel = Check(typeof(TEto));
            //由於創建的是有界通道,存在有界通道消息積累超過初始大小所以迴圈判斷是否可以寫入消息
            while ( await channel.Writer.WaitToWriteAsync(CancellationToken.None)) 
            {
                var data = JsonConvert.SerializeObject(eto);

                await channel.Writer.WriteAsync(data, _cancellation);
            }          
        }
        /// <summary>
        /// 消費者
        /// </summary>
        /// <returns></returns>
        public void Subscribe<TEto>() where TEto : class
        {
            var attribute = typeof(TEto).GetCustomAttributes()
           .OfType<EventDiscriptorAttribute>()
           .FirstOrDefault();

            if (attribute is null)
            {
                ThorwEventAttributeNullException.ThorwException();
            }

            if (EventTriggers.Keys.Any(p => p == attribute.EventName))
            {
                return;
            }

            Func<Task> func = async () =>
            {
                var scope = ServiceProvider.CreateAsyncScope();

                var channel = Check(typeof(TEto));

                var handler = scope.ServiceProvider.GetRequiredService<IEventHandler<TEto>>();

                var reader = channel.Reader;

                try
                {
                    while (await channel.Reader.WaitToReadAsync())
                    {
                        while (reader.TryRead(out string str))
                        {
                            var data = JsonConvert.DeserializeObject<TEto>(str);

                            _logger.LogInformation(str);

                            await handler.HandelrAsync(data);
                        }
                    }
                }
                catch (Exception e)
                {
                    _logger.LogInformation($"本地事件匯流排異常{e.Source}--{e.Message}--{e.Data}");
                    throw;
                }
            };

            var trigger = new EventTrigger();
            trigger.Recived(func);

            EventTriggers.TryAdd(attribute.EventName, trigger);
        }

        public Task Trigger()
        {
            //只允許初始化一次消費者
            if (IsInitConsumer)
            {
                foreach (var eventTrigger in EventTriggers)
                {
                    Task.Factory.StartNew(async () =>
                    {
                        await eventTrigger.Value.Trigger();
                    });
                }
            }
            IsInitConsumer = false;
            return Task.CompletedTask;  
        }
    }
}

EventTrigger 定義

    public class EventTrigger:IDisposable
    {
        public event Func<Task>? Event;

        public EventTrigger()
        {

        }

        public void Recived(Func<Task> func)
        {
            if (Event is not null)
            {
                return;
            }
            Event += func;
        }

        public Task Trigger()
        {
            if(Event is null)
            {
                return Task.CompletedTask;  
            }
            return Event();
        }

        public void Dispose()
        {
            if( Event is not null )
            {
                Event = null;
            }
        }
    }

托管任務執行EventHandlerManager Trigger()方法

    public class EventBusBackgroundService : BackgroundService
    {
        private readonly IEventHandlerManager _eventHandlerManager;
        public EventBusBackgroundService(IEventHandlerManager eventHandlerManager) 
        { 
            _eventHandlerManager = eventHandlerManager; 
        }  
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            await _eventHandlerManager.Trigger();
        }
    }

拓展類定義

    public  static class EventBusExtensions
    {
        //添加事件匯流排並且添加channle管道
        public static IServiceCollection AddEventBusAndChannles(this IServiceCollection services,Action<EventHandlerContainer> action)
        {
            services.AddSingleton<IEventHandlerManager, EventHandlerManager>();

            services.AddTransient<ILocalEventBus, LocalEventBus>();
            ///添加托管任務
            services.AddHostedService<EventBusBackgroundService>();

            EventHandlerContainer eventHandlerContainer = new EventHandlerContainer(services);

            action.Invoke(eventHandlerContainer);

            return services;
        }

        //創建通信管道
        public static async Task InitChannles(this IServiceProvider serviceProvider,Action<IEventHandlerManager> action)
        {
            var scope = serviceProvider.CreateAsyncScope(); 

            var eventhandlerManager = scope.ServiceProvider.GetRequiredService<IEventHandlerManager>();
            
            //初始化通信管道
            await eventhandlerManager.CreateChannles();

            action.Invoke(eventhandlerManager);
        }

        //添加本地事件匯流排
        public static IServiceCollection AddEventBus(this IServiceCollection services)
        {
            services.AddSingleton<IEventHandlerManager, EventHandlerManager>();

            services.AddTransient<ILocalEventBus, LocalEventBus>();

            services.AddHostedService<EventBusBackgroundService>();

            return services;
        }

        //添加通信管道
        public static IServiceCollection AddChannles(this IServiceCollection services, Action<EventHandlerContainer> action)
        {
            EventHandlerContainer eventHandlerContainer = new EventHandlerContainer(services);

            action.Invoke(eventHandlerContainer);

            return services;
        }
    }
}

使用


    context.Services.AddEventBus();
    //添加通信管道
    context.Services.AddChannles(p =>
    {
        p.TryAddChannle<TestEto>();
    });
    //
    var scope = context.ServiceProvider.CreateScope();

    var eventhandlerManager = scope.ServiceProvider.GetRequiredService<IEventHandlerManager>();
    //初始化通信管道
    await eventhandlerManager.CreateChannles();
    //訂閱事件
    eventhandlerManager.Subscribe<TestEto>();
    //定義EventHandler
    public class TestEventHandler : IEventHandler<TestEto>,ITransientInjection
    {
        private ILogger _logger;
        public TestEventHandler(ILoggerFactory factory)
        {
            _logger = factory.CreateLogger<TestEventHandler>();
        }   
        public Task HandelrAsync(TestEto eto)
        {
            _logger.LogInformation($"{typeof(TestEto).Name}--{eto.Name}--{eto.Description}");
            return Task.CompletedTask;
        }
    }
    
    //構造函數註入
    [HttpGet]
		public async Task TestLocalEventBus()
		{
			TestEto eto = null;

			for(var i = 0; i < 100; i++)
			{
				eto = new TestEto()
				{
					Name ="LocalEventBus" + i.ToString(),
					Description ="wyg"+i.ToString(),
				};
				await _localEventBus.PublichAsync(eto);
			}
		}

總結

作為一個才畢業一年的初級程式員的我來說這次的channel的事件匯流排的封裝還存在著許多不足

1.無法對消息進行持久化的管理

2.沒有對消息異常進行處理

3.沒有做到像abp那樣自動訂閱

當然還存在著一些我不知道問題,歡迎各位大佬提出問題指正

源碼鏈接

這裡提一嘴(有個小小的請求),有廣州的老哥所在的公司目前還招人的話,希望給小弟一個機會(找了快一個月工作了,確實有點難,聯繫方式vx:wenyg2001411)


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 問題回溯 2023年Q2某日運營反饋一個問題,商品系統商家中心某批量工具模板無法下載,導致功能無法使用(因為模板是動態變化的) 商家中心報錯(JSON串): {"code":-1,"msg":"失敗"} 負責的同事看到失敗後立即與我展開討論(因為不是關鍵業務,所以不需要回滾,修複即可),我們發現新功 ...
  • OpenHarmony Meetup 常州站正火熱招募中! 誠邀充滿激情的開發者參與線下盛會~ 探索OpenHarmony前沿科技,暢談未來前景, 感受OpenHarmony生態構建之路的魅力! 線下參與,名額有限,僅限20位幸運者! 報名截止時間為9月26日24:00點,快快行動起來~ 參加Ope ...
  • 一. gcc 安裝 yum install gcc-c++ 安裝 nginx 需要先將官網下載的源碼進行編譯,編譯依賴 gcc 環境,如果沒有 gcc 環境,則需要安裝: cd /etc/yum.repos.d/ sed -i 's/mirrorlist/#mirrorlist/g' /etc/yu ...
  • 基於java高校獎學金管理系統設計與實現,可適用於大學獎學金管理系統,學生獎學金管理系統,學校獎學金,校園獎學金申請管理系統; ...
  • 除了繪製各類分析圖形(比如柱狀圖,折線圖,餅圖等等)以外,matplotlib 也可以在畫布上任意繪製各類幾何圖形。這對於電腦圖形學、幾何演算法和電腦輔助設計等領域非常重要。 matplitlib 中的 patches 類提供了豐富的幾何對象,本篇拋磚引玉,介紹其中幾種常用的幾何圖形繪製方法。 其 ...
  • 截至目前(2023年),Java8發佈至今已有9年,2018年9月25日,Oracle發佈了Java11,這是Java8之後的首個LTS版本。那麼從JDK8到JDK11,到底帶來了哪些特性呢?值得我們升級嗎?而且升級過程會遇到哪些問題呢?帶著這些問題,本篇文章將帶來完整的JDK8升級JDK11最全實... ...
  • 用Rust手把手編寫一個Proxy(代理), 動工 項目 ++wmproxy++ gitee 傳送門 github 傳送門 設計流程圖 flowchart LR A[客戶端] -->|Http| B[代理端] --> C[代理服務端] --> D[服務端] B -->|直達| D A -->|Htt ...
  • 在學習C#中的記錄類型時,對出現的Equals和ReferenceEquals得到的不同結果表示不理解,隨即進行相關資料查找。 值類型 == : 比較兩者的“內容”是否相同,即“值”是否一樣Equals:比較兩者的“內容”是否相同,即“值”是否一樣ReferenceEquals:返回false,因為 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...