天天看點

[Abp 源碼分析]六、工作單元的實作

0.簡介

在 Abp 架構内部實作了工作單元,在這裡講解一下,什麼是工作單元?

Unit Of Work(工作單元)模式用來維護一個由已經被業務事物修改(增加、删除或更新)的業務對象組成的清單。Unit  Of Work模式負責協調這些修改的持久化工作以及所有标記的并發問題。在資料通路層中采用Unit Of Work模式帶來的好處是能夠確定資料完整性。如果在持久化一系列業務對象(他們屬于同一個事物)的過程中出現問題,那麼應該将所有的修改復原,以確定資料始終處于有效狀态。

而在 Abp 的内部則是結合 Castle 的 Dynamic Proxy 攔截 UnitOfwork Attribute 來進行動态代理注入,實作了當執行标注了

[UnitOfwork]

方法時能夠通過

UnitOfworkManager

來進行事務控制。

其大概流程如下:

[Abp 源碼分析]六、工作單元的實作

1 啟動流程

首先我們來看一下 Abp 内部是什麼時候注入 UOW 相關的代碼的,翻閱源碼,在

AbpBootstrapper

内部我們就可以看到 Abp 作者為 UOW 寫了一個攔截器,并且在 Abp 架構初始化的時候就通過

AddInterceptorRegistrars()

方法來監聽

IocManager

的元件注冊事件,當觸發事件的時候就來判斷是否滿足條件,如果滿足則将攔截器與該類型進行一個綁定。

public class AbpBootstrapper : IDisposable
{
    private AbpBootstrapper([NotNull] Type startupModule, [CanBeNull] Action<AbpBootstrapperOptions> optionsAction = null)
    {
        // 其他代碼
        if (!options.DisableAllInterceptors)
        {
        	// 添加攔截器
            AddInterceptorRegistrars();
        }
    }

    private void AddInterceptorRegistrars()
    {
        ValidationInterceptorRegistrar.Initialize(IocManager);
        AuditingInterceptorRegistrar.Initialize(IocManager);
        EntityHistoryInterceptorRegistrar.Initialize(IocManager);
        UnitOfWorkRegistrar.Initialize(IocManager);
        AuthorizationInterceptorRegistrar.Initialize(IocManager);
    }
}
           
internal static class UnitOfWorkRegistrar
{
    public static void Initialize(IIocManager iocManager)
    {
        // 監聽元件注冊事件
        iocManager.IocContainer.Kernel.ComponentRegistered += (key, handler) =>
        {
            var implementationType = handler.ComponentModel.Implementation.GetTypeInfo();
            
            // 按 UOW 特性注冊
            HandleTypesWithUnitOfWorkAttribute(implementationType, handler);
            // 按規約注冊
            HandleConventionalUnitOfWorkTypes(iocManager, implementationType, handler);
        };
    }

    private static void HandleTypesWithUnitOfWorkAttribute(TypeInfo implementationType, IHandler handler)
    {
        if (IsUnitOfWorkType(implementationType) || AnyMethodHasUnitOfWork(implementationType))
        {
            handler.ComponentModel.Interceptors.Add(new InterceptorReference(typeof(UnitOfWorkInterceptor)));
        }
    }

    private static void HandleConventionalUnitOfWorkTypes(IIocManager iocManager, TypeInfo implementationType, IHandler handler)
    {
        if (!iocManager.IsRegistered<IUnitOfWorkDefaultOptions>())
        {
            return;
        }

        var uowOptions = iocManager.Resolve<IUnitOfWorkDefaultOptions>();

        if (uowOptions.IsConventionalUowClass(implementationType.AsType()))
        {
            handler.ComponentModel.Interceptors.Add(new InterceptorReference(typeof(UnitOfWorkInterceptor)));
        }
    }

    private static bool IsUnitOfWorkType(TypeInfo implementationType)
    {
        return UnitOfWorkHelper.HasUnitOfWorkAttribute(implementationType);
    }

    private static bool AnyMethodHasUnitOfWork(TypeInfo implementationType)
    {
        return implementationType
            .GetMethods(BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic)
            .Any(UnitOfWorkHelper.HasUnitOfWorkAttribute);
    }
}
           

