基於channel的非同步事件匯流排

来源:https://www.cnblogs.com/wygbjd/archive/2023/09/19/17715763.html
-Advertisement-
Play Games

生成者/消費者概念編程模型 通道是生成者/使用者概念編程模型的實現。 在此編程模型中,生成者非同步生成數據,使用者非同步使用該數據。 換句話說,此模型將數據從一方移交給另一方。 嘗試將通道視為任何其他常見的泛型集合類型,例如 List。 主要區別在於,此集合管理同步,並通過工廠創建選項提供各種消耗模型。 ...


生成者/消費者概念編程模型

通道是生成者/使用者概念編程模型的實現。 在此編程模型中,生成者非同步生成數據,使用者非同步使用該數據。 換句話說,此模型將數據從一方移交給另一方。 嘗試將通道視為任何其他常見的泛型集合類型,例如 List。 主要區別在於,此集合管理同步,並通過工廠創建選項提供各種消耗模型。 這些選項控制通道的行為,例如允許它們存儲的元素數,以及達到該限制時會發生什麼情況,或者通道是由多個生成者還是多個使用者同時訪問

channel簡介

channel提供了用於在生成者和使用者之間以非同步方式傳遞數據的一組同步數據結構。

channel(管道)提供了有界通道和無界通道

無界通道

該通道可以同時供任意數量的讀取器和編寫器使用。 或者,可以通過提供 UnboundedChannelOptions 實例在創建無限制通道時指定非預設行為。 該通道的容量不受限制,並且所有寫入均以同步方式執行

有界通道

創建有界通道時,該通道將綁定到最大容量。 達到邊界時,預設行為是通道非同步阻止生成者,直到空間可用。 可以通過在創建通道時指定選項來配置此行為。 可以使用任何大於零的容量值創建有界通道

模式行為

BoundedChannelFullMode.Wait

這是預設值。 WriteAsync調用 以等待空間可用以完成寫入操作。 調用 以 TryWrite 立即返回 false 。

BoundedChannelFullMode.DropNewest

刪除並忽略通道中的最新項,以便為要寫入的項留出空間。

BoundedChannelFullMode.DropOldest

刪除並忽略通道中的最舊項,以便為要寫入的項留出空間。
BoundedChannelFullMode.DropWrite 刪除要寫入的項。

Channel.Writer API

生成者功能在 Channel<TWrite,TRead>.Writer 上公開。 下表詳細介紹了生成者 API 和預期行為:

ChannelWriter.Complete

將通道標記為已完成,這意味著不再向該通道寫入更多項。

ChannelWriter.TryComplete

嘗試將通道標記為已完成,這意味著不會向通道寫入更多數據。

ChannelWriter.TryWrite

嘗試將指定的項寫入到通道。 當與無界通道一起使用時,除非通道的編寫器通過 ChannelWriter.Complete 或 ChannelWriter.TryComplete 發出完成信號,否則這將始終返回 true。

ChannelWriter.WaitToWriteAsync

返回一個 ValueTask ,當有空間可以寫入項時完成。
ChannelWriter.WriteAsync 以非同步方式將項寫入到通道

Channel.Reader API

ChannelReader.ReadAllAsync

創建允許從通道中讀取所有數據的 IAsyncEnumerable

ChannelReader.ReadAsync

以非同步方式從通道中讀取項。

ChannelReader.TryPeek

嘗試從通道中查看項。

ChannelReader.TryRead

嘗試從通道中讀取項。

ChannelReader.WaitToReadAsync

返回在 ValueTask 數據可供讀取時完成的 。

channel的具體使用

https://learn.microsoft.com/zh-cn/dotnet/core/extensions/channels

基於channel實現事件匯流排

EventDiscriptorAttribute 特性定義

    [AttributeUsage(AttributeTargets.Class,AllowMultiple = false,Inherited = false)]
    public class EventDiscriptorAttribute:Attribute
    {
       /// <summary>
       /// 事件2名稱
       /// </summary>
       public string EventName { get; private set; }
       /// <summary>
       /// channel 容量設置
       /// </summary>
       public int Capacity { get; private set; }  
       /// <summary>
       /// 是否維持一個生產者多個消費者模型
       /// </summary>
       public bool SigleReader { get; private set; }

       public EventDiscriptorAttribute(string eventName, int capacity = 1000, bool sigleReader = true)
        {
            EventName = eventName;
            Capacity = capacity;
            SigleReader = sigleReader;
        }   
    }

