天天看點

[Abp vNext 源碼分析] - 12. 背景作業與背景工作者

一、簡要說明

文章資訊:

基于的 ABP vNext 版本:1.0.0

創作日期:2019 年 10 月 24 日晚

更新日期:暫無

ABP vNext 提供了背景工作者和背景作業的支援,基本實作與原來的 ABP 架構類似,并且 ABP vNext 還提供了對 HangFire 和 RabbitMQ 的背景作業內建。開發人員在使用這些第三方庫的時候,基本就是開箱即用,不需要做其他複雜的配置。

背景作業在系統開發的過程當中,是比較常用的功能。因為總是有一些長耗時的任務,而這些任務我們不是立即響應的,例如 Excel 文檔導入、批量發送短信通知等。

背景工作者 的話,ABP vNext 的實作就是在 CLR 的

Timer

之上封裝了一層,周期性地執行使用者邏輯。ABP vNext 預設提供的 背景任務管理器,就是在背景工作者基礎之上進行的封裝。

涉及到背景任務、背景工作者的子產品一共有 6 個,它們分别是:

  • Volo.Abp.Threading :提供了一些常用的線程元件,其中

    AbpTimer

    就是在裡面實作的。
  • Volo.Abp.BackgroundWorkers :背景工作者的定義和實作。
  • Volo.Abp.BackgroundJobs.Abstractions :背景任務的一些共有定義。
  • Volo.Abp.BackgroundJobs :預設的背景任務管理器實作。
  • Volo.Abp.BackgroundJobs.HangFire :基于 Hangfire 庫實作的背景任務管理器。
  • Volo.Abp.BackgroundJobs.RabbitMQ : 基于 RabbitMQ 實作的背景任務管理器。

二、源碼分析

線程元件

健壯的計時器

CLR 為我們提供了多種計時器,我們一般使用的是

System.Threading.Timer

,它是基于 CLR 線程池的一個周期計時器,會根據我們配置的

Period

(周期) 定時執行。在 CLR 線程池中,所有的

Timer

隻有 1 個線程為其服務。這個線程直到下一個計時器的觸發時間,當下一個

Timer

對象到期時,這個線程就會将

Timer

的回調方法通過

ThreadPool.QueueUserWorkItem()

扔到線程池去執行。

不過這帶來了一個問題,即你的回調方法執行時間超過了計時器的周期,那麼就會造成上一個任務還沒執行完成又開始執行新的任務。

解決這個方法其實很簡單,即啟動之後,将周期設定為

Timeout.Infinite

,這樣隻會執行一次。當回調方法執行完成之後,就設定

dueTime

參數說明下次執行要等待多久,并且周期還是

Timeout.Infinite

ABP vNext 已經為我們提供了健壯的計時器,該類型的定義是

AbpTimer

,在内部用到了

volatile

關鍵字和

Monitor

實作 條件變量模式 解決多線程環境下的問題。

public class AbpTimer : ITransientDependency
{
    // 回調事件。
    public event EventHandler Elapsed;

    // 執行周期。
    public int Period { get; set; }

    // 定時器啟動之後就開始運作,預設為 Fasle。
    public bool RunOnStart { get; set; }

    // 日志記錄器。
    public ILogger<AbpTimer> Logger { get; set; }

    private readonly Timer _taskTimer;
    // 定時器是否在執行任務,預設為 false。
    private volatile bool _performingTasks;
    // 定時器的運作狀态,預設為 false。
    private volatile bool _isRunning;

    public AbpTimer()
    {
        Logger = NullLogger<AbpTimer>.Instance;

        // 回調函數是 TimerCallBack,執行周期為永不執行。
        _taskTimer = new Timer(TimerCallBack, null, Timeout.Infinite, Timeout.Infinite);
    }

    public void Start(CancellationToken cancellationToken = default)
    {
        // 如果傳遞的周期小于等于 0 ,則抛出異常。
        if (Period <= 0)
        {
            throw new AbpException("Period should be set before starting the timer!");
        }

        // 使用互斥鎖,保證線程安全。
        lock (_taskTimer)
        {
            // 如果啟動之後就需要馬上執行,則設定為 0,馬上執行任務,否則會等待 Period 毫秒之後再執行(1 個周期)。
            _taskTimer.Change(RunOnStart ? 0 : Period, Timeout.Infinite);
            // 定時器成功運作了。
            _isRunning = true;
        }
        // 釋放 _taskTimer 的互斥鎖。
    }

