【Shashlik.EventBus】.NET 事件匯流排,分散式事務最終一致性

来源:https://www.cnblogs.com/gucaocao/archive/2022/09/13/16688691.html
-Advertisement-
Play Games

【Shashlik.EventBus】.NET 事件匯流排,分散式事務最終一致性 簡介 github https://github.com/dotnet-shashlik/shashlik.eventbus 各位爺高興了給個star唄。 分散式事務、CAP定理、事件匯流排,在當前微服務、分散式、集群大行 ...


【Shashlik.EventBus】.NET 事件匯流排,分散式事務最終一致性

簡介

github https://github.com/dotnet-shashlik/shashlik.eventbus

各位爺高興了給個star唄。

分散式事務、CAP定理、事件匯流排,在當前微服務、分散式、集群大行其道的架構前提下,是不可逃避的幾個關鍵字,在此不會過多闡述相關的理論知識。Shashlik.EventBus就是一個基於.NET6的開源事件匯流排解決方案,同時也是分散式事務最終一致性、延遲事件解決方案。Shashlik.EventBus採用的是非同步確保的思路(本地消息表),將消息數據與業務數據在同一事務中進行提交或回滾,以此來保證消息數據的可靠性。其設計目標是高性能、簡單、易用、易擴展,為拋棄歷史包袱,僅支持NET6,採用最寬鬆的 MIT 開源協議。

原理如下圖:

image

如圖所示,消息數據需要和業務數據在同一的事務中進行提交或者回滾,最後Shashlik.EventBus會檢查消息數據是否已提交,如果已提交才會執行真正的消息發送。所以要求事務的隔離級別最低為讀已提交(RC)

關於消息冪等

Shashlik.EventBus不能保證業務消息的冪等性,為了保證消息的可靠傳輸,EventBus以及消息中間件對消息QOS處理等級必須為at least once (至少到達一次),一般消息中間件都需要開啟消息持久化避免消息丟失。簡而言之就是一個事件處理類可能處理多次同一個事件,事件消息的冪等性應該由業務方進行處理。比如用戶訂單付款完成為一個事件,付款完成後需要修改訂單狀態為待發貨,也就是在付款完成事件處理類中可能收到多次這個訂單的付款完成事件,那麼業務的冪等性處理就可以使用鎖,判斷訂單狀態,如果訂單狀態已經為待發貨,則直接返回並忽略本次事件響應。

延遲事件

Shashlik.EventBus支持基於本地的延遲事件機制,考慮到不是所有的消息中間件都支持延遲功能,且為了最大程度保證消息的可靠性,最後採用了System.Timers.Timer來執行延遲功能。

延遲事件同樣適用於分散式事務最終一致性,但如果延遲事件處理類處理異常由重試器介入處理後,那麼最終的延遲執行時間和期望的延遲時間就會產生較大的差異,是否忽略這裡的時間差需要由具體的業務來決定。比如訂單30分鐘未付款需要關閉訂單,30分鐘後關閉訂單出現了異常,最後由重試器到了40分鐘後才關閉,也不影響訂單,那麼認為這個時間差可以容忍。又比如雙11啦,發佈一個延遲事件,晚上12點叫醒我起來買買買,只有1分鐘時間,過了就買不到了,那麼這種情況可以在事件處理類中,自行根據當前時間、事件發送時間、延遲執行時間等要素,自行決定業務如何處理。

延遲事件和普通事件在事件定義和事件處理類聲明和處理時沒有任何區別,僅僅是在發佈事件時需要指定延遲時間。

上代碼

