【NServiceBus】什麼是Saga,Saga能做什麼

来源:https://www.cnblogs.com/sword-successful/archive/2019/11/25/11925790.html
-Advertisement-
Play Games

前言 Saga單詞翻譯過來是指尤指古代挪威或冰島講述冒險經歷和英雄業績的長篇故事,對,這裡強調長篇故事。許多系統都存在長時間運行的業務流程,NServiceBus使用基於事件驅動的體繫結構將容錯性和可伸縮性融入這些業務處理過程中。 當然一個單一介面調用則算不上一個長時間運行的業務場景,那麼如果在給定 ...


前言

          Saga單詞翻譯過來是指尤指古代挪威或冰島講述冒險經歷和英雄業績的長篇故事,對,這裡強調長篇故事。許多系統都存在長時間運行的業務流程,NServiceBus使用基於事件驅動的體繫結構將容錯性和可伸縮性融入這些業務處理過程中。
          當然一個單一介面調用則算不上一個長時間運行的業務場景,那麼如果在給定的用例中有兩個或多個調用,則應該考慮數據一致性的問題,這裡有可能第一個介面調用成功,第二次調用則可能失敗或者超時,Saga的設計以簡單而健壯的方式處理這樣的業務用例。

認識Saga

         先來通過一段代碼簡單認識一下Saga,在NServiceBus里,使用Saga的話則需要實現抽象類Saga,SqlSaga,這裡的T的是Saga業務實體,封裝數據,用來在長時間運行過程中封裝業務數據。

public class Saga:Saga<State>,
        IAmStartedByMessages<StartOrder>,
        IHandleMessages<CompleteOrder>
    {
        protected override void ConfigureHowToFindSaga(SagaPropertyMapper<State> mapper)
        {
            mapper.ConfigureMapping<StartOrder>(message=>message.OrderId).ToSaga(saga=>saga.OrderId);
            mapper.ConfigureMapping<CompleteOrder>(message=>message.OrderId).ToSaga(saga=>saga.OrderId);
        }

        public Task Handle(StartOrder message, IMessageHandlerContext context)
        {
            return Task.CompletedTask;
        }

        public Task Handle(CompleteOrder message, IMessageHandlerContext context)
        {
            MarkAsComplete();
            return Task.CompletedTask;
        }
    }

臨時狀態

     長時間運行則意味著有狀態,任何涉及多個網路調用的進程都需要一個臨時狀態,這個臨時狀態可以存儲在記憶體中,序列化在磁碟中,也可以存儲在分散式緩存中。在NServiceBus中我們定義實體,繼承抽象類ContainSagaData即可,預設情況下,所有公開訪問的屬性都會被持久化。

public class State:ContainSagaData
{
    public Guid OrderId { get; set; }
}

添加行為

      在NServiceBus里,處理消息的有兩種介面:IHandlerMessages、IAmStartedByMessages

開啟一個Saga

       在前面的代碼片段里,我們看到已經實現了介面IAmStartedByMessages,這個介面告訴NServiceBus,如果收到了StartOrder 消息,則創建一個Saga實例(Saga Instance),當然Saga長流程處理的實體至少有一個需要開啟Saga流程。

處理無序消息

       如果你的業務用例中確實存在無序消息的情況,則還需要業務流程正常輪轉,那麼則需要多個messaeg都要事先介面IAmStartedByMessages介面,也就是說多個message都可以創建Saga實例。

依賴可恢復性

      在處理無序消息和多個消息類型的時候,就存在消息丟失的可能,必須在你的Saga狀態完成以後,這個Saga實例又收到一條消息,但這時Saga狀態已經是完結狀態,這條消息則仍然需要處理,這裡則實現NServiceBus的IHandleSagaNotFound介面。

 public class SagaNotFoundHandler:IHandleSagaNotFound
 {
    public Task Handle(object message, IMessageProcessingContext context)
    {
        return context.Reply(new SagaNotFoundMessage());
    }
 }
  
 public class SagaNotFoundMessage
 {
        
 }

結束Saga

      當你的業務用例不再需要Saga實例時,則調用MarkComplete()來結束Saga實例。這個方法在前面的代碼片段中也可以看到,其實本質也就是設置Saga.Complete屬性,這是個bool值,你在業務用例中也可以用此值來判斷Saga流程是否結束。

namespace NServiceBus
{
    using System;
    using System.Threading.Tasks;
    using Extensibility;

    public abstract class Saga
    {
        /// <summary>
        /// The saga's typed data.
        /// </summary>
        public IContainSagaData Entity { get; set; }

        
        public bool Completed { get; private set; }

