0.簡介 在 Abp 框架內部實現了工作單元,在這裡講解一下,什麼是工作單元? Unit Of Work(工作單元)模式用來維護一個由已經被業務事物修改(增加、刪除或更新)的業務對象組成的列表。Unit Of Work模式負責協調這些修改的持久化工作以及所有標記的併發問題。在數據訪問層中採用Unit ...
0.簡介
在 Abp 框架內部實現了工作單元,在這裡講解一下,什麼是工作單元?
Unit Of Work(工作單元)模式用來維護一個由已經被業務事物修改(增加、刪除或更新)的業務對象組成的列表。Unit Of Work模式負責協調這些修改的持久化工作以及所有標記的併發問題。在數據訪問層中採用Unit Of Work模式帶來的好處是能夠確保數據完整性。如果在持久化一系列業務對象(他們屬於同一個事物)的過程中出現問題,那麼應該將所有的修改回滾,以確保數據始終處於有效狀態。
而在 Abp 的內部則是結合 Castle 的 Dynamic Proxy 攔截 UnitOfwork Attribute 來進行動態代理註入,實現了當執行標註了 [UnitOfwork]
方法時能夠通過 UnitOfworkManager
來進行事務控制。
其大概流程如下:
1 啟動流程
首先我們來看一下 Abp 內部是什麼時候註入 UOW 相關的代碼的,翻閱源碼,在 AbpBootstrapper
內部我們就可以看到 Abp 作者為 UOW 寫了一個攔截器,並且在 Abp 框架初始化的時候就通過 AddInterceptorRegistrars()
方法來監聽 IocManager
的組件註冊事件,當觸發事件的時候就來判斷是否滿足條件,如果滿足則將攔截器與該類型進行一個綁定。
public class AbpBootstrapper : IDisposable
{
private AbpBootstrapper([NotNull] Type startupModule, [CanBeNull] Action<AbpBootstrapperOptions> optionsAction = null)
{
// 其他代碼
if (!options.DisableAllInterceptors)
{
// 添加攔截器
AddInterceptorRegistrars();
}
}
private void AddInterceptorRegistrars()
{
ValidationInterceptorRegistrar.Initialize(IocManager);
AuditingInterceptorRegistrar.Initialize(IocManager);
EntityHistoryInterceptorRegistrar.Initialize(IocManager);
UnitOfWorkRegistrar.Initialize(IocManager);
AuthorizationInterceptorRegistrar.Initialize(IocManager);
}
}
internal static class UnitOfWorkRegistrar
{
public static void Initialize(IIocManager iocManager)
{
// 監聽組件註冊事件
iocManager.IocContainer.Kernel.ComponentRegistered += (key, handler) =>
{
var implementationType = handler.ComponentModel.Implementation.GetTypeInfo();
// 按 UOW 特性註冊
HandleTypesWithUnitOfWorkAttribute(implementationType, handler);
// 按規約註冊
HandleConventionalUnitOfWorkTypes(iocManager, implementationType, handler);
};
}
private static void HandleTypesWithUnitOfWorkAttribute(TypeInfo implementationType, IHandler handler)
{
if (IsUnitOfWorkType(implementationType) || AnyMethodHasUnitOfWork(implementationType))
{
handler.ComponentModel.Interceptors.Add(new InterceptorReference(typeof(UnitOfWorkInterceptor)));
}
}
private static void HandleConventionalUnitOfWorkTypes(IIocManager iocManager, TypeInfo implementationType, IHandler handler)
{
if (!iocManager.IsRegistered<IUnitOfWorkDefaultOptions>())
{
return;
}
var uowOptions = iocManager.Resolve<IUnitOfWorkDefaultOptions>();
if (uowOptions.IsConventionalUowClass(implementationType.AsType()))
{
handler.ComponentModel.Interceptors.Add(new InterceptorReference(typeof(UnitOfWorkInterceptor)));
}
}
private static bool IsUnitOfWorkType(TypeInfo implementationType)
{
return UnitOfWorkHelper.HasUnitOfWorkAttribute(implementationType);
}
private static bool AnyMethodHasUnitOfWork(TypeInfo implementationType)
{
return implementationType
.GetMethods(BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic)
.Any(UnitOfWorkHelper.HasUnitOfWorkAttribute);
}
}
可以看到在這個 Registrar 裡面他擁有兩種註冊方式,第一種很簡單,就是判斷註冊的組件類型是否擁有 UOW 標簽,第二種則是通過規約來註入攔截器。
Abp 預設針對倉儲與應用服務會自動將攔截器掛載到這兩個類型以及他的所有子類的。這裡的 UnitOfWorkDefaultOptionsExtensions.IsConventionalUowClass()
方法就是用來判斷傳入的 Type 是否屬於規約的 Type。
public static bool IsConventionalUowClass(this IUnitOfWorkDefaultOptions unitOfWorkDefaultOptions, Type type)
{
return unitOfWorkDefaultOptions.ConventionalUowSelectors.Any(selector => selector(type));
}
又牽扯到了一個 IUnitOfWorkDefaultOptions
,看一下他的預設實現 UnitOfWorkDefaultOptions
就會發現這樣的代碼:
public UnitOfWorkDefaultOptions()
{
_filters = new List<DataFilterConfiguration>();
IsTransactional = true;
Scope = TransactionScopeOption.Required;
IsTransactionScopeAvailable = true;
// 預設類型
ConventionalUowSelectors = new List<Func<Type, bool>>
{
type => typeof(IRepository).IsAssignableFrom(type) ||
typeof(IApplicationService).IsAssignableFrom(type)
};
}
2. 實現原理
2.1 工作單元攔截器
在上一步我們通過兩種註入方式將攔截器註入到需要應用工作單元特性的類型裡面,那麼我們程式在執行的時候就會使用 Dyncmic Proxy 來攔截包裹這些方法。
下麵我們就來看一下剛剛註入的攔截器:
internal class UnitOfWorkInterceptor : IInterceptor
{
// ...
// 其他代碼
public void Intercept(IInvocation invocation)
{
MethodInfo method;
try
{
method = invocation.MethodInvocationTarget;
}
catch
{
method = invocation.GetConcreteMethod();
}
// 判斷當前進入的方法是否帶有 UnitOfWork 特性
var unitOfWorkAttr = _unitOfWorkOptions.GetUnitOfWorkAttributeOrNull(method);
if (unitOfWorkAttr == null || unitOfWorkAttr.IsDisabled)
{
// 沒有則直接執行該方法
invocation.Proceed();
return;
}
PerformUow(invocation, unitOfWorkAttr.CreateOptions());
}
// ...
// 其他代碼
}
攔截器內部方法很簡單,如果是 UOW 方法則執行 PerformUow()
即可,在該方法內部則對方法類型進行了不同的判斷,同步與非同步的處理方法是不一樣的。
// ...
// 其他代碼
private void PerformUow(IInvocation invocation, UnitOfWorkOptions options)
{
// 判斷方法是同步還是非同步方法,不同則執行不同的處理操作
if (invocation.Method.IsAsync())
{
PerformAsyncUow(invocation, options);
}
else
{
PerformSyncUow(invocation, options);
}
}
// ...
// 其他代碼
那麼我們就先來看一下同步方法:
// ...
// 其他代碼
private void PerformSyncUow(IInvocation invocation, UnitOfWorkOptions options)
{
using (var uow = _unitOfWorkManager.Begin(options))
{
// 繼續執行
invocation.Proceed();
uow.Complete();
}
}
// ...
// 其他代碼
同步方法針對 UOW 的操作十分簡單,直接使用 UnitOfWorkManager.Begin()
方法開啟一個事務,然後在內部執行原有方法的代碼,執行完成之後調用 Complete()
完成此次調用。
假如我擁有兩個應用服務類,他們都擁有 UnitOfWork
特性,然後我再一個 A 方法調用他們兩個 B 類的 Run()
方法,而 B類的內部也調用了C 的 Run()
方法,大體如下:
public class A
{
private readonly B B;
public A(B b)
{
B = b;
}
public void TestMethod()
{
B.Run();
}
}
internal class B
{
private readonly C C;
public B(C c)
{
C = c;
}
[UnitOfWork]
public void Run()
{
// 資料庫操作
C.Run();
Console.WriteLine("B 的 Run 方法被調用.");
}
}
internal class C
{
[UnitOfWork]
public void Run()
{
Console.WriteLine("C 的 Run 方法被調用.");
}
}
然後在攔截器內部的執行過程就類似於下麵這種:
internal class UnitOfWorkInterceptor
{
public void TestMethod()
{
using (var uow = _unitOfWorkManager.Begin(options))
{
using(var uow2 = _unitOfWorkManager.Begin(options))
{
// C 方法的代碼
Console.WriteLine("C 的 Run 方法被調用.");
uow2.Complete();
}
// B 方法的代碼
Console.WriteLine("B 的 Run 方法被調用.");
uow.Complete();
}
}
}
兩個工作單元之間的調用會被嵌套在一個 using 語句塊之中,一旦任何代碼拋出了異常,都會導致最外層的 uow.Complete()
不會被執行,而 Complete()
方法沒有執行,則會導致 uow 對象被釋放的時候,uow.Dispose()
內部檢測到 Complete()
沒有被調用,Abp 框架也會自己拋出異常。
所以 Abp 巧妙結合 Castle Dynamic 實現了 UOW 模式。
下麵我們繼續看一下他是如何處理非同步 UOW 方法的。
private void PerformAsyncUow(IInvocation invocation, UnitOfWorkOptions options)
{
var uow = _unitOfWorkManager.Begin(options);
try
{
invocation.Proceed();
}
catch
{
uow.Dispose();
throw;
}
// 如果是無返回值的非同步方法
if (invocation.Method.ReturnType == typeof(Task))
{
invocation.ReturnValue = InternalAsyncHelper.AwaitTaskWithPostActionAndFinally(
(Task) invocation.ReturnValue,
async () => await uow.CompleteAsync(),
exception => uow.Dispose()
);
}
// 有返回值的非同步方法處理
else
{
invocation.ReturnValue = InternalAsyncHelper.CallAwaitTaskWithPostActionAndFinallyAndGetResult(
invocation.Method.ReturnType.GenericTypeArguments[0],
invocation.ReturnValue,
async () => await uow.CompleteAsync(),
exception => uow.Dispose()
);
}
}
相比而言,針對攔截到的非同步方法處理起來更加複雜一點,但是總體思路仍然是一樣的,將這些工作單元的方法一層層地嵌套起來,依次執行就是核心。而在上面代碼裡面,一樣的首先使用 UnitOfManager.Begin()
獲得了一個新的工作單元之後,繼續執行原有的操作,下麵則主要是通過內部的 InternalAsyncHelper
封裝的兩個輔助方法來確保等待原有任務執行完成之後,再執行 CompleteAsync()
方法。
我們可以來看一下這個內部類的實現:
// 非同步無返回值處理
public static async Task AwaitTaskWithPostActionAndFinally(Task actualReturnValue, Func<Task> postAction, Action<Exception> finalAction)
{
Exception exception = null;
try
{
// 等待原有任務執行完成
await actualReturnValue;
// 執行 CompleteAsync() 表示本工作單元已經順利執行
await postAction();
}
// 捕獲異常
catch (Exception ex)
{
exception = ex;
throw;
}
finally
{
// 不論是否拋出異常,都調用之前傳入的 uow.Dispose() 方法
finalAction(exception);
}
}
// 原理基本同上,只是多了一個返回值
public static async Task<T> AwaitTaskWithPostActionAndFinallyAndGetResult<T>(Task<T> actualReturnValue, Func<Task> postAction, Action<Exception> finalAction)
{
Exception exception = null;
try
{
var result = await actualReturnValue;
await postAction();
return result;
}
catch (Exception ex)
{
exception = ex;
throw;
}
finally
{
finalAction(exception);
}
}
// 非同步有返回值處理
public static object CallAwaitTaskWithPostActionAndFinallyAndGetResult(Type taskReturnType, object actualReturnValue, Func<Task> action, Action<Exception> finalAction)
{
// 這裡通過反射獲取到 AwaitTaskWithPostActionAndFinallyAndGetResult 方法,並調用。
return typeof (InternalAsyncHelper)
.GetMethod("AwaitTaskWithPostActionAndFinallyAndGetResult", BindingFlags.Public | BindingFlags.Static)
.MakeGenericMethod(taskReturnType)
.Invoke(null, new object[] { actualReturnValue, action, finalAction });
}
並不複雜,以上即是攔截器所做的操作。
2.2 工作單元管理器
通過上文我們可以看到一個工作單元是通過 IUnitOfWorkManager.Begin()
拿到的,那 IUnitOfWorkManager
又是個什麼東西呢?
根據字面意思我們大概知道應該類似於管理 UOW 的東西,它其實只有兩個作用。第一,獲取當前處於激活狀態的工作單元,什麼叫激活狀態我們後面再講。第二個作用就是我們之前看到的,可以通過 Begin()
方法來創建一個新的工作單元。
IUnitOfWorkManager
在 Abp 框架初始化的時候就被註入了,其預設實現為 UnitOfWorkManager
,其核心方法就是 Begin()
方法。
public IUnitOfWorkCompleteHandle Begin(UnitOfWorkOptions options)
{
// 如果沒有傳入 UOW 參數,則填充一個預設的參數
options.FillDefaultsForNonProvidedOptions(_defaultOptions);
// 獲取當前的外部工作單元
var outerUow = _currentUnitOfWorkProvider.Current;
// 如果已經存在有外部工作單元,則直接構建一個內部工作單元
if (options.Scope == TransactionScopeOption.Required && outerUow != null)
{
return new InnerUnitOfWorkCompleteHandle();
}
// 不存在外部工作單元,則從 IOC 容器當中獲取一個新的出來
var uow = _iocResolver.Resolve<IUnitOfWork>();
// 綁定外部工作單元的事件
uow.Completed += (sender, args) =>
{
_currentUnitOfWorkProvider.Current = null;
};
uow.Failed += (sender, args) =>
{
_currentUnitOfWorkProvider.Current = null;
};
uow.Disposed += (sender, args) =>
{
_iocResolver.Release(uow);
};
// 設置過濾器
if (outerUow != null)
{
options.FillOuterUowFiltersForNonProvidedOptions(outerUow.Filters.ToList());
}
uow.Begin(options);
// 綁定租戶 ID
if (outerUow != null)
{
uow.SetTenantId(outerUow.GetTenantId(), false);
}
// 設置當前的外部工作單元為剛剛初始化的工作單元
_currentUnitOfWorkProvider.Current = uow;
return uow;
}
可以看到 Begin()
方法返回的是一個類型為 IUnitOfWorkCompleteHandle
的東西,轉到其定義:
/// <summary>
/// Used to complete a unit of work.
/// This interface can not be injected or directly used.
/// Use <see cref="IUnitOfWorkManager"/> instead.
/// </summary>
public interface IUnitOfWorkCompleteHandle : IDisposable
{
/// <summary>
/// Completes this unit of work.
/// It saves all changes and commit transaction if exists.
/// </summary>
void Complete();
/// <summary>
/// Completes this unit of work.
/// It saves all changes and commit transaction if exists.
/// </summary>
Task CompleteAsync();
}
他只有兩個方法,都是標識 UOW 處於已經完成的狀態。
在方法上面右鍵查看其實現可以看到有這樣一種依賴關係:
可以看到 IUnitOfWorkCompleteHandle
有兩個實現,一個是 InnerUnitOfWorkCompleteHandle
還有一個則是 IUnitOfWork
介面。
首先看一下 InnerUnitOfWorkCompleteHandle
:
internal class InnerUnitOfWorkCompleteHandle : IUnitOfWorkCompleteHandle
{
public const string DidNotCallCompleteMethodExceptionMessage = "Did not call Complete method of a unit of work.";
private volatile bool _isCompleteCalled;
private volatile bool _isDisposed;
public void Complete()
{
_isCompleteCalled = true;
}
public Task CompleteAsync()
{
_isCompleteCalled = true;
return Task.FromResult(0);
}
public void Dispose()
{
if (_isDisposed)
{
return;
}
_isDisposed = true;
if (!_isCompleteCalled)
{
if (HasException())
{
return;
}
throw new AbpException(DidNotCallCompleteMethodExceptionMessage);
}
}
private static bool HasException()
{
try
{
return Marshal.GetExceptionCode() != 0;
}
catch (Exception)
{
return false;
}
}
}
代碼很簡單,調用 Complete()/CompleteAsync()
會將 _isCompleteCalled 置為 true,然後在 Dispose()
方法內會進行檢測,為 faslse 的話直接拋出異常。可以看到在 InnerUnitOfWorkCompleteHandle
內部並不會真正地調用 DbContext.SaveChanges()
進行數據保存。
那麼誰才是真正進行資料庫操作的工作單元呢?
答案就是之前在 IUnitOfWorkManager.Begin()
裡面,可以看到在創建 UOW 對象的時候,他在內部進行了一個判斷,如果不存在外部工作單元的情況下才會創建 InnerUnitOfWorkCompleteHandle
對象,否則是解析的一個 IUnitOfWork
對象。
也就是說你可以想象有以下代碼:
public void TestUowMethod()
{
using(var outerUOW = Manager.Begin()) // 這裡返回的是 IOC 解析出的 IUnitOfWork
{
OperationOuter();
using(var innerUOW1 = Manager.Begin()) // 內部 UOW
{
Operation1();
using(var innerUOW2 = Manager.Begin()) // 內部 UOW
{
Operation2();
Complete();
}
Complete();
}
Complete();
}
}
當代碼執行的時候,如同俄羅斯套娃,從內部依次到外部執行,內部工作單元僅會在調用 Complete 方法的時候將 completed 標記為 true,但一旦操作拋出異常,Complete()
無法得到執行,則會直接拋出異常,中斷外層代碼執行。
在 ABP 內部針對 EF Core 框架實現了一套 UOW,其繼承自 UnitOfWorkBase
,而在 UnitOfWorkBase
內部有部分針對介面 IActiveUnitOfWork
的實現,同時由於 IUnifOfWork
也實現了 IUnitOfWorkCompleteHandle
介面,所以在 Begin()
方法處能夠向上轉型。
2.3 抽象工作單元
根據上圖可以知道 Abp 預設實現了一個 UnitOfWorkBase
作為工作單元的抽象基類,他主要的屬性就是 Id 與 Outer 屬性。
public abstract class UnitOfWorkBase : IUnitOfWork
{
public string Id { get; }
[DoNotWire]
public IUnitOfWork Outer { get; set; }
}
這裡的 Id 是使用的 Guid 生成的,用於標識每個工作單元。
而 Outer 則是當前 UOW 對象的引用對象。
這裡重點說一下 Outer 是哪兒來的,Outer 他的值是在之前的 UnitOfWorkManager.Begin()
裡面的 _currentUnitOfWorkProvider.Current = uow;
進行設置的,_currentUnitOfWorkProvider
的實現在 AsyncLocalCurrentUnitOfWorkProvider
內部,其作用是維護一個 UOW 鏈,確保當前的工作單元始終是最新的,這裡的代碼原本是使用 CallContext
實現的,現在已經換為 AsyncLocal<T>
了。
public class AsyncLocalCurrentUnitOfWorkProvider : ICurrentUnitOfWorkProvider, ITransientDependency
{
/// <inheritdoc />
[DoNotWire]
public IUnitOfWork Current
{
get { return GetCurrentUow(); }
set { SetCurrentUow(value); }
}
public ILogger Logger { get; set; }
private static readonly AsyncLocal<LocalUowWrapper> AsyncLocalUow = new AsyncLocal<LocalUowWrapper>();
public AsyncLocalCurrentUnitOfWorkProvider()
{
Logger = NullLogger.Instance;
}
private static IUnitOfWork GetCurrentUow()
{
var uow = AsyncLocalUow.Value?.UnitOfWork;
if (uow == null)
{
return null;
}
if (uow.IsDisposed)
{
AsyncLocalUow.Value = null;
return null;
}
return uow;
}
private static void SetCurrentUow(IUnitOfWork value)
{
lock (AsyncLocalUow)
{
if (value == null)
{
if (AsyncLocalUow.Value == null)
{
return;
}
if (AsyncLocalUow.Value.UnitOfWork?.Outer == null)
{
AsyncLocalUow.Value.UnitOfWork = null;
AsyncLocalUow.Value = null;
return;
}
AsyncLocalUow.Value.UnitOfWork = AsyncLocalUow.Value.UnitOfWork.Outer;
}
else
{
if (AsyncLocalUow.Value?.UnitOfWork == null)
{
if (AsyncLocalUow.Value != null)
{
AsyncLocalUow.Value.UnitOfWork = value;
}
AsyncLocalUow.Value = new LocalUowWrapper(value);
return;
}
value.Outer = AsyncLocalUow.Value.UnitOfWork;
AsyncLocalUow.Value.UnitOfWork = value;
}
}
}
private class LocalUowWrapper
{
public IUnitOfWork UnitOfWork { get; set; }
public LocalUowWrapper(IUnitOfWork unitOfWork)
{
UnitOfWork = unitOfWork;
}
}
}
繼續往下看,在 UnitOfWorkBase
的裡面也是有個 Complete()
與 CompleteAsync()
方法的。
protected abstract void CompleteUow();
/// <inheritdoc/>
public void Complete()
{
// 判斷是否重覆完成
PreventMultipleComplete();
try
{
CompleteUow();
_succeed = true;
OnCompleted();
}
catch (Exception ex)
{
_exception = ex;
throw;
}
}
這裡的 CompleteUow()
仍然只是一個抽象方法,具體的實現在具體的訪問層裡面。
2.4 EF Core 實際處理
Abp 針對 EF Core 的 UOW 實現是 EfCoreUnitOfWork
,代碼如下:
protected override void BeginUow()
{
if (Options.IsTransactional == true)
{
_transactionStrategy.InitOptions(Options);
}
}
public override void SaveChanges()
{
foreach (var dbContext in GetAllActiveDbContexts())
{
SaveChangesInDbContext(dbContext);
}
}
public override async Task SaveChangesAsync()
{
// 遍歷所有激活的 DbContext
foreach (var dbContext in GetAllActiveDbContexts())
{
await SaveChangesInDbContextAsync(dbContext);
}
}
protected override void CompleteUow()
{
SaveChanges();
CommitTransaction();
}
protected override async Task CompleteUowAsync()
{
await SaveChangesAsync();
CommitTransaction();
}
private void CommitTransaction()
{
if (Options.IsTransactional == true)
{
_transactionStrategy.Commit();
}
}
public IReadOnlyList<DbContext> GetAllActiveDbContexts()
{
return ActiveDbContexts.Values.ToImmutableList();
}
根本就是遍歷 DbContext
調用其 SaveChanges()
來提交所有資料庫更改。
餘下更加詳細的東西會放在 《七、倉儲與 Entity Framework Core》 當中說明的。