可以看到在這個 Registrar 裡面他擁有兩種注冊方式,第一種很簡單,就是判斷注冊的元件類型是否擁有 UOW 标簽,第二種則是通過規約來注入攔截器。

Abp 預設針對倉儲與應用服務會自動将攔截器挂載到這兩個類型以及他的所有子類的。這裡的

UnitOfWorkDefaultOptionsExtensions.IsConventionalUowClass()

方法就是用來判斷傳入的 Type 是否屬于規約的 Type。

public static bool IsConventionalUowClass(this IUnitOfWorkDefaultOptions unitOfWorkDefaultOptions, Type type)
{
    return unitOfWorkDefaultOptions.ConventionalUowSelectors.Any(selector => selector(type));
}
           

又牽扯到了一個

IUnitOfWorkDefaultOptions

,看一下他的預設實作

UnitOfWorkDefaultOptions

就會發現這樣的代碼:

public UnitOfWorkDefaultOptions()
{
    _filters = new List<DataFilterConfiguration>();
    IsTransactional = true;
    Scope = TransactionScopeOption.Required;

    IsTransactionScopeAvailable = true;

    // 預設類型
    ConventionalUowSelectors = new List<Func<Type, bool>>
    {
        type => typeof(IRepository).IsAssignableFrom(type) ||
            typeof(IApplicationService).IsAssignableFrom(type)
    };
}
           

2. 實作原理

2.1 工作單元攔截器

在上一步我們通過兩種注入方式将攔截器注入到需要應用工作單元特性的類型裡面,那麼我們程式在執行的時候就會使用 Dyncmic Proxy 來攔截包裹這些方法。

下面我們就來看一下剛剛注入的攔截器:

internal class UnitOfWorkInterceptor : IInterceptor
{
	// ...
	// 其他代碼
	
    public void Intercept(IInvocation invocation)
    {
        MethodInfo method;
        try
        {
            method = invocation.MethodInvocationTarget;
        }
        catch
        {
            method = invocation.GetConcreteMethod();
        }

		// 判斷目前進入的方法是否帶有 UnitOfWork 特性
        var unitOfWorkAttr = _unitOfWorkOptions.GetUnitOfWorkAttributeOrNull(method);
        if (unitOfWorkAttr == null || unitOfWorkAttr.IsDisabled)
        {
        	// 沒有則直接執行該方法
            invocation.Proceed();
            return;
        }

        PerformUow(invocation, unitOfWorkAttr.CreateOptions());
    }
    
    // ...
    // 其他代碼
}
           

攔截器内部方法很簡單,如果是 UOW 方法則執行

PerformUow()

即可,在該方法内部則對方法類型進行了不同的判斷,同步與異步的處理方法是不一樣的。

// ...
// 其他代碼

private void PerformUow(IInvocation invocation, UnitOfWorkOptions options)
{
    // 判斷方法是同步還是異步方法,不同則執行不同的處理操作
    if (invocation.Method.IsAsync())
    {
        PerformAsyncUow(invocation, options);
    }
    else
    {
        PerformSyncUow(invocation, options);
    }
}

// ...
// 其他代碼
           

那麼我們就先來看一下同步方法:

// ...
// 其他代碼

private void PerformSyncUow(IInvocation invocation, UnitOfWorkOptions options)
{
    using (var uow = _unitOfWorkManager.Begin(options))
    {
        // 繼續執行
        invocation.Proceed();
        uow.Complete();
    }
}

// ...
// 其他代碼
           

同步方法針對 UOW 的操作十分簡單,直接使用

UnitOfWorkManager.Begin()

方法開啟一個事務,然後在内部執行原有方法的代碼,執行完成之後調用

Complete()

完成此次調用。

假如我擁有兩個應用服務類,他們都擁有

UnitOfWork

特性,然後我再一個 A 方法調用他們兩個 B 類的

Run()

方法,而 B類的内部也調用了C 的

Run()

方法,大體如下:

public class A
{
    private readonly B B;
    
    public A(B b)
    {
        B = b;
    }
    
