前言 Saga單詞翻譯過來是指尤指古代挪威或冰島講述冒險經歷和英雄業績的長篇故事,對,這裡強調長篇故事。許多系統都存在長時間運行的業務流程,NServiceBus使用基於事件驅動的體繫結構將容錯性和可伸縮性融入這些業務處理過程中。 當然一個單一介面調用則算不上一個長時間運行的業務場景,那麼如果在給定 ...
前言
Saga單詞翻譯過來是指尤指古代挪威或冰島講述冒險經歷和英雄業績的長篇故事,對,這裡強調長篇故事。許多系統都存在長時間運行的業務流程,NServiceBus使用基於事件驅動的體繫結構將容錯性和可伸縮性融入這些業務處理過程中。
當然一個單一介面調用則算不上一個長時間運行的業務場景,那麼如果在給定的用例中有兩個或多個調用,則應該考慮數據一致性的問題,這裡有可能第一個介面調用成功,第二次調用則可能失敗或者超時,Saga的設計以簡單而健壯的方式處理這樣的業務用例。
認識Saga
先來通過一段代碼簡單認識一下Saga,在NServiceBus里,使用Saga的話則需要實現抽象類Saga
public class Saga:Saga<State>,
IAmStartedByMessages<StartOrder>,
IHandleMessages<CompleteOrder>
{
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<State> mapper)
{
mapper.ConfigureMapping<StartOrder>(message=>message.OrderId).ToSaga(saga=>saga.OrderId);
mapper.ConfigureMapping<CompleteOrder>(message=>message.OrderId).ToSaga(saga=>saga.OrderId);
}
public Task Handle(StartOrder message, IMessageHandlerContext context)
{
return Task.CompletedTask;
}
public Task Handle(CompleteOrder message, IMessageHandlerContext context)
{
MarkAsComplete();
return Task.CompletedTask;
}
}
臨時狀態
長時間運行則意味著有狀態,任何涉及多個網路調用的進程都需要一個臨時狀態,這個臨時狀態可以存儲在記憶體中,序列化在磁碟中,也可以存儲在分散式緩存中。在NServiceBus中我們定義實體,繼承抽象類ContainSagaData即可,預設情況下,所有公開訪問的屬性都會被持久化。
public class State:ContainSagaData
{
public Guid OrderId { get; set; }
}
添加行為
在NServiceBus里,處理消息的有兩種介面:IHandlerMessages
開啟一個Saga
在前面的代碼片段里,我們看到已經實現了介面IAmStartedByMessages
處理無序消息
如果你的業務用例中確實存在無序消息的情況,則還需要業務流程正常輪轉,那麼則需要多個messaeg都要事先介面IAmStartedByMessages介面,也就是說多個message都可以創建Saga實例。
依賴可恢復性
在處理無序消息和多個消息類型的時候,就存在消息丟失的可能,必須在你的Saga狀態完成以後,這個Saga實例又收到一條消息,但這時Saga狀態已經是完結狀態,這條消息則仍然需要處理,這裡則實現NServiceBus的IHandleSagaNotFound介面。
public class SagaNotFoundHandler:IHandleSagaNotFound
{
public Task Handle(object message, IMessageProcessingContext context)
{
return context.Reply(new SagaNotFoundMessage());
}
}
public class SagaNotFoundMessage
{
}
結束Saga
當你的業務用例不再需要Saga實例時,則調用MarkComplete()來結束Saga實例。這個方法在前面的代碼片段中也可以看到,其實本質也就是設置Saga.Complete屬性,這是個bool值,你在業務用例中也可以用此值來判斷Saga流程是否結束。
namespace NServiceBus
{
using System;
using System.Threading.Tasks;
using Extensibility;
public abstract class Saga
{
/// <summary>
/// The saga's typed data.
/// </summary>
public IContainSagaData Entity { get; set; }
public bool Completed { get; private set; }
internal protected abstract void ConfigureHowToFindSaga(IConfigureHowToFindSagaWithMessage sagaMessageFindingConfiguration);
protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, DateTime at) where TTimeoutMessageType : new()
{
return RequestTimeout(context, at, new TTimeoutMessageType());
}
protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, DateTime at, TTimeoutMessageType timeoutMessage)
{
if (at.Kind == DateTimeKind.Unspecified)
{
throw new InvalidOperationException("Kind property of DateTime 'at' must be specified.");
}
VerifySagaCanHandleTimeout(timeoutMessage);
var options = new SendOptions();
options.DoNotDeliverBefore(at);
options.RouteToThisEndpoint();
SetTimeoutHeaders(options);
return context.Send(timeoutMessage, options);
}
protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, TimeSpan within) where TTimeoutMessageType : new()
{
return RequestTimeout(context, within, new TTimeoutMessageType());
}
protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, TimeSpan within, TTimeoutMessageType timeoutMessage)
{
VerifySagaCanHandleTimeout(timeoutMessage);
var sendOptions = new SendOptions();
sendOptions.DelayDeliveryWith(within);
sendOptions.RouteToThisEndpoint();
SetTimeoutHeaders(sendOptions);
return context.Send(timeoutMessage, sendOptions);
}
protected Task ReplyToOriginator(IMessageHandlerContext context, object message)
{
if (string.IsNullOrEmpty(Entity.Originator))
{
throw new Exception("Entity.Originator cannot be null. Perhaps the sender is a SendOnly endpoint.");
}
var options = new ReplyOptions();
options.SetDestination(Entity.Originator);
context.Extensions.Set(new AttachCorrelationIdBehavior.State { CustomCorrelationId = Entity.OriginalMessageId });
options.Context.Set(new PopulateAutoCorrelationHeadersForRepliesBehavior.State
{
SagaTypeToUse = null,
SagaIdToUse = null
});
return context.Reply(message, options);
}
//這個方法結束saga流程,標記Completed屬性
protected void MarkAsComplete()
{
Completed = true;
}
void VerifySagaCanHandleTimeout<TTimeoutMessageType>(TTimeoutMessageType timeoutMessage)
{
var canHandleTimeoutMessage = this is IHandleTimeouts<TTimeoutMessageType>;
if (!canHandleTimeoutMessage)
{
var message = $"The type '{GetType().Name}' cannot request timeouts for '{timeoutMessage}' because it does not implement 'IHandleTimeouts<{typeof(TTimeoutMessageType).FullName}>'";
throw new Exception(message);
}
}
void SetTimeoutHeaders(ExtendableOptions options)
{
options.SetHeader(Headers.SagaId, Entity.Id.ToString());
options.SetHeader(Headers.IsSagaTimeoutMessage, bool.TrueString);
options.SetHeader(Headers.SagaType, GetType().AssemblyQualifiedName);
}
}
}
Saga持久化
本機開發環境我們使用LearningPersistence,但是投產的話則需要使用資料庫持久化,這裡我們基於MySQL,SQL持久化需要引入NServiceBus.Persistence.Sql。SQL Persistence會生成幾種關係型資料庫的sql scripts,然後會根據你的斷言配置選擇所需資料庫,比如SQL Server、MySQL、PostgreSQL、Oracle。
持久化Saga自動創建所需表結構,你只需手動配置即可,配置後編譯成功後項目執行目錄下會生成sql腳本,文件夾名稱是NServiceBus.Persistence.Sql,下麵會有Saga子目錄。
/* TableNameVariable */
set @tableNameQuoted = concat('`', @tablePrefix, 'Saga`');
set @tableNameNonQuoted = concat(@tablePrefix, 'Saga');
/* Initialize */
drop procedure if exists sqlpersistence_raiseerror;
create procedure sqlpersistence_raiseerror(message varchar(256))
begin
signal sqlstate
'ERROR'
set
message_text = message,
mysql_errno = '45000';
end;
/* CreateTable */
set @createTable = concat('
create table if not exists ', @tableNameQuoted, '(
Id varchar(38) not null,
Metadata json not null,
Data json not null,
PersistenceVersion varchar(23) not null,
SagaTypeVersion varchar(23) not null,
Concurrency int not null,
primary key (Id)
) default charset=ascii;
');
prepare script from @createTable;
execute script;
deallocate prepare script;
/* AddProperty OrderId */
select count(*)
into @exist
from information_schema.columns
where table_schema = database() and
column_name = 'Correlation_OrderId' and
table_name = @tableNameNonQuoted;
set @query = IF(
@exist <= 0,
concat('alter table ', @tableNameQuoted, ' add column Correlation_OrderId varchar(38) character set ascii'), 'select \'Column Exists\' status');
prepare script from @query;
execute script;
deallocate prepare script;
/* VerifyColumnType Guid */
set @column_type_OrderId = (
select concat(column_type,' character set ', character_set_name)
from information_schema.columns
where
table_schema = database() and
table_name = @tableNameNonQuoted and
column_name = 'Correlation_OrderId'
);
set @query = IF(
@column_type_OrderId <> 'varchar(38) character set ascii',
'call sqlpersistence_raiseerror(concat(\'Incorrect data type for Correlation_OrderId. Expected varchar(38) character set ascii got \', @column_type_OrderId, \'.\'));',
'select \'Column Type OK\' status');
prepare script from @query;
execute script;
deallocate prepare script;
/* WriteCreateIndex OrderId */
select count(*)
into @exist
from information_schema.statistics
where
table_schema = database() and
index_name = 'Index_Correlation_OrderId' and
table_name = @tableNameNonQuoted;
set @query = IF(
@exist <= 0,
concat('create unique index Index_Correlation_OrderId on ', @tableNameQuoted, '(Correlation_OrderId)'), 'select \'Index Exists\' status');
prepare script from @query;
execute script;
deallocate prepare script;
/* PurgeObsoleteIndex */
select concat('drop index ', index_name, ' on ', @tableNameQuoted, ';')
from information_schema.statistics
where
table_schema = database() and
table_name = @tableNameNonQuoted and
index_name like 'Index_Correlation_%' and
index_name <> 'Index_Correlation_OrderId' and
table_schema = database()
into @dropIndexQuery;
select if (
@dropIndexQuery is not null,
@dropIndexQuery,
'select ''no index to delete'';')
into @dropIndexQuery;
prepare script from @dropIndexQuery;
execute script;
deallocate prepare script;
/* PurgeObsoleteProperties */
select concat('alter table ', table_name, ' drop column ', column_name, ';')
from information_schema.columns
where
table_schema = database() and
table_name = @tableNameNonQuoted and
column_name like 'Correlation_%' and
column_name <> 'Correlation_OrderId'
into @dropPropertiesQuery;
select if (
@dropPropertiesQuery is not null,
@dropPropertiesQuery,
'select ''no property to delete'';')
into @dropPropertiesQuery;
prepare script from @dropPropertiesQuery;
execute script;
deallocate prepare script;
/* CompleteSagaScript */
生成的表結構:
持久化配置
Saga持久化需要依賴NServiceBus.Persistence.Sql。引入後需要實現SqlSaga抽象類,抽象類需要重寫ConfigureMapping,配置Saga工作流程業務主鍵。
public class Saga:SqlSaga<State>,
IAmStartedByMessages<StartOrder>
{
protected override void ConfigureMapping(IMessagePropertyMapper mapper)
{
mapper.ConfigureMapping<StartOrder>(message=>message.OrderId);
}
protected override string CorrelationPropertyName => nameof(StartOrder.OrderId);
public Task Handle(StartOrder message, IMessageHandlerContext context)
{
Console.WriteLine($"Receive message with OrderId:{message.OrderId}");
MarkAsComplete();
return Task.CompletedTask;
}
}
static async Task MainAsync()
{
Console.Title = "Client-UI";
var configuration = new EndpointConfiguration("Client-UI");
//這個方法開啟自動建表、自動創建RabbitMQ隊列
configuration.EnableInstallers();
configuration.UseSerialization<NewtonsoftSerializer>();
configuration.UseTransport<LearningTransport>();
string connectionString = "server=127.0.0.1;uid=root;pwd=000000;database=nservicebus;port=3306;AllowUserVariables=True;AutoEnlist=false";
var persistence = configuration.UsePersistence<SqlPersistence>();
persistence.SqlDialect<SqlDialect.MySql>();
//配置mysql連接串
persistence.ConnectionBuilder(()=>new MySqlConnection(connectionString));
var instance = await Endpoint.Start(configuration).ConfigureAwait(false);
var command = new StartOrder()
{
OrderId = Guid.NewGuid()
};
await instance.SendLocal(command).ConfigureAwait(false);
Console.ReadKey();
await instance.Stop().ConfigureAwait(false);
}
Saga Timeouts
在消息驅動類型的環境中,雖然傳遞的無連接特性可以防止線上等待過程中消耗資源,但是畢竟等待時間需要有一個上線。在NServiceBus里已經提供了Timeout方法,我們只需訂閱即可,可以在你的Handle方法中根據需要訂閱Timeout,可參考如下代碼:
public class Saga:Saga<State>,
IAmStartedByMessages<StartOrder>,
IHandleMessages<CompleteOrder>,
IHandleTimeouts<TimeOutMessage>
{
public Task Handle(StartOrder message, IMessageHandlerContext context)
{
var model=new TimeOutMessage();
//訂閱超時消息
return RequestTimeout(context,TimeSpan.FromMinutes(10));
}
public Task Handle(CompleteOrder message, IMessageHandlerContext context)
{
MarkAsComplete();
return Task.CompletedTask;
}
protected override string CorrelationPropertyName => nameof(StartOrder.OrderId);
public Task Timeout(TimeOutMessage state, IMessageHandlerContext context)
{
//處理超時消息
}
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<State> mapper)
{
mapper.ConfigureMapping<StartOrder>(message=>message.OrderId).ToSaga(saga=>saga.OrderId);
mapper.ConfigureMapping<CompleteOrder>(message=>message.OrderId).ToSaga(saga=>saga.OrderId);
}
}
//從Timeout的源碼看,這個方法是通過設置SendOptions,然後再把當前這個消息發送給自己來實現
protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, TimeSpan within, TTimeoutMessageType timeoutMessage)
{
VerifySagaCanHandleTimeout(timeoutMessage);
var sendOptions = new SendOptions();
sendOptions.DelayDeliveryWith(within);
sendOptions.RouteToThisEndpoint();
SetTimeoutHeaders(sendOptions);
return context.Send(timeoutMessage, sendOptions);
}
總結
NServiceBus因為是商業產品,對分散式消息系統所涉及到的東西都做了實現,包括分散式事務(Outbox)、DTC都有,還有心跳檢測,監控都有,全而大,目前我們用到的也只是NServiceBus里很小的一部分功能。