前言 當你在處理非同步消息時,每個單獨的消息處理程式都是一個單獨的handler,每個handler之間互不影響。這時如果一個消息依賴另一個消息的狀態呢? 這時業務邏輯怎麼處理? 借用我們上篇文章的業務場景,如果在Ship項目里需要發送一個ShipOrder Command。這個ShipOrder需要 ...
前言
當你在處理非同步消息時,每個單獨的消息處理程式都是一個單獨的handler,每個handler之間互不影響。這時如果一個消息依賴另一個消息的狀態呢? 這時業務邏輯怎麼處理?
借用我們上篇文章的業務場景,如果在Ship項目里需要發送一個ShipOrder Command。這個ShipOrder需要依賴Sales.OrderPlaced和Bill.OrderBilled Command的狀態,目前我們的兩個單獨的Message Handler都沒有保持任何的狀態欄位,所以這時如果我們需要完成這個業務模型,就需要跟蹤他們的狀態。
什麼是Saga
這個就是本篇文章要提的saga,定義在NServiceBus框架里,他的本質是一個消息驅動模型里的狀態機,或者也可以理解為一系列消息處理程式用來共用狀態的業務模型。我理解在消息隊列里如果我們要保證消息一致性通常會自己創建一張Event表,這裡saga維持狀態的角色有點像我們這裡的Event表。
好的,回到正題上,如果我們需要在Shipping Service里發送一個ShipOrder,發送他之前需要確定OrderPlaced和OrderBilled的狀態,確保這兩個消息都收到以後才能發送ShipOrder。
如何使用Saga
當然,我暫且理解Saga的目的是為了處理在長時間運行的任務里保證數據一致性這樣的一個角色。
Saga狀態
saga狀態主要是告訴NServiceBus在處理數據一致性的判斷邏輯,這裡需要繼承抽象類ContainSagaData,在我們這個業務場景中則主要是判斷OrderPlaced和OrderBilled消息是否已經接收到並處理。
public class ShippingPolicyData:ContainSagaData
{
public string OrderId { get; set; }
public bool IsOrderPlaced { get; set; }
public bool IsOrderBilled { get; set; }
}
Saga如何工作
有了狀態以後,我們還需要一個“handler”來告訴NServiceBus,在這個handler里主要用來處理消息數據一致性,我看了官方文檔後,他們建議我們這裡的handler角色使用Policy尾碼命名,當然我覺的也可以用Saga尾碼命名,比如ShippingPolicy或者ShippingSaga。
同時這裡我們這個handler覺色還要繼承Saga
public class ShipPolicy:Saga<ShippingPolicyData>,
IAmStartedByMessages<OrderPlaced>,
IAmStartedByMessages<OrderBilled> //都可以創建Saga實例
{
private static ILog log = LogManager.GetLogger<ShipPolicy>();
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<ShippingPolicyData> mapper)
{
mapper.ConfigureMapping<OrderPlaced>(t=>t.OrderId).ToSaga(sagaData=>sagaData.OrderId);
mapper.ConfigureMapping<OrderBilled>(t=>t.OrderId).ToSaga(sagaData=>sagaData.OrderId);
}
public Task Handle(OrderPlaced message, IMessageHandlerContext context)
{
log.Info("OrderPlaced message received ");
this.Data.IsOrderPlaced = true;
return ProcessOrder(context);
}
public Task Handle(OrderBilled message, IMessageHandlerContext context)
{
log.Info("OrderBilled message received");
this.Data.IsOrderBilled = true;
return ProcessOrder(context);
}
private async Task ProcessOrder(IMessageHandlerContext context)
{
if (Data.IsOrderBilled && Data.IsOrderPlaced)
{
await context.SendLocal(new ShipOrder()
{
OrderId = Data.OrderId
});
MarkAsComplete();
}
}
}
這個類里你會發現還實現了介面IAmStartedByMessages
發送ShipOrder Command
到這裡也就是我們的OrderPlaced和OrderBIlled消息都收到了,業務邏輯符合要求,可以發送ShipOrder消息了,也就是用戶創建了訂單,付了款,可以發貨了。
新建ShipOrder類
public class ShipOrder:ICommand
{
public string OrderId { get; set; }
}
新建ShipOrderHandler
public class ShipOrderHandler:IHandleMessages<ShipOrder>
{
private static ILog log = LogManager.GetLogger<ShipOrderHandler>();
public Task Handle(ShipOrder message, IMessageHandlerContext context)
{
log.Info($"Order [{message.OrderId}] - Successfully shipped");
return Task.CompletedTask;
}
}
運行Shipping項目,看到下圖,則說明程式運行成功,我們這個業務場景里OrderPlaced消息肯定先接受到,OrderBilled消息後接受到。
參考鏈接
https://docs.particular.net/tutorials/nservicebus-sagas/1-getting-started/
https://docs.particular.net/nservicebus/sagas/