    public void TestMethod()
    {
    	B.Run();
    }
}

internal class B
{
    private readonly C C;
    
    public B(C c)
    {
        C = c;
    }
    
    [UnitOfWork]
    public void Run()
    {
        // 資料庫操作
        C.Run();
        Console.WriteLine("B 的 Run 方法被調用.");
    }
}

internal class C
{
    [UnitOfWork]
    public void Run()
    {
        Console.WriteLine("C 的 Run 方法被調用.");
    }
}
           

然後在攔截器内部的執行過程就類似于下面這種:

internal class UnitOfWorkInterceptor
{
    public void TestMethod()
    {
        using (var uow = _unitOfWorkManager.Begin(options))
        {
            using(var uow2 = _unitOfWorkManager.Begin(options))
            {
                // C 方法的代碼
                Console.WriteLine("C 的 Run 方法被調用.");
                uow2.Complete();
            }
            
            // B 方法的代碼
            Console.WriteLine("B 的 Run 方法被調用.");
            uow.Complete();
        }
    }
}
           

兩個工作單元之間的調用會被嵌套在一個 using 語句塊之中,一旦任何代碼抛出了異常,都會導緻最外層的

uow.Complete()

不會被執行,而

Complete()

方法沒有執行,則會導緻 uow 對象被釋放的時候,

uow.Dispose()

内部檢測到

Complete()

沒有被調用,Abp 架構也會自己抛出異常。

是以 Abp 巧妙結合 Castle Dynamic 實作了 UOW 模式。

下面我們繼續看一下他是如何處理異步 UOW 方法的。

private void PerformAsyncUow(IInvocation invocation, UnitOfWorkOptions options)
{
    var uow = _unitOfWorkManager.Begin(options);

    try
    {
        invocation.Proceed();
    }
    catch
    {
        uow.Dispose();
        throw;
    }

    // 如果是無傳回值的異步方法
    if (invocation.Method.ReturnType == typeof(Task))
    {
        invocation.ReturnValue = InternalAsyncHelper.AwaitTaskWithPostActionAndFinally(
            (Task) invocation.ReturnValue,
            async () => await uow.CompleteAsync(),
            exception => uow.Dispose()
        );
    }
    // 有傳回值的異步方法處理
    else
    {
        invocation.ReturnValue = InternalAsyncHelper.CallAwaitTaskWithPostActionAndFinallyAndGetResult(
            invocation.Method.ReturnType.GenericTypeArguments[0],
            invocation.ReturnValue,
            async () => await uow.CompleteAsync(),
            exception => uow.Dispose()
        );
    }
}
           

相比而言,針對攔截到的異步方法處理起來更加複雜一點,但是總體思路仍然是一樣的,将這些工作單元的方法一層層地嵌套起來,依次執行就是核心。而在上面代碼裡面,一樣的首先使用

UnitOfManager.Begin()

獲得了一個新的工作單元之後,繼續執行原有的操作,下面則主要是通過内部的

InternalAsyncHelper

封裝的兩個輔助方法來確定等待原有任務執行完成之後,再執行

CompleteAsync()

方法。

我們可以來看一下這個内部類的實作:

// 異步無傳回值處理
public static async Task AwaitTaskWithPostActionAndFinally(Task actualReturnValue, Func<Task> postAction, Action<Exception> finalAction)
{
    Exception exception = null;

    try
    {
        // 等待原有任務執行完成
        await actualReturnValue;
        // 執行 CompleteAsync() 表示本工作單元已經順利執行
        await postAction();
    }
    // 捕獲異常
    catch (Exception ex)
    {
        exception = ex;
        throw;
    }
    finally
    {
        // 不論是否抛出異常,都調用之前傳入的 uow.Dispose() 方法
        finalAction(exception);
    }
}

// 原理基本同上,隻是多了一個傳回值
public static async Task<T> AwaitTaskWithPostActionAndFinallyAndGetResult<T>(Task<T> actualReturnValue, Func<Task> postAction, Action<Exception> finalAction)
{
    Exception exception = null;

    try
    {
        var result = await actualReturnValue;
        await postAction();
        return result;
    }
    catch (Exception ex)
    {
        exception = ex;
        throw;
    }
    finally
    {
        finalAction(exception);
    }
}

