天天看點

淺談工作單元 在整個 ABP 架構當中的應用

ABP在其内部實作了工作單元模式,統一地進行事務與連接配接管理。 其核心就是通過 Castle 的 Dynamic Proxy 進行動态代理,在元件注冊的時候進行攔截器注入,攔截到實作了 Unit Of Work 特性的方法進行操作,在執行完方法之後就會關閉掉工作單元。

其整體流程大概如下:

  • 首先 UOW 相關接口、攔截器等通過 IocManager 注入到 Ioc 容器當中。
  • 監聽 Ioc 注冊事件,并為其添加方法攔截器。
  • 在攔截器内部使用 using 包裹資料庫操作方法,使其成為一個工作單元。
  • 一旦在方法 procced 執行的時候,産生任何異常觸發任何異常都不會執行 Complete 方法,直接抛出終止執行。

UnitOfWorkInterceptors

這是一個 Castle Interceptors 的實作,在 AbpBootStrap 的 Initialze 方法當中被注入到 Ioc 容器。

private void AddInterceptorRegistrars()
{
    ValidationInterceptorRegistrar.Initialize(IocManager);
    AuditingInterceptorRegistrar.Initialize(IocManager);
    EntityHistoryInterceptorRegistrar.Initialize(IocManager);
    UnitOfWorkRegistrar.Initialize(IocManager);
    AuthorizationInterceptorRegistrar.Initialize(IocManager);
}
           

之後在 Registrar 内部針對元件注入事件進行綁定:

public static void Initialize(IIocManager iocManager)
{
    iocManager.IocContainer.Kernel.ComponentRegistered += (key, handler) =>
    {
        var implementationType = handler.ComponentModel.Implementation.GetTypeInfo();
        HandleTypesWithUnitOfWorkAttribute(implementationType, handler);
        HandleConventionalUnitOfWorkTypes(iocManager, implementationType, handler);
    };
}
           

在這裡的兩個方法分别是針對已經實作了 UnitOfWork 特性的類進行綁定,另外一個則是針對符合命名規則的類型進行攔截器綁定。

攔截器做的事情十分簡單,針對攔截到的方法進行 UOW 特性檢測,如果檢測通過之後則直接執行工作單元方法,并根據特性生成 Options。不過不是一個 UOW 的話,則直接繼續執行原先方法内代碼。

var unitOfWorkAttr = _unitOfWorkOptions.GetUnitOfWorkAttributeOrNull(method);
if (unitOfWorkAttr == null || unitOfWorkAttr.IsDisabled)
{
    //No need to a uow
    invocation.Proceed();
    return;
}
PerformUow(invocation, unitOfWorkAttr.CreateOptions());
           

在建立 UOW 的時候,攔截器也會根據方法類型的不同來用不同的方式建構 UOW 。

如果是同步方法的話:

private void PerformSyncUow(IInvocation invocation, UnitOfWorkOptions options)
{
    // 直接從 Manager 生成一個新的工作單元
    using (var uow = _unitOfWorkManager.Begin(options))
    {
        // 繼續執行原方法
        // 産生異常直接進入 uow 的 Dispose 方法
        invocation.Proceed();
        // 如果一切正常,送出事務
        uow.Complete();
    }
}
           

但如果這個工作單元被标注在異步方法上面,則操作略微複雜:

private void PerformAsyncUow(IInvocation invocation, UnitOfWorkOptions options)
{
    // 獲得一個工作單元
    var uow = _unitOfWorkManager.Begin(options);
    // 繼續執行攔截到的方法
    try
    {
        invocation.Proceed();
    }
    catch
    {
        uow.Dispose();
        throw;
    }
    // 如果是異步無傳回值的方法
    if (invocation.Method.ReturnType == typeof(Task))
    {
        invocation.ReturnValue = InternalAsyncHelper.AwaitTaskWithPostActionAndFinally(
            (Task) invocation.ReturnValue,
            async () => await uow.CompleteAsync(),
            exception => uow.Dispose()
        );
    }
    else //Task<TResult>
    {
        invocation.ReturnValue = InternalAsyncHelper.CallAwaitTaskWithPostActionAndFinallyAndGetResult(
            invocation.Method.ReturnType.GenericTypeArguments[0],
            invocation.ReturnValue,
            async () => await uow.CompleteAsync(),
            exception => uow.Dispose()
        );
    }
}
           

