動手造輪子:實現一個簡單的 EventBus Intro EventBus 是一種事件發佈訂閱模式,通過 EventBus 我們可以很方便的實現解耦,將事件的發起和事件的處理的很好的分隔開來,很好的實現解耦。 微軟官方的示例項目 "EShopOnContainers" 也有在使用 EventBus ...
動手造輪子:實現一個簡單的 EventBus
Intro
EventBus 是一種事件發佈訂閱模式,通過 EventBus 我們可以很方便的實現解耦,將事件的發起和事件的處理的很好的分隔開來,很好的實現解耦。 微軟官方的示例項目 EShopOnContainers 也有在使用 EventBus 。
這裡的 EventBus 實現也是參考借鑒了微軟 eShopOnContainers 項目。
EventBus 處理流程:
微服務間使用 EventBus 實現系統間解耦:
藉助 EventBus 我們可以很好的實現組件之間,服務之間,系統之間的解耦以及相互通信的問題。
起初覺得 EventBus 和 MQ 其實差不多嘛,都是通過非同步處理來實現解耦合,高性能。後來看到了下麵這張圖才算明白為什麼要用 EventBus 以及 EventBus 和 MQ 之間的關係,EventBus 是抽象的,可以用MQ來實現 EventBus.
為什麼要使用 EventBus
- 解耦合(輕鬆的實現系統間解耦)
- 高性能可擴展(每一個事件都是簡單獨立且不可更改的對象,只需要保存新增的事件,不涉及其他的變更刪除操作)
- 系統審計(每一個事件都是不可變更的,每一個事件都是可追溯的)
- ...
EventBus 整體架構:
IEventBase
:所有的事件應該實現這個介面,這個介面定義了事件的唯一idEventId
和事件發生的事件EventAt
IEventHandler
:定義了一個Handle
方法來處理相應的事件
IEventStore
:所有的事件的處理存儲,保存事件的IEventHandler
,一般不會直接操作,通過 EventBus 的訂閱和取消訂閱來操作EventStore
IEventBus
:用來發佈/訂閱/取消訂閱事件,並將事件的某一個IEventHandler
保存到 EventStore 或從 EventStore 中移除
使用示例
來看一個使用示例,完整代碼示例:
internal class EventTest
{
public static void MainTest()
{
var eventBus = DependencyResolver.Current.ResolveService<IEventBus>();
eventBus.Subscribe<CounterEvent, CounterEventHandler1>();
eventBus.Subscribe<CounterEvent, CounterEventHandler2>();
eventBus.Subscribe<CounterEvent, DelegateEventHandler<CounterEvent>>();
eventBus.Publish(new CounterEvent { Counter = 1 });
eventBus.Unsubscribe<CounterEvent, CounterEventHandler1>();
eventBus.Unsubscribe<CounterEvent, DelegateEventHandler<CounterEvent>>();
eventBus.Publish(new CounterEvent { Counter = 2 });
}
}
internal class CounterEvent : EventBase
{
public int Counter { get; set; }
}
internal class CounterEventHandler1 : IEventHandler<CounterEvent>
{
public Task Handle(CounterEvent @event)
{
LogHelper.GetLogger<CounterEventHandler1>().Info($"Event Info: {@event.ToJson()}, Handler Type:{GetType().FullName}");
return Task.CompletedTask;
}
}
internal class CounterEventHandler2 : IEventHandler<CounterEvent>
{
public Task Handle(CounterEvent @event)
{
LogHelper.GetLogger<CounterEventHandler2>().Info($"Event Info: {@event.ToJson()}, Handler Type:{GetType().FullName}");
return Task.CompletedTask;
}
}
具體實現
EventStoreInMemory 是 IEventStore
將數據放在記憶體中的實現,使用了 ConcurrentDictionary 以及 HashSet 來儘可能的保證高效,具體實現代碼如下:
public class EventStoreInMemory : IEventStore
{
private readonly ConcurrentDictionary<string, HashSet<Type>> _eventHandlers = new ConcurrentDictionary<string, HashSet<Type>>();
public bool AddSubscription<TEvent, TEventHandler>()
where TEvent : IEventBase
where TEventHandler : IEventHandler<TEvent>
{
var eventKey = GetEventKey<TEvent>();
if (_eventHandlers.ContainsKey(eventKey))
{
return _eventHandlers[eventKey].Add(typeof(TEventHandler));
}
else
{
return _eventHandlers.TryAdd(eventKey, new HashSet<Type>()
{
typeof(TEventHandler)
});
}
}
public bool Clear()
{
_eventHandlers.Clear();
return true;
}
public ICollection<Type> GetEventHandlerTypes<TEvent>() where TEvent : IEventBase
{
if(_eventHandlers.Count == 0)
return new Type[0];
var eventKey = GetEventKey<TEvent>();
if (_eventHandlers.TryGetValue(eventKey, out var handlers))
{
return handlers;
}
return new Type[0];
}
public string GetEventKey<TEvent>()
{
return typeof(TEvent).FullName;
}
public bool HasSubscriptionsForEvent<TEvent>() where TEvent : IEventBase
{
if(_eventHandlers.Count == 0)
return false;
var eventKey = GetEventKey<TEvent>();
return _eventHandlers.ContainsKey(eventKey);
}
public bool RemoveSubscription<TEvent, TEventHandler>()
where TEvent : IEventBase
where TEventHandler : IEventHandler<TEvent>
{
if(_eventHandlers.Count == 0)
return false;
var eventKey = GetEventKey<TEvent>();
if (_eventHandlers.ContainsKey(eventKey))
{
return _eventHandlers[eventKey].Remove(typeof(TEventHandler));
}
return false;
}
}
EventBus 的實現,從上面可以看到 EventStore 保存的是 IEventHandler
對應的 Type,在 Publish 的時候根據 Type 從 IoC 容器中取得相應的 Handler 即可,如果沒有在 IoC 容器中找到對應的類型,則會嘗試創建一個類型實例,然後調用 IEventHandler
的 Handle
方法,代碼如下:
/// <summary>
/// EventBus in process
/// </summary>
public class EventBus : IEventBus
{
private static readonly ILogHelperLogger Logger = Helpers.LogHelper.GetLogger<EventBus>();
private readonly IEventStore _eventStore;
private readonly IServiceProvider _serviceProvider;
public EventBus(IEventStore eventStore, IServiceProvider serviceProvider = null)
{
_eventStore = eventStore;
_serviceProvider = serviceProvider ?? DependencyResolver.Current;
}
public bool Publish<TEvent>(TEvent @event) where TEvent : IEventBase
{
if (!_eventStore.HasSubscriptionsForEvent<TEvent>())
{
return false;
}
var handlers = _eventStore.GetEventHandlerTypes<TEvent>();
if (handlers.Count > 0)
{
var handlerTasks = new List<Task>();
foreach (var handlerType in handlers)
{
try
{
if (_serviceProvider.GetServiceOrCreateInstance(handlerType) is IEventHandler<TEvent> handler)
{
handlerTasks.Add(handler.Handle(@event));
}
}
catch (Exception ex)
{
Logger.Error(ex, $"handle event [{_eventStore.GetEventKey<TEvent>()}] error, eventHandlerType:{handlerType.FullName}");
}
}
handlerTasks.WhenAll().ConfigureAwait(false);
return true;
}
return false;
}
public bool Subscribe<TEvent, TEventHandler>()
where TEvent : IEventBase
where TEventHandler : IEventHandler<TEvent>
{
return _eventStore.AddSubscription<TEvent, TEventHandler>();
}
public bool Unsubscribe<TEvent, TEventHandler>()
where TEvent : IEventBase
where TEventHandler : IEventHandler<TEvent>
{
return _eventStore.RemoveSubscription<TEvent, TEventHandler>();
}
}
項目實例
來看一個實際的項目中的使用,在我的活動室預約項目中有一個公告的模塊,訪問公告詳情頁面,這個公告的訪問次數加1,把這個訪問次數加1改成了用 EventBus 來實現,實際項目代碼:https://github.com/WeihanLi/ActivityReservation/blob/67e2cb8e92876629a7af6dc051745dd8c7e9faeb/ActivityReservation/Startup.cs
- 定義 Event 以及 EventHandler
public class NoticeViewEvent : EventBase
{
public Guid NoticeId { get; set; }
// UserId
// IP
// ...
}
public class NoticeViewEventHandler : IEventHandler<NoticeViewEvent>
{
public async Task Handle(NoticeViewEvent @event)
{
await DependencyResolver.Current.TryInvokeServiceAsync<ReservationDbContext>(async dbContext =>
{
var notice = await dbContext.Notices.FindAsync(@event.NoticeId);
notice.NoticeVisitCount += 1;
await dbContext.SaveChangesAsync();
});
}
}
這裡的 Event 只定義了一個 NoticeId ,其實也可以把請求信息如IP/UA等信息加進去,在 EventHandler里處理以便日後數據分析。
- 註冊 EventBus 相關服務以及 EventHandlers
services.AddSingleton<IEventBus, EventBus>();
services.AddSingleton<IEventStore, EventStoreInMemory>();
//register EventHandlers
services.AddSingleton<NoticeViewEventHandler>();
- 訂閱事件
public void Configure(IApplicationBuilder app, IHostingEnvironment env, ILoggerFactory loggerFactory, IEventBus eventBus)
{
eventBus.Subscribe<NoticeViewEvent, NoticeViewEventHandler>();
// ...
}
- 發佈事件
eventBus.Publish(new NoticeViewEvent { NoticeId = notice.NoticeId });
Reference
- https://github.com/dotnet-architecture/eShopOnContainers
- https://docs.microsoft.com/zh-cn/previous-versions/msp-n-p/jj591559(v=pandp.10)
- https://docs.microsoft.com/en-us/dotnet/standard/microservices-architecture/multi-container-microservice-net-applications/integration-event-based-microservice-communications
- https://docs.microsoft.com/en-us/dotnet/standard/microservices-architecture/multi-container-microservice-net-applications/subscribe-events
- https://github.com/sheng-jie/EventBus
- https://github.com/WeihanLi/ActivityReservation