需求:一個新用戶註冊以後有以下需求:1. 發送歡迎註冊簡訊;2. 發放新用戶優惠券;3. 30分鐘後推送新用戶優惠活動信息。

  1. 服務配置,這裡以MySql + RabbitMQ為例:
  services.AddEventBus(r =>
      {
          // 這些都是預設配置,可以直接services.AddEventBus()
          // 運行環境,註冊到MQ的事件名稱和事件處理名稱會帶上此尾碼
          r.Environment = "Production";
          // 最大失敗重試次數,預設60次
          r.RetryFailedMax = 60;
          // 消息重試間隔,預設2分鐘
          r.RetryInterval = 60 * 2;
          // 單次重試消息數量限制,預設100
          r.RetryLimitCount = 100;
          // 成功的消息過期時間,預設3天,失敗的消息永不過期,必須處理
          r.SucceedExpireHour = 24 * 3;            
          // 消息處理失敗後,重試器介入時間,預設5分鐘後
          r.StartRetryAfter = 60 * 5;            
          // 事務提交超時時間,單位秒,預設60秒
          r.TransactionCommitTimeout = 60;
          // 重試器執行時消息鎖定時長
          r.LockTime = 110;
      })
      // 使用ef DbContext mysql
      .AddMySql<DemoDbContext>()
      // 配置RabbitMQ
      .AddRabbitMQ(r =>
      {
          r.Host = "localhost";
          r.UserName = "rabbit";
          r.Password = "123123";
      });
  1. 定義事件

  // 新用戶註冊完成事件,實現介面IEvent
  public class NewUserEvent : IEvent
  {
      public string Id { get;set; }
      public string Name { get; set; }
  }
  
  // 定義新用戶註冊延遲活動推送事件
  public class NewUserPromotionEvent : IEvent
  {
      public string Id { get;set; }
      public string Name { get; set; }
      public string PromotionId { get; set; }
  }

  1. 發佈事件

  public class UserManager
  {
      public UserManager(IEventPublisher eventPublisher, DemoDbContext dbContext)
      {
          EventPublisher = eventPublisher;
          DbContext = dbContext;
      }
  
      private IEventPublisher EventPublisher { get; }
      private DemoDbContext DbContext { get; }
  
      public async Task CreateUserAsync(UserInput input)
      {
          // 開啟本地事務
          using var tran = await DbContext.DataBase.BeginTransactionAsync();
          try
          {
              // 創建用戶邏輯處理...
  
              // 發佈新用戶事件
              // 通過註入IEventPublisher發佈事件,需要傳入事務上下文數據
              await EventPublisher.PublishAsync(new NewUserEvent{
                  Id = user.Id,
                  Name = input.Name
              }, DbContext.GetTransactionContext());
  
              // 發佈延遲事件
              // 通過ef擴展,直接使用DbContext發佈事件,自動使用當前上下文事務
              await DbContext.PublishEventAsync(new NewUserPromotionEvent{
                  Id = user.Id,
                  Name = input.Name,
                  PromotionId = "1"
              }, DatetimeOffset.Now.AddMinutes(30));
  
              // 提交本地事務
              await tran.CommitAsync();
          }catch(Exception ex)
          {
              // 回滾事務,消息數據也將回滾不會發佈
              await tran.RollbackAsync();
          }
      }
  }

  1. 定義事件處理類
    
  // 一個事件可以有多個處理類,可以分佈在不同的微服務中
  // 用於發送簡訊的事件處理類
  public class NewUserEventForSmsHandler : IEventHandler<NewUserEvent>
  {
      public async Task Execute(NewUserEvent @event, IDictionary<string, string> items)
      {
          // 發送簡訊...
      }
  }


  // 用於發放消費券的事件處理類
  public class NewUserEventForCouponsHandler : IEventHandler<NewUserEvent>
  {
      public async Task Execute(NewUserEvent @event, IDictionary<string, string> items)
      {
          // 業務處理...
      }
  }

  // 用於新用戶延遲活動的事件處理類,將在指定時間執行
  public class NewUserPromotionEventHandler : IEventHandler<NewUserPromotionEvent>
  {
      public async Task Execute(NewUserPromotionEvent @event, IDictionary<string, string> items)
      {
          // 業務處理...
      }
  }    

預設的,發佈、聲明到消息中間件的事件、事件處理器名稱生產規則為{Type.Name}.{Options.Environment},在分散式架構下需要,您需要瞭解這個預設規則,這點不同於CAP框架必須顯示聲明,當然Shashlik.EventBus也可以使用EventBusNameAttribute特性來顯示聲明,詳細說明請上github查看wiki文檔

XA事務支持(TransactionScope)

雖然儘可能的不要使用TransactionScope,但在某些場景仍然是需要的,Shashlik.EventBus對其提供了事務支持,可以通過XaTransactionContext.Current獲取當前環境的事務上下文,發佈事件如下:


  public class UserManager
  {
      public UserManager(IEventPublisher eventPublisher, DemoDbContext dbContext)
      {
          EventPublisher = eventPublisher;
          DbContext = dbContext;
      }
  
      private IEventPublisher EventPublisher { get; }
      private DemoDbContext DbContext { get; }
  
      public async Task CreateUserAsync(UserInput input)
      {
          // 開啟事務
          using var scope = new TransactionScope();
          try
          {
              // 創建用戶邏輯處理...
  
              // 發佈新用戶事件
              // 通過註入IEventPublisher發佈事件,需要傳入事務上下文數據
              await EventPublisher.PublishAsync(new NewUserEvent{
                  Id = user.Id,
                  Name = input.Name
              // 使用 XaTransactionContext.Current
              }, XaTransactionContext.Current);
  
              // 提交事務
              await scope.Complete();
          }catch(Exception ex)
          {
              // 回滾事務,消息數據也將回滾不會發佈
              await tran.RollbackAsync();
          }
      }
  }