以上代碼在進入的時候直接執行原方法,如果産生任何異常直接進入 Dispose 方法并且抛出異常。乍一看與同步方法處理沒什麼差別,但重點是這裡并沒有執行 Complete 方法,因為這裡需要對其異步傳回結果更改為 UOW 異步送出之後的值,先檢視第一種直接傳回 Task 的情況.

public static async Task AwaitTaskWithPostActionAndFinally(Task actualReturnValue, Func<Task> postAction, Action<Exception> finalAction)
{
    Exception exception = null;
    try
    {
        // 原有異步任務傳回值
        await actualReturnValue;
        // 新的異步傳回結果
        await postAction();
    }
    catch (Exception ex)
    {
        exception = ex;
        throw;
    }
    finally
    {
        finalAction(exception);
    }
}
           

在内部首先等待原有任務執行完成之後再執行傳入的 UOW 的 CompleteAsync() 方法,并且在執行過程中有無異常都會直接調用 UOW 的 Disopse 釋放資源。

第二種則适用于有傳回值的情況:

public static object CallAwaitTaskWithPostActionAndFinallyAndGetResult(Type taskReturnType, object actualReturnValue, Func<Task> action, Action<Exception> finalAction)
{
    // 獲得 AwaitTaskWithPreActionAndPostActionAndFinallyAndGetResult 方法,并且通過反射構造一個泛型方法,并且将自身參數傳入調用。
    return typeof (InternalAsyncHelper)
        .GetMethod("AwaitTaskWithPostActionAndFinallyAndGetResult", BindingFlags.Public | BindingFlags.Static)
        .MakeGenericMethod(taskReturnType)
        .Invoke(null, new object[] { actualReturnValue, action, finalAction });
}
public static async Task<T> AwaitTaskWithPreActionAndPostActionAndFinallyAndGetResult<T>(Func<Task<T>> actualReturnValue, Func<Task> preAction = null, Func<Task> postAction = null, Action<Exception> finalAction = null)
{
    Exception exception = null;
    try
    {
        if (preAction != null)
        {
            await preAction();
        }
        var result = await actualReturnValue();
        if (postAction != null)
        {
            await postAction();                    
        }
        return result;
    }
    catch (Exception ex)
    {
        exception = ex;
        throw;
    }
    finally
    {
        if (finalAction != null)
        {
            finalAction(exception);
        }
    }
}
           

這兩個方法的作用都是確定 CompleteAsync 和 Dispose 能夠在放在異步任務當中執行。

IUnitOfWorkManager

顧名思義,這是一個 UOW 的管理器,在 ABP 内部有其一個預設實作

UnitOfWorkManager

,在 AbpKernel 子產品初始化的時候就已經被注入。

核心方法是 Begin 方法,在 Begin 方法當中通過

FillDefaultsForNonProvidedOptions

方法判斷是否傳入了配置參數,如果沒有傳入的話則建構一個預設參數對象。

public IUnitOfWorkCompleteHandle Begin(UnitOfWorkOptions options)
{
    // 如果沒有 UOW 參數,構造預設參數
    options.FillDefaultsForNonProvidedOptions(_defaultOptions);
    var outerUow = _currentUnitOfWorkProvider.Current;
    // 目前是否已經存在工作單元,存在工作單元的話,建構一個内部工作單元
    if (options.Scope == TransactionScopeOption.Required && outerUow != null)
    {
        return new InnerUnitOfWorkCompleteHandle();
    }
    // 不存在的話建構一個新的工作單元
    var uow = _iocResolver.Resolve<IUnitOfWork>();
    // 綁定各種事件
    uow.Completed += (sender, args) =>
    {
        _currentUnitOfWorkProvider.Current = null;
    };
    uow.Failed += (sender, args) =>
    {
        _currentUnitOfWorkProvider.Current = null;
    };
    uow.Disposed += (sender, args) =>
    {
        _iocResolver.Release(uow);
    };
    //Inherit filters from outer UOW
    if (outerUow != null)
    {
        options.FillOuterUowFiltersForNonProvidedOptions(outerUow.Filters.ToList());
    }
    uow.Begin(options);
    //Inherit tenant from outer UOW
    if (outerUow != null)
    {
        uow.SetTenantId(outerUow.GetTenantId(), false);
    }
    // 設定目前工作單元為新建立的 UOW 
    _currentUnitOfWorkProvider.Current = uow;
    return uow;
}
           

