【Shashlik.EventBus】.NET 事件匯流排,分散式事務最終一致性

来源:https://www.cnblogs.com/gucaocao/archive/2022/09/13/16688691.html
-Advertisement-
Play Games

【Shashlik.EventBus】.NET 事件匯流排,分散式事務最終一致性 簡介 github https://github.com/dotnet-shashlik/shashlik.eventbus 各位爺高興了給個star唄。 分散式事務、CAP定理、事件匯流排,在當前微服務、分散式、集群大行 ...


【Shashlik.EventBus】.NET 事件匯流排,分散式事務最終一致性

簡介

github https://github.com/dotnet-shashlik/shashlik.eventbus

各位爺高興了給個star唄。

分散式事務、CAP定理、事件匯流排,在當前微服務、分散式、集群大行其道的架構前提下,是不可逃避的幾個關鍵字,在此不會過多闡述相關的理論知識。Shashlik.EventBus就是一個基於.NET6的開源事件匯流排解決方案,同時也是分散式事務最終一致性、延遲事件解決方案。Shashlik.EventBus採用的是非同步確保的思路(本地消息表),將消息數據與業務數據在同一事務中進行提交或回滾,以此來保證消息數據的可靠性。其設計目標是高性能、簡單、易用、易擴展,為拋棄歷史包袱,僅支持NET6,採用最寬鬆的 MIT 開源協議。

原理如下圖:

image

如圖所示,消息數據需要和業務數據在同一的事務中進行提交或者回滾,最後Shashlik.EventBus會檢查消息數據是否已提交,如果已提交才會執行真正的消息發送。所以要求事務的隔離級別最低為讀已提交(RC)

關於消息冪等

Shashlik.EventBus不能保證業務消息的冪等性,為了保證消息的可靠傳輸,EventBus以及消息中間件對消息QOS處理等級必須為at least once (至少到達一次),一般消息中間件都需要開啟消息持久化避免消息丟失。簡而言之就是一個事件處理類可能處理多次同一個事件,事件消息的冪等性應該由業務方進行處理。比如用戶訂單付款完成為一個事件,付款完成後需要修改訂單狀態為待發貨,也就是在付款完成事件處理類中可能收到多次這個訂單的付款完成事件,那麼業務的冪等性處理就可以使用鎖,判斷訂單狀態,如果訂單狀態已經為待發貨,則直接返回並忽略本次事件響應。

延遲事件

Shashlik.EventBus支持基於本地的延遲事件機制,考慮到不是所有的消息中間件都支持延遲功能,且為了最大程度保證消息的可靠性,最後採用了System.Timers.Timer來執行延遲功能。

延遲事件同樣適用於分散式事務最終一致性,但如果延遲事件處理類處理異常由重試器介入處理後,那麼最終的延遲執行時間和期望的延遲時間就會產生較大的差異,是否忽略這裡的時間差需要由具體的業務來決定。比如訂單30分鐘未付款需要關閉訂單,30分鐘後關閉訂單出現了異常,最後由重試器到了40分鐘後才關閉,也不影響訂單,那麼認為這個時間差可以容忍。又比如雙11啦,發佈一個延遲事件,晚上12點叫醒我起來買買買,只有1分鐘時間,過了就買不到了,那麼這種情況可以在事件處理類中,自行根據當前時間、事件發送時間、延遲執行時間等要素,自行決定業務如何處理。

延遲事件和普通事件在事件定義和事件處理類聲明和處理時沒有任何區別,僅僅是在發佈事件時需要指定延遲時間。

上代碼

