生成者/消費者概念編程模型 通道是生成者/使用者概念編程模型的實現。 在此編程模型中,生成者非同步生成數據,使用者非同步使用該數據。 換句話說,此模型將數據從一方移交給另一方。 嘗試將通道視為任何其他常見的泛型集合類型,例如 List。 主要區別在於,此集合管理同步,並通過工廠創建選項提供各種消耗模型。 ...
生成者/消費者概念編程模型
通道是生成者/使用者概念編程模型的實現。 在此編程模型中,生成者非同步生成數據,使用者非同步使用該數據。 換句話說,此模型將數據從一方移交給另一方。 嘗試將通道視為任何其他常見的泛型集合類型,例如 List
channel簡介
channel提供了用於在生成者和使用者之間以非同步方式傳遞數據的一組同步數據結構。
channel(管道)提供了有界通道和無界通道
無界通道
該通道可以同時供任意數量的讀取器和編寫器使用。 或者,可以通過提供 UnboundedChannelOptions 實例在創建無限制通道時指定非預設行為。 該通道的容量不受限制,並且所有寫入均以同步方式執行
有界通道
創建有界通道時,該通道將綁定到最大容量。 達到邊界時,預設行為是通道非同步阻止生成者,直到空間可用。 可以通過在創建通道時指定選項來配置此行為。 可以使用任何大於零的容量值創建有界通道
模式行為
BoundedChannelFullMode.Wait
這是預設值。 WriteAsync調用 以等待空間可用以完成寫入操作。 調用 以 TryWrite 立即返回 false 。
BoundedChannelFullMode.DropNewest
刪除並忽略通道中的最新項,以便為要寫入的項留出空間。
BoundedChannelFullMode.DropOldest
刪除並忽略通道中的最舊項,以便為要寫入的項留出空間。
BoundedChannelFullMode.DropWrite 刪除要寫入的項。
Channel.Writer API
生成者功能在 Channel<TWrite,TRead>.Writer 上公開。 下表詳細介紹了生成者 API 和預期行為:
ChannelWriter.Complete
將通道標記為已完成,這意味著不再向該通道寫入更多項。
ChannelWriter.TryComplete
嘗試將通道標記為已完成,這意味著不會向通道寫入更多數據。
ChannelWriter.TryWrite
嘗試將指定的項寫入到通道。 當與無界通道一起使用時,除非通道的編寫器通過 ChannelWriter
ChannelWriter.WaitToWriteAsync
返回一個 ValueTask
ChannelWriter
Channel.Reader API
ChannelReader.ReadAllAsync
創建允許從通道中讀取所有數據的 IAsyncEnumerable
ChannelReader.ReadAsync
以非同步方式從通道中讀取項。
ChannelReader.TryPeek
嘗試從通道中查看項。
ChannelReader.TryRead
嘗試從通道中讀取項。
ChannelReader.WaitToReadAsync
返回在 ValueTask
channel的具體使用
https://learn.microsoft.com/zh-cn/dotnet/core/extensions/channels
基於channel實現事件匯流排
EventDiscriptorAttribute 特性定義
[AttributeUsage(AttributeTargets.Class,AllowMultiple = false,Inherited = false)]
public class EventDiscriptorAttribute:Attribute
{
/// <summary>
/// 事件2名稱
/// </summary>
public string EventName { get; private set; }
/// <summary>
/// channel 容量設置
/// </summary>
public int Capacity { get; private set; }
/// <summary>
/// 是否維持一個生產者多個消費者模型
/// </summary>
public bool SigleReader { get; private set; }
public EventDiscriptorAttribute(string eventName, int capacity = 1000, bool sigleReader = true)
{
EventName = eventName;
Capacity = capacity;
SigleReader = sigleReader;
}
}
定義通道容器
//通道容器單列註入,在拓展類中初始化
public class ChannelContainer : IChannelContainer
{
public List<EventDiscription> Events { get; private set; }
private readonly IServiceCollection Services;
public EventHandlerContainer(IServiceCollection services)
{
Events = new List<EventDiscription>();
Services = services;
services.AddSingleton<IEventHandlerContainer>(this);
}
private bool Check(Type type)
{
var discription = Events.FirstOrDefault(p=>p.EtoType == type);
return discription is null;
}
///訂閱並且註入EventHandler
public void TryAddChannle(Type eto,Type handler)
{
if(!Check(eto))
{
return;
}
Events.Add(new EventDiscription(eto, handler));
var handlerbaseType = typeof(IEventHandler<>);
var handlertype = handlerbaseType.MakeGenericType(eto);
if(Services.Any(P=>P.ServiceType==handlertype))
{
return;
}
Services.AddTransient(handlertype, handler);
}
public void TryAddChannle<TEto, THandler>()
{
TryAddChannle(typeof(TEto),typeof(THandler));
}
public void TryAddChannle(Type eto)
{
if (!Check(eto))
{
return;
}
Events.Add(new EventDiscription(eto));
var handlerbaseType = typeof(IEventHandler<>);
var handlertype = handlerbaseType.MakeGenericType(eto);
if (Services.Any(P => P.ServiceType == handlertype))
{
return;
}
}
public void TryAddChannle<TEto>()
{
TryAddChannle(typeof(TEto));
}
事件管理器
事件管理器通過線程安全字典管理事件通道和事件的觸發
可以看到在Subscribe 方法中消費者並不是在訂閱後立即執行的而是放到EventTrigger中的定義的非同步事件中去
消費者執行最後又.,NET提供的托管任務去執行
public class EventHandlerManager : IEventHandlerManager,IDisposable
{
private ConcurrentDictionary<string, Channel<string>> Channels;
private bool IsDiposed = false;
private readonly IServiceProvider ServiceProvider;
private readonly CancellationToken _cancellation;
private readonly IEventHandlerContainer _eventHandlerContainer;
private readonly ILogger _logger;
private ConcurrentDictionary<string,EventTrigger> EventTriggers;
private bool IsInitConsumer = true;
public EventHandlerManager( IServiceProvider serviceProvider
, IEventHandlerContainer eventHandlerContainer
, ILoggerFactory loggerFactory)
{
ServiceProvider = serviceProvider;
_cancellation = CancellationToken.None;
_eventHandlerContainer = eventHandlerContainer;
Channels = new ConcurrentDictionary<string, Channel<string>>();
EventTriggers = new ConcurrentDictionary<string, EventTrigger>();
_logger = loggerFactory.CreateLogger<IEventHandlerManager>();
}
//初始化通信管道
public async Task CreateChannles()
{
var eventDiscriptions = _eventHandlerContainer.Events;
foreach(var item in eventDiscriptions)
{
var attribute = item.EtoType.GetCustomAttributes()
.OfType<EventDiscriptorAttribute>()
.FirstOrDefault();
if (attribute is null)
{
ThorwEventAttributeNullException.ThorwException();
}
var channel = Channels.GetValueOrDefault(attribute.EventName);
if (channel is not null)
{
return;
}
//創建無界通道模型,並且初始化容量大小,當無容量寫入後等待寫入
channel = Channel.CreateBounded<string>(
new BoundedChannelOptions(attribute.Capacity)
{
SingleWriter = true,
SingleReader = false,
AllowSynchronousContinuations = false,
FullMode = BoundedChannelFullMode.Wait
});
Channels.TryAdd(attribute.EventName, channel);
_logger.LogInformation($"創建通信管道{item.EtoType}--{attribute.EventName}");
}
await Task.CompletedTask;
}
private Channel<string> Check(Type type)
{
var attribute = type .GetCustomAttributes()
.OfType<EventDiscriptorAttribute>()
.FirstOrDefault();
if (attribute is null)
{
ThorwEventAttributeNullException.ThorwException();
}
var channel = Channels.GetValueOrDefault(attribute.EventName);
if(channel is null)
{
ThrowChannelNullException.ThrowException(attribute.EventName);
}
return channel;
}
public void Dispose()
{
IsDiposed = true;
IsInitConsumer = true;
foreach(var trigger in EventTriggers.Values)
{
trigger.Dispose();
}
_cancellation.ThrowIfCancellationRequested();
}
/// <summary>
/// 生產者
/// </summary>
/// <typeparam name="TEto"></typeparam>
/// <param name="eto"></param>
/// <returns></returns>
public async Task WriteAsync<TEto>(TEto eto) where TEto : class
{
var channel = Check(typeof(TEto));
//由於創建的是有界通道,存在有界通道消息積累超過初始大小所以迴圈判斷是否可以寫入消息
while ( await channel.Writer.WaitToWriteAsync(CancellationToken.None))
{
var data = JsonConvert.SerializeObject(eto);
await channel.Writer.WriteAsync(data, _cancellation);
}
}
/// <summary>
/// 消費者
/// </summary>
/// <returns></returns>
public void Subscribe<TEto>() where TEto : class
{
var attribute = typeof(TEto).GetCustomAttributes()
.OfType<EventDiscriptorAttribute>()
.FirstOrDefault();
if (attribute is null)
{
ThorwEventAttributeNullException.ThorwException();
}
if (EventTriggers.Keys.Any(p => p == attribute.EventName))
{
return;
}
Func<Task> func = async () =>
{
var scope = ServiceProvider.CreateAsyncScope();
var channel = Check(typeof(TEto));
var handler = scope.ServiceProvider.GetRequiredService<IEventHandler<TEto>>();
var reader = channel.Reader;
try
{
while (await channel.Reader.WaitToReadAsync())
{
while (reader.TryRead(out string str))
{
var data = JsonConvert.DeserializeObject<TEto>(str);
_logger.LogInformation(str);
await handler.HandelrAsync(data);
}
}
}
catch (Exception e)
{
_logger.LogInformation($"本地事件匯流排異常{e.Source}--{e.Message}--{e.Data}");
throw;
}
};
var trigger = new EventTrigger();
trigger.Recived(func);
EventTriggers.TryAdd(attribute.EventName, trigger);
}
public Task Trigger()
{
//只允許初始化一次消費者
if (IsInitConsumer)
{
foreach (var eventTrigger in EventTriggers)
{
Task.Factory.StartNew(async () =>
{
await eventTrigger.Value.Trigger();
});
}
}
IsInitConsumer = false;
return Task.CompletedTask;
}
}
}
EventTrigger 定義
public class EventTrigger:IDisposable
{
public event Func<Task>? Event;
public EventTrigger()
{
}
public void Recived(Func<Task> func)
{
if (Event is not null)
{
return;
}
Event += func;
}
public Task Trigger()
{
if(Event is null)
{
return Task.CompletedTask;
}
return Event();
}
public void Dispose()
{
if( Event is not null )
{
Event = null;
}
}
}
托管任務執行EventHandlerManager Trigger()方法
public class EventBusBackgroundService : BackgroundService
{
private readonly IEventHandlerManager _eventHandlerManager;
public EventBusBackgroundService(IEventHandlerManager eventHandlerManager)
{
_eventHandlerManager = eventHandlerManager;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _eventHandlerManager.Trigger();
}
}
拓展類定義
public static class EventBusExtensions
{
//添加事件匯流排並且添加channle管道
public static IServiceCollection AddEventBusAndChannles(this IServiceCollection services,Action<EventHandlerContainer> action)
{
services.AddSingleton<IEventHandlerManager, EventHandlerManager>();
services.AddTransient<ILocalEventBus, LocalEventBus>();
///添加托管任務
services.AddHostedService<EventBusBackgroundService>();
EventHandlerContainer eventHandlerContainer = new EventHandlerContainer(services);
action.Invoke(eventHandlerContainer);
return services;
}
//創建通信管道
public static async Task InitChannles(this IServiceProvider serviceProvider,Action<IEventHandlerManager> action)
{
var scope = serviceProvider.CreateAsyncScope();
var eventhandlerManager = scope.ServiceProvider.GetRequiredService<IEventHandlerManager>();
//初始化通信管道
await eventhandlerManager.CreateChannles();
action.Invoke(eventhandlerManager);
}
//添加本地事件匯流排
public static IServiceCollection AddEventBus(this IServiceCollection services)
{
services.AddSingleton<IEventHandlerManager, EventHandlerManager>();
services.AddTransient<ILocalEventBus, LocalEventBus>();
services.AddHostedService<EventBusBackgroundService>();
return services;
}
//添加通信管道
public static IServiceCollection AddChannles(this IServiceCollection services, Action<EventHandlerContainer> action)
{
EventHandlerContainer eventHandlerContainer = new EventHandlerContainer(services);
action.Invoke(eventHandlerContainer);
return services;
}
}
}
使用
context.Services.AddEventBus();
//添加通信管道
context.Services.AddChannles(p =>
{
p.TryAddChannle<TestEto>();
});
//
var scope = context.ServiceProvider.CreateScope();
var eventhandlerManager = scope.ServiceProvider.GetRequiredService<IEventHandlerManager>();
//初始化通信管道
await eventhandlerManager.CreateChannles();
//訂閱事件
eventhandlerManager.Subscribe<TestEto>();
//定義EventHandler
public class TestEventHandler : IEventHandler<TestEto>,ITransientInjection
{
private ILogger _logger;
public TestEventHandler(ILoggerFactory factory)
{
_logger = factory.CreateLogger<TestEventHandler>();
}
public Task HandelrAsync(TestEto eto)
{
_logger.LogInformation($"{typeof(TestEto).Name}--{eto.Name}--{eto.Description}");
return Task.CompletedTask;
}
}
//構造函數註入
[HttpGet]
public async Task TestLocalEventBus()
{
TestEto eto = null;
for(var i = 0; i < 100; i++)
{
eto = new TestEto()
{
Name ="LocalEventBus" + i.ToString(),
Description ="wyg"+i.ToString(),
};
await _localEventBus.PublichAsync(eto);
}
}
總結
作為一個才畢業一年的初級程式員的我來說這次的channel的事件匯流排的封裝還存在著許多不足
1.無法對消息進行持久化的管理
2.沒有對消息異常進行處理
3.沒有做到像abp那樣自動訂閱
當然還存在著一些我不知道問題,歡迎各位大佬提出問題指正
這裡提一嘴(有個小小的請求),有廣州的老哥所在的公司目前還招人的話,希望給小弟一個機會(找了快一個月工作了,確實有點難,聯繫方式vx:wenyg2001411)