CQRS是Command Query Responsibility Segregation的縮寫,一般稱作命令查詢職責分離。從字面意思理解,就是將命令(寫入)和查詢(讀取)的責任劃分到不同的模型中。 對比一下常用的 CRUD 模式(創建-讀取-更新-刪除),通常我們會讓用戶界面與負責所有四種操作的數... ...
CQRS和中介者模式
MediatR庫主要是為了幫助開發者快速實現兩種軟體架構模式:CQRS和Mediator。這兩種架構模式看上去似乎差不多,但還是有很多區別的。
CQRS
CQRS是Command Query Responsibility Segregation的縮寫,一般稱作命令查詢職責分離。從字面意思理解,就是將命令(寫入)和查詢(讀取)的責任劃分到不同的模型中。
對比一下常用的 CRUD 模式(創建-讀取-更新-刪除),通常我們會讓用戶界面與負責所有四種操作的數據存儲交互。而 CQRS 則將這些操作分成兩種模式,一種用於查詢(又稱 "R"),另一種用於命令(又稱 "CUD")。
如圖所示,應用程式只是將查詢和命令模型分開。CQRS並沒有對分離的方式做出具體的規定。可以是應用程式裡面的一個類或者第三方類庫,也可以是通過不同的伺服器進行物理上的隔離。具體如何實現取決於應用程式的實際情況。總而言之,CQRS的核心就是將讀和寫分開。
看到這裡是不是有種似曾相識的感覺?沒錯,CQRS的設計理念和資料庫的讀寫分離一毛一樣。
CQRS看上去似乎很棒,但它也是一把雙刃劍,和軟體開發實踐中的其他東西一樣,需要進行一些平衡和取捨,包括:
- 管理單獨的系統(如果應用程式層被拆分)
- 數據過時(如果資料庫層被拆分)
- 管理多個組件的複雜性
是否使用CQRS模式最終取決於我們的特定用例。良好的開發實踐鼓勵我們“保持簡單”(KISS),因此僅在需要時使用這些模式,否則就是過度設計了。
Mediator 模式
Mediator模式只是定義了一個對象,它封裝了對象之間的交互方式。兩個或多個對象之間不再直接相互依賴,而是通過一個 "中介 "進行交互,"中介 "負責將這些交互發送給另一方。
如上圖所示,SomeService 向Mediator發送消息,然後Mediator調用多個服務來處理該消息。任何Handler組件之間沒有直接依賴關係。
中介模式之所以有用,與控制反轉(Inversion of Control)等模式一樣。它可以實現 "松耦合",因為依賴關係圖最小化,因此代碼更簡單、更容易測試。換句話說,一個組件考慮的因素越少,它就越容易開發和演進。
我們在上圖中看到了服務之間沒有直接依賴關係,消息的生產者不知道是那些Handler在處理它。這與消息代理在“發佈/訂閱”模式中的工作方式非常相似。如果我們想添加另一個處理程式,直接條件就可以了,不必修改生產者。
如何使用MediatR?
我們可以將 MediatR 視為“進程內”中介器實現方案,這有助於我們構建 CQRS 系統。用戶界面和數據存儲之間的所有通信都通過 MediatR 進行。
這裡我們需要註意”進程內“這三個字,這是一個非常重要的限制條件,意味著您無法使用MediatR實現跨進程消息通信。如果我們想跨兩個系統分離命令和查詢,更好的方法是使用消息代理,例如 Kafka 、RabbitMQ或 Azure 服務匯流排等等。推薦學習一下MassTransit這個庫。
在ASP.NET Core API項目中配置MediatR
項目設置
首先,讓我們打開Visual Studio並創建一個新的 ASP.NET Core Web API應用程式。我們將它命名為CqrsMediatrExample。
安裝依賴包
PM> install-package MediatR
如果是v12之前的版本,則需要再安裝MediatR.Extensions.Microsoft.DependencyInjection
。
註冊依賴
打開Program.cs
builder.Services.AddMediatR(cfg => cfg.RegisterServicesFromAssembly(typeof(Program).Assembly));
我們必須為構造函數提供預設配置。
現在,MediatR 已配置完畢,隨時可用。
在我們進入控制器創建之前,我們將修改文件:launchSettings.json
{
"profiles": {
"https": {
"commandName": "Project",
"dotnetRunMessages": true,
"launchBrowser": false,
"applicationUrl": "https://localhost:5001;http://localhost:5000",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
}
}
}
添加控制器
現在我們已經安裝了所有內容,讓我們設置一個新的控制器,它將向 MediatR 發送消息。
在“Controllers”文件夾中,讓我們添加一個名稱為 ProductsController.cs
的控制器
然後我們得到以下類:
[Route("api/[controller]")]
[ApiController]
public class ProductsController : ControllerBase
{
}
接下來,讓我們通過構造函數註入一個IMediatR
實例:
[Route("api/[controller]")]
[ApiController]
public class ProductsController : ControllerBase
{
private readonly IMediator _mediator;
public ProductsController(IMediator mediator)
{
_mediator = mediator;
}
}
IMediatR 介面允許我們向 MediatR 發送消息,然後由 MediatR 向相關處理程式派發消息。因為我們已經安裝了依賴註入軟體包,所以實例會自動解析。
從 MediatR 9.0 版開始,IMediator 介面被拆分為ISender 和 IPublisher兩個介面。因此,儘管我們仍然可以使用 IMediator 介面向處理程式發送請求,但是更嚴謹一些的做法是分別使用ISender和IPublisher分別發送不同類型的消息。
public interface ISender
{
Task<TResponse> Send<TResponse>(IRequest<TResponse> request, CancellationToken cancellationToken = default);
Task<object?> Send(object request, CancellationToken cancellationToken = default);
}
public interface IPublisher
{
Task Publish(object notification, CancellationToken cancellationToken = default);
Task Publish<TNotification>(TNotification notification, CancellationToken cancellationToken = default)
where TNotification : INotification;
}
public interface IMediator : ISender, IPublisher
{
}
數據存儲
通常,我們希望與真實的資料庫進行交互。但在本文中,讓我們創建一個包含此責任的Fake class,並簡單地與一些 Product 實體進行交互。
但在這樣做之前,我們必須創建一個簡單的類:Product
public class Product
{
public int Id { get; set; }
public string Name { get; set; }
}
接下來我們添加一個新的類,命名為FakeDataStore
:
public class FakeDataStore
{
private static List<Product> _products;
public FakeDataStore()
{
_products = new List<Product>
{
new Product { Id = 1, Name = "Test Product 1" },
new Product { Id = 2, Name = "Test Product 2" },
new Product { Id = 3, Name = "Test Product 3" }
};
}
public async Task AddProduct(Product product)
{
_products.Add(product);
await Task.CompletedTask;
}
public async Task<IEnumerable<Product>> GetAllProducts() => await Task.FromResult(_products);
}
然後,我們需要在Program.cs
將FakeDataStore配置到依賴註入:
builder.Services.AddSingleton<FakeDataStore>();
分離命令和查詢
本文畢竟是關於 CQRS 的,因此讓我們為此目的創建三個新文件夾:Commands、Queries和Handlers。我們將通過這三個文件夾將模型進行物理上的分隔。
使用MediatR發送請求
MediatR請求是非常簡單的請求-響應樣式消息,其中單個請求由單個處理程式同步處理(這裡的同步並不是編程意義上的同步,而是從業務或者流程的角度觸發,即發送請求後持續等待流程處理完成並且返回結果,需要和C#的async/await區別開)。這裡我們做一個簡單的例子來示範查詢或者更新資料庫。
MediatR 中有兩種類型的請求。一個有返回值,另一個沒有返回值。通常,這對應於讀取/查詢(返回值)和寫入/命令(通常不返回值)。
獲取產品(Query)
由於這是一個查詢,讓我們添加一個調用到 “Queries” 文件夾的類,並實現它:GetProductsQuery
public record GetProductsQuery() : IRequest<IEnumerable<Product>>;
這裡我們創建一個名為GetProductsQuery
的record對象並且繼承IRequest<IEnumerable<Product>>
介面,表示此查詢將返回一個Product
集合。
然後,在 Handlers 文件夾中,我們將創建一個新的Handler類來處理我們的查詢:
public class GetProductsHandler : IRequestHandler<GetProductsQuery, IEnumerable<Product>>
{
private readonly FakeDataStore _fakeDataStore;
public GetProductsHandler(FakeDataStore fakeDataStore) => _fakeDataStore = fakeDataStore;
public async Task<IEnumerable<Product>> Handle(GetProductsQuery request,
CancellationToken cancellationToken) => await _fakeDataStore.GetAllProducts();
}
稍微分解以下,GetProductsHandler
要繼承IRequestHandler<GetProductsQuery, IEnumerable<Product>>
,表示GetProductsHandler
可以處理GetProductsQuery
查詢請求,並且返回一個產品列表,具體的查詢邏輯在Handle
方法中實現。
調用請求
要調用查詢請求,只需要在ProductsController
中添加一個GetProducts
的Action。
[HttpGet]
public async Task<ActionResult> GetProducts()
{
var products = await _mediator.Send(new GetProductsQuery());
return Ok(products);
}
Too simple對不對?
來測試一下吧。
首先在IDE或者控制臺中運行我們的項目。然後打開Postman並創建一個請求:
MediatR發送命令
我們在“Commands”文件夾中添加一個名為AddProductCommand
的record,並且繼承IRequest
介面。
public record AddProductCommand(Product Product) : IRequest;
因為我們不需要返回值,所以只需要繼承IRequest
,不需要添加泛型參數,AddProductCommand
將會自動擁有一個名為Product的屬性。
註意:因為我們僅僅是為了簡單且快速地示範MediatR的使用,所以此處直接使用的領域實體作為參數,在實際使用中,應當使用DTO等對象從公共Api中隱藏領域實體。
接下來,我們要在“Handlers”文件夾中添加AddProductCommand
的Handler。
public class AddProductHandler : IRequestHandler<AddProductCommand>
{
private readonly FakeDataStore _fakeDataStore;
public AddProductHandler(FakeDataStore fakeDataStore) => _fakeDataStore = fakeDataStore;
public async Task Handle(AddProductCommand request, CancellationToken cancellationToken)
{
await _fakeDataStore.AddProduct(request.Product);
return;
}
}
調用請求
同樣的,我們在ProductsController
中添加一個名為AddProduct
的Action來發送Command。
[HttpPost]
public async Task<ActionResult> AddProduct([FromBody]Product product)
{
await _mediator.Send(new AddProductCommand(product));
return StatusCode(201);
}
與上一個方法類似,只不過這次我們不需要返回任何值。
運行項目,並向Postman中添加一個新的請求:
執行完成後再次運行之前的查詢請求:
新添加的數據已經出現在列表中,證明我們的代碼已經按照預期執行了。
使用返回值的命令
我們的Post操作目前返回的是201狀態碼,並沒有包含其他的信息。然而在實際應用中,客戶端可能需要更多的信息,例如新添加的產品的Id等。
在此之前我們需要添加一個根據Id獲取產品的功能。
- 在“Queries”文件夾里添加一個名為
GetProductByIdQuery
的record:
public record GetProductByIdQuery(int Id) : IRequest<Product>;
- 修改
FakeDataStore
使其支持根據Id查詢產品信息:
public async Task<Product> GetProductById(int id) =>
await Task.FromResult(_products.Single(p => p.Id == id));
- 添加一個新的Handler用於處理
GetProductByIdQuery
:
public class GetProductByIdHandler : IRequestHandler<GetProductByIdQuery, Product>
{
private readonly FakeDataStore _fakeDataStore;
public GetProductByIdHandler(FakeDataStore fakeDataStore) => _fakeDataStore = fakeDataStore;
public async Task<Product> Handle(GetProductByIdQuery request, CancellationToken cancellationToken) =>
await _fakeDataStore.GetProductById(request.Id);
}
- 在Controller中添加新的Get介面:
[HttpGet("{id:int}", Name = "GetProductById")]
public async Task<ActionResult> GetProductById(int id)
{
var product = await _mediator.Send(new GetProductByIdQuery(id));
return Ok(product);
}
好了,我們在Postman中添加一個新的請求,並測試一下:
修改命令和Handler
如果Request需要返回操作結果,只需要將Command的介面增加一個泛型參數,參數的類型為需要返回的值的類型。
public record AddProductCommand(Product Product) : IRequest<Product>;
Handler也需要做一些調整:
public class AddProductHandler : IRequestHandler<AddProductCommand, Product>
{
private readonly FakeDataStore _fakeDataStore;
public AddProductHandler(FakeDataStore fakeDataStore) => _fakeDataStore = fakeDataStore;
public async Task<Product> Handle(AddProductCommand request, CancellationToken cancellationToken)
{
await _fakeDataStore.AddProduct(request.Product);
return request.Product;
}
}
這是一個簡化到極致的例子,目的僅僅是為了演示如何使用。
最後需要修改的是Controller的Action方法:
[HttpPost]
public async Task<ActionResult> AddProduct([FromBody]Product product)
{
var productToReturn = await _mediator.Send(new AddProductCommand(product));
return CreatedAtRoute("GetProductById", new { id = productToReturn.Id }, productToReturn);
}
完成所有這些更改後,我們可以發送 post 請求,但這一次,我們將在響應正文中看到一個新創建的產品,並且在Header中,還會看到一個叫Location
的Key,它的Value是一個連接,可以用來獲取該新產品的信息:
好了,基本的新增和查詢操作就到這裡了,修改和刪除可以按照這個套路舉一反三。
MediatR通知
我們註意到,Request有且只能有一個Handler來處理,但是如果我們需要有多個Handler怎麼辦呢?這時候就需要用到通知了,通知的使用場景通常是在一個事件發生後,需要有多個響應。例如我們添加了產品後,需要:
- 發送郵件通知
- 作廢緩存
為了演示通知的使用,我們需要修改AddProductCommand
,在完成Product添加操作後發送一個通知出來。
發送電子郵件和使緩存失效超出了本文的範圍,但為了演示通知的行為,讓我們簡單地更新我們的Fake數據來表示已處理某些內容。
打開FakeDataStore
並添加一個方法:
public async Task EventOccured(Product product, string evt)
{
_products.Single(p => p.Id == product.Id).Name = $"{product.Name} evt: {evt}";
await Task.CompletedTask;
}
創建通知和處理程式
讓我們定義一條通知消息,用於封裝我們要定義的事件。
首先,讓我們添加一個名為“Notifications”的新文件夾,在該文件夾中添加一個名為ProductAddedNotification
的record。
public record ProductAddedNotification(Product Product) : INotification;
這個record繼承了INotification
,並且擁有一個Product屬性。
現在,我們為通知創建兩個處理程式:
public class EmailHandler : INotificationHandler<ProductAddedNotification>
{
private readonly FakeDataStore _fakeDataStore;
public EmailHandler(FakeDataStore fakeDataStore) => _fakeDataStore = fakeDataStore;
public async Task Handle(ProductAddedNotification notification, CancellationToken cancellationToken)
{
await _fakeDataStore.EventOccured(notification.Product, "Email sent");
await Task.CompletedTask;
}
}
public class CacheInvalidationHandler : INotificationHandler<ProductAddedNotification>
{
private readonly FakeDataStore _fakeDataStore;
public CacheInvalidationHandler(FakeDataStore fakeDataStore) => _fakeDataStore = fakeDataStore;
public async Task Handle(ProductAddedNotification notification, CancellationToken cancellationToken)
{
await _fakeDataStore.EventOccured(notification.Product, "Cache Invalidated");
await Task.CompletedTask;
}
}
這兩個類做了同樣的兩件事:
- 實現
INotificationHandler<ProductAddedNotification>
介面表示它可以處理ProductAddedNotification
通知。 - 在 FakeDataStore 上調用
EventOccured
方法。
在實際用例中,這些將以不同的方式實現,並且可能會採用一些外部依賴,但在這裡我們只是嘗試演示通知的行為。
觸發通知
接下來,我們需要實際觸發通知。
打開ProductsController
並且修改AddProduct
方法:
[HttpPost]
public async Task<ActionResult> AddProduct([FromBody]Product product)
{
var productToReturn = await _mediator.Send(new AddProductCommand(product));
await _mediator.Publish(new ProductAddedNotification(productToReturn));
return CreatedAtRoute("GetProductById", new { id = productToReturn.Id }, productToReturn);
}
除了要發送AddProductCommand
請求外,還需要向MediatR發送ProductAddedNotification
通知,但是這次需要使用Publish方法,而不是Send。
我們也可以把通知的發送放到
AddProductCommand
命令Handler裡面。
測試通知
運行項目,先在Postman中運行GetProducts
請求。
接下來運行AddProduct
請求,調用成功之後重新運行GetProducts
請求。
正如預期的那樣,當我們添加新產品時,兩個事件都會觸發並編輯名稱。雖然這是一個簡單且略顯粗糙的例子,但這裡的關鍵要點是,我們可以通過MediatR觸發一個事件並使用不同的Handler多次處理它,而生產者不知道任何不同。
如果我們想擴展我們的工作流程來執行額外的任務,我們可以簡單地添加一個新的處理程式。我們不需要修改通知本身或所述通知的發佈,這再次觸及了早期的可擴展性和關註點分離。
構建MediatR行為
通常,當我們構建應用程式時,我們有許多跨領域問題。其中包括授權、驗證和日誌記錄。我們可以利用 Behavior,而不是在整個處理程式中重覆此邏輯。MediatR的Behavior與ASP.NET Core中間件非常相似,它們接受請求,執行某些操作,然後(可選)傳遞請求。
創建Behavior
首先我們在項目下新建一個名為“Behaviors”的文件夾。然後在文件夾中添加一個類,命名為LoggingBehavior
:
public class LoggingBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
where TRequest : IRequest<TResponse>
{
private readonly ILogger<LoggingBehavior<TRequest, TResponse>> _logger;
public LoggingBehavior(ILogger<LoggingBehavior<TRequest, TResponse>> logger)
=> _logger = logger;
public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TResponse> next,
CancellationToken cancellationToken)
{
_logger.LogInformation($"Handling {typeof(TRequest).Name}");
var response = await next();
_logger.LogInformation($"Handled {typeof(TResponse).Name}");
return response;
}
}
解釋一下這段代碼:
LoggingBehavior
包含兩個泛型參數TRequest
和TResponse
,繼承了IPipelineBehavior<TRequest, TResponse>
介面。從泛型參數可以看出,這個Behavior可以處理任何請求。LoggingBehavior
實現了Handle方法,在調用next()
委托之前和之後進行日誌記錄 。
註冊Behavior
打開Program.cs
,增加一行代碼:
builder.Services.AddSingleton(typeof(IPipelineBehavior<,>), typeof(LoggingBehavior<,>));
測試Behavior
運行項目,打開Postman並執行任意一個請求,查看控制台輸出:
OK,看到這個界面說明LoggingBehavior已經正常工作了。
我們在沒有修改任何業務代碼的情況下,輕鬆地用AOP的方式實現了日誌記錄。
結論
我們用了兩篇文章介紹如何使用 MediatR 在核心 ASP.NET 實現 CQRS 和中介器模式。我們已經完成了請求和通知,以及如何處理行為的橫切問題。
MediatR 為需要從簡單的單體架構演變為更成熟的應用程式提供了一個很好的起點,它允許我們分離讀取和寫入關註點,並最大限度地減少代碼之間的依賴關係。
這為我們採取其他幾個可能的步驟提供了有利條件:
- 使用不同的資料庫進行讀取(也許可以通過擴展我們的 ProductAddedNotification 來添加第二個處理程式,將數據寫入新的資料庫,然後修改 GetProductsQuery 以從該資料庫讀取數據)
- 將我們的讀取/寫入分離到不同的應用程式中(修改 ProductAddedNotification 以發佈到 Kafka/服務匯流排,然後讓第二個應用程式從消息匯流排中讀取)。
現在,我們的應用程式已經處於一個很好的狀態,可以在需要時採取上述步驟,而不會在短期內使事情過於複雜。
點關註,不迷路。
如果您喜歡這篇文章,請不要忘記點贊、關註、轉發,謝謝!如果您有任何高見,歡迎在評論區留言討論……