【Shashlik.EventBus】.NET 事件匯流排,分散式事務最終一致性 簡介 github https://github.com/dotnet-shashlik/shashlik.eventbus 各位爺高興了給個star唄。 分散式事務、CAP定理、事件匯流排,在當前微服務、分散式、集群大行 ...
【Shashlik.EventBus】.NET 事件匯流排,分散式事務最終一致性
簡介
github https://github.com/dotnet-shashlik/shashlik.eventbus
各位爺高興了給個star唄。
分散式事務、CAP定理、事件匯流排,在當前微服務、分散式、集群大行其道的架構前提下,是不可逃避的幾個關鍵字,在此不會過多闡述相關的理論知識。Shashlik.EventBus
就是一個基於.NET6的開源事件匯流排解決方案,同時也是分散式事務最終一致性、延遲事件解決方案。Shashlik.EventBus
採用的是非同步確保的思路(本地消息表),將消息數據與業務數據在同一事務中進行提交或回滾,以此來保證消息數據的可靠性。其設計目標是高性能、簡單、易用、易擴展,為拋棄歷史包袱,僅支持NET6,採用最寬鬆的 MIT 開源協議。
原理如下圖:
如圖所示,消息數據需要和業務數據在同一的事務中進行提交或者回滾,最後Shashlik.EventBus
會檢查消息數據是否已提交,如果已提交才會執行真正的消息發送。所以要求事務的隔離級別最低為讀已提交(RC)。
關於消息冪等
Shashlik.EventBus
不能保證業務消息的冪等性,為了保證消息的可靠傳輸,EventBus以及消息中間件對消息QOS處理等級必須為at least once
(至少到達一次),一般消息中間件都需要開啟消息持久化避免消息丟失。簡而言之就是一個事件處理類可能處理多次同一個事件,事件消息的冪等性應該由業務方進行處理。比如用戶訂單付款完成為一個事件,付款完成後需要修改訂單狀態為待發貨,也就是在付款完成事件處理類中可能收到多次這個訂單的付款完成事件,那麼業務的冪等性處理就可以使用鎖,判斷訂單狀態,如果訂單狀態已經為待發貨,則直接返回並忽略本次事件響應。
延遲事件
Shashlik.EventBus
支持基於本地的延遲事件機制,考慮到不是所有的消息中間件都支持延遲功能,且為了最大程度保證消息的可靠性,最後採用了System.Timers.Timer
來執行延遲功能。
延遲事件同樣適用於分散式事務最終一致性,但如果延遲事件處理類處理異常由重試器介入處理後,那麼最終的延遲執行時間和期望的延遲時間就會產生較大的差異,是否忽略這裡的時間差需要由具體的業務來決定。比如訂單30分鐘未付款需要關閉訂單,30分鐘後關閉訂單出現了異常,最後由重試器到了40分鐘後才關閉,也不影響訂單,那麼認為這個時間差可以容忍。又比如雙11啦,發佈一個延遲事件,晚上12點叫醒我起來買買買,只有1分鐘時間,過了就買不到了,那麼這種情況可以在事件處理類中,自行根據當前時間、事件發送時間、延遲執行時間等要素,自行決定業務如何處理。
延遲事件和普通事件在事件定義和事件處理類聲明和處理時沒有任何區別,僅僅是在發佈事件時需要指定延遲時間。
上代碼
需求:一個新用戶註冊以後有以下需求:1. 發送歡迎註冊簡訊;2. 發放新用戶優惠券;3. 30分鐘後推送新用戶優惠活動信息。
- 服務配置,這裡以
MySql
+RabbitMQ
為例:
services.AddEventBus(r =>
{
// 這些都是預設配置,可以直接services.AddEventBus()
// 運行環境,註冊到MQ的事件名稱和事件處理名稱會帶上此尾碼
r.Environment = "Production";
// 最大失敗重試次數,預設60次
r.RetryFailedMax = 60;
// 消息重試間隔,預設2分鐘
r.RetryInterval = 60 * 2;
// 單次重試消息數量限制,預設100
r.RetryLimitCount = 100;
// 成功的消息過期時間,預設3天,失敗的消息永不過期,必須處理
r.SucceedExpireHour = 24 * 3;
// 消息處理失敗後,重試器介入時間,預設5分鐘後
r.StartRetryAfter = 60 * 5;
// 事務提交超時時間,單位秒,預設60秒
r.TransactionCommitTimeout = 60;
// 重試器執行時消息鎖定時長
r.LockTime = 110;
})
// 使用ef DbContext mysql
.AddMySql<DemoDbContext>()
// 配置RabbitMQ
.AddRabbitMQ(r =>
{
r.Host = "localhost";
r.UserName = "rabbit";
r.Password = "123123";
});
- 定義事件
// 新用戶註冊完成事件,實現介面IEvent
public class NewUserEvent : IEvent
{
public string Id { get;set; }
public string Name { get; set; }
}
// 定義新用戶註冊延遲活動推送事件
public class NewUserPromotionEvent : IEvent
{
public string Id { get;set; }
public string Name { get; set; }
public string PromotionId { get; set; }
}
- 發佈事件
public class UserManager
{
public UserManager(IEventPublisher eventPublisher, DemoDbContext dbContext)
{
EventPublisher = eventPublisher;
DbContext = dbContext;
}
private IEventPublisher EventPublisher { get; }
private DemoDbContext DbContext { get; }
public async Task CreateUserAsync(UserInput input)
{
// 開啟本地事務
using var tran = await DbContext.DataBase.BeginTransactionAsync();
try
{
// 創建用戶邏輯處理...
// 發佈新用戶事件
// 通過註入IEventPublisher發佈事件,需要傳入事務上下文數據
await EventPublisher.PublishAsync(new NewUserEvent{
Id = user.Id,
Name = input.Name
}, DbContext.GetTransactionContext());
// 發佈延遲事件
// 通過ef擴展,直接使用DbContext發佈事件,自動使用當前上下文事務
await DbContext.PublishEventAsync(new NewUserPromotionEvent{
Id = user.Id,
Name = input.Name,
PromotionId = "1"
}, DatetimeOffset.Now.AddMinutes(30));
// 提交本地事務
await tran.CommitAsync();
}catch(Exception ex)
{
// 回滾事務,消息數據也將回滾不會發佈
await tran.RollbackAsync();
}
}
}
- 定義事件處理類
// 一個事件可以有多個處理類,可以分佈在不同的微服務中
// 用於發送簡訊的事件處理類
public class NewUserEventForSmsHandler : IEventHandler<NewUserEvent>
{
public async Task Execute(NewUserEvent @event, IDictionary<string, string> items)
{
// 發送簡訊...
}
}
// 用於發放消費券的事件處理類
public class NewUserEventForCouponsHandler : IEventHandler<NewUserEvent>
{
public async Task Execute(NewUserEvent @event, IDictionary<string, string> items)
{
// 業務處理...
}
}
// 用於新用戶延遲活動的事件處理類,將在指定時間執行
public class NewUserPromotionEventHandler : IEventHandler<NewUserPromotionEvent>
{
public async Task Execute(NewUserPromotionEvent @event, IDictionary<string, string> items)
{
// 業務處理...
}
}
預設的,發佈、聲明到消息中間件的事件、事件處理器名稱生產規則為{Type.Name}.{Options.Environment}
,在分散式架構下需要,您需要瞭解這個預設規則,這點不同於CAP
框架必須顯示聲明,當然Shashlik.EventBus
也可以使用EventBusNameAttribute
特性來顯示聲明,詳細說明請上github查看wiki文檔。
XA事務支持(TransactionScope)
雖然儘可能的不要使用TransactionScope
,但在某些場景仍然是需要的,Shashlik.EventBus
對其提供了事務支持,可以通過XaTransactionContext.Current
獲取當前環境的事務上下文,發佈事件如下:
public class UserManager
{
public UserManager(IEventPublisher eventPublisher, DemoDbContext dbContext)
{
EventPublisher = eventPublisher;
DbContext = dbContext;
}
private IEventPublisher EventPublisher { get; }
private DemoDbContext DbContext { get; }
public async Task CreateUserAsync(UserInput input)
{
// 開啟事務
using var scope = new TransactionScope();
try
{
// 創建用戶邏輯處理...
// 發佈新用戶事件
// 通過註入IEventPublisher發佈事件,需要傳入事務上下文數據
await EventPublisher.PublishAsync(new NewUserEvent{
Id = user.Id,
Name = input.Name
// 使用 XaTransactionContext.Current
}, XaTransactionContext.Current);
// 提交事務
await scope.Complete();
}catch(Exception ex)
{
// 回滾事務,消息數據也將回滾不會發佈
await tran.RollbackAsync();
}
}
}
擴展
如果預設實現不能滿足你的需求,可以自行實現可擴展介面,並註冊即可。
IMsgIdGenerator
:消息Id生成器,是指傳輸的全局唯一id,不是指存儲的id。預設guidIEventPublisher
:事件發佈處理器。IMessageSerializer
:消息序列化、反序列化處理類。預設Newtonsoft.Json
。IReceivedMessageRetryProvider
:已接收消息重試器。IPublishedMessageRetryProvider
:已發佈消息重試器。IEventHandlerInvoker
: 事件處理執行器IEventNameRuler
:事件名稱規則生成(對應消息隊列topic/route)。IEventHandlerNameRuler
:事件處理名稱規則生成(對應消息隊列queue/group)。IEventHandlerFindProvider
:事件處理類查找器IExpiredMessageProvider
:已過期消息刪除處理器。IMessageListener
:消息監聽處理器。IRetryProvider
:重試執行器。IPublishHandler
:消息發佈處理器。IReceivedHandler
:消息接收處理器。IMessageStorageInitializer
:存儲介質初始化。IMessageStorage
:消息存儲、讀取等操作。
例:
// 替換預設的IMsgIdGenerator
service.AddSingleton<IMsgIdGenerator, CustomMsgIdGenerator>();
service.AddEventBus()
.AddMemoryQueue()
.AddMemoryStorage();
後續計劃
- 功能
- 消息中間件支持
- 存儲支持