需求:一個新用戶註冊以後有以下需求:1. 發送歡迎註冊簡訊;2. 發放新用戶優惠券;3. 30分鐘後推送新用戶優惠活動信息。

  1. 服務配置,這裡以MySql + RabbitMQ為例:
  services.AddEventBus(r =>
      {
          // 這些都是預設配置,可以直接services.AddEventBus()
          // 運行環境,註冊到MQ的事件名稱和事件處理名稱會帶上此尾碼
          r.Environment = "Production";
          // 最大失敗重試次數,預設60次
          r.RetryFailedMax = 60;
          // 消息重試間隔,預設2分鐘
          r.RetryInterval = 60 * 2;
          // 單次重試消息數量限制,預設100
          r.RetryLimitCount = 100;
          // 成功的消息過期時間,預設3天,失敗的消息永不過期,必須處理
          r.SucceedExpireHour = 24 * 3;            
          // 消息處理失敗後,重試器介入時間,預設5分鐘後
          r.StartRetryAfter = 60 * 5;            
          // 事務提交超時時間,單位秒,預設60秒
          r.TransactionCommitTimeout = 60;
          // 重試器執行時消息鎖定時長
          r.LockTime = 110;
      })
      // 使用ef DbContext mysql
      .AddMySql<DemoDbContext>()
      // 配置RabbitMQ
      .AddRabbitMQ(r =>
      {
          r.Host = "localhost";
          r.UserName = "rabbit";
          r.Password = "123123";
      });
  1. 定義事件

  // 新用戶註冊完成事件,實現介面IEvent
  public class NewUserEvent : IEvent
  {
      public string Id { get;set; }
      public string Name { get; set; }
  }
  
  // 定義新用戶註冊延遲活動推送事件
  public class NewUserPromotionEvent : IEvent
  {
      public string Id { get;set; }
      public string Name { get; set; }
      public string PromotionId { get; set; }
  }

  1. 發佈事件

  public class UserManager
  {
      public UserManager(IEventPublisher eventPublisher, DemoDbContext dbContext)
      {
          EventPublisher = eventPublisher;
          DbContext = dbContext;
      }
  
      private IEventPublisher EventPublisher { get; }
      private DemoDbContext DbContext { get; }
  
      public async Task CreateUserAsync(UserInput input)
      {
          // 開啟本地事務
          using var tran = await DbContext.DataBase.BeginTransactionAsync();
          try
          {
              // 創建用戶邏輯處理...
  
              // 發佈新用戶事件
              // 通過註入IEventPublisher發佈事件,需要傳入事務上下文數據
              await EventPublisher.PublishAsync(new NewUserEvent{
                  Id = user.Id,
                  Name = input.Name
              }, DbContext.GetTransactionContext());
  
              // 發佈延遲事件
              // 通過ef擴展,直接使用DbContext發佈事件,自動使用當前上下文事務
              await DbContext.PublishEventAsync(new NewUserPromotionEvent{
                  Id = user.Id,
                  Name = input.Name,
                  PromotionId = "1"
              }, DatetimeOffset.Now.AddMinutes(30));
  
              // 提交本地事務
              await tran.CommitAsync();
          }catch(Exception ex)
          {
              // 回滾事務,消息數據也將回滾不會發佈
              await tran.RollbackAsync();
          }
      }
  }

  1. 定義事件處理類
    
  // 一個事件可以有多個處理類,可以分佈在不同的微服務中
  // 用於發送簡訊的事件處理類
  public class NewUserEventForSmsHandler : IEventHandler<NewUserEvent>
  {
      public async Task Execute(NewUserEvent @event, IDictionary<string, string> items)
      {
          // 發送簡訊...
      }
  }


  // 用於發放消費券的事件處理類
  public class NewUserEventForCouponsHandler : IEventHandler<NewUserEvent>
  {
      public async Task Execute(NewUserEvent @event, IDictionary<string, string> items)
      {
          // 業務處理...
      }
  }

  // 用於新用戶延遲活動的事件處理類,將在指定時間執行
  public class NewUserPromotionEventHandler : IEventHandler<NewUserPromotionEvent>
  {
      public async Task Execute(NewUserPromotionEvent @event, IDictionary<string, string> items)
      {
          // 業務處理...
      }
  }    

預設的,發佈、聲明到消息中間件的事件、事件處理器名稱生產規則為{Type.Name}.{Options.Environment},在分散式架構下需要,您需要瞭解這個預設規則,這點不同於CAP框架必須顯示聲明,當然Shashlik.EventBus也可以使用EventBusNameAttribute特性來顯示聲明,詳細說明請上github查看wiki文檔

XA事務支持(TransactionScope)