        internal protected abstract void ConfigureHowToFindSaga(IConfigureHowToFindSagaWithMessage sagaMessageFindingConfiguration);

       
        protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, DateTime at) where TTimeoutMessageType : new()
        {
            return RequestTimeout(context, at, new TTimeoutMessageType());
        }

        
        protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, DateTime at, TTimeoutMessageType timeoutMessage)
        {
            if (at.Kind == DateTimeKind.Unspecified)
            {
                throw new InvalidOperationException("Kind property of DateTime 'at' must be specified.");
            }

            VerifySagaCanHandleTimeout(timeoutMessage);

            var options = new SendOptions();

            options.DoNotDeliverBefore(at);
            options.RouteToThisEndpoint();

            SetTimeoutHeaders(options);

            return context.Send(timeoutMessage, options);
        }

        
        protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, TimeSpan within) where TTimeoutMessageType : new()
        {
            return RequestTimeout(context, within, new TTimeoutMessageType());
        }

        
        protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, TimeSpan within, TTimeoutMessageType timeoutMessage)
        {
            VerifySagaCanHandleTimeout(timeoutMessage);

            var sendOptions = new SendOptions();

            sendOptions.DelayDeliveryWith(within);
            sendOptions.RouteToThisEndpoint();

            SetTimeoutHeaders(sendOptions);

            return context.Send(timeoutMessage, sendOptions);
        }

        
        protected Task ReplyToOriginator(IMessageHandlerContext context, object message)
        {
            if (string.IsNullOrEmpty(Entity.Originator))
            {
                throw new Exception("Entity.Originator cannot be null. Perhaps the sender is a SendOnly endpoint.");
            }

            var options = new ReplyOptions();

            options.SetDestination(Entity.Originator);
            context.Extensions.Set(new AttachCorrelationIdBehavior.State { CustomCorrelationId = Entity.OriginalMessageId });

            
            options.Context.Set(new PopulateAutoCorrelationHeadersForRepliesBehavior.State
            {
                SagaTypeToUse = null,
                SagaIdToUse = null
            });

            return context.Reply(message, options);
        }

        //這個方法結束saga流程,標記Completed屬性
        protected void MarkAsComplete()
        {
            Completed = true;
        }

        void VerifySagaCanHandleTimeout<TTimeoutMessageType>(TTimeoutMessageType timeoutMessage)
        {
            var canHandleTimeoutMessage = this is IHandleTimeouts<TTimeoutMessageType>;
            if (!canHandleTimeoutMessage)
            {
                var message = $"The type '{GetType().Name}' cannot request timeouts for '{timeoutMessage}' because it does not implement 'IHandleTimeouts<{typeof(TTimeoutMessageType).FullName}>'";
                throw new Exception(message);
            }
        }

        void SetTimeoutHeaders(ExtendableOptions options)
        {
            options.SetHeader(Headers.SagaId, Entity.Id.ToString());
            options.SetHeader(Headers.IsSagaTimeoutMessage, bool.TrueString);
            options.SetHeader(Headers.SagaType, GetType().AssemblyQualifiedName);
        }
    }
}

    

Saga持久化

      本機開發環境我們使用LearningPersistence,但是投產的話則需要使用資料庫持久化,這裡我們基於MySQL,SQL持久化需要引入NServiceBus.Persistence.Sql。SQL Persistence會生成幾種關係型資料庫的sql scripts,然後會根據你的斷言配置選擇所需資料庫,比如SQL Server、MySQL、PostgreSQL、Oracle。
     持久化Saga自動創建所需表結構,你只需手動配置即可,配置後編譯成功後項目執行目錄下會生成sql腳本,文件夾名稱是NServiceBus.Persistence.Sql,下麵會有Saga子目錄。


/* TableNameVariable */

set @tableNameQuoted = concat('`', @tablePrefix, 'Saga`');
set @tableNameNonQuoted = concat(@tablePrefix, 'Saga');


/* Initialize */

drop procedure if exists sqlpersistence_raiseerror;
create procedure sqlpersistence_raiseerror(message varchar(256))
begin
signal sqlstate
    'ERROR'
set
    message_text = message,
    mysql_errno = '45000';
end;

/* CreateTable */