    public void Stop(CancellationToken cancellationToken = default)
    {
        // 使用互斥鎖。
        lock (_taskTimer)
        {
            // 将内部定時器設定為永不執行的狀态。
            _taskTimer.Change(Timeout.Infinite, Timeout.Infinite);

            // 檢測目前是否還有正在執行的任務,如果有則等待任務執行完成。
            while (_performingTasks)
            {
                // 臨時釋放鎖,阻塞目前線程。但是其他線程可以擷取 _timer 的互斥鎖。
                Monitor.Wait(_taskTimer);
            }

            // 需要表示停止狀态,是以标記狀态為 false。
            _isRunning = false;
        }
    }

    private void TimerCallBack(object state)
    {
        lock (_taskTimer)
        {
            // 如果有任務正在運作,或者内部定時器已經停止了,則不做任何事情。
            if (!_isRunning || _performingTasks)
            {
                return;
            }

            // 臨時停止内部定時器。
            _taskTimer.Change(Timeout.Infinite, Timeout.Infinite);
            // 表明馬上需要執行任務了。
            _performingTasks = true;
        }

        try
        {
            // 調用綁定的事件。
            Elapsed.InvokeSafely(this, new EventArgs());
        }
        catch
        {
            // 注意,這裡将會吞噬異常。
        }
        finally
        {
            lock (_taskTimer)
            {
                // 任務執行完成,更改狀态。
                _performingTasks = false;

                // 如果定時器還在運作,沒有被停止,則啟動下一個 Period 周期。
                if (_isRunning)
                {
                    _taskTimer.Change(Period, Timeout.Infinite);
                }

                // 解除因為釋放鎖而阻塞的線程。
                // 如果已經調用了 Stop,則會喚醒那個因為 Wait 阻塞的線程,就會使 _isRunning 置為 false。
                Monitor.Pulse(_taskTimer);
            }
        }
    }
}
           

這裡對

_performingTasks

_isRunning

字段設定為

volatile

防止指令重排和寄存器緩存。這是因為在

Stop

方法内部使用到的

_performingTasks

可能會被優化,是以将該字段設定為了易失的。

IRunnable

接口

ABP vNext 為任務的啟動和停止,抽象了一個

IRunnable

接口。雖然描述說的是對線程的行為進行抽象,但千萬千萬不要手動調用

Thread.Abort()

。關于

Thread.Abort()

的壞處,這裡不再多加贅述,可以參考 這篇文章 的描述,或者搜尋其他的相關文章。

public interface IRunnable
{
    // 啟動這個服務。
    Task StartAsync(CancellationToken cancellationToken = default);

    /// <summary>
    /// 停止這個服務。
    /// </summary>
    Task StopAsync(CancellationToken cancellationToken = default);
}
           

背景工作者

子產品的構造

背景工作者的子產品行為比較簡單,它定義了在應用程式初始化和銷毀時的行為。在初始化時,背景工作者管理器 獲得所有 背景工作者,并開始啟動它們。在銷毀時,背景工作者管理器獲得所有背景工作者,并開始停止他們,這樣才能夠做到優雅退出。

[DependsOn(
    typeof(AbpThreadingModule)
    )]
public class AbpBackgroundWorkersModule : AbpModule
{
    public override void OnApplicationInitialization(ApplicationInitializationContext context)
    {
        var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundWorkerOptions>>().Value;
        // 如果啟用了背景工作者,那麼獲得背景工作者管理器的執行個體,并調用 StartAsync 啟動所有背景工作者。
        if (options.IsEnabled)
        {
            AsyncHelper.RunSync(
                () => context.ServiceProvider
                    .GetRequiredService<IBackgroundWorkerManager>()
                    .StartAsync()
            );
        }
    }

    public override void OnApplicationShutdown(ApplicationShutdownContext context)
    {
        var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundWorkerOptions>>().Value;
        // 如果啟用了背景工作者,那麼獲得背景工作者管理器的執行個體,并調用 StopAsync 停止所有背景工作者。
        if (options.IsEnabled)
        {
            AsyncHelper.RunSync(
                () => context.ServiceProvider
                    .GetRequiredService<IBackgroundWorkerManager>()
                    .StopAsync()
            );
        }
    }
}
           