// 異步有傳回值處理
public static object CallAwaitTaskWithPostActionAndFinallyAndGetResult(Type taskReturnType, object actualReturnValue, Func<Task> action, Action<Exception> finalAction)
{
    // 這裡通過反射擷取到 AwaitTaskWithPostActionAndFinallyAndGetResult 方法,并調用。
    return typeof (InternalAsyncHelper)
        .GetMethod("AwaitTaskWithPostActionAndFinallyAndGetResult", BindingFlags.Public | BindingFlags.Static)
        .MakeGenericMethod(taskReturnType)
        .Invoke(null, new object[] { actualReturnValue, action, finalAction });
}
           

并不複雜,以上即是攔截器所做的操作。

2.2 工作單元管理器

通過上文我們可以看到一個工作單元是通過

IUnitOfWorkManager.Begin()

拿到的,那

IUnitOfWorkManager

又是個什麼東西呢?

根據字面意思我們大概知道應該類似于管理 UOW 的東西,它其實隻有兩個作用。第一,擷取目前處于激活狀态的工作單元,什麼叫激活狀态我們後面再講。第二個作用就是我們之前看到的,可以通過

Begin()

方法來建立一個新的工作單元。

IUnitOfWorkManager

在 Abp 架構初始化的時候就被注入了,其預設實作為

UnitOfWorkManager

,其核心方法就是

Begin()

public IUnitOfWorkCompleteHandle Begin(UnitOfWorkOptions options)
{
    // 如果沒有傳入 UOW 參數,則填充一個預設的參數
    options.FillDefaultsForNonProvidedOptions(_defaultOptions);
	
    // 擷取目前的外部工作單元
    var outerUow = _currentUnitOfWorkProvider.Current;

    // 如果已經存在有外部工作單元,則直接建構一個内部工作單元
    if (options.Scope == TransactionScopeOption.Required && outerUow != null)
    {
        return new InnerUnitOfWorkCompleteHandle();
    }

    // 不存在外部工作單元,則從 IOC 容器當中擷取一個新的出來
    var uow = _iocResolver.Resolve<IUnitOfWork>();

    // 綁定外部工作單元的事件
    uow.Completed += (sender, args) =>
    {
        _currentUnitOfWorkProvider.Current = null;
    };

    uow.Failed += (sender, args) =>
    {
        _currentUnitOfWorkProvider.Current = null;
    };

    uow.Disposed += (sender, args) =>
    {
        _iocResolver.Release(uow);
    };

    // 設定過濾器
    if (outerUow != null)
    {
        options.FillOuterUowFiltersForNonProvidedOptions(outerUow.Filters.ToList());
    }

    uow.Begin(options);

    // 綁定租戶 ID
    if (outerUow != null)
    {
        uow.SetTenantId(outerUow.GetTenantId(), false);
    }

    // 設定目前的外部工作單元為剛剛初始化的工作單元
    _currentUnitOfWorkProvider.Current = uow;

    return uow;
}
           

可以看到

Begin()

方法傳回的是一個類型為

IUnitOfWorkCompleteHandle

的東西,轉到其定義:

/// <summary>
/// Used to complete a unit of work.
/// This interface can not be injected or directly used.
/// Use <see cref="IUnitOfWorkManager"/> instead.
/// </summary>
public interface IUnitOfWorkCompleteHandle : IDisposable
{
    /// <summary>
    /// Completes this unit of work.
    /// It saves all changes and commit transaction if exists.
    /// </summary>
    void Complete();

    /// <summary>
    /// Completes this unit of work.
    /// It saves all changes and commit transaction if exists.
    /// </summary>
    Task CompleteAsync();
}
           

他隻有兩個方法,都是辨別 UOW 處于已經完成的狀态。

在方法上面右鍵檢視其實作可以看到有這樣一種依賴關系:

[Abp 源碼分析]六、工作單元的實作

IUnitOfWorkCompleteHandle

有兩個實作,一個是

InnerUnitOfWorkCompleteHandle

