很長一段時間以來,我都在思考如何在ASP.NET Core的框架下,實現一套完整的事件驅動型架構。這個問題看上去有點大,其實主要目標是為了實現一個基於ASP.NET Core的微服務,它能夠非常簡單地訂閱來自於某個渠道的事件消息,並對接收到的消息進行處理,於此同時,它還能夠向該渠道發送事件消息,以便... ...
很長一段時間以來,我都在思考如何在ASP.NET Core的框架下,實現一套完整的事件驅動型架構。這個問題看上去有點大,其實主要目標是為了實現一個基於ASP.NET Core的微服務,它能夠非常簡單地訂閱來自於某個渠道的事件消息,並對接收到的消息進行處理,於此同時,它還能夠向該渠道發送事件消息,以便訂閱該事件消息的消費者能夠對消息數據做進一步處理。讓我們回顧一下微服務之間通信的幾種方式,分為同步和非同步兩種。同步通信最常見的就是RESTful API,而且非常簡單輕量,一個Request/Response迴環就結束了;非同步通信最常見的就是通過消息渠道,將載有特殊意義的數據的事件消息發送到消息渠道,而對某種類型消息感興趣的消費者,就可以獲取消息中所帶信息並執行相應操作,這也是我們比較熟知的事件驅動架構的一種表現形式。雖然事件驅動型架構看起來非常複雜,從微服務的實現來看顯得有些繁重,但它的應用範圍確實很廣,也為服務間通信提供了新的思路。瞭解DDD的朋友相信一定知道CQRS體繫結構模式,它就是一種事件驅動型架構。事實上,實現一套完整的、安全的、穩定的、正確的事件驅動架構並不簡單,由於非同步特性帶來的一致性問題會非常棘手,甚至需要藉助一些基礎結構層工具(比如關係型資料庫,不錯!只能是關係型資料庫)來解決一些特殊問題。本文就打算帶領大家一起探探路,基於ASP.NET Core Web API實現一個相對比較簡單的事件驅動架構,然後引出一些有待深入思考的問題,留在今後的文章中繼續討論。或許,本文所引入的源代碼無法直接用於生產環境,但我希望本文介紹的內容能夠給到讀者一些啟發,並能夠幫助解決實際中遇到的問題。
術語約定
本文會涉及一些相關的專業術語,在此先作約定:
- 事件:在某一特定時刻發生在某件事物上的一件事情,例如:在我撰寫本文的時候,電話鈴響了
- 消息:承載事件數據的實體。事件的序列化/反序列化和傳輸都以消息的形式進行
- 消息通信渠道:一種帶有消息路由功能的數據傳輸機制,用以在消息的派發器和訂閱器之間進行數據傳輸
註意:為了迎合描述的需要,在下文中可能會混用事件和消息兩個概念。
一個簡單的設計
先從簡單的設計開始,基本上事件驅動型架構會有事件消息(Events)、事件訂閱器(Event Subscriber)、事件派發器(Event Publisher)、事件處理器(Event Handler)以及事件匯流排(Event Bus)等主要組件,它們之間的關係大致如下:
首先,IEvent介面定義了事件消息(更確切地說,數據)的基本結構,幾乎所有的事件都會有一個唯一標識符(Id)和一個事件發生的時間(Timestamp),這個時間通常使用UTC時間作為標準。IEventHandler定義了事件處理器介面,顯而易見,它包含兩個方法:CanHandle方法,用以確定傳入的事件對象是否可被當前處理器所處理,以及Handle方法,它定義了事件的處理過程。IEvent和IEventHandler構成了事件處理的基本元素。
然後就是IEventSubscriber與IEventPublisher介面。前者表示實現該介面的類型為事件訂閱器,它負責事件處理器的註冊,並偵聽來自事件通信渠道上的消息,一旦所獲得的消息能夠被某個處理器處理,它就會指派該處理器對接收到的消息進行處理。因此,IEventSubscriber會保持著對事件處理器的引用;而對於實現了IEventPublisher介面的事件派發器而言,它的主要任務就是將事件消息發送到消息通信渠道,以便訂閱端能夠獲得消息併進行處理。
IEventBus介面表示消息通信渠道,也就是大家所熟知的消息匯流排的概念。它不僅具有消息訂閱的功能,而且還具有消息派發的能力,因此,它會同時繼承於IEventSubscriber和IEventPublisher介面。在上面的設計中,通過介面分離消息匯流排的訂閱器和派發器的角色是很有必要的,因為兩種角色的各自職責不一樣,這樣的設計同時滿足SOLID中的SRP和ISP兩個準則。
基於以上基礎模型,我們可以很快地將這個對象關係模型轉換為C#代碼:
public interface IEvent { Guid Id { get; } DateTime Timestamp { get; } } public interface IEventHandler { Task<bool> HandleAsync(IEvent @event, CancellationToken cancellationToken = default); bool CanHandle(IEvent @event); } public interface IEventHandler<in T> : IEventHandler where T : IEvent { Task<bool> HandleAsync(T @event, CancellationToken cancellationToken = default); } public interface IEventPublisher : IDisposable { Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default) where TEvent : IEvent; } public interface IEventSubscriber : IDisposable { void Subscribe(); } public interface IEventBus : IEventPublisher, IEventSubscriber { }
短短30行代碼,就把我們的基本對象關係描述清楚了。對於上面的代碼我們需要註意以下幾點:
- 這段代碼使用了C# 7.1的新特性(default關鍵字)
- Publish以及Handle方法被替換為支持非同步調用的PublishAsync和HandleAsync方法,它們會返回Task對象,這樣可以方便使用C#中async/await的編程模型
- 由於我們的這個模型可以作為實現消息系統的通用模型,並且會需要用到ASP.NET Core的項目中,因此,建議將這些介面的定義放在一個獨立的NetStandard的Class Library中,方便今後重用和擴展
OK,介面定義好了。實現呢?下麵,我們實現一個非常簡單的消息匯流排:PassThroughEventBus。在今後的文章中,我還會介紹如何基於RabbitMQ和Azure Service Bus實現不一樣的消息匯流排。
PassThroughEventBus
顧名思義,PassThroughEventBus表示當有消息被派發到消息匯流排時,消息匯流排將不做任何處理與路由,而是直接將消息推送到訂閱方。在訂閱方的事件監聽函數中,會通過已經註冊的事件處理器對接收到的消息進行處理。整個過程並不會依賴於任何外部組件,不需要引用額外的開發庫,只是利用現有的.NET數據結構來模擬消息的派發和訂閱過程。因此,PassThroughEventBus不具備容錯和消息重發功能,不具備消息存儲和路由功能,我們先實現這樣一個簡單的消息匯流排,來體驗事件驅動型架構的設計過程。
我們可以使用.NET中的Queue或者ConcurrentQueue等基本數據結構來作為消息隊列的實現,與這些基本的數據結構相比,消息隊列本身有它自己的職責,它需要在消息被推送進隊列的同時通知調用方。當然,PassThroughEventBus不需要依賴於Queue或者ConcurrentQueue,它所要做的事情就是模擬一個消息隊列,當消息推送進來的時候,立刻通知訂閱方進行處理。同樣,為了分離職責,我們可以引入一個EventQueue的實現(如下),從而將消息推送和路由的職責(基礎結構層的職責)從消息匯流排中分離出來。
internal sealed class EventQueue { public event System.EventHandler<EventProcessedEventArgs> EventPushed; public EventQueue() { } public void Push(IEvent @event) { OnMessagePushed(new EventProcessedEventArgs(@event)); } private void OnMessagePushed(EventProcessedEventArgs e) => this.EventPushed?.Invoke(this, e); }
EventQueue中最主要的方法就是Push方法,從上面的代碼可以看到,當EventQueue的Push方法被調用時,它將立刻觸發EventPushed事件,它是一個.NET事件,用以通知EventQueue對象的訂閱者,消息已經被派發。整個EventQueue的實現非常簡單,我們僅專註於事件的路由,完全沒有考慮任何額外的事情。
接下來,就是利用EventQueue來實現PassThroughEventBus。毫無懸念,PassThroughEventBus需要實現IEventBus介面,它的兩個基本操作分別是Publish和Subscribe。在Publish方法中,會將傳入的事件消息轉發到EventQueue上,而Subscribe方法則會訂閱EventQueue.EventPushed事件(.NET事件),而在EventPushed事件處理過程中,會從所有已註冊的事件處理器(Event Handlers)中找到能夠處理所接收到的事件,並對其進行處理。整個流程還是非常清晰的。以下便是PassThroughEventBus的實現代碼:
public sealed class PassThroughEventBus : IEventBus { private readonly EventQueue eventQueue = new EventQueue(); private readonly IEnumerable<IEventHandler> eventHandlers; public PassThroughEventBus(IEnumerable<IEventHandler> eventHandlers) { this.eventHandlers = eventHandlers; } private void EventQueue_EventPushed(object sender, EventProcessedEventArgs e) => (from eh in this.eventHandlers where eh.CanHandle(e.Event) select eh).ToList().ForEach(async eh => await eh.HandleAsync(e.Event)); public Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default) where TEvent : IEvent => Task.Factory.StartNew(() => eventQueue.Push(@event)); public void Subscribe() => eventQueue.EventPushed += EventQueue_EventPushed; #region IDisposable Support private bool disposedValue = false; // To detect redundant calls void Dispose(bool disposing) { if (!disposedValue) { if (disposing) { this.eventQueue.EventPushed -= EventQueue_EventPushed; } disposedValue = true; } } public void Dispose() => Dispose(true); #endregion }
實現過程非常簡單,當然,從這些代碼也可以更清楚地瞭解到,PassThroughEventBus不做任何路由處理,更不會依賴於一個基礎結構設施(比如實現了AMQP的消息隊列),因此,不要指望能夠在生產環境中使用它。不過,目前來看,它對於我們接下來要討論的事情還是會很有幫助的,至少在我們引入基於RabbitMQ等實現的消息匯流排之前。
同樣地,請將PassThroughEventBus實現在另一個NetStandard的Class Library中,雖然它不需要額外的依賴,但它畢竟是眾多消息匯流排中的一種,將它從介面定義的程式集中剝離開來,好處有兩點:第一,保證了定義介面的程式集的純凈度,使得該程式集不需要依賴任何外部組件,並確保了該程式集的職責單一性,即為消息系統的實現提供基礎類庫;第二,將PassThroughEventBus置於獨立的程式集中,有利於調用方針對IEventBus進行技術選擇,比如,如果開發者選擇使用基於RabbitMQ的實現,那麼,只需要引用基於RabbitMQ實現IEventBus介面的程式集就可以了,而無需引用包含了PassThroughEventBus的程式集。這一點我覺得可以歸納為框架設計中“隔離依賴關係(Dependency Segregation)”的準則。
好了,基本組件都定義好了,接下來,讓我們一起基於ASP.NET Core Web API來做一個RESTful服務,並接入上面的消息匯流排機制,實現消息的派發和訂閱。
Customer RESTful API
我們仍然以客戶管理的RESTful API為例子,不過,我們不會過多地討論如何去實現管理客戶信息的RESTful服務,那並不是本文的重點。作為一個案例,我使用ASP.NET Core 2.0 Web API建立了這個服務,使用Visual Studio 2017 15.5做開發,併在CustomersController中使用Dapper來對客戶信息CRUD。後臺基於SQL Server 2017 Express Edition,使用SQL Server Management Studio能夠讓我方便地查看資料庫操作的結果。
RESTful API的實現
假設我們的客戶信息只包含客戶ID和名稱,下麵的CustomersController代碼展示了我們的RESTful服務是如何保存並讀取客戶信息的。當然,我已經將本文的代碼通過Github開源,開源協議為MIT,雖然商業友好,但畢竟是案例代碼沒有經過測試,所以請謹慎使用。本文源代碼的使用我會在文末介紹。
[Route("api/[controller]")] public class CustomersController : Controller { private readonly IConfiguration configuration; private readonly string connectionString; public CustomersController(IConfiguration configuration) { this.configuration = configuration; this.connectionString = configuration["mssql:connectionString"]; } // 獲取指定ID的客戶信息 [HttpGet("{id}")] public async Task<IActionResult> Get(Guid id) { const string sql = "SELECT [CustomerId] AS Id, [CustomerName] AS Name FROM [dbo].[Customers] WHERE [CustomerId]=@id"; using (var connection = new SqlConnection(connectionString)) { var customer = await connection.QueryFirstOrDefaultAsync<Model.Customer>(sql, new { id }); if (customer == null) { return NotFound(); } return Ok(customer); } } // 創建新的客戶信息 [HttpPost] public async Task<IActionResult> Create([FromBody] dynamic model) { var name = (string)model.Name; if (string.IsNullOrEmpty(name)) { return BadRequest(); } const string sql = "INSERT INTO [dbo].[Customers] ([CustomerId], [CustomerName]) VALUES (@Id, @Name)"; using (var connection = new SqlConnection(connectionString)) { var customer = new Model.Customer(name); await connection.ExecuteAsync(sql, customer); return Created(Url.Action("Get", new { id = customer.Id }), customer.Id); } } }
代碼一如既往的簡單,Web API控制器通過Dapper簡單地實現了客戶信息的創建和返回。我們不妨測試一下,使用下麵的Invoke-RestMethod PowerShell指令,發送Post請求,通過上面的Create方法創建一個用戶:
可以看到,response中已經返回了新建客戶的ID號。接下來,繼續使用Invoke-RestMethod來獲取新建客戶的詳細信息:
OK,API調試完全沒有問題。下麵,我們將這個案例再擴充一下,我們希望這個API在完成客戶信息創建的同時,向外界發送一條“客戶信息已創建”的事件,並設置一個事件處理器,負責將該事件的詳細內容保存到資料庫中。
加入事件匯流排和消息處理機制
首先,我們在ASP.NET Core Web API項目上,添加對以上兩個程式集的引用,然後,按常規做法,在ConfigureServices方法中,將PassThroughEventBus添加到IoC容器中:
public void ConfigureServices(IServiceCollection services) { services.AddMvc(); services.AddSingleton<IEventBus, PassThroughEventBus>(); }
在此,將事件匯流排註冊為單例(Singleton)服務,是因為它不保存狀態。理論上講,使用單例服務時,需要特別註意服務實例對象的生命周期管理,因為它的生命周期是整個應用程式級別,在程式運行的過程中,由其引用的對象資源將無法釋放,因此,當程式結束運行時,需要合理地將這些資源dispose掉。好在ASP.NET Core的依賴註入框架中已經幫我們處理過了,因此,對於上面的PassThroughEventBus單例註冊,我們不需要過多擔心,程式執行結束並正常退出時,依賴註入框架會自動幫我們dispose掉PassThroughEventBus的單例實例。那麼對於單例實例來說,我們是否只需要通過AddSingleton方法進行註冊就可以了,而無需關註它是否真的被dispose了呢?答案是否定的,有興趣的讀者可以參考微軟的官方文檔,在下一篇文章中我會對這部分內容做些介紹。
接下來,我們需要定義一個CustomerCreatedEvent對象,表示“客戶信息已經創建”這一事件信息,同時,再定義一個CustomerCreatedEventHandler事件處理器,用來處理從PassThroughEventBus接收到的事件消息。代碼如下,當然也很簡單:
public class CustomerCreatedEvent : IEvent { public CustomerCreatedEvent(string customerName) { this.Id = Guid.NewGuid(); this.Timestamp = DateTime.UtcNow; this.CustomerName = customerName; } public Guid Id { get; } public DateTime Timestamp { get; } public string CustomerName { get; } } public class CustomerCreatedEventHandler : IEventHandler<CustomerCreatedEvent> { public bool CanHandle(IEvent @event) => @event.GetType().Equals(typeof(CustomerCreatedEvent)); public Task<bool> HandleAsync(CustomerCreatedEvent @event, CancellationToken cancellationToken = default) { return Task.FromResult(true); public Task<bool> HandleAsync(IEvent @event, CancellationToken cancellationToken = default) => CanHandle(@event) ? HandleAsync((CustomerCreatedEvent)@event, cancellationToken) : Task.FromResult(false); }
兩者分別實現了我們最開始定義好的IEvent和IEventHandler介面。在CustomerCreatedEventHandler類的第一個HandleAsync重載方法中,我們暫且讓它簡單地返回一個true值,表示事件處理成功。下麵要做的事情就是,在客戶信息創建成功後,向事件匯流排發送CustomerCreatedEvent事件,以及在ASP.NET Core Web API程式啟動的時候,註冊CustomerCreatedEventHandler實例,並調用事件匯流排的Subscribe方法,使其開始偵聽事件的派發行為。
於是,CustomerController需要依賴IEventBus,並且在CustomerController.Create方法中,需要通過調用IEventBus的Publish方法將事件發送出去。現對CustomerController的實現做一些調整,調整後代碼如下:
[Route("api/[controller]")] public class CustomersController : Controller { private readonly IConfiguration configuration; private readonly string connectionString; private readonly IEventBus eventBus; public CustomersController(IConfiguration configuration, IEventBus eventBus) { this.configuration = configuration; this.connectionString = configuration["mssql:connectionString"]; this.eventBus = eventBus; } // 創建新的客戶信息 [HttpPost] public async Task<IActionResult> Create([FromBody] dynamic model) { var name = (string)model.Name; if (string.IsNullOrEmpty(name)) { return BadRequest(); } const string sql = "INSERT INTO [dbo].[Customers] ([CustomerId], [CustomerName]) VALUES (@Id, @Name)"; using (var connection = new SqlConnection(connectionString)) { var customer = new Model.Customer(name); await connection.ExecuteAsync(sql, customer); await this.eventBus.PublishAsync(new CustomerCreatedEvent(name)); return Created(Url.Action("Get", new { id = customer.Id }), customer.Id); } } // Get方法暫且省略 }
然後,修改Startup.cs中的ConfigureServices方法,將CustomerCreatedEventHandler註冊進來:
public void ConfigureServices(IServiceCollection services) { services.AddMvc(); services.AddTransient<IEventHandler, CustomerCreatedEventHandler>(); services.AddSingleton<IEventBus, PassThroughEventBus>(); }
並且調用Subscribe方法,開始偵聽消息匯流排:
public void Configure(IApplicationBuilder app, IHostingEnvironment env) { var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>(); eventBus.Subscribe(); if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); } app.UseMvc(); }
OK,現在讓我們在CustomerCreatedEventHandler的HandleAsync方法上設置個斷點,按下F5啟用Visual Studio 2017調試,然後重新使用Invoke-RestMethod命令發送一個Post請求,可以看到,HandleAsync方法上的斷點被命中,同時事件已被正確派發:
資料庫中的數據也被正確更新:
目前還差最後一小步,就是在HandleAsync中,將CustomerCreatedEvent對象的數據序列化並保存到資料庫中。當然這也不難,同樣可以考慮使用Dapper,或者直接使用ADO.NET,甚至使用比較重量級的Entity Framework Core,都可以實現。那就在此將這個問題留給感興趣的讀者朋友自己搞定啦。
小結
到這裡基本上本文的內容也就告一段落了,回顧一下,本文一開始就提出了一種相對簡單的消息系統和事件驅動型架構的設計模型,並實現了一個最簡單的事件匯流排:PassThroughEventBus。隨後,結合一個實際的ASP.NET Core Web API案例,瞭解了在RESTful API中實現事件消息派發和訂閱的過程,並實現了在事件處理器中,對獲得的事件消息進行處理。
然而,我們還有很多問題需要更深入地思考,比如:
- 如果事件處理器需要依賴基礎結構層組件,依賴關係如何管理?組件生命周期如何管理?
- 如何實現基於RabbitMQ或者Azure Service Bus的事件匯流排?
- 如果在資料庫更新成功後,事件發送失敗怎麼辦?
- 如何保證事件處理的順序?
等等。。。在接下來的文章中,我會儘力做更詳細的介紹。
源代碼的使用
本系列文章的源代碼在https://github.com/daxnet/edasample這個Github Repo里,通過不同的release tag來區分針對不同章節的源代碼。本文的源代碼請參考chapter_1這個tag,如下:
接下來還將會有chapter_2、chapter_3等這些tag,對應本系列文章的第二部分、第三部分等等。敬請期待。