背景工作者的定義

首先看看

IBackgroundWorker

接口的定義,是空的。不過繼承了

ISingletonDependency

接口,說明我們的每個背景工作者都是 單例 的。

/// <summary>
/// 在背景運作,執行某些任務的工作程式(線程)的接口定義。
/// </summary>
public interface IBackgroundWorker : IRunnable, ISingletonDependency
{

}
           

ABP vNext 為我們定義了一個抽象的背景工作者類型

BackgroundWorkerBase

,這個基類的設計目的是提供一些常用元件(和

ApplicationService

一樣)。

public abstract class BackgroundWorkerBase : IBackgroundWorker
{
    //TODO: Add UOW, Localization and other useful properties..?
    //TODO: 是否應該提供工作單元、本地化以及其他常用的屬性?

    public ILogger<BackgroundWorkerBase> Logger { protected get; set; }

    protected BackgroundWorkerBase()
    {
        Logger = NullLogger<BackgroundWorkerBase>.Instance;
    }
    
    public virtual Task StartAsync(CancellationToken cancellationToken = default)
    {
        Logger.LogDebug("Started background worker: " + ToString());
        return Task.CompletedTask;
    }

    public virtual Task StopAsync(CancellationToken cancellationToken = default)
    {
        Logger.LogDebug("Stopped background worker: " + ToString());
        return Task.CompletedTask;
    }

    public override string ToString()
    {
        return GetType().FullName;
    }
}
           

ABP vNext 内部隻有一個預設的背景工作者實作

PeriodicBackgroundWorkerBase

。從名字上來看,意思是就是周期執行的背景工作者,内部就是用的

AbpTimer

來實作,ABP vNext 将其包裝起來是為了實作統一的模式(背景工作者)。

public abstract class PeriodicBackgroundWorkerBase : BackgroundWorkerBase
{
    protected readonly AbpTimer Timer;

    // 也就意味着子類必須在其構造函數,指定 timer 的執行周期。
    protected PeriodicBackgroundWorkerBase(AbpTimer timer)
    {
        Timer = timer;
        Timer.Elapsed += Timer_Elapsed;
    }

    // 啟動背景工作者。
    public override async Task StartAsync(CancellationToken cancellationToken = default)
    {
        await base.StartAsync(cancellationToken);
        Timer.Start(cancellationToken);
    }

    // 停止背景工作者。
    public override async Task StopAsync(CancellationToken cancellationToken = default)
    {
        Timer.Stop(cancellationToken);
        await base.StopAsync(cancellationToken);
    }
    
    // Timer 關聯的周期事件,之是以不直接挂載 DoWork,是為了捕獲異常。
    private void Timer_Elapsed(object sender, System.EventArgs e)
    {
        try
        {
            DoWork();
        }
        catch (Exception ex)
        {
            Logger.LogException(ex);
        }
    }

    // 你要周期執行的任務。
    protected abstract void DoWork();
}
           

我們如果要實作自己的背景工作者,隻需要繼承該類,實作

DoWork()

方法即可。

public class TestWorker : PeriodicBackgroundWorkerBase
{
    public TestWorker(AbpTimer timer) : base(timer)
    {
        // 每五分鐘執行一次。
        timer.Period = (int)TimeSpan.FromMinutes(5).TotalMilliseconds;
    }

    protected override void DoWork()
    {
        Console.WriteLine("背景工作者被執行了。");
    }
}
           

然後在我們自己子產品的

OnPreApplicationInitialization()

方法内解析出背景作業管理器(

IBackgroundWorkerManager

),調用它的

Add()

方法,将我們定義的

TestWorker

添加到管理器當中即可。

背景工作者管理器

所有的背景工作者都是通過

IBackgroundWorkerManager

進行管理的,它提供了

StartAsync()

StopAsync()

Add()

方法。前面兩個方法就是

IRunnable

接口定義的,背景工作者管理器直接內建了該接口,後面的

Add()

方法就是用來動态添加我們的背景工作者。

背景工作者管理器的預設實作是

BackgroundWorkerManager

類型,它内部做的事情很簡單,就是維護一個背景工作者集合。每當調用

StartAsync()

StopAsync()

方法的時候,都從這個集合周遊背景工作者,執行他們的啟動和停止方法。

這裡值得注意的一點是,當我們調用

Add()