還有一個則是

IUnitOfWork

接口。

首先看一下

InnerUnitOfWorkCompleteHandle

internal class InnerUnitOfWorkCompleteHandle : IUnitOfWorkCompleteHandle
{
    public const string DidNotCallCompleteMethodExceptionMessage = "Did not call Complete method of a unit of work.";

    private volatile bool _isCompleteCalled;
    private volatile bool _isDisposed;

    public void Complete()
    {
        _isCompleteCalled = true;
    }

    public Task CompleteAsync()
    {
        _isCompleteCalled = true;
        return Task.FromResult(0);
    }

    public void Dispose()
    {
        if (_isDisposed)
        {
            return;
        }

        _isDisposed = true;

        if (!_isCompleteCalled)
        {
            if (HasException())
            {
                return;
            }

            throw new AbpException(DidNotCallCompleteMethodExceptionMessage);
        }
    }
    
    private static bool HasException()
    {
        try
        {
            return Marshal.GetExceptionCode() != 0;
        }
        catch (Exception)
        {
            return false;
        }
    }
}
           

代碼很簡單,調用

Complete()/CompleteAsync()

會将 _isCompleteCalled 置為 true,然後在

Dispose()

方法内會進行檢測,為 faslse 的話直接抛出異常。可以看到在

InnerUnitOfWorkCompleteHandle

内部并不會真正地調用

DbContext.SaveChanges()

進行資料儲存。

那麼誰才是真正進行資料庫操作的工作單元呢?

答案就是之前在

IUnitOfWorkManager.Begin()

裡面,可以看到在建立 UOW 對象的時候,他在内部進行了一個判斷,如果不存在外部工作單元的情況下才會建立

InnerUnitOfWorkCompleteHandle

對象,否則是解析的一個

IUnitOfWork

對象。

也就是說你可以想象有以下代碼:

public void TestUowMethod()
{
    using(var outerUOW = Manager.Begin())  // 這裡傳回的是 IOC 解析出的 IUnitOfWork
    {
        OperationOuter();
        using(var innerUOW1 = Manager.Begin())  // 内部 UOW
        {
            Operation1();
            using(var innerUOW2 = Manager.Begin())  // 内部 UOW
            {
                Operation2();
                Complete();
            }
            Complete();
        }
        Complete();
    }
}
           

當代碼執行的時候,如同俄羅斯套娃,從内部依次到外部執行,内部工作單元僅會在調用 Complete 方法的時候将 completed 标記為 true,但一旦操作抛出異常,

Complete()

無法得到執行,則會直接抛出異常,中斷外層代碼執行。

在 ABP 内部針對 EF Core 架構實作了一套 UOW,其繼承自

UnitOfWorkBase

,而在

UnitOfWorkBase

内部有部分針對接口

IActiveUnitOfWork

的實作,同時由于

IUnifOfWork

也實作了

IUnitOfWorkCompleteHandle

接口,是以在

Begin()

方法處能夠向上轉型。

2.3 抽象工作單元

根據上圖可以知道 Abp 預設實作了一個

UnitOfWorkBase

作為工作單元的抽象基類,他主要的屬性就是 Id 與 Outer 屬性。

public abstract class UnitOfWorkBase : IUnitOfWork
{
    public string Id { get; }

    [DoNotWire]
    public IUnitOfWork Outer { get; set; }
}
           

這裡的 Id 是使用的 Guid 生成的,用于辨別每個工作單元。

而 Outer 則是目前 UOW 對象的引用對象。

這裡重點說一下 Outer 是哪兒來的,Outer 他的值是在之前的

UnitOfWorkManager.Begin()

裡面的

_currentUnitOfWorkProvider.Current = uow;

進行設定的,

_currentUnitOfWorkProvider

的實作在

AsyncLocalCurrentUnitOfWorkProvider

内部,其作用是維護一個 UOW 鍊,確定目前的工作單元始終是最新的,這裡的代碼原本是使用

CallContext

實作的,現在已經換為

AsyncLocal<T>

了。

