ABP在其内部實作了工作單元模式,統一地進行事務與連接配接管理。 其核心就是通過 Castle 的 Dynamic Proxy 進行動态代理,在元件注冊的時候進行攔截器注入,攔截到實作了 Unit Of Work 特性的方法進行操作,在執行完方法之後就會關閉掉工作單元。
其整體流程大概如下:
- 首先 UOW 相關接口、攔截器等通過 IocManager 注入到 Ioc 容器當中。
- 監聽 Ioc 注冊事件,并為其添加方法攔截器。
- 在攔截器内部使用 using 包裹資料庫操作方法,使其成為一個工作單元。
- 一旦在方法 procced 執行的時候,産生任何異常觸發任何異常都不會執行 Complete 方法,直接抛出終止執行。
UnitOfWorkInterceptors
這是一個 Castle Interceptors 的實作,在 AbpBootStrap 的 Initialze 方法當中被注入到 Ioc 容器。
private void AddInterceptorRegistrars()
{
ValidationInterceptorRegistrar.Initialize(IocManager);
AuditingInterceptorRegistrar.Initialize(IocManager);
EntityHistoryInterceptorRegistrar.Initialize(IocManager);
UnitOfWorkRegistrar.Initialize(IocManager);
AuthorizationInterceptorRegistrar.Initialize(IocManager);
}
之後在 Registrar 内部針對元件注入事件進行綁定:
public static void Initialize(IIocManager iocManager)
{
iocManager.IocContainer.Kernel.ComponentRegistered += (key, handler) =>
{
var implementationType = handler.ComponentModel.Implementation.GetTypeInfo();
HandleTypesWithUnitOfWorkAttribute(implementationType, handler);
HandleConventionalUnitOfWorkTypes(iocManager, implementationType, handler);
};
}
在這裡的兩個方法分别是針對已經實作了 UnitOfWork 特性的類進行綁定,另外一個則是針對符合命名規則的類型進行攔截器綁定。
攔截器做的事情十分簡單,針對攔截到的方法進行 UOW 特性檢測,如果檢測通過之後則直接執行工作單元方法,并根據特性生成 Options。不過不是一個 UOW 的話,則直接繼續執行原先方法内代碼。
var unitOfWorkAttr = _unitOfWorkOptions.GetUnitOfWorkAttributeOrNull(method);
if (unitOfWorkAttr == null || unitOfWorkAttr.IsDisabled)
{
//No need to a uow
invocation.Proceed();
return;
}
PerformUow(invocation, unitOfWorkAttr.CreateOptions());
在建立 UOW 的時候,攔截器也會根據方法類型的不同來用不同的方式建構 UOW 。
如果是同步方法的話:
private void PerformSyncUow(IInvocation invocation, UnitOfWorkOptions options)
{
// 直接從 Manager 生成一個新的工作單元
using (var uow = _unitOfWorkManager.Begin(options))
{
// 繼續執行原方法
// 産生異常直接進入 uow 的 Dispose 方法
invocation.Proceed();
// 如果一切正常,送出事務
uow.Complete();
}
}
但如果這個工作單元被标注在異步方法上面,則操作略微複雜:
private void PerformAsyncUow(IInvocation invocation, UnitOfWorkOptions options)
{
// 獲得一個工作單元
var uow = _unitOfWorkManager.Begin(options);
// 繼續執行攔截到的方法
try
{
invocation.Proceed();
}
catch
{
uow.Dispose();
throw;
}
// 如果是異步無傳回值的方法
if (invocation.Method.ReturnType == typeof(Task))
{
invocation.ReturnValue = InternalAsyncHelper.AwaitTaskWithPostActionAndFinally(
(Task) invocation.ReturnValue,
async () => await uow.CompleteAsync(),
exception => uow.Dispose()
);
}
else //Task<TResult>
{
invocation.ReturnValue = InternalAsyncHelper.CallAwaitTaskWithPostActionAndFinallyAndGetResult(
invocation.Method.ReturnType.GenericTypeArguments[0],
invocation.ReturnValue,
async () => await uow.CompleteAsync(),
exception => uow.Dispose()
);
}
}
以上代碼在進入的時候直接執行原方法,如果産生任何異常直接進入 Dispose 方法并且抛出異常。乍一看與同步方法處理沒什麼差別,但重點是這裡并沒有執行 Complete 方法,因為這裡需要對其異步傳回結果更改為 UOW 異步送出之後的值,先檢視第一種直接傳回 Task 的情況.
public static async Task AwaitTaskWithPostActionAndFinally(Task actualReturnValue, Func<Task> postAction, Action<Exception> finalAction)
{
Exception exception = null;
try
{
// 原有異步任務傳回值
await actualReturnValue;
// 新的異步傳回結果
await postAction();
}
catch (Exception ex)
{
exception = ex;
throw;
}
finally
{
finalAction(exception);
}
}
在内部首先等待原有任務執行完成之後再執行傳入的 UOW 的 CompleteAsync() 方法,并且在執行過程中有無異常都會直接調用 UOW 的 Disopse 釋放資源。
第二種則适用于有傳回值的情況:
public static object CallAwaitTaskWithPostActionAndFinallyAndGetResult(Type taskReturnType, object actualReturnValue, Func<Task> action, Action<Exception> finalAction)
{
// 獲得 AwaitTaskWithPreActionAndPostActionAndFinallyAndGetResult 方法,并且通過反射構造一個泛型方法,并且将自身參數傳入調用。
return typeof (InternalAsyncHelper)
.GetMethod("AwaitTaskWithPostActionAndFinallyAndGetResult", BindingFlags.Public | BindingFlags.Static)
.MakeGenericMethod(taskReturnType)
.Invoke(null, new object[] { actualReturnValue, action, finalAction });
}
public static async Task<T> AwaitTaskWithPreActionAndPostActionAndFinallyAndGetResult<T>(Func<Task<T>> actualReturnValue, Func<Task> preAction = null, Func<Task> postAction = null, Action<Exception> finalAction = null)
{
Exception exception = null;
try
{
if (preAction != null)
{
await preAction();
}
var result = await actualReturnValue();
if (postAction != null)
{
await postAction();
}
return result;
}
catch (Exception ex)
{
exception = ex;
throw;
}
finally
{
if (finalAction != null)
{
finalAction(exception);
}
}
}
這兩個方法的作用都是確定 CompleteAsync 和 Dispose 能夠在放在異步任務當中執行。
IUnitOfWorkManager
顧名思義,這是一個 UOW 的管理器,在 ABP 内部有其一個預設實作
UnitOfWorkManager
,在 AbpKernel 子產品初始化的時候就已經被注入。
核心方法是 Begin 方法,在 Begin 方法當中通過
FillDefaultsForNonProvidedOptions
方法判斷是否傳入了配置參數,如果沒有傳入的話則建構一個預設參數對象。
public IUnitOfWorkCompleteHandle Begin(UnitOfWorkOptions options)
{
// 如果沒有 UOW 參數,構造預設參數
options.FillDefaultsForNonProvidedOptions(_defaultOptions);
var outerUow = _currentUnitOfWorkProvider.Current;
// 目前是否已經存在工作單元,存在工作單元的話,建構一個内部工作單元
if (options.Scope == TransactionScopeOption.Required && outerUow != null)
{
return new InnerUnitOfWorkCompleteHandle();
}
// 不存在的話建構一個新的工作單元
var uow = _iocResolver.Resolve<IUnitOfWork>();
// 綁定各種事件
uow.Completed += (sender, args) =>
{
_currentUnitOfWorkProvider.Current = null;
};
uow.Failed += (sender, args) =>
{
_currentUnitOfWorkProvider.Current = null;
};
uow.Disposed += (sender, args) =>
{
_iocResolver.Release(uow);
};
//Inherit filters from outer UOW
if (outerUow != null)
{
options.FillOuterUowFiltersForNonProvidedOptions(outerUow.Filters.ToList());
}
uow.Begin(options);
//Inherit tenant from outer UOW
if (outerUow != null)
{
uow.SetTenantId(outerUow.GetTenantId(), false);
}
// 設定目前工作單元為新建立的 UOW
_currentUnitOfWorkProvider.Current = uow;
return uow;
}
這裡 Begin 方法所傳回的是一個
IUnitOfWorkCompleteHandle
對象,跳轉到
IUnitOfWorkCompleteHandle
,可以看到它有兩個方法:
public interface IUnitOfWorkCompleteHandle : IDisposable
{
void Complete();
Task CompleteAsync();
}
都是完成工作單元的方法,一個同步、一個異步,同時這個接口也實作了 IDispose 接口,從開始使用 using 就可以看出其含義。
InnerUnitOfWorkCompleteHandle
參考其引用,可以發現有一個
IUnitOfWork
接口繼承自它,并且還有一個
InnerUnitOfWorkCompleteHandle
的内部實作。這裡先檢視
InnerUnitOfWorkCompleteHandle
内部實作:
internal class InnerUnitOfWorkCompleteHandle : IUnitOfWorkCompleteHandle
{
public const string DidNotCallCompleteMethodExceptionMessage = "Did not call Complete method of a unit of work.";
private volatile bool _isCompleteCalled;
private volatile bool _isDisposed;
public void Complete()
{
_isCompleteCalled = true;
}
public Task CompleteAsync()
{
_isCompleteCalled = true;
return Task.FromResult(0);
}
public void Dispose()
{
if (_isDisposed)
{
return;
}
_isDisposed = true;
if (!_isCompleteCalled)
{
if (HasException())
{
return;
}
throw new AbpException(DidNotCallCompleteMethodExceptionMessage);
}
}
private static bool HasException()
{
try
{
return Marshal.GetExceptionCode() != 0;
}
catch (Exception)
{
return false;
}
}
}
其内部實作十分簡單,其中 Complete 的同步和異步方法都隻是對完成辨別進行一個标記。并未真正的進行任何資料庫事務操作。同時在他的内部也實作了
IDispose
接口,如果 complete 未被标記為已完成,那麼直接抛出異常,後續操作不會執行。
現在再轉到 Begin 方法内部就可以發現,在建立的時候會首先判斷了目前是否已經存在了工作單元,如果存在了才會建立這樣一個内部工作單元。也就是說真正那個的事務操作是在傳回的 IUnitOfWork 當中實作的。
這樣的話就會建構出一個嵌套的工作單元:
OuterUOW->InnerUow1->InnerUOW2->.....->InnerUowN
你可以想象有以下代碼:
public void TestUowMethod()
{
using(var outerUOW = Manager.Begin()) // 這裡傳回的是 IOC 解析出的 IUnitOfWork
{
OperationOuter();
using(var innerUOW1 = Manager.Begin()) // 内部 UOW
{
Operation1();
using(var innerUOW2 = Manager.Begin()) // 内部 UOW
{
Operation2();
Complete();
}
Complete();
}
Complete();
}
}
當代碼執行的時候,如同俄羅斯套娃,從内部依次到外部執行,内部工作單元僅會在調用 Complete 方法的時候将 completed 标記為 true,但一旦操作抛出異常,Complete無法得到執行,則會直接抛出異常,中斷外層代碼執行。
那麼 IUnitOfWork 的實作又是怎樣的呢?
在 ABP 内部針對 EF Core 架構實作了一套 UOW,其繼承自 UnitOfWorkBase,而在 UnitOfWorkBase 内部有部分針對接口 IActiveUnitOfWork 的實作,同時由于 IUnifOfWork 也實作了 IUnitOfWorkCompleteHandle 接口,是以在 Begin 方法處能夠向上轉型。
IActiveUnitOfWork
在 IActiveUnitOfWork 接口當中定義了工作單元的三個事件,使用者在使用的時候可以針對這三個事件進行綁定。
事件名稱 | 觸發條件 |
---|---|
Completed | 工作單元調用 Complete 方法之後觸發 |
Failed | 工作單元在調用 Complete 方法如果産生異常,則會在 Dispose 釋放資源時觸發。 |
Disposed | 釋放工作單元的時候觸發 |
除了三個事件之外,在 ABP 當中的資料過濾器也是在工作單元調用的時候工作的,後面講解 EfCoreUnitOfWork 的時候會研究,資料過濾僅包括軟删除等審計字段,同時也包括多租戶的租戶 ID 也在工作單元當中進行設定的。 | |
在這裡也定義了兩個 SaveChanges 方法,一個同步、一個異步,作用跟實體上下文的同名方法作用差不多。 |
IUnitOfWork
IUnitOfWork 同時繼承了 IUnitOfWorkCompleteHandle 與 IActiveUnitOfWork 接口,并且增加了兩個屬性與一個 Begin 方法。
UnitOfWorkBase
UnitOfWorkBase 是所有工作單元的一個抽象基類,在其内部實作了 IUnitOfWork 的所有方法,其中也包括兩個父級接口。
下面就一個一個屬性講解。
Id
這個 Id 是在構造函數當中初始化的,全局唯一的 Id,使用 Guid 初始化其值。
Outer
用于标明目前工作單元的外部工作單元,其值是在 UnitOfWorkManager 建立工作單元的時候賦予的。在 Manager 的 Begin 方法當中每次針對 Current Uow 指派的時候都會将已經存在的 UOW 關聯最新的 Current Uow 的 Outer 屬性上面。形成如下結構:
Current Uow = Uow3.Outer = Uow2.Outer = Uow1.Outer = null
具體代碼參考
ICurrentUnitOfWorkProvider
的實作。
Begin()
在 Manager 建立好一個 Uow 之後,就會調用它的 Begin 方法,在内部首先做了一個判斷,判斷是否多次調用了 Begin 方法,這裡可以看到并未做加鎖處理。這是因為在 ABP 當中,一個線程當中共用一個工作單元。其實在 CurrentProvider 的代碼當中就可以看出來如果在一個線程裡面建立了多個工作單元,他會将其串聯起來。
之後設定過濾器,并且開始調用 BeginUow 方法,這裡并未實作,具體實作我們轉到 EfUnitOfWork 可以看到。
BeginUow()-EfCoreUnitOfWork
protected override void BeginUow()
{
if (Options.IsTransactional == true)
{
_transactionStrategy.InitOptions(Options);
}
}
覆寫了父類方法,僅對設定進行初始化。
其實到這裡我們就大概清楚了 ABP 整個 UOW 的工作流程,如果是兩個打上 UnitOfWork 特性的方法在 A 調用 B 的時候其實就會封裝成兩個嵌套的 using 塊。
using(var a = Manager.Begin())
{
// 操作
using(var b = Manager.Begin())
{
// 操作
b.Complete();
}
a.Complete();
}
而最外層的 Complete 就是真正執行了資料庫事務操作的。
可以看下 EfUnitOfWork 的實作:
// 這裡是 UnitOfWorkBase 的 Complete 方法
public void Complete()
{
PreventMultipleComplete();
try
{
CompleteUow();
_succeed = true;
OnCompleted();
}
catch (Exception ex)
{
_exception = ex;
throw;
}
}
// 這裡是 EfUnitOfWork 的 CompleteUow 方法
protected override void CompleteUow()
{
SaveChanges();
CommitTransaction();
}
// 周遊所有激活的 DbContext 并儲存更改到資料庫
public override void SaveChanges()
{
foreach (var dbContext in GetAllActiveDbContexts())
{
SaveChangesInDbContext(dbContext);
}
}
// 送出事務
private void CommitTransaction()
{
if (Options.IsTransactional == true)
{
_transactionStrategy.Commit();
}
}