方法添加了一個背景工作者之後,背景工作者管理器就會啟動這個背景工作者。

public class BackgroundWorkerManager : IBackgroundWorkerManager, ISingletonDependency, IDisposable
{
    protected bool IsRunning { get; private set; }

    private bool _isDisposed;

    private readonly List<IBackgroundWorker> _backgroundWorkers;

    public BackgroundWorkerManager()
    {
        _backgroundWorkers = new List<IBackgroundWorker>();
    }

    public virtual void Add(IBackgroundWorker worker)
    {
        _backgroundWorkers.Add(worker);

        // 如果目前背景工作者管理器還處于運作狀态,則調用工作者的 StartAsync() 方法啟動。
        if (IsRunning)
        {
            AsyncHelper.RunSync(
                () => worker.StartAsync()
            );
        }
    }

    public virtual void Dispose()
    {
        if (_isDisposed)
        {
            return;
        }

        _isDisposed = true;

        //TODO: ???
    }

    // 啟動,則周遊集合啟動。
    public virtual async Task StartAsync(CancellationToken cancellationToken = default)
    {
        IsRunning = true;

        foreach (var worker in _backgroundWorkers)
        {
            await worker.StartAsync(cancellationToken);
        }
    }

    // 停止, 則周遊集合停止。
    public virtual async Task StopAsync(CancellationToken cancellationToken = default)
    {
        IsRunning = false;

        foreach (var worker in _backgroundWorkers)
        {
            await worker.StopAsync(cancellationToken);
        }
    }
}
           

上述代碼其實存在一個問題,即背景工作者被釋放以後,是否還能執行

Add()

操作。參考我 之前的文章 ,其實當對象被釋放之後,就應該抛出

ObjectDisposedException

異常。

背景作業

比起背景工作者,我們執行一次性任務的時候,一般會使用背景作業進行處理。比起隻能設定固定周期的

PeriodicBackgroundWorkerBase

,內建了 Hangfire 的背景作業管理器,能夠讓我們使用 Cron 表達式,更加靈活地設定任務的執行周期。

關于背景作業的子產品,我們需要說道兩處。第一處是位于 Volo.Abp.BackgroundJobs.Abstractions 項目的

AbpBackgroundJobsAbstractionsModule

,第二出則是位于 Volo.Abp.BackgroundJobs 項目的

AbpBackgroundJobsModule

AbpBackgroundJobsAbstractionsModule

的主要行為是将符合條件的背景作業,添加到

AbpBackgroundJobOptions

配置當中,以便後續進行使用。

public override void PreConfigureServices(ServiceConfigurationContext context)
{
    RegisterJobs(context.Services);
}

private static void RegisterJobs(IServiceCollection services)
{
    var jobTypes = new List<Type>();

    // 如果注冊的類型符合 IBackgroundJob<> 泛型,則添加到集合當中。
    services.OnRegistred(context =>
    {
        if (ReflectionHelper.IsAssignableToGenericType(context.ImplementationType, typeof(IBackgroundJob<>)))
        {
            jobTypes.Add(context.ImplementationType);
        }
    });

    services.Configure<AbpBackgroundJobOptions>(options =>
    {
        // 将資料指派給配置類。
        foreach (var jobType in jobTypes)
        {
            options.AddJob(jobType);
        }
    });
}
           

Volo.Abp.BackgroundJobs 内部是 ABP vNext 為我們提供的 預設背景作業管理器,這個背景作業管理器 本質上是一個背景工作者。

這個背景工作者會周期性(取決于

AbpBackgroundJobWorkerOptions.JobPollPeriod

值,預設為 5 秒種)地從

IBackgroundJobStore

撈出一堆背景任務,并且在背景執行。至于每次執行多少個背景任務,這也取決于

AbpBackgroundJobWorkerOptions.MaxJobFetchCount

的值,預設值是 1000 個。

注意:

這裡的 Options 類是

AbpBackgroundJobWorkerOptions

,别和

AbpBackgroundWorkerOptions

混淆了。

是以在

AbpBackgroundJobsModule

子產品裡面,隻做了一件事情,就是将負責背景作業的背景工作者,添加到背景工作者管理器種,并開始周期性地執行。