雖然儘可能的不要使用TransactionScope,但在某些場景仍然是需要的,Shashlik.EventBus對其提供了事務支持,可以通過XaTransactionContext.Current獲取當前環境的事務上下文,發佈事件如下:


  public class UserManager
  {
      public UserManager(IEventPublisher eventPublisher, DemoDbContext dbContext)
      {
          EventPublisher = eventPublisher;
          DbContext = dbContext;
      }
  
      private IEventPublisher EventPublisher { get; }
      private DemoDbContext DbContext { get; }
  
      public async Task CreateUserAsync(UserInput input)
      {
          // 開啟事務
          using var scope = new TransactionScope();
          try
          {
              // 創建用戶邏輯處理...
  
              // 發佈新用戶事件
              // 通過註入IEventPublisher發佈事件,需要傳入事務上下文數據
              await EventPublisher.PublishAsync(new NewUserEvent{
                  Id = user.Id,
                  Name = input.Name
              // 使用 XaTransactionContext.Current
              }, XaTransactionContext.Current);
  
              // 提交事務
              await scope.Complete();
          }catch(Exception ex)
          {
              // 回滾事務,消息數據也將回滾不會發佈
              await tran.RollbackAsync();
          }
      }
  }

擴展

如果預設實現不能滿足你的需求,可以自行實現可擴展介面,並註冊即可。

  • IMsgIdGenerator:消息Id生成器,是指傳輸的全局唯一id,不是指存儲的id。預設guid
  • IEventPublisher:事件發佈處理器。
  • IMessageSerializer:消息序列化、反序列化處理類。預設Newtonsoft.Json
  • IReceivedMessageRetryProvider:已接收消息重試器。
  • IPublishedMessageRetryProvider:已發佈消息重試器。
  • IEventHandlerInvoker: 事件處理執行器
  • IEventNameRuler:事件名稱規則生成(對應消息隊列topic/route)。
  • IEventHandlerNameRuler:事件處理名稱規則生成(對應消息隊列queue/group)。
  • IEventHandlerFindProvider:事件處理類查找器
  • IExpiredMessageProvider:已過期消息刪除處理器。
  • IMessageListener:消息監聽處理器。
  • IRetryProvider:重試執行器。
  • IPublishHandler:消息發佈處理器。
  • IReceivedHandler:消息接收處理器。
  • IMessageStorageInitializer:存儲介質初始化。
  • IMessageStorage:消息存儲、讀取等操作。

例:


  // 替換預設的IMsgIdGenerator
  service.AddSingleton<IMsgIdGenerator, CustomMsgIdGenerator>();
  service.AddEventBus()
      .AddMemoryQueue()
      .AddMemoryStorage();

後續計劃

  • 功能
  • 消息中間件支持
  • 存儲支持

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 前言 嗨嘍,大家好呀~這裡是愛看美女的茜茜吶 又到了學Python時刻~ 今天實現一下人臉識別。 先問大家一個問題什麼是百度Aip模塊? 百度AI平臺提供了很多的API介面供開發者快速的調用運用在項目中本文寫的是使用百度AI的線上介面SDK模塊(baidu-aip)進行實現人臉識別 除了人臉識別,其 ...
  • 本文詳細闡述了knn演算法,從開始介紹什麼事knn,到講解knn演算法的原理再到最後以實際例子來運用knn演算法的步驟,實際例子的代碼講解也十分詳細 ...
  • 為了在jupyter中使用pyTorch的虛擬環境,來記錄一下怎麼操作一、conda命令的使用因為使用的是jupyter,所有就使用Anaconda Prompt來創建虛擬環境(也可使用virtualenv,不過沒試過) conda create -n 環境名 # 創建的環境在預設路徑下,C盤位置不 ...
  • 摘要:本文主要講述如何進行圖像量化處理和採樣處理及局部馬賽克特效。 本文分享自華為雲社區《[Python圖像處理] 二十.圖像量化處理和採樣處理及局部馬賽克特效》,作者: eastmount。 本文主要講述如何進行圖像量化處理和採樣處理及局部馬賽克特效。 一.圖像量化處理 圖像通常是自然界景物的客觀 ...
  • 1.推導式套路 除了最簡單的列表推導式和生成器表達式,其實還有字典推導式、集合推導式等等。 下麵是一個以列表推導式為例的推導式詳細格式,同樣適用於其他推導式。 variable = [out_exp_res for out_exp in input_list if out_exp == 2] out ...
  • 當面試官問你,“什麼是令牌桶限流演算法”! 你知道要怎麼回答,才能獲得面試官的青睞嗎? 大家好,我是Mic,一個工作了14年的Java程式員。 關於這個問題,面試官想考察哪些緯度?我們又該怎麼回答呢? 問題解析 限流策略,是在高併發流量下保護系統穩定性的一種策略。 所以這個問題,主要是互聯網公司會去考 ...
  • 一、argparse簡介 argparse 是 python 自帶的命令行參數解析包,可以用來方便的服務命令行參數,使用之前需要先導入包 import argparse 二、簡單案例 簡單使用,創建一個名為test.py的文件 # 導入 argparse 模塊 import argparse # 創 ...
  • 在實際業務中,當後臺數據發生變化,客戶端能夠實時的收到通知,而不是由用戶主動的進行頁面刷新才能查看,這將是一個非常人性化的設計 ...