定義通道容器

    //通道容器單列註入,在拓展類中初始化
    public class ChannelContainer : IChannelContainer
    {
        public List<EventDiscription> Events { get; private set; }

        private readonly IServiceCollection Services;

        public EventHandlerContainer(IServiceCollection services)
        {
            Events = new List<EventDiscription>();
            Services = services;         
            services.AddSingleton<IEventHandlerContainer>(this);
        }

        private bool Check(Type type)
        {
            var discription = Events.FirstOrDefault(p=>p.EtoType == type);

            return discription is null;
        }
        
        ///訂閱並且註入EventHandler
        public void TryAddChannle(Type eto,Type handler)
        {
            if(!Check(eto))
            {
                return;
            }

            Events.Add(new EventDiscription(eto, handler));

            var handlerbaseType = typeof(IEventHandler<>);

            var handlertype = handlerbaseType.MakeGenericType(eto);

            if(Services.Any(P=>P.ServiceType==handlertype))
            {
                return;
            }

            Services.AddTransient(handlertype, handler);
        }

        public void TryAddChannle<TEto, THandler>()
        {
            TryAddChannle(typeof(TEto),typeof(THandler));  
        }

        
        public void TryAddChannle(Type eto)
        {
            if (!Check(eto))
            {
                return;
            }

            Events.Add(new EventDiscription(eto));

            var handlerbaseType = typeof(IEventHandler<>);

            var handlertype = handlerbaseType.MakeGenericType(eto);

            if (Services.Any(P => P.ServiceType == handlertype))
            {
                return;
            }
        }

        public void TryAddChannle<TEto>()
        {
            TryAddChannle(typeof(TEto));
        }

事件管理器

事件管理器通過線程安全字典管理事件通道和事件的觸發

可以看到在Subscribe 方法中消費者並不是在訂閱後立即執行的而是放到EventTrigger中的定義的非同步事件中去

消費者執行最後又.,NET提供的托管任務去執行

 public class EventHandlerManager : IEventHandlerManager,IDisposable 
    {
        private ConcurrentDictionary<string, Channel<string>> Channels;
        private bool IsDiposed = false;

        private readonly IServiceProvider ServiceProvider;

        private readonly CancellationToken _cancellation;

        private readonly IEventHandlerContainer _eventHandlerContainer;

        private readonly ILogger _logger;

        private ConcurrentDictionary<string,EventTrigger> EventTriggers;

        private bool IsInitConsumer = true;

        public EventHandlerManager( IServiceProvider serviceProvider
            , IEventHandlerContainer eventHandlerContainer
            , ILoggerFactory loggerFactory)
        {
            ServiceProvider = serviceProvider;
            _cancellation = CancellationToken.None;
            _eventHandlerContainer = eventHandlerContainer;
            Channels = new ConcurrentDictionary<string, Channel<string>>();
            EventTriggers = new ConcurrentDictionary<string, EventTrigger>();
            _logger = loggerFactory.CreateLogger<IEventHandlerManager>();
        }
        //初始化通信管道
        public async Task CreateChannles()
        {
            var eventDiscriptions = _eventHandlerContainer.Events;

            foreach(var item in eventDiscriptions)
            {
                var attribute = item.EtoType.GetCustomAttributes()
                                            .OfType<EventDiscriptorAttribute>()
                                            .FirstOrDefault();

                if (attribute is null)
                {
                    ThorwEventAttributeNullException.ThorwException();
                }

                var channel = Channels.GetValueOrDefault(attribute.EventName);

                if (channel is not null)
                {
                    return;
                }
                //創建無界通道模型,並且初始化容量大小,當無容量寫入後等待寫入
                channel = Channel.CreateBounded<string>(
                        new BoundedChannelOptions(attribute.Capacity)
                              {
                                SingleWriter = true,
                                SingleReader = false,
                                AllowSynchronousContinuations = false,
                                FullMode = BoundedChannelFullMode.Wait
                        });

                Channels.TryAdd(attribute.EventName, channel);

                _logger.LogInformation($"創建通信管道{item.EtoType}--{attribute.EventName}");
            }
            await Task.CompletedTask;
        }

        private Channel<string> Check(Type type)
        {
            var attribute = type .GetCustomAttributes()
                                   .OfType<EventDiscriptorAttribute>()
                                   .FirstOrDefault();

            if (attribute is null)
            {
                ThorwEventAttributeNullException.ThorwException();
            }

            var channel = Channels.GetValueOrDefault(attribute.EventName);

            if(channel is null)
            {
                ThrowChannelNullException.ThrowException(attribute.EventName);
            } 

            return channel;
        }

        public void Dispose()
        {
            IsDiposed = true;
            IsInitConsumer = true;
            foreach(var trigger in EventTriggers.Values)
            {
                trigger.Dispose();
            }
            _cancellation.ThrowIfCancellationRequested();
        }

        /// <summary>
        /// 生產者
        /// </summary>
        /// <typeparam name="TEto"></typeparam>
        /// <param name="eto"></param>
        /// <returns></returns>
        public async Task WriteAsync<TEto>(TEto eto) where TEto : class
        {
            var channel = Check(typeof(TEto));
            //由於創建的是有界通道,存在有界通道消息積累超過初始大小所以迴圈判斷是否可以寫入消息
            while ( await channel.Writer.WaitToWriteAsync(CancellationToken.None)) 
            {
                var data = JsonConvert.SerializeObject(eto);

                await channel.Writer.WriteAsync(data, _cancellation);
            }          
        }
        /// <summary>
        /// 消費者
        /// </summary>
        /// <returns></returns>
        public void Subscribe<TEto>() where TEto : class
        {
            var attribute = typeof(TEto).GetCustomAttributes()
           .OfType<EventDiscriptorAttribute>()
           .FirstOrDefault();

            if (attribute is null)
            {
                ThorwEventAttributeNullException.ThorwException();
            }

            if (EventTriggers.Keys.Any(p => p == attribute.EventName))
            {
                return;
            }

            Func<Task> func = async () =>
            {
                var scope = ServiceProvider.CreateAsyncScope();

                var channel = Check(typeof(TEto));

                var handler = scope.ServiceProvider.GetRequiredService<IEventHandler<TEto>>();

                var reader = channel.Reader;

                try
                {
                    while (await channel.Reader.WaitToReadAsync())
                    {
                        while (reader.TryRead(out string str))
                        {
                            var data = JsonConvert.DeserializeObject<TEto>(str);

                            _logger.LogInformation(str);

                            await handler.HandelrAsync(data);
                        }
                    }
                }
                catch (Exception e)
                {
                    _logger.LogInformation($"本地事件匯流排異常{e.Source}--{e.Message}--{e.Data}");
                    throw;
                }
            };

            var trigger = new EventTrigger();
            trigger.Recived(func);

            EventTriggers.TryAdd(attribute.EventName, trigger);
        }

        public Task Trigger()
        {
            //只允許初始化一次消費者
            if (IsInitConsumer)
            {
                foreach (var eventTrigger in EventTriggers)
                {
                    Task.Factory.StartNew(async () =>
                    {
                        await eventTrigger.Value.Trigger();
                    });
                }
            }
            IsInitConsumer = false;
            return Task.CompletedTask;  
        }
    }
}