public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
    var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundJobOptions>>().Value;
    if (options.IsJobExecutionEnabled)
    {
        // 獲得背景工作者管理器,并将負責背景作業的工作者添加進去。
        context.ServiceProvider
            .GetRequiredService<IBackgroundWorkerManager>()
            .Add(context.ServiceProvider.GetRequiredService<IBackgroundJobWorker>()
            );
    }
}
           

背景作業的定義

在上一節裡面看到,隻要是實作

IBackgroundJob<TArgs>

類型的都視為一個背景作業。這個背景作業接口,隻定義了一個行為,那就是執行(

Execute(TArgs)

)。這裡的

TArgs

泛型作為執行背景作業時,需要傳遞的參數類型。

// 因為是傳入的參數,是以泛型參數是逆變的。
public interface IBackgroundJob<in TArgs>
{
    void Execute(TArgs args);
}
           

檢查源碼,發現 ABP vNext 的郵箱子產品定義了一個郵件發送任務

BackgroundEmailSendingJob

,它的實作大概如下。

public class BackgroundEmailSendingJob : BackgroundJob<BackgroundEmailSendingJobArgs>, ITransientDependency
{
    // ...
    
    public override void Execute(BackgroundEmailSendingJobArgs args)
    {
        AsyncHelper.RunSync(() => EmailSender.SendAsync(args.To, args.Subject, args.Body, args.IsBodyHtml));
    }
}
           

背景作業管理器

背景作業都是通過一個背景作業管理器(

IBackgroundJobManager

)進行管理的,這個接口定義了一個入隊方法(

EnqueueAsync()

),注意,我們的背景作業在入隊後,不是馬上執行的。

說一下這個入隊處理邏輯:

  1. 首先我們會通過參數的類型,擷取到任務的名稱。(假設任務上面沒有标注

    BackgroundJobNameAttribute

    特性,那麼任務的名稱就是參數類型的

    FullName

    。)
  2. 構造一個

    BackgroundJobInfo

    對象。
  3. 通過

    IBackgroundJobStore

    持久化任務資訊。
public virtual async Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null)
{
    // 擷取任務名稱。
    var jobName = BackgroundJobNameAttribute.GetName<TArgs>();
    var jobId = await EnqueueAsync(jobName, args, priority, delay);
    return jobId.ToString();
}

protected virtual async Task<Guid> EnqueueAsync(string jobName, object args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null)
{
    var jobInfo = new BackgroundJobInfo
    {
        Id = GuidGenerator.Create(),
        JobName = jobName,
        // 通過序列化器,序列化參數值,友善存儲。這裡内部其實使用的是 JSON.NET 進行序列化。
        JobArgs = Serializer.Serialize(args),
        Priority = priority,
        CreationTime = Clock.Now,
        NextTryTime = Clock.Now
    };

    // 如果任務有執行延遲,則任務的初始執行時間要加上這個延遲。
    if (delay.HasValue)
    {
        jobInfo.NextTryTime = Clock.Now.Add(delay.Value);
    }

    // 持久化任務資訊,友善後面執行背景作業的工作者能夠取到。
    await Store.InsertAsync(jobInfo);

    return jobInfo.Id;
}
           

BackgroundJobNameAttribute

相關的方法:

public static string GetName<TJobArgs>()
{
    return GetName(typeof(TJobArgs));
}

public static string GetName([NotNull] Type jobArgsType)
{
    Check.NotNull(jobArgsType, nameof(jobArgsType));

    // 判斷參數類型上面是否标注了特性,并且特性實作了 IBackgroundJobNameProvider 接口。
    return jobArgsType
                .GetCustomAttributes(true)
                .OfType<IBackgroundJobNameProvider>()
                .FirstOrDefault()
                ?.Name
        // 拿不到名字,則使用類型的 FullName。
            ?? jobArgsType.FullName;
}
           

背景作業的存儲

背景作業的存儲預設是放在記憶體的,這點可以從

InMemoryBackgroundJobStore

類型實作看出來。在它的内部使用了一個并行字典,通過作業的 Guid 與作業進行關聯綁定。

除了記憶體實作,在 Volo.Abp.BackgroundJobs.Domain 子產品還有一個

BackgroundJobStore

實作,基本套路與

SettingStore

一樣,都是存儲到資料庫裡面。

public class BackgroundJobStore : IBackgroundJobStore, ITransientDependency
{
    protected IBackgroundJobRepository BackgroundJobRepository { get; }

    // ... 
    
