引言大家好像對分析源碼厭倦了,說實在我也會厭倦,不過不看是無法分析其後面的東西,從易到難是一個必要的過程。今天說下EventBus,前幾天園裡的大神已經把其解刨,我今天就藉著大神的肩膀,分析下在eShop項目中EventBus的實現。最近發覺轉發文章不寫出處的,特此加上鏈接:http://inday... ...
引言
大家好像對分析源碼厭倦了,說實在我也會厭倦,不過不看是無法分析其後面的東西,從易到難是一個必要的過程。
今天說下EventBus,前幾天園裡的大神已經把其解刨,我今天就藉著大神的肩膀,分析下在eShop項目中EventBus的實現。
最近發覺轉發文章不寫出處的,特此加上鏈接:http://inday.cnblogs.com
解析源碼
我們知道使用EventBus是為瞭解除Publisher和Subscriber之間的依賴性,這樣我們的Publisher就不需要知道有多少Subscribers,只需要通過EventBus進行註冊管理就好了,在eShop項目中,有一個這樣的介面IEventBus(eShopOnContainers\src\BuildingBlocks\EventBus\EventBus\Abstractions)
public interface IEventBus { void Subscribe<T, TH>(Func<TH> handler) where T : IntegrationEvent where TH : IIntegrationEventHandler<T>; void Unsubscribe<T, TH>() where TH : IIntegrationEventHandler<T> where T : IntegrationEvent; void Publish(IntegrationEvent @event); }
我們可以看到這個介面定義了EventBus所需的一些操作, 對比大神的EventBus,相關功能都是一致的,我們看下它的實現類:EventBusRabbitMQ,從名字上可以看出,這是一個通過RabbitMQ來進行管理的EventBus,我們可以看到它使用了IEventBusSubscriptionsManager進行訂閱存儲,也就是大神文中的:
private readonly ConcurrentDictionary<Type, List<Type>> _eventAndHandlerMapping;
微軟在Demo中把其提取出了介面,把一些常用方法給提煉了出來,但是核心還是Dictionary<string, List<Delegate>>, 使用Dictionary進行Map映射。通過Subscribe和UnSubscribe進行訂閱和取消,使用Publish方法進行發佈操作。
public void Subscribe<T, TH>(Func<TH> handler) where T : IntegrationEvent where TH : IIntegrationEventHandler<T> { var eventName = typeof(T).Name; var containsKey = _subsManager.HasSubscriptionsForEvent<T>(); if (!containsKey) { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } using (var channel = _persistentConnection.CreateModel()) { channel.QueueBind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName); } } _subsManager.AddSubscription<T, TH>(handler); }
我們看到在訂閱的時候,EventBus會檢查下在Map中是否有相應的註冊,如果沒有的話首先回去RabbitMQ中創建一個新的channel進行綁定,隨後在Map中進行註冊映射。
UnSubscribe則直接從Map中取消映射,通過OnEventRemoved事件判斷Map下此映射的subscriber是否為空,為空則從RabbitMQ中關閉channel。
在RabbitMQ的構造方法中,我們看到這樣一個創建:CreateConsumerChannel(),這裡創建了一個EventingBasicConsumer,當Queue中有新的消息時會通過ProcessEvent執行Map中註冊的handler(subscribers),看圖可能更清晰些:
在ProcessEvent方法中,回去Map中找尋subscribers,然後通過動態反射進行執行:
private async Task ProcessEvent(string eventName, string message) { if (_subsManager.HasSubscriptionsForEvent(eventName)) { var eventType = _subsManager.GetEventTypeByName(eventName); var integrationEvent = JsonConvert.DeserializeObject(message, eventType); var handlers = _subsManager.GetHandlersForEvent(eventName); foreach (var handlerfactory in handlers) { var handler = handlerfactory.DynamicInvoke(); var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); } } }
微軟通過簡單的代碼解耦了Publisher和Subscribers之間的依賴關係,我們引用大神的總結:
應用
在catalog.api中,微軟出現了EventBus,我在上一篇中也提到了,這是我的一個疑惑,因為在catalog中並沒有訂閱操作,直接執行了Publish操作,原先以為是一個空操作,後來看了Basket.Api我才知道為何微軟要用RabbitMQ。
使用RabbitMQ,我們不僅是從類之間的解耦,更可以跨項目,跨語言,跨平臺的解耦,publisher僅僅需要把消息體(IntegrationEvent)傳送到RabbitMQ,Consumer從Queue中獲取消息體,然後推送到Subscribers執行相應的操作。我們看下Basket.Api.Startup.cs:
protected virtual void ConfigureEventBus(IApplicationBuilder app) { var catalogPriceHandler = app.ApplicationServices .GetService<IIntegrationEventHandler<ProductPriceChangedIntegrationEvent>>(); var orderStartedHandler = app.ApplicationServices .GetService<IIntegrationEventHandler<OrderStartedIntegrationEvent>>(); var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>(); eventBus.Subscribe<ProductPriceChangedIntegrationEvent, ProductPriceChangedIntegrationEventHandler> (() => app.ApplicationServices.GetRequiredService<ProductPriceChangedIntegrationEventHandler>()); eventBus.Subscribe<OrderStartedIntegrationEvent, OrderStartedIntegrationEventHandler> (() => app.ApplicationServices.GetRequiredService<OrderStartedIntegrationEventHandler>()); }
在這個方法里,我們看到了Subscribe操作,想想之前的提問有點搞笑,不過研究明白了也不錯,對吧!
總結
今天我們看了EventBus在Demo中的應用,總結一下。
1、EventBus可以很好的解耦訂閱者和發佈者之間的依賴
2、使用RabbitMQ能夠跨項目、跨平臺、跨語言的解耦訂閱者和發佈者
雖然在Demo中我們看到對訂閱者的管理是通過Dictionary記憶體的方式,所以我們的Subscribe僅僅只在Basket.Api中看到,但微軟是通過IEventBusSubscriptionsManager介面定義的,我們可以通過自己的需求來進行定製,可以做成分散式的,比如使用memcached。
寫在最後
每個月到下旬就會比較忙,所以文章發佈會比較慢,但我也會堅持學習完eShop的,為了學習,我建了個群,大家可以進來一起學習,有什麼建議和問題都可以進來哦。
eShop雖好,但不建議大家放到生產環境,畢竟是一個Demo,而且目前還是ALPHA版本,用來學習是一個很好的教材,這就是一個大雜燴,學習中你會學到很多新的東西,大家如果看好core的發展,可以一起研究下。
QQ群:376248054