set @createTable = concat('
    create table if not exists ', @tableNameQuoted, '(
        Id varchar(38) not null,
        Metadata json not null,
        Data json not null,
        PersistenceVersion varchar(23) not null,
        SagaTypeVersion varchar(23) not null,
        Concurrency int not null,
        primary key (Id)
    ) default charset=ascii;
');
prepare script from @createTable;
execute script;
deallocate prepare script;

/* AddProperty OrderId */

select count(*)
into @exist
from information_schema.columns
where table_schema = database() and
      column_name = 'Correlation_OrderId' and
      table_name = @tableNameNonQuoted;

set @query = IF(
    @exist <= 0,
    concat('alter table ', @tableNameQuoted, ' add column Correlation_OrderId varchar(38) character set ascii'), 'select \'Column Exists\' status');

prepare script from @query;
execute script;
deallocate prepare script;

/* VerifyColumnType Guid */

set @column_type_OrderId = (
  select concat(column_type,' character set ', character_set_name)
  from information_schema.columns
  where
    table_schema = database() and
    table_name = @tableNameNonQuoted and
    column_name = 'Correlation_OrderId'
);

set @query = IF(
    @column_type_OrderId <> 'varchar(38) character set ascii',
    'call sqlpersistence_raiseerror(concat(\'Incorrect data type for Correlation_OrderId. Expected varchar(38) character set ascii got \', @column_type_OrderId, \'.\'));',
    'select \'Column Type OK\' status');

prepare script from @query;
execute script;
deallocate prepare script;

/* WriteCreateIndex OrderId */

select count(*)
into @exist
from information_schema.statistics
where
    table_schema = database() and
    index_name = 'Index_Correlation_OrderId' and
    table_name = @tableNameNonQuoted;

set @query = IF(
    @exist <= 0,
    concat('create unique index Index_Correlation_OrderId on ', @tableNameQuoted, '(Correlation_OrderId)'), 'select \'Index Exists\' status');

prepare script from @query;
execute script;
deallocate prepare script;

/* PurgeObsoleteIndex */

select concat('drop index ', index_name, ' on ', @tableNameQuoted, ';')
from information_schema.statistics
where
    table_schema = database() and
    table_name = @tableNameNonQuoted and
    index_name like 'Index_Correlation_%' and
    index_name <> 'Index_Correlation_OrderId' and
    table_schema = database()
into @dropIndexQuery;
select if (
    @dropIndexQuery is not null,
    @dropIndexQuery,
    'select ''no index to delete'';')
    into @dropIndexQuery;

prepare script from @dropIndexQuery;
execute script;
deallocate prepare script;

/* PurgeObsoleteProperties */

select concat('alter table ', table_name, ' drop column ', column_name, ';')
from information_schema.columns
where
    table_schema = database() and
    table_name = @tableNameNonQuoted and
    column_name like 'Correlation_%' and
    column_name <> 'Correlation_OrderId'
into @dropPropertiesQuery;

select if (
    @dropPropertiesQuery is not null,
    @dropPropertiesQuery,
    'select ''no property to delete'';')
    into @dropPropertiesQuery;

prepare script from @dropPropertiesQuery;
execute script;
deallocate prepare script;

/* CompleteSagaScript */

生成的表結構:
QQ截圖20191125085237.png

持久化配置

      Saga持久化需要依賴NServiceBus.Persistence.Sql。引入後需要實現SqlSaga抽象類,抽象類需要重寫ConfigureMapping,配置Saga工作流程業務主鍵。

public class Saga:SqlSaga<State>,
        IAmStartedByMessages<StartOrder>
{
   protected override void ConfigureMapping(IMessagePropertyMapper mapper)
   {
      mapper.ConfigureMapping<StartOrder>(message=>message.OrderId);
   }

   protected override string CorrelationPropertyName => nameof(StartOrder.OrderId);

   public Task Handle(StartOrder message, IMessageHandlerContext context)
   {
       Console.WriteLine($"Receive message with OrderId:{message.OrderId}");

       MarkAsComplete();
       return Task.CompletedTask;
    }
 }
    
 static async Task MainAsync()
 {
     Console.Title = "Client-UI";

     var configuration = new EndpointConfiguration("Client-UI");
     //這個方法開啟自動建表、自動創建RabbitMQ隊列
     configuration.EnableInstallers(); 
     configuration.UseSerialization<NewtonsoftSerializer>();
     configuration.UseTransport<LearningTransport>();

     string connectionString = "server=127.0.0.1;uid=root;pwd=000000;database=nservicebus;port=3306;AllowUserVariables=True;AutoEnlist=false";
     var persistence = configuration.UsePersistence<SqlPersistence>();
     persistence.SqlDialect<SqlDialect.MySql>();
     //配置mysql連接串
     persistence.ConnectionBuilder(()=>new MySqlConnection(connectionString));

     var instance = await Endpoint.Start(configuration).ConfigureAwait(false);

     var command = new StartOrder()
     {
         OrderId = Guid.NewGuid()
     };

     await instance.SendLocal(command).ConfigureAwait(false);

     Console.ReadKey();

     await instance.Stop().ConfigureAwait(false);
 }

     

Saga Timeouts

     在消息驅動類型的環境中,雖然傳遞的無連接特性可以防止線上等待過程中消耗資源,但是畢竟等待時間需要有一個上線。在NServiceBus里已經提供了Timeout方法,我們只需訂閱即可,可以在你的Handle方法中根據需要訂閱Timeout,可參考如下代碼:

public class Saga:Saga<State>,
        IAmStartedByMessages<StartOrder>,
        IHandleMessages<CompleteOrder>,
        IHandleTimeouts<TimeOutMessage>
    {
        
        public Task Handle(StartOrder message, IMessageHandlerContext context)
        {
            var model=new TimeOutMessage();
            
            //訂閱超時消息
            return RequestTimeout(context,TimeSpan.FromMinutes(10));
        }

        public Task Handle(CompleteOrder message, IMessageHandlerContext context)
        {
            MarkAsComplete();
            return Task.CompletedTask;
        }

        protected override string CorrelationPropertyName => nameof(StartOrder.OrderId);


        public Task Timeout(TimeOutMessage state, IMessageHandlerContext context)
        {
            //處理超時消息
        }

        protected override void ConfigureHowToFindSaga(SagaPropertyMapper<State> mapper)
        {
            mapper.ConfigureMapping<StartOrder>(message=>message.OrderId).ToSaga(saga=>saga.OrderId);
            mapper.ConfigureMapping<CompleteOrder>(message=>message.OrderId).ToSaga(saga=>saga.OrderId);
        }
    }
//從Timeout的源碼看,這個方法是通過設置SendOptions,然後再把當前這個消息發送給自己來實現 
protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, TimeSpan within, TTimeoutMessageType timeoutMessage)
 {
     VerifySagaCanHandleTimeout(timeoutMessage);
     var sendOptions = new SendOptions();
     sendOptions.DelayDeliveryWith(within);
     sendOptions.RouteToThisEndpoint();
     SetTimeoutHeaders(sendOptions);

     return context.Send(timeoutMessage, sendOptions);
 }