    public BackgroundJobInfo Find(Guid jobId)
    {
        return ObjectMapper.Map<BackgroundJobRecord, BackgroundJobInfo>(
            BackgroundJobRepository.Find(jobId)
        );
    }
    
    // ...

    public void Insert(BackgroundJobInfo jobInfo)
    {
        BackgroundJobRepository.Insert(
            ObjectMapper.Map<BackgroundJobInfo, BackgroundJobRecord>(jobInfo)
        );
    }

    // ...
}
           

背景作業的執行

預設的背景作業管理器是通過一個背景工作者來執行背景任務的,這個實作叫做

BackgroundJobWorker

,這個背景工作者的生命周期也是單例的。背景作業的具體執行邏輯裡面,涉及到了以下幾個類型的互動。

類型 作用

AbpBackgroundJobOptions

提供每個背景任務的配置資訊,包括任務的類型、參數類型、任務名稱資料。

AbpBackgroundJobWorkerOptions

提供背景作業工作者的配置資訊,例如每個周期 最大執行的作業數量、背景

工作者的 執行周期、作業執行 逾時時間 等。

BackgroundJobConfiguration

背景任務的配置資訊,作用是将持久化存儲的作業資訊與運作時類型進行綁定

和執行個體化,以便 ABP vNext 來執行具體的任務。

IBackgroundJobExecuter

背景作業的執行器,當我們從持久化存儲擷取到背景作業資訊時,将會通過

這個執行器來執行具體的背景作業。

IBackgroundJobSerializer

背景作業序列化器,用于背景作業持久化時進行序列化的工具,預設采用的

是 JSON.NET 進行實作。

JobExecutionContext

執行器在執行背景作業時,是通過這個上下文參數進行執行的,在這個上下

文内部,包含了背景作業的具體類型、背景作業的參數值。

IBackgroundJobStore

前面已經講過了,這個是用于背景作業的持久化存儲,預設實作是存儲在記憶體。

BackgroundJobPriority

背景作業的執行優先級定義,ABP vNext 在執行背景任務時,會根據任務的優

先級進行排序,以便在後面執行的時候優先級高的任務先執行。

我們來按照邏輯順序走一遍它的實作,首先背景作業的執行工作者會從持久化存儲内,擷取

MaxJobFetchCount

個任務用于執行。從持久化存儲擷取背景作業資訊(

BackgroundJobInfo

),是由

IBackgroundJobStore

提供的。

var store = scope.ServiceProvider.GetRequiredService<IBackgroundJobStore>();

var waitingJobs = store.GetWaitingJobs(WorkerOptions.MaxJobFetchCount);

// 不存在任何背景作業,則直接結束本次調用。
if (!waitingJobs.Any())
{
    return;
}
           

InMemoryBackgroundJobStore

的相關實作:

public List<BackgroundJobInfo> GetWaitingJobs(int maxResultCount)
{
    return _jobs.Values
        .Where(t => !t.IsAbandoned && t.NextTryTime <= Clock.Now)
        .OrderByDescending(t => t.Priority)
        .ThenBy(t => t.TryCount)
        .ThenBy(t => t.NextTryTime)
        .Take(maxResultCount)
        .ToList();
}

           

上面的代碼可以看出來,首先排除 被放棄的任務 ,包含達到執行時間的任務,然後根據任務的優先級從高到低進行排序。重試次數少的優先執行,預計執行時間越早的越先執行。最後從這些資料中,篩選出

maxResultCount

結果并傳回。

說到這裡,我們來看一下這個

NextTryTime

是如何被計算出來的?回想起最開始的背景作業管理器,我們在添加一個背景任務的時候,就會設定這個背景任務的 預計執行時間。第一個任務被添加到執行隊列中時,它的值一般是

Clock.Now

,也就是它被添加到隊列的時間。

不過 ABP vNext 為了讓那些經常執行失敗的任務,有比較低的優先級再執行,就在每次任務執行失敗之後,會将

NextTryTime

的值指數級進行增加。這塊代碼可以在

CalculateNextTryTime

裡面看到,也就是說某個任務的執行 失敗次數越高,那麼它下一次的預期執行時間就會越遠。