擴展

如果預設實現不能滿足你的需求,可以自行實現可擴展介面,並註冊即可。

  • IMsgIdGenerator:消息Id生成器,是指傳輸的全局唯一id,不是指存儲的id。預設guid
  • IEventPublisher:事件發佈處理器。
  • IMessageSerializer:消息序列化、反序列化處理類。預設Newtonsoft.Json
  • IReceivedMessageRetryProvider:已接收消息重試器。
  • IPublishedMessageRetryProvider:已發佈消息重試器。
  • IEventHandlerInvoker: 事件處理執行器
  • IEventNameRuler:事件名稱規則生成(對應消息隊列topic/route)。
  • IEventHandlerNameRuler:事件處理名稱規則生成(對應消息隊列queue/group)。
  • IEventHandlerFindProvider:事件處理類查找器
  • IExpiredMessageProvider:已過期消息刪除處理器。
  • IMessageListener:消息監聽處理器。
  • IRetryProvider:重試執行器。
  • IPublishHandler:消息發佈處理器。
  • IReceivedHandler:消息接收處理器。
  • IMessageStorageInitializer:存儲介質初始化。
  • IMessageStorage:消息存儲、讀取等操作。

例:


  // 替換預設的IMsgIdGenerator
  service.AddSingleton<IMsgIdGenerator, CustomMsgIdGenerator>();
  service.AddEventBus()
      .AddMemoryQueue()
      .AddMemoryStorage();

後續計劃

  • 功能
  • 消息中間件支持
  • 存儲支持

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 前言 嗨嘍,大家好呀~這裡是愛看美女的茜茜吶 又到了學Python時刻~ 今天實現一下人臉識別。 先問大家一個問題什麼是百度Aip模塊? 百度AI平臺提供了很多的API介面供開發者快速的調用運用在項目中本文寫的是使用百度AI的線上介面SDK模塊(baidu-aip)進行實現人臉識別 除了人臉識別,其 ...
  • 本文詳細闡述了knn演算法,從開始介紹什麼事knn,到講解knn演算法的原理再到最後以實際例子來運用knn演算法的步驟,實際例子的代碼講解也十分詳細 ...
  • 為了在jupyter中使用pyTorch的虛擬環境,來記錄一下怎麼操作一、conda命令的使用因為使用的是jupyter,所有就使用Anaconda Prompt來創建虛擬環境(也可使用virtualenv,不過沒試過) conda create -n 環境名 # 創建的環境在預設路徑下,C盤位置不 ...
  • 摘要:本文主要講述如何進行圖像量化處理和採樣處理及局部馬賽克特效。 本文分享自華為雲社區《[Python圖像處理] 二十.圖像量化處理和採樣處理及局部馬賽克特效》,作者: eastmount。 本文主要講述如何進行圖像量化處理和採樣處理及局部馬賽克特效。 一.圖像量化處理 圖像通常是自然界景物的客觀 ...
  • 1.推導式套路 除了最簡單的列表推導式和生成器表達式,其實還有字典推導式、集合推導式等等。 下麵是一個以列表推導式為例的推導式詳細格式,同樣適用於其他推導式。 variable = [out_exp_res for out_exp in input_list if out_exp == 2] out ...
  • 當面試官問你,“什麼是令牌桶限流演算法”! 你知道要怎麼回答,才能獲得面試官的青睞嗎? 大家好,我是Mic,一個工作了14年的Java程式員。 關於這個問題,面試官想考察哪些緯度?我們又該怎麼回答呢? 問題解析 限流策略,是在高併發流量下保護系統穩定性的一種策略。 所以這個問題,主要是互聯網公司會去考 ...
  • 一、argparse簡介 argparse 是 python 自帶的命令行參數解析包,可以用來方便的服務命令行參數,使用之前需要先導入包 import argparse 二、簡單案例 簡單使用,創建一個名為test.py的文件 # 導入 argparse 模塊 import argparse # 創 ...
  • 在實際業務中,當後臺數據發生變化,客戶端能夠實時的收到通知,而不是由用戶主動的進行頁面刷新才能查看,這將是一個非常人性化的設計 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...