這裡 Begin 方法所傳回的是一個

IUnitOfWorkCompleteHandle

對象,跳轉到

IUnitOfWorkCompleteHandle

,可以看到它有兩個方法:

public interface IUnitOfWorkCompleteHandle : IDisposable
{
    void Complete();
    Task CompleteAsync();
}
           

都是完成工作單元的方法,一個同步、一個異步,同時這個接口也實作了 IDispose 接口,從開始使用 using 就可以看出其含義。

InnerUnitOfWorkCompleteHandle

參考其引用,可以發現有一個

IUnitOfWork

接口繼承自它,并且還有一個

InnerUnitOfWorkCompleteHandle

的内部實作。這裡先檢視

InnerUnitOfWorkCompleteHandle

内部實作:

internal class InnerUnitOfWorkCompleteHandle : IUnitOfWorkCompleteHandle
{
    public const string DidNotCallCompleteMethodExceptionMessage = "Did not call Complete method of a unit of work.";
    private volatile bool _isCompleteCalled;
    private volatile bool _isDisposed;
    public void Complete()
    {
        _isCompleteCalled = true;
    }
    public Task CompleteAsync()
    {
        _isCompleteCalled = true;
        return Task.FromResult(0);
    }
    public void Dispose()
    {
        if (_isDisposed)
        {
            return;
        }
        _isDisposed = true;
        if (!_isCompleteCalled)
        {
            if (HasException())
            {
                return;
            }
            throw new AbpException(DidNotCallCompleteMethodExceptionMessage);
        }
    }
    private static bool HasException()
    {
        try
        {
            return Marshal.GetExceptionCode() != 0;
        }
        catch (Exception)
        {
            return false;
        }
    }
}
           

其内部實作十分簡單,其中 Complete 的同步和異步方法都隻是對完成辨別進行一個标記。并未真正的進行任何資料庫事務操作。同時在他的内部也實作了

IDispose

接口,如果 complete 未被标記為已完成,那麼直接抛出異常,後續操作不會執行。

現在再轉到 Begin 方法内部就可以發現,在建立的時候會首先判斷了目前是否已經存在了工作單元,如果存在了才會建立這樣一個内部工作單元。也就是說真正那個的事務操作是在傳回的 IUnitOfWork 當中實作的。

這樣的話就會建構出一個嵌套的工作單元:

OuterUOW->InnerUow1->InnerUOW2->.....->InnerUowN
           

你可以想象有以下代碼:

public void TestUowMethod()
{
    using(var outerUOW = Manager.Begin())  // 這裡傳回的是 IOC 解析出的 IUnitOfWork
    {
        OperationOuter();
        using(var innerUOW1 = Manager.Begin())  // 内部 UOW
        {
            Operation1();
            using(var innerUOW2 = Manager.Begin())  // 内部 UOW
            {
                Operation2();
                Complete();
            }
            Complete();
        }
        Complete();
    }
}
           

當代碼執行的時候,如同俄羅斯套娃,從内部依次到外部執行,内部工作單元僅會在調用 Complete 方法的時候将 completed 标記為 true,但一旦操作抛出異常,Complete無法得到執行,則會直接抛出異常,中斷外層代碼執行。

那麼 IUnitOfWork 的實作又是怎樣的呢?

在 ABP 内部針對 EF Core 架構實作了一套 UOW,其繼承自 UnitOfWorkBase,而在 UnitOfWorkBase 内部有部分針對接口 IActiveUnitOfWork 的實作,同時由于 IUnifOfWork 也實作了 IUnitOfWorkCompleteHandle 接口,是以在 Begin 方法處能夠向上轉型。