protected virtual DateTime? CalculateNextTryTime(BackgroundJobInfo jobInfo, IClock clock)
{
    // 一般來說,這個 DefaultWaitFactor 因子的值是 2.0 。
    var nextWaitDuration = WorkerOptions.DefaultFirstWaitDuration * (Math.Pow(WorkerOptions.DefaultWaitFactor, jobInfo.TryCount - 1)); // 同執行失敗的次數進行挂鈎。
    var nextTryDate = jobInfo.LastTryTime?.AddSeconds(nextWaitDuration) ??
                        clock.Now.AddSeconds(nextWaitDuration);

    if (nextTryDate.Subtract(jobInfo.CreationTime).TotalSeconds > WorkerOptions.DefaultTimeout)
    {
        return null;
    }

    return nextTryDate;
}

           

當預期的執行時間都超過

DefaultTimeout

的逾時時間時(預設為 2 天),說明這個任務确實沒救了,就不要再執行了。

我們之前說到,從

IBackgroundJobStore

拿到了需要執行的背景任務資訊集合,接下來我們就要開始執行背景任務了。

foreach (var jobInfo in waitingJobs)
{
    jobInfo.TryCount++;
    jobInfo.LastTryTime = clock.Now;

    try
    {
        // 根據任務名稱擷取任務的配置參數。
        var jobConfiguration = JobOptions.GetJob(jobInfo.JobName);
        // 根據配置裡面存儲的任務類型,将參數值進行反序列化。
        var jobArgs = serializer.Deserialize(jobInfo.JobArgs, jobConfiguration.ArgsType);
        // 構造一個新的執行上下文,讓執行器執行任務。
        var context = new JobExecutionContext(scope.ServiceProvider, jobConfiguration.JobType, jobArgs);

        try
        {
            jobExecuter.Execute(context);

            // 如果任務執行成功則删除該任務。            
            store.Delete(jobInfo.Id);
        }
        catch (BackgroundJobExecutionException)
        {
            // 發生任務執行失敗異常時,根據指定的公式計算下一次的執行時間。
            var nextTryTime = CalculateNextTryTime(jobInfo, clock);

            if (nextTryTime.HasValue)
            {
                jobInfo.NextTryTime = nextTryTime.Value;
            }
            else
            {
                // 超過逾時時間的時候,公式計算函數傳回 null,該任務置為廢棄任務。
                jobInfo.IsAbandoned = true;
            }

            TryUpdate(store, jobInfo);
        }
    }
    catch (Exception ex)
    {
        // 執行過程中,産生了未知異常,設定為廢棄任務,并列印日志。
        Logger.LogException(ex);
        jobInfo.IsAbandoned = true;
        TryUpdate(store, jobInfo);
    }
}

           

執行背景任務的時候基本分為 5 步,它們分别是:

  1. 獲得任務關聯的配置參數,預設不用提供,因為在之前子產品初始化的時候就已經配置了(你也可以顯式指定)。
  2. 通過之前存儲的配置參數,将參數值反序列化出來,構造具體執行個體。
  3. 構造一個執行上下文。
  4. 背景任務執行器執行具體的背景任務。
  5. 成功則删除任務,失敗則更新任務下次的執行狀态。

至于執行器裡面的真正執行操作,你都拿到了參數值和任務類型了。就可以通過類型用 IoC 擷取背景任務對象的執行個體,然後通過反射比對方法簽名,在執行個體上調用這個方法傳入參數即可。

public virtual void Execute(JobExecutionContext context)
{
    // 構造具體的背景作業執行個體對象。
    var job = context.ServiceProvider.GetService(context.JobType);
    if (job == null)
    {
        throw new AbpException("The job type is not registered to DI: " + context.JobType);
    }

    // 獲得需要執行的方法簽名。
    var jobExecuteMethod = context.JobType.GetMethod(nameof(IBackgroundJob<object>.Execute));
    if (jobExecuteMethod == null)
    {
        throw new AbpException($"Given job type does not implement {typeof(IBackgroundJob<>).Name}. The job type was: " + context.JobType);
    }

    try
    {
        // 直接通過 MethodInfo 的 Invoke 方法調用,傳入具體的執行個體對象和參數值即可。
        jobExecuteMethod.Invoke(job, new[] { context.JobArgs });
    }
    catch (Exception ex)
    {
        Logger.LogException(ex);

        // 如果是執行方法内的異常,則包裝進行處理,然後抛出。
        throw new BackgroundJobExecutionException("A background job execution is failed. See inner exception for details.", ex)
        {
            JobType = context.JobType.AssemblyQualifiedName,
            JobArgs = context.JobArgs
        };
    }
}

           