EventTrigger 定義

    public class EventTrigger:IDisposable
    {
        public event Func<Task>? Event;

        public EventTrigger()
        {

        }

        public void Recived(Func<Task> func)
        {
            if (Event is not null)
            {
                return;
            }
            Event += func;
        }

        public Task Trigger()
        {
            if(Event is null)
            {
                return Task.CompletedTask;  
            }
            return Event();
        }

        public void Dispose()
        {
            if( Event is not null )
            {
                Event = null;
            }
        }
    }

托管任務執行EventHandlerManager Trigger()方法

    public class EventBusBackgroundService : BackgroundService
    {
        private readonly IEventHandlerManager _eventHandlerManager;
        public EventBusBackgroundService(IEventHandlerManager eventHandlerManager) 
        { 
            _eventHandlerManager = eventHandlerManager; 
        }  
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            await _eventHandlerManager.Trigger();
        }
    }

拓展類定義

    public  static class EventBusExtensions
    {
        //添加事件匯流排並且添加channle管道
        public static IServiceCollection AddEventBusAndChannles(this IServiceCollection services,Action<EventHandlerContainer> action)
        {
            services.AddSingleton<IEventHandlerManager, EventHandlerManager>();

            services.AddTransient<ILocalEventBus, LocalEventBus>();
            ///添加托管任務
            services.AddHostedService<EventBusBackgroundService>();

            EventHandlerContainer eventHandlerContainer = new EventHandlerContainer(services);

            action.Invoke(eventHandlerContainer);

            return services;
        }

        //創建通信管道
        public static async Task InitChannles(this IServiceProvider serviceProvider,Action<IEventHandlerManager> action)
        {
            var scope = serviceProvider.CreateAsyncScope(); 

            var eventhandlerManager = scope.ServiceProvider.GetRequiredService<IEventHandlerManager>();
            
            //初始化通信管道
            await eventhandlerManager.CreateChannles();

            action.Invoke(eventhandlerManager);
        }

        //添加本地事件匯流排
        public static IServiceCollection AddEventBus(this IServiceCollection services)
        {
            services.AddSingleton<IEventHandlerManager, EventHandlerManager>();

            services.AddTransient<ILocalEventBus, LocalEventBus>();

            services.AddHostedService<EventBusBackgroundService>();

            return services;
        }

        //添加通信管道
        public static IServiceCollection AddChannles(this IServiceCollection services, Action<EventHandlerContainer> action)
        {
            EventHandlerContainer eventHandlerContainer = new EventHandlerContainer(services);

            action.Invoke(eventHandlerContainer);

            return services;
        }
    }
}