public class AsyncLocalCurrentUnitOfWorkProvider : ICurrentUnitOfWorkProvider, ITransientDependency
{
    /// <inheritdoc />
    [DoNotWire]
    public IUnitOfWork Current
    {
        get { return GetCurrentUow(); }
        set { SetCurrentUow(value); }
    }

    public ILogger Logger { get; set; }

    private static readonly AsyncLocal<LocalUowWrapper> AsyncLocalUow = new AsyncLocal<LocalUowWrapper>();

    public AsyncLocalCurrentUnitOfWorkProvider()
    {
        Logger = NullLogger.Instance;
    }

    private static IUnitOfWork GetCurrentUow()
    {
        var uow = AsyncLocalUow.Value?.UnitOfWork;
        if (uow == null)
        {
            return null;
        }

        if (uow.IsDisposed)
        {
            AsyncLocalUow.Value = null;
            return null;
        }

        return uow;
    }

    private static void SetCurrentUow(IUnitOfWork value)
    {
        lock (AsyncLocalUow)
        {
            if (value == null)
            {
                if (AsyncLocalUow.Value == null)
                {
                    return;
                }

                if (AsyncLocalUow.Value.UnitOfWork?.Outer == null)
                {
                    AsyncLocalUow.Value.UnitOfWork = null;
                    AsyncLocalUow.Value = null;
                    return;
                }

                AsyncLocalUow.Value.UnitOfWork = AsyncLocalUow.Value.UnitOfWork.Outer;
            }
            else
            {
                if (AsyncLocalUow.Value?.UnitOfWork == null)
                {
                    if (AsyncLocalUow.Value != null)
                    {
                        AsyncLocalUow.Value.UnitOfWork = value;
                    }

                    AsyncLocalUow.Value = new LocalUowWrapper(value);
                    return;
                }

                value.Outer = AsyncLocalUow.Value.UnitOfWork;
                AsyncLocalUow.Value.UnitOfWork = value;
            }
        }
    }

    private class LocalUowWrapper
    {
        public IUnitOfWork UnitOfWork { get; set; }

        public LocalUowWrapper(IUnitOfWork unitOfWork)
        {
            UnitOfWork = unitOfWork;
        }
    }
}
           

繼續往下看,在

UnitOfWorkBase

的裡面也是有個

Complete()

CompleteAsync()

方法的。

protected abstract void CompleteUow();

/// <inheritdoc/>
public void Complete()
{
    // 判斷是否重複完成
    PreventMultipleComplete();
    try
    {
        CompleteUow();
        _succeed = true;
        OnCompleted();
    }
    catch (Exception ex)
    {
        _exception = ex;
        throw;
    }
}
           

這裡的

CompleteUow()

仍然隻是一個抽象方法,具體的實作在具體的通路層裡面。

2.4 EF Core 實際處理

Abp 針對 EF Core 的 UOW 實作是

EfCoreUnitOfWork

,代碼如下:

protected override void BeginUow()
{
    if (Options.IsTransactional == true)
    {
        _transactionStrategy.InitOptions(Options);
    }
}

public override void SaveChanges()
{
    foreach (var dbContext in GetAllActiveDbContexts())
    {
        SaveChangesInDbContext(dbContext);
    }
}

public override async Task SaveChangesAsync()
{
    // 周遊所有激活的 DbContext
    foreach (var dbContext in GetAllActiveDbContexts())
    {
        await SaveChangesInDbContextAsync(dbContext);
    }
}

protected override void CompleteUow()
{
    SaveChanges();
    CommitTransaction();
}

protected override async Task CompleteUowAsync()
{
    await SaveChangesAsync();
    CommitTransaction();
}

private void CommitTransaction()
{
    if (Options.IsTransactional == true)
    {
        _transactionStrategy.Commit();
    }
}

public IReadOnlyList<DbContext> GetAllActiveDbContexts()
{
    return ActiveDbContexts.Values.ToImmutableList();
}
           

根本就是周遊

DbContext

調用其

SaveChanges()

來送出所有資料庫更改。

餘下更加詳細的東西會放在 《七、倉儲與 Entity Framework Core》 當中說明的。

3.常見問題

3.1 待寫

4.點此跳轉到總目錄