內建 Hangfire

ABP vNext 對于 Hangfire 的內建代碼分布在 Volo.Abp.HangFire 和 Volo.Abp.BackgroundJobs.HangFire 子產品内部,前者是在子產品配置裡面,調用 Hangfire 庫的相關方法,注入元件到 IoC 容器當中。後者則是對背景作業進行了适配處理,替換了預設的

IBackgroundJobManager

實作。

AbpHangfireModule

子產品内部,通過工廠建立出來一個

BackgroudJobServer

執行個體,并将它的生命周期與應用程式的生命周期進行綁定,以便進行銷毀處理。

public class AbpHangfireModule : AbpModule
{
    private BackgroundJobServer _backgroundJobServer;

    public override void ConfigureServices(ServiceConfigurationContext context)
    {
        context.Services.AddHangfire(configuration =>
        {
            context.Services.ExecutePreConfiguredActions(configuration);
        });
    }

    public override void OnApplicationInitialization(ApplicationInitializationContext context)
    {
        var options = context.ServiceProvider.GetRequiredService<IOptions<AbpHangfireOptions>>().Value;
        _backgroundJobServer = options.BackgroundJobServerFactory.Invoke(context.ServiceProvider);
    }

    public override void OnApplicationShutdown(ApplicationShutdownContext context)
    {
        //TODO: ABP may provide two methods for application shutdown: OnPreApplicationShutdown & OnApplicationShutdown
        _backgroundJobServer.SendStop();
        _backgroundJobServer.Dispose();
    }
}

           

我們直奔主題,看一下基于 Hangfire 的背景作業管理器是怎麼實作的。

public class HangfireBackgroundJobManager : IBackgroundJobManager, ITransientDependency
{
    public Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal,
        TimeSpan? delay = null)
    {
        // 如果沒有延遲參數,則直接通過 Enqueue() 方法扔進執行對了。
        if (!delay.HasValue)
        {
            return Task.FromResult(
                BackgroundJob.Enqueue<HangfireJobExecutionAdapter<TArgs>>(
                    adapter => adapter.Execute(args)
                )
            );
        }
        else
        {
            return Task.FromResult(
                BackgroundJob.Schedule<HangfireJobExecutionAdapter<TArgs>>(
                    adapter => adapter.Execute(args),
                    delay.Value
                )
            );
        }
    }

           

上述代碼中使用

HangfireJobExecutionAdapter

進行了一個适配操作,因為 Hangfire 要将一個背景任務扔進隊列執行,不是用

TArgs

就能解決的。

轉到這個擴充卡定義,提供了一個

Execute(TArgs)

方法,當被添加到 Hangfire 隊列執行的時候。實際 Hangfire 會調用擴充卡的

Excetue(TArgs)

方法,然後内部還是使用的

IBackgroundJobExecuter

來執行具體定義的任務。

public class HangfireJobExecutionAdapter<TArgs>
{
    protected AbpBackgroundJobOptions Options { get; }
    protected IServiceScopeFactory ServiceScopeFactory { get; }
    protected IBackgroundJobExecuter JobExecuter { get; }

    public HangfireJobExecutionAdapter(
        IOptions<AbpBackgroundJobOptions> options, 
        IBackgroundJobExecuter jobExecuter, 
        IServiceScopeFactory serviceScopeFactory)
    {
        JobExecuter = jobExecuter;
        ServiceScopeFactory = serviceScopeFactory;
        Options = options.Value;
    }

    public void Execute(TArgs args)
    {
        using (var scope = ServiceScopeFactory.CreateScope())
        {
            var jobType = Options.GetJob(typeof(TArgs)).JobType;
            var context = new JobExecutionContext(scope.ServiceProvider, jobType, args);
            JobExecuter.Execute(context);
        }
    }
}

           

內建 RabbitMQ

基于 RabbitMQ 的背景作業實作,我想放在分布式事件總線裡面,對其一起進行講解。

三、總結

ABP vNext 為我們提供了多種背景作業管理器的實作,你可以根據自己的需求選用不同的背景作業管理器,又或者是自己動手造輪子。

需要看其他的 ABP vNext 相關文章?點選我 即可跳轉到總目錄。