使用


    context.Services.AddEventBus();
    //添加通信管道
    context.Services.AddChannles(p =>
    {
        p.TryAddChannle<TestEto>();
    });
    //
    var scope = context.ServiceProvider.CreateScope();

    var eventhandlerManager = scope.ServiceProvider.GetRequiredService<IEventHandlerManager>();
    //初始化通信管道
    await eventhandlerManager.CreateChannles();
    //訂閱事件
    eventhandlerManager.Subscribe<TestEto>();
    //定義EventHandler
    public class TestEventHandler : IEventHandler<TestEto>,ITransientInjection
    {
        private ILogger _logger;
        public TestEventHandler(ILoggerFactory factory)
        {
            _logger = factory.CreateLogger<TestEventHandler>();
        }   
        public Task HandelrAsync(TestEto eto)
        {
            _logger.LogInformation($"{typeof(TestEto).Name}--{eto.Name}--{eto.Description}");
            return Task.CompletedTask;
        }
    }
    
    //構造函數註入
    [HttpGet]
		public async Task TestLocalEventBus()
		{
			TestEto eto = null;

			for(var i = 0; i < 100; i++)
			{
				eto = new TestEto()
				{
					Name ="LocalEventBus" + i.ToString(),
					Description ="wyg"+i.ToString(),
				};
				await _localEventBus.PublichAsync(eto);
			}
		}

總結

作為一個才畢業一年的初級程式員的我來說這次的channel的事件匯流排的封裝還存在著許多不足

1.無法對消息進行持久化的管理

2.沒有對消息異常進行處理

3.沒有做到像abp那樣自動訂閱

當然還存在著一些我不知道問題,歡迎各位大佬提出問題指正

源碼鏈接

這裡提一嘴(有個小小的請求),有廣州的老哥所在的公司目前還招人的話,希望給小弟一個機會(找了快一個月工作了,確實有點難,聯繫方式vx:wenyg2001411)


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 問題回溯 2023年Q2某日運營反饋一個問題,商品系統商家中心某批量工具模板無法下載,導致功能無法使用(因為模板是動態變化的) 商家中心報錯(JSON串): {"code":-1,"msg":"失敗"} 負責的同事看到失敗後立即與我展開討論(因為不是關鍵業務,所以不需要回滾,修複即可),我們發現新功 ...
  • OpenHarmony Meetup 常州站正火熱招募中! 誠邀充滿激情的開發者參與線下盛會~ 探索OpenHarmony前沿科技,暢談未來前景, 感受OpenHarmony生態構建之路的魅力! 線下參與,名額有限,僅限20位幸運者! 報名截止時間為9月26日24:00點,快快行動起來~ 參加Ope ...
  • 一. gcc 安裝 yum install gcc-c++ 安裝 nginx 需要先將官網下載的源碼進行編譯,編譯依賴 gcc 環境,如果沒有 gcc 環境,則需要安裝: cd /etc/yum.repos.d/ sed -i 's/mirrorlist/#mirrorlist/g' /etc/yu ...
  • 基於java高校獎學金管理系統設計與實現,可適用於大學獎學金管理系統,學生獎學金管理系統,學校獎學金,校園獎學金申請管理系統; ...
  • 除了繪製各類分析圖形(比如柱狀圖,折線圖,餅圖等等)以外,matplotlib 也可以在畫布上任意繪製各類幾何圖形。這對於電腦圖形學、幾何演算法和電腦輔助設計等領域非常重要。 matplitlib 中的 patches 類提供了豐富的幾何對象,本篇拋磚引玉,介紹其中幾種常用的幾何圖形繪製方法。 其 ...
  • 截至目前(2023年),Java8發佈至今已有9年,2018年9月25日,Oracle發佈了Java11,這是Java8之後的首個LTS版本。那麼從JDK8到JDK11,到底帶來了哪些特性呢?值得我們升級嗎?而且升級過程會遇到哪些問題呢?帶著這些問題,本篇文章將帶來完整的JDK8升級JDK11最全實... ...
  • 用Rust手把手編寫一個Proxy(代理), 動工 項目 ++wmproxy++ gitee 傳送門 github 傳送門 設計流程圖 flowchart LR A[客戶端] -->|Http| B[代理端] --> C[代理服務端] --> D[服務端] B -->|直達| D A -->|Htt ...
  • 在學習C#中的記錄類型時,對出現的Equals和ReferenceEquals得到的不同結果表示不理解,隨即進行相關資料查找。 值類型 == : 比較兩者的“內容”是否相同,即“值”是否一樣Equals:比較兩者的“內容”是否相同,即“值”是否一樣ReferenceEquals:返回false,因為 ...