IActiveUnitOfWork

在 IActiveUnitOfWork 接口當中定義了工作單元的三個事件,使用者在使用的時候可以針對這三個事件進行綁定。

事件名稱 觸發條件
Completed 工作單元調用 Complete 方法之後觸發
Failed 工作單元在調用 Complete 方法如果産生異常,則會在 Dispose 釋放資源時觸發。
Disposed 釋放工作單元的時候觸發
除了三個事件之外,在 ABP 當中的資料過濾器也是在工作單元調用的時候工作的,後面講解 EfCoreUnitOfWork 的時候會研究,資料過濾僅包括軟删除等審計字段,同時也包括多租戶的租戶 ID 也在工作單元當中進行設定的。
在這裡也定義了兩個 SaveChanges 方法,一個同步、一個異步,作用跟實體上下文的同名方法作用差不多。

IUnitOfWork

IUnitOfWork 同時繼承了 IUnitOfWorkCompleteHandle 與 IActiveUnitOfWork 接口,并且增加了兩個屬性與一個 Begin 方法。

UnitOfWorkBase

UnitOfWorkBase 是所有工作單元的一個抽象基類,在其内部實作了 IUnitOfWork 的所有方法,其中也包括兩個父級接口。

下面就一個一個屬性講解。

Id

這個 Id 是在構造函數當中初始化的,全局唯一的 Id,使用 Guid 初始化其值。

Outer

用于标明目前工作單元的外部工作單元,其值是在 UnitOfWorkManager 建立工作單元的時候賦予的。在 Manager 的 Begin 方法當中每次針對 Current Uow 指派的時候都會将已經存在的 UOW 關聯最新的 Current Uow 的 Outer 屬性上面。形成如下結構:

Current Uow = Uow3.Outer = Uow2.Outer = Uow1.Outer = null
           

具體代碼參考

ICurrentUnitOfWorkProvider

的實作。

Begin()

在 Manager 建立好一個 Uow 之後,就會調用它的 Begin 方法,在内部首先做了一個判斷,判斷是否多次調用了 Begin 方法,這裡可以看到并未做加鎖處理。這是因為在 ABP 當中,一個線程當中共用一個工作單元。其實在 CurrentProvider 的代碼當中就可以看出來如果在一個線程裡面建立了多個工作單元,他會将其串聯起來。

之後設定過濾器,并且開始調用 BeginUow 方法,這裡并未實作,具體實作我們轉到 EfUnitOfWork 可以看到。

BeginUow()-EfCoreUnitOfWork

protected override void BeginUow()
{
    if (Options.IsTransactional == true)
    {
        _transactionStrategy.InitOptions(Options);
    }
}
           

覆寫了父類方法,僅對設定進行初始化。

其實到這裡我們就大概清楚了 ABP 整個 UOW 的工作流程,如果是兩個打上 UnitOfWork 特性的方法在 A 調用 B 的時候其實就會封裝成兩個嵌套的 using 塊。

using(var a = Manager.Begin())
{
    // 操作
    using(var b = Manager.Begin())
    {
        // 操作
        b.Complete();
    }
    a.Complete();
}
           

而最外層的 Complete 就是真正執行了資料庫事務操作的。

可以看下 EfUnitOfWork 的實作:

// 這裡是 UnitOfWorkBase 的 Complete 方法
public void Complete()
{
    PreventMultipleComplete();
    try
    {
        CompleteUow();
        _succeed = true;
        OnCompleted();
    }
    catch (Exception ex)
    {
        _exception = ex;
        throw;
    }
}
           
// 這裡是 EfUnitOfWork 的 CompleteUow 方法
protected override void CompleteUow()
{
    SaveChanges();
    CommitTransaction();
}
// 周遊所有激活的 DbContext 并儲存更改到資料庫
public override void SaveChanges()
{
    foreach (var dbContext in GetAllActiveDbContexts())
    {
        SaveChangesInDbContext(dbContext);
    }
}
// 送出事務
private void CommitTransaction()
{
    if (Options.IsTransactional == true)
    {
        _transactionStrategy.Commit();
    }
}