總結

       NServiceBus因為是商業產品,對分散式消息系統所涉及到的東西都做了實現,包括分散式事務(Outbox)、DTC都有,還有心跳檢測,監控都有,全而大,目前我們用到的也只是NServiceBus里很小的一部分功能。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 場景 指定一個路徑,根據這個父級路徑獲取此目錄下所有目錄的名稱、全路徑、創建日期等信息。 註: 博客主頁: https://blog.csdn.net/badao_liumang_qizhi 關註公眾號 霸道的程式猿 獲取編程相關電子書、教程推送與免費下載。 實現 System.IO.Directo ...
  • 場景 指定一個文件路徑,獲取當前路徑下所有文件,並篩選出以指定內容開頭和結尾的文件。 註: 博客主頁: https://blog.csdn.net/badao_liumang_qizhi 關註公眾號 霸道的程式猿 獲取編程相關電子書、教程推送與免費下載。 實現 首先指定首碼和尾碼名變數。 strin ...
  • using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Net.Sockets; using Sy... ...
  • 本文介紹了C#中時間和時間戳的轉換方法,以及DateTimeOffset的簡單使用。 ...
  • PDF是當今最流行的文檔格式之一,各種應用程式將其用作最終輸出。由於支持多種數據類型和可移植性,因此它是創建和共用內容的首選格式。作為對開發文檔管理應用程式感興趣的.NET應用程式開發人員,可能希望嵌入處理功能,以讀取PDF文檔並將其轉換為其他文件格式,例如HTML。 Aspose.PDF for ...
  • Aspose.Cells for .NET是Excel電子錶格編程API,可加快電子錶格管理和處理任務,同時支持構建具有生成,修改,轉換,呈現和列印電子錶格功能的跨平臺應用程式。 將Excel電子錶格轉換為圖像格式始終是熱門話題。有時,您聲稱此過程花費的時間太長。其他人則抱怨該過程卡在了較大的文件上 ...
  • object m = Type.Missing; const int MENU_ITEM_TYPE = 1; const int NEW_MENU = 18; CommandBarControl oNewMenu = ExcelGlobals.Application.CommandBars["Wor ...
  • 1. 前言 之前用PointLight做了一個番茄鐘,效果還不錯,具體可見這篇文章: "[UWP]使用PointLight並實現動畫效果" 後來試玩了Win2D,這次就用Win2D實現文字的鏤空效果,配合PointLight做一個內斂不張揚的番茄鐘。 實現鏤空文字的核心思想是使用CanvasGeom ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...