一周排行
    -Advertisement-
    Play Games
  • Timer是什麼 Timer 是一種用於創建定期粒度行為的機制。 與標準的 .NET System.Threading.Timer 類相似,Orleans 的 Timer 允許在一段時間後執行特定的操作,或者在特定的時間間隔內重覆執行操作。 它在分散式系統中具有重要作用,特別是在處理需要周期性執行的 ...
  • 前言 相信很多做WPF開發的小伙伴都遇到過表格類的需求,雖然現有的Grid控制項也能實現,但是使用起來的體驗感並不好,比如要實現一個Excel中的表格效果,估計你能想到的第一個方法就是套Border控制項,用這種方法你需要控制每個Border的邊框,並且在一堆Bordr中找到Grid.Row,Grid. ...
  • .NET C#程式啟動閃退,目錄導致的問題 這是第2次踩這個坑了,很小的編程細節,容易忽略,所以寫個博客,分享給大家。 1.第一次坑:是windows 系統把程式運行成服務,找不到配置文件,原因是以服務運行它的工作目錄是在C:\Windows\System32 2.本次坑:WPF桌面程式通過註冊表設 ...
  • 在分散式系統中,數據的持久化是至關重要的一環。 Orleans 7 引入了強大的持久化功能,使得在分散式環境下管理數據變得更加輕鬆和可靠。 本文將介紹什麼是 Orleans 7 的持久化,如何設置它以及相應的代碼示例。 什麼是 Orleans 7 的持久化? Orleans 7 的持久化是指將 Or ...
  • 前言 .NET Feature Management 是一個用於管理應用程式功能的庫,它可以幫助開發人員在應用程式中輕鬆地添加、移除和管理功能。使用 Feature Management,開發人員可以根據不同用戶、環境或其他條件來動態地控制應用程式中的功能。這使得開發人員可以更靈活地管理應用程式的功 ...
  • 在 WPF 應用程式中,拖放操作是實現用戶交互的重要組成部分。通過拖放操作,用戶可以輕鬆地將數據從一個位置移動到另一個位置,或者將控制項從一個容器移動到另一個容器。然而,WPF 中預設的拖放操作可能並不是那麼好用。為瞭解決這個問題,我們可以自定義一個 Panel 來實現更簡單的拖拽操作。 自定義 Pa ...
  • 在實際使用中,由於涉及到不同編程語言之間互相調用,導致C++ 中的OpenCV與C#中的OpenCvSharp 圖像數據在不同編程語言之間難以有效傳遞。在本文中我們將結合OpenCvSharp源碼實現原理,探究兩種數據之間的通信方式。 ...
  • 一、前言 這是一篇搭建許可權管理系統的系列文章。 隨著網路的發展,信息安全對應任何企業來說都越發的重要,而本系列文章將和大家一起一步一步搭建一個全新的許可權管理系統。 說明:由於搭建一個全新的項目過於繁瑣,所有作者將挑選核心代碼和核心思路進行分享。 二、技術選擇 三、開始設計 1、自主搭建vue前端和. ...
  • Csharper中的表達式樹 這節課來瞭解一下表示式樹是什麼? 在C#中,表達式樹是一種數據結構,它可以表示一些代碼塊,如Lambda表達式或查詢表達式。表達式樹使你能夠查看和操作數據,就像你可以查看和操作代碼一樣。它們通常用於創建動態查詢和解析表達式。 一、認識表達式樹 為什麼要這樣說?它和委托有 ...
  • 在使用Django等框架來操作MySQL時,實際上底層還是通過Python來操作的,首先需要安裝一個驅動程式,在Python3中,驅動程式有多種選擇,比如有pymysql以及mysqlclient等。使用pip命令安裝mysqlclient失敗應如何解決? 安裝的python版本說明 機器同時安裝了 ...