一周排行
    -Advertisement-
    Play Games
  • 示例項目結構 在 Visual Studio 中創建一個 WinForms 應用程式後,項目結構如下所示: MyWinFormsApp/ │ ├───Properties/ │ └───Settings.settings │ ├───bin/ │ ├───Debug/ │ └───Release/ ...
  • [STAThread] 特性用於需要與 COM 組件交互的應用程式,尤其是依賴單線程模型(如 Windows Forms 應用程式)的組件。在 STA 模式下,線程擁有自己的消息迴圈,這對於處理用戶界面和某些 COM 組件是必要的。 [STAThread] static void Main(stri ...
  • 在WinForm中使用全局異常捕獲處理 在WinForm應用程式中,全局異常捕獲是確保程式穩定性的關鍵。通過在Program類的Main方法中設置全局異常處理,可以有效地捕獲並處理未預見的異常,從而避免程式崩潰。 註冊全局異常事件 [STAThread] static void Main() { / ...
  • 前言 給大家推薦一款開源的 Winform 控制項庫,可以幫助我們開發更加美觀、漂亮的 WinForm 界面。 項目介紹 SunnyUI.NET 是一個基於 .NET Framework 4.0+、.NET 6、.NET 7 和 .NET 8 的 WinForm 開源控制項庫,同時也提供了工具類庫、擴展 ...
  • 說明 該文章是屬於OverallAuth2.0系列文章,每周更新一篇該系列文章(從0到1完成系統開發)。 該系統文章,我會儘量說的非常詳細,做到不管新手、老手都能看懂。 說明:OverallAuth2.0 是一個簡單、易懂、功能強大的許可權+可視化流程管理系統。 有興趣的朋友,請關註我吧(*^▽^*) ...
  • 一、下載安裝 1.下載git 必須先下載並安裝git,再TortoiseGit下載安裝 git安裝參考教程:https://blog.csdn.net/mukes/article/details/115693833 2.TortoiseGit下載與安裝 TortoiseGit,Git客戶端,32/6 ...
  • 前言 在項目開發過程中,理解數據結構和演算法如同掌握蓋房子的秘訣。演算法不僅能幫助我們編寫高效、優質的代碼,還能解決項目中遇到的各種難題。 給大家推薦一個支持C#的開源免費、新手友好的數據結構與演算法入門教程:Hello演算法。 項目介紹 《Hello Algo》是一本開源免費、新手友好的數據結構與演算法入門 ...
  • 1.生成單個Proto.bat內容 @rem Copyright 2016, Google Inc. @rem All rights reserved. @rem @rem Redistribution and use in source and binary forms, with or with ...
  • 一:背景 1. 講故事 前段時間有位朋友找到我,說他的窗體程式在客戶這邊出現了卡死,讓我幫忙看下怎麼回事?dump也生成了,既然有dump了那就上 windbg 分析吧。 二:WinDbg 分析 1. 為什麼會卡死 窗體程式的卡死,入口門檻很低,後續往下分析就不一定了,不管怎麼說先用 !clrsta ...
  • 前言 人工智慧時代,人臉識別技術已成為安全驗證、身份識別和用戶交互的關鍵工具。 給大家推薦一款.NET 開源提供了強大的人臉識別 API,工具不僅易於集成,還具備高效處理能力。 本文將介紹一款如何利用這些API,為我們的項目添加智能識別的亮點。 項目介紹 GitHub 上擁有 1.2k 星標的 C# ...