閑來無聊在我的Biwen.QuickApi中實現一下極簡的事件匯流排,其實代碼還是蠻簡單的,對於初學者可能有些幫助 就貼出來,有什麼不足的地方也歡迎板磚交流~ 首先定義一個事件約定的空介面 public interface IEvent{} 然後定義事件訂閱者介面 public interface I ...
閑來無聊在我的Biwen.QuickApi中實現一下極簡的事件匯流排,其實代碼還是蠻簡單的,對於初學者可能有些幫助 就貼出來,有什麼不足的地方也歡迎板磚交流~
首先定義一個事件約定的空介面
public interface IEvent{}
然後定義事件訂閱者介面
public interface IEventSubscriber<T> where T : IEvent
{
Task HandleAsync(T @event, CancellationToken ct);
/// <summary>
/// 執行排序
/// </summary>
int Order { get; }
/// <summary>
/// 如果發生錯誤是否拋出異常,將阻塞後續Handler
/// </summary>
bool ThrowIfError { get; }
}
public abstract class EventSubscriber<T> : IEventSubscriber<T> where T : IEvent
{
public abstract Task HandleAsync(T @event, CancellationToken ct);
public virtual int Order => 0;
/// <summary>
/// 預設不拋出異常
/// </summary>
public virtual bool ThrowIfError => false;
}
接著就是發佈者
internal class Publisher(IServiceProvider serviceProvider)
{
public async Task PublishAsync<T>(T @event, CancellationToken ct) where T : IEvent
{
var handlers = serviceProvider.GetServices<IEventSubscriber<T>>();
if (handlers is null) return;
foreach (var handler in handlers.OrderBy(x => x.Order))
{
try
{
await handler.HandleAsync(@event, ct);
}
catch
{
if (handler.ThrowIfError)
{
throw;
}
//todo:
}
}
}
}
到此發佈訂閱的基本代碼也就寫完了.接下來就是註冊發佈者和所有的訂閱者了
核心代碼如下:
static readonly Type InterfaceEventSubscriber = typeof(IEventSubscriber<>);
static readonly object _lock = new();//鎖
static bool IsToGenericInterface(this Type type, Type baseInterface)
{
if (type == null) return false;
if (baseInterface == null) return false;
return type.GetInterfaces().Any(x => x.IsGenericType && x.GetGenericTypeDefinition() == baseInterface);
}
static IEnumerable<Type> _eventHanlers = null!;
static IEnumerable<Type> EventHandlers
{
get
{
lock (_lock)
return _eventHanlers ??= ASS.InAllRequiredAssemblies.Where(x =>
!x.IsAbstract && x.IsPublic && x.IsClass && x.IsToGenericInterface(InterfaceEventSubscriber));
}
}
//註冊EventSubscribers
foreach (var subscriberType in EventSubscribers)
{
//存在一個訂閱者訂閱多個事件的情況:
var baseTypes = subscriberType.GetInterfaces().Where(x => x.IsGenericType && x.GetGenericTypeDefinition() == InterfaceEventSubscriber).ToArray();
foreach (var baseType in baseTypes)
{
services.AddScoped(baseType, subscriberType);
}
}
//註冊Publisher
services.AddScoped<Publisher>();
至此發佈訂閱的代碼也就完成了!
現在我們將發佈訂閱封裝到QuickApi中使用:
internal interface IPublisher
{
/// <summary>
/// Event Publish
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="event">Event</param>
/// <returns></returns>
Task PublishAsync<T>(T @event, CancellationToken cancellationToken) where T : IEvent;
}
然後BaseQuickApi實現IPublisher介面
internal interface IQuickApi<Req, Rsp> : IHandlerBuilder, IQuickApiMiddlewareHandler, IAntiforgeryApi, IPublisher
{
ValueTask<Rsp> ExecuteAsync(Req request);
}
// BaseQuickApi.PublishAsync
public virtual async Task PublishAsync<T>(T @event, CancellationToken cancellationToken = default) where T : IEvent
{
using var scope = ServiceRegistration.ServiceProvider.CreateScope();
var publisher = scope.ServiceProvider.GetRequiredService<Publisher>();
await publisher.PublishAsync(@event, cancellationToken);
}
至此功能完成,接下來我們測試一下:
using Biwen.QuickApi.Events;
using Microsoft.AspNetCore.Mvc;
namespace Biwen.QuickApi.DemoWeb.Apis
{
public class MyEvent : BaseRequest<MyEvent>,IEvent
{
[FromQuery]
public string? Message { get; set; }
}
public class MyEventHandler : EventSubscriber<MyEvent>
{
private readonly ILogger<MyEventHandler> _logger;
public MyEventHandler(ILogger<MyEventHandler> logger)
{
_logger = logger;
}
public override Task HandleAsync(MyEvent @event, CancellationToken ct)
{
_logger.LogInformation($"msg 2 : {@event.Message}");
return Task.CompletedTask;
}
}
/// <summary>
/// 更早執行的Handler
/// </summary>
public class MyEventHandler2 : EventSubscriber<MyEvent>
{
private readonly ILogger<MyEventHandler2> _logger;
public MyEventHandler2(ILogger<MyEventHandler2> logger)
{
_logger = logger;
}
public override Task HandleAsync(MyEvent @event, CancellationToken ct)
{
_logger.LogInformation($"msg 1 : {@event.Message}");
return Task.CompletedTask;
}
public override int Order => -1;
}
/// <summary>
/// 拋出異常的Handler
/// </summary>
public class MyEventHandler3 : EventSubscriber<MyEvent>
{
private readonly ILogger<MyEventHandler3> _logger;
public MyEventHandler3(ILogger<MyEventHandler3> logger)
{
_logger = logger;
}
public override Task HandleAsync(MyEvent @event, CancellationToken ct)
{
throw new Exception("error");
}
public override int Order => -2;
public override bool ThrowIfError => false;
}
[QuickApi("event")]
public class EventApi : BaseQuickApi<MyEvent>
{
public override async ValueTask<IResultResponse> ExecuteAsync(MyEvent request)
{
//publish
await PublishAsync(request);
return IResultResponse.Content("send event");
}
}
}
最後我們運行項目測試一下功能:
curl -X 'GET' \
'http://localhost:5101/quick/event?Message=hello%20world' \
-H 'accept: */*'
源代碼我發佈到了GitHub,歡迎star! https://github.com/vipwan/Biwen.QuickApi