[Abp vNext 源碼分析] - 12. 後臺作業與後臺工作者

来源:https://www.cnblogs.com/myzony/archive/2019/10/25/11738721.html
-Advertisement-
Play Games

一、簡要說明 文章信息: 基於的 ABP vNext 版本:1.0.0 創作日期:2019 年 10 月 24 日晚 更新日期:暫無 ABP vNext 提供了後臺工作者和後臺作業的支持,基本實現與原來的 ABP 框架類似,並且 ABP vNext 還提供了對 HangFire 和 RabbitMQ ...


一、簡要說明

文章信息:

基於的 ABP vNext 版本:1.0.0

創作日期:2019 年 10 月 24 日晚

更新日期:暫無

ABP vNext 提供了後臺工作者和後臺作業的支持,基本實現與原來的 ABP 框架類似,並且 ABP vNext 還提供了對 HangFire 和 RabbitMQ 的後臺作業集成。開發人員在使用這些第三方庫的時候,基本就是開箱即用,不需要做其他複雜的配置。

後臺作業在系統開發的過程當中,是比較常用的功能。因為總是有一些長耗時的任務,而這些任務我們不是立即響應的,例如 Excel 文檔導入、批量發送簡訊通知等。

後臺工作者 的話,ABP vNext 的實現就是在 CLR 的 Timer 之上封裝了一層,周期性地執行用戶邏輯。ABP vNext 預設提供的 後臺任務管理器,就是在後臺工作者基礎之上進行的封裝。

涉及到後臺任務、後臺工作者的模塊一共有 6 個,它們分別是:

  • Volo.Abp.Threading :提供了一些常用的線程組件,其中 AbpTimer 就是在裡面實現的。
  • Volo.Abp.BackgroundWorkers :後臺工作者的定義和實現。

  • Volo.Abp.BackgroundJobs.Abstractions :後臺任務的一些共有定義。
  • Volo.Abp.BackgroundJobs :預設的後臺任務管理器實現。
  • Volo.Abp.BackgroundJobs.HangFire :基於 Hangfire 庫實現的後臺任務管理器。
  • Volo.Abp.BackgroundJobs.RabbitMQ : 基於 RabbitMQ 實現的後臺任務管理器。

二、源碼分析

線程組件

健壯的計時器

CLR 為我們提供了多種計時器,我們一般使用的是 System.Threading.Timer ,它是基於 CLR 線程池的一個周期計時器,會根據我們配置的 Period (周期) 定時執行。在 CLR 線程池中,所有的 Timer 只有 1 個線程為其服務。這個線程直到下一個計時器的觸發時間,當下一個 Timer 對象到期時,這個線程就會將 Timer 的回調方法通過 ThreadPool.QueueUserWorkItem() 扔到線程池去執行。

不過這帶來了一個問題,即你的回調方法執行時間超過了計時器的周期,那麼就會造成上一個任務還沒執行完成又開始執行新的任務。

解決這個方法其實很簡單,即啟動之後,將周期設置為 Timeout.Infinite ,這樣只會執行一次。當回調方法執行完成之後,就設置 dueTime 參數說明下次執行要等待多久,並且周期還是 Timeout.Infinite

ABP vNext 已經為我們提供了健壯的計時器,該類型的定義是 AbpTimer ,在內部用到了 volatile 關鍵字和 Monitor 實現 條件變數模式 解決多線程環境下的問題。

public class AbpTimer : ITransientDependency
{
    // 回調事件。
    public event EventHandler Elapsed;

    // 執行周期。
    public int Period { get; set; }

    // 定時器啟動之後就開始運行,預設為 Fasle。
    public bool RunOnStart { get; set; }

    // 日誌記錄器。
    public ILogger<AbpTimer> Logger { get; set; }

    private readonly Timer _taskTimer;
    // 定時器是否在執行任務,預設為 false。
    private volatile bool _performingTasks;
    // 定時器的運行狀態,預設為 false。
    private volatile bool _isRunning;

    public AbpTimer()
    {
        Logger = NullLogger<AbpTimer>.Instance;

        // 回調函數是 TimerCallBack,執行周期為永不執行。
        _taskTimer = new Timer(TimerCallBack, null, Timeout.Infinite, Timeout.Infinite);
    }

    public void Start(CancellationToken cancellationToken = default)
    {
        // 如果傳遞的周期小於等於 0 ,則拋出異常。
        if (Period <= 0)
        {
            throw new AbpException("Period should be set before starting the timer!");
        }

        // 使用互斥鎖,保證線程安全。
        lock (_taskTimer)
        {
            // 如果啟動之後就需要馬上執行,則設置為 0,馬上執行任務,否則會等待 Period 毫秒之後再執行(1 個周期)。
            _taskTimer.Change(RunOnStart ? 0 : Period, Timeout.Infinite);
            // 定時器成功運行了。
            _isRunning = true;
        }
        // 釋放 _taskTimer 的互斥鎖。
    }

    public void Stop(CancellationToken cancellationToken = default)
    {
        // 使用互斥鎖。
        lock (_taskTimer)
        {
            // 將內部定時器設置為永不執行的狀態。
            _taskTimer.Change(Timeout.Infinite, Timeout.Infinite);

            // 檢測當前是否還有正在執行的任務,如果有則等待任務執行完成。
            while (_performingTasks)
            {
                // 臨時釋放鎖,阻塞當前線程。但是其他線程可以獲取 _timer 的互斥鎖。
                Monitor.Wait(_taskTimer);
            }

            // 需要表示停止狀態,所以標記狀態為 false。
            _isRunning = false;
        }
    }

    private void TimerCallBack(object state)
    {
        lock (_taskTimer)
        {
            // 如果有任務正在運行,或者內部定時器已經停止了,則不做任何事情。
            if (!_isRunning || _performingTasks)
            {
                return;
            }

            // 臨時停止內部定時器。
            _taskTimer.Change(Timeout.Infinite, Timeout.Infinite);
            // 表明馬上需要執行任務了。
            _performingTasks = true;
        }

        try
        {
            // 調用綁定的事件。
            Elapsed.InvokeSafely(this, new EventArgs());
        }
        catch
        {
            // 註意,這裡將會吞噬異常。
        }
        finally
        {
            lock (_taskTimer)
            {
                // 任務執行完成,更改狀態。
                _performingTasks = false;

                // 如果定時器還在運行,沒有被停止,則啟動下一個 Period 周期。
                if (_isRunning)
                {
                    _taskTimer.Change(Period, Timeout.Infinite);
                }

                // 解除因為釋放鎖而阻塞的線程。
                // 如果已經調用了 Stop,則會喚醒那個因為 Wait 阻塞的線程,就會使 _isRunning 置為 false。
                Monitor.Pulse(_taskTimer);
            }
        }
    }
}

這裡對 _performingTasks_isRunning 欄位設置為 volatile 防止指令重排和寄存器緩存。這是因為在 Stop 方法內部使用到的 _performingTasks 可能會被優化,所以將該欄位設置為了易失的。

IRunnable 介面

ABP vNext 為任務的啟動和停止,抽象了一個 IRunnable 介面。雖然描述說的是對線程的行為進行抽象,但千萬千萬不要手動調用 Thread.Abort() 。關於 Thread.Abort() 的壞處,這裡不再多加贅述,可以參考 這篇文章 的描述,或者搜索其他的相關文章。

public interface IRunnable
{
    // 啟動這個服務。
    Task StartAsync(CancellationToken cancellationToken = default);

    /// <summary>
    /// 停止這個服務。
    /// </summary>
    Task StopAsync(CancellationToken cancellationToken = default);
}

後臺工作者

模塊的構造

後臺工作者的模塊行為比較簡單,它定義了在應用程式初始化和銷毀時的行為。在初始化時,後臺工作者管理器 獲得所有 後臺工作者,並開始啟動它們。在銷毀時,後臺工作者管理器獲得所有後臺工作者,並開始停止他們,這樣才能夠做到優雅退出。

[DependsOn(
    typeof(AbpThreadingModule)
    )]
public class AbpBackgroundWorkersModule : AbpModule
{
    public override void OnApplicationInitialization(ApplicationInitializationContext context)
    {
        var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundWorkerOptions>>().Value;
        // 如果啟用了後臺工作者,那麼獲得後臺工作者管理器的實例,並調用 StartAsync 啟動所有後臺工作者。
        if (options.IsEnabled)
        {
            AsyncHelper.RunSync(
                () => context.ServiceProvider
                    .GetRequiredService<IBackgroundWorkerManager>()
                    .StartAsync()
            );
        }
    }

    public override void OnApplicationShutdown(ApplicationShutdownContext context)
    {
        var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundWorkerOptions>>().Value;
        // 如果啟用了後臺工作者,那麼獲得後臺工作者管理器的實例,並調用 StopAsync 停止所有後臺工作者。
        if (options.IsEnabled)
        {
            AsyncHelper.RunSync(
                () => context.ServiceProvider
                    .GetRequiredService<IBackgroundWorkerManager>()
                    .StopAsync()
            );
        }
    }
}

後臺工作者的定義

首先看看 IBackgroundWorker 介面的定義,是空的。不過繼承了 ISingletonDependency 介面,說明我們的每個後臺工作者都是 單例 的。

/// <summary>
/// 在後臺運行,執行某些任務的工作程式(線程)的介面定義。
/// </summary>
public interface IBackgroundWorker : IRunnable, ISingletonDependency
{

}

ABP vNext 為我們定義了一個抽象的後臺工作者類型 BackgroundWorkerBase,這個基類的設計目的是提供一些常用組件(和 ApplicationService 一樣)。

public abstract class BackgroundWorkerBase : IBackgroundWorker
{
    //TODO: Add UOW, Localization and other useful properties..?
    //TODO: 是否應該提供工作單元、本地化以及其他常用的屬性?

    public ILogger<BackgroundWorkerBase> Logger { protected get; set; }

    protected BackgroundWorkerBase()
    {
        Logger = NullLogger<BackgroundWorkerBase>.Instance;
    }
    
    public virtual Task StartAsync(CancellationToken cancellationToken = default)
    {
        Logger.LogDebug("Started background worker: " + ToString());
        return Task.CompletedTask;
    }

    public virtual Task StopAsync(CancellationToken cancellationToken = default)
    {
        Logger.LogDebug("Stopped background worker: " + ToString());
        return Task.CompletedTask;
    }

    public override string ToString()
    {
        return GetType().FullName;
    }
}

ABP vNext 內部只有一個預設的後臺工作者實現 PeriodicBackgroundWorkerBase。從名字上來看,意思是就是周期執行的後臺工作者,內部就是用的 AbpTimer 來實現,ABP vNext 將其包裝起來是為了實現統一的模式(後臺工作者)。

public abstract class PeriodicBackgroundWorkerBase : BackgroundWorkerBase
{
    protected readonly AbpTimer Timer;

    // 也就意味著子類必須在其構造函數,指定 timer 的執行周期。
    protected PeriodicBackgroundWorkerBase(AbpTimer timer)
    {
        Timer = timer;
        Timer.Elapsed += Timer_Elapsed;
    }

    // 啟動後臺工作者。
    public override async Task StartAsync(CancellationToken cancellationToken = default)
    {
        await base.StartAsync(cancellationToken);
        Timer.Start(cancellationToken);
    }

    // 停止後臺工作者。
    public override async Task StopAsync(CancellationToken cancellationToken = default)
    {
        Timer.Stop(cancellationToken);
        await base.StopAsync(cancellationToken);
    }
    
    // Timer 關聯的周期事件,之所以不直接掛載 DoWork,是為了捕獲異常。
    private void Timer_Elapsed(object sender, System.EventArgs e)
    {
        try
        {
            DoWork();
        }
        catch (Exception ex)
        {
            Logger.LogException(ex);
        }
    }

    // 你要周期執行的任務。
    protected abstract void DoWork();
}

我們如果要實現自己的後臺工作者,只需要繼承該類,實現 DoWork() 方法即可。

public class TestWorker : PeriodicBackgroundWorkerBase
{
    public TestWorker(AbpTimer timer) : base(timer)
    {
        // 每五分鐘執行一次。
        timer.Period = (int)TimeSpan.FromMinutes(5).TotalMilliseconds;
    }

    protected override void DoWork()
    {
        Console.WriteLine("後臺工作者被執行了。");
    }
}

然後在我們自己模塊的 OnPreApplicationInitialization() 方法內解析出後臺作業管理器(IBackgroundWorkerManager),調用它的 Add() 方法,將我們定義的 TestWorker 添加到管理器當中即可。

後臺工作者管理器

所有的後臺工作者都是通過 IBackgroundWorkerManager 進行管理的,它提供了 StartAsync()StopAsync()Add() 方法。前面兩個方法就是 IRunnable 介面定義的,後臺工作者管理器直接集成了該介面,後面的 Add() 方法就是用來動態添加我們的後臺工作者。

後臺工作者管理器的預設實現是 BackgroundWorkerManager 類型,它內部做的事情很簡單,就是維護一個後臺工作者集合。每當調用 StartAsync()StopAsync() 方法的時候,都從這個集合遍歷後臺工作者,執行他們的啟動和停止方法。

這裡值得註意的一點是,當我們調用 Add() 方法添加了一個後臺工作者之後,後臺工作者管理器就會啟動這個後臺工作者。

public class BackgroundWorkerManager : IBackgroundWorkerManager, ISingletonDependency, IDisposable
{
    protected bool IsRunning { get; private set; }

    private bool _isDisposed;

    private readonly List<IBackgroundWorker> _backgroundWorkers;

    public BackgroundWorkerManager()
    {
        _backgroundWorkers = new List<IBackgroundWorker>();
    }

    public virtual void Add(IBackgroundWorker worker)
    {
        _backgroundWorkers.Add(worker);

        // 如果當前後臺工作者管理器還處於運行狀態,則調用工作者的 StartAsync() 方法啟動。
        if (IsRunning)
        {
            AsyncHelper.RunSync(
                () => worker.StartAsync()
            );
        }
    }

    public virtual void Dispose()
    {
        if (_isDisposed)
        {
            return;
        }

        _isDisposed = true;

        //TODO: ???
    }

    // 啟動,則遍歷集合啟動。
    public virtual async Task StartAsync(CancellationToken cancellationToken = default)
    {
        IsRunning = true;

        foreach (var worker in _backgroundWorkers)
        {
            await worker.StartAsync(cancellationToken);
        }
    }

    // 停止, 則遍歷集合停止。
    public virtual async Task StopAsync(CancellationToken cancellationToken = default)
    {
        IsRunning = false;

        foreach (var worker in _backgroundWorkers)
        {
            await worker.StopAsync(cancellationToken);
        }
    }
}

上述代碼其實存在一個問題,即後臺工作者被釋放以後,是否還能執行 Add() 操作。參考我 之前的文章 ,其實當對象被釋放之後,就應該拋出 ObjectDisposedException 異常。

後臺作業

比起後臺工作者,我們執行一次性任務的時候,一般會使用後臺作業進行處理。比起只能設置固定周期的 PeriodicBackgroundWorkerBase ,集成了 Hangfire 的後臺作業管理器,能夠讓我們使用 Cron 表達式,更加靈活地設置任務的執行周期。

模塊的構造

關於後臺作業的模塊,我們需要說道兩處。第一處是位於 Volo.Abp.BackgroundJobs.Abstractions 項目的 AbpBackgroundJobsAbstractionsModule ,第二出則是位於 Volo.Abp.BackgroundJobs 項目的 AbpBackgroundJobsModule

AbpBackgroundJobsAbstractionsModule 的主要行為是將符合條件的後臺作業,添加到 AbpBackgroundJobOptions 配置當中,以便後續進行使用。

public override void PreConfigureServices(ServiceConfigurationContext context)
{
    RegisterJobs(context.Services);
}

private static void RegisterJobs(IServiceCollection services)
{
    var jobTypes = new List<Type>();

    // 如果註冊的類型符合 IBackgroundJob<> 泛型,則添加到集合當中。
    services.OnRegistred(context =>
    {
        if (ReflectionHelper.IsAssignableToGenericType(context.ImplementationType, typeof(IBackgroundJob<>)))
        {
            jobTypes.Add(context.ImplementationType);
        }
    });

    services.Configure<AbpBackgroundJobOptions>(options =>
    {
        // 將數據賦值給配置類。
        foreach (var jobType in jobTypes)
        {
            options.AddJob(jobType);
        }
    });
}

Volo.Abp.BackgroundJobs 內部是 ABP vNext 為我們提供的 預設後臺作業管理器,這個後臺作業管理器 本質上是一個後臺工作者

這個後臺工作者會周期性(取決於 AbpBackgroundJobWorkerOptions.JobPollPeriod 值,預設為 5 秒種)地從 IBackgroundJobStore 撈出一堆後臺任務,並且在後臺執行。至於每次執行多少個後臺任務,這也取決於 AbpBackgroundJobWorkerOptions.MaxJobFetchCount 的值,預設值是 1000 個。

註意:

這裡的 Options 類是 AbpBackgroundJobWorkerOptions,別和 AbpBackgroundWorkerOptions 混淆了。

所以在 AbpBackgroundJobsModule 模塊裡面,只做了一件事情,就是將負責後臺作業的後臺工作者,添加到後臺工作者管理器種,並開始周期性地執行。

public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
    var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundJobOptions>>().Value;
    if (options.IsJobExecutionEnabled)
    {
        // 獲得後臺工作者管理器,並將負責後臺作業的工作者添加進去。
        context.ServiceProvider
            .GetRequiredService<IBackgroundWorkerManager>()
            .Add(context.ServiceProvider.GetRequiredService<IBackgroundJobWorker>()
            );
    }
}

後臺作業的定義

在上一節裡面看到,只要是實現 IBackgroundJob<TArgs> 類型的都視為一個後臺作業。這個後臺作業介面,只定義了一個行為,那就是執行(Execute(TArgs))。這裡的 TArgs 泛型作為執行後臺作業時,需要傳遞的參數類型。

// 因為是傳入的參數,所以泛型參數是逆變的。
public interface IBackgroundJob<in TArgs>
{
    void Execute(TArgs args);
}

檢查源碼,發現 ABP vNext 的郵箱模塊定義了一個郵件發送任務 BackgroundEmailSendingJob,它的實現大概如下。

public class BackgroundEmailSendingJob : BackgroundJob<BackgroundEmailSendingJobArgs>, ITransientDependency
{
    // ...
    
    public override void Execute(BackgroundEmailSendingJobArgs args)
    {
        AsyncHelper.RunSync(() => EmailSender.SendAsync(args.To, args.Subject, args.Body, args.IsBodyHtml));
    }
}

後臺作業管理器

後臺作業都是通過一個後臺作業管理器(IBackgroundJobManager)進行管理的,這個介面定義了一個入隊方法(EnqueueAsync()),註意,我們的後臺作業在入隊後,不是馬上執行的。

說一下這個入隊處理邏輯:

  1. 首先我們會通過參數的類型,獲取到任務的名稱。(假設任務上面沒有標註 BackgroundJobNameAttribute 特性,那麼任務的名稱就是參數類型的 FullName 。)
  2. 構造一個 BackgroundJobInfo 對象。
  3. 通過 IBackgroundJobStore 持久化任務信息。
public virtual async Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null)
{
    // 獲取任務名稱。
    var jobName = BackgroundJobNameAttribute.GetName<TArgs>();
    var jobId = await EnqueueAsync(jobName, args, priority, delay);
    return jobId.ToString();
}

protected virtual async Task<Guid> EnqueueAsync(string jobName, object args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null)
{
    var jobInfo = new BackgroundJobInfo
    {
        Id = GuidGenerator.Create(),
        JobName = jobName,
        // 通過序列化器,序列化參數值,方便存儲。這裡內部其實使用的是 JSON.NET 進行序列化。
        JobArgs = Serializer.Serialize(args),
        Priority = priority,
        CreationTime = Clock.Now,
        NextTryTime = Clock.Now
    };

    // 如果任務有執行延遲,則任務的初始執行時間要加上這個延遲。
    if (delay.HasValue)
    {
        jobInfo.NextTryTime = Clock.Now.Add(delay.Value);
    }

    // 持久化任務信息,方便後面執行後臺作業的工作者能夠取到。
    await Store.InsertAsync(jobInfo);

    return jobInfo.Id;
}

BackgroundJobNameAttribute 相關的方法:

public static string GetName<TJobArgs>()
{
    return GetName(typeof(TJobArgs));
}

public static string GetName([NotNull] Type jobArgsType)
{
    Check.NotNull(jobArgsType, nameof(jobArgsType));

    // 判斷參數類型上面是否標註了特性,並且特性實現了 IBackgroundJobNameProvider 介面。
    return jobArgsType
                .GetCustomAttributes(true)
                .OfType<IBackgroundJobNameProvider>()
                .FirstOrDefault()
                ?.Name
        // 拿不到名字,則使用類型的 FullName。
            ?? jobArgsType.FullName;
}

後臺作業的存儲

後臺作業的存儲預設是放在記憶體的,這點可以從 InMemoryBackgroundJobStore 類型實現看出來。在它的內部使用了一個並行字典,通過作業的 Guid 與作業進行關聯綁定。

除了記憶體實現,在 Volo.Abp.BackgroundJobs.Domain 模塊還有一個 BackgroundJobStore 實現,基本套路與 SettingStore 一樣,都是存儲到資料庫裡面。

public class BackgroundJobStore : IBackgroundJobStore, ITransientDependency
{
    protected IBackgroundJobRepository BackgroundJobRepository { get; }

    // ... 
    
    public BackgroundJobInfo Find(Guid jobId)
    {
        return ObjectMapper.Map<BackgroundJobRecord, BackgroundJobInfo>(
            BackgroundJobRepository.Find(jobId)
        );
    }
    
    // ...

    public void Insert(BackgroundJobInfo jobInfo)
    {
        BackgroundJobRepository.Insert(
            ObjectMapper.Map<BackgroundJobInfo, BackgroundJobRecord>(jobInfo)
        );
    }

    // ...
}

後臺作業的執行

預設的後臺作業管理器是通過一個後臺工作者來執行後臺任務的,這個實現叫做 BackgroundJobWorker,這個後臺工作者的生命周期也是單例的。後臺作業的具體執行邏輯裡面,涉及到了以下幾個類型的交互。

類型 作用
AbpBackgroundJobOptions 提供每個後臺任務的配置信息,包括任務的類型、參數類型、任務名稱數據。
AbpBackgroundJobWorkerOptions 提供後臺作業工作者的配置信息,例如每個周期 最大執行的作業數量、後臺
工作者的 執行周期、作業執行 超時時間 等。
BackgroundJobConfiguration 後臺任務的配置信息,作用是將持久化存儲的作業信息與運行時類型進行綁定
和實例化,以便 ABP vNext 來執行具體的任務。
IBackgroundJobExecuter 後臺作業的執行器,當我們從持久化存儲獲取到後臺作業信息時,將會通過
這個執行器來執行具體的後臺作業。
IBackgroundJobSerializer 後臺作業序列化器,用於後臺作業持久化時進行序列化的工具,預設採用的
是 JSON.NET 進行實現。
JobExecutionContext 執行器在執行後臺作業時,是通過這個上下文參數進行執行的,在這個上下
文內部,包含了後臺作業的具體類型、後臺作業的參數值。
IBackgroundJobStore 前面已經講過了,這個是用於後臺作業的持久化存儲,預設實現是存儲在記憶體。
BackgroundJobPriority 後臺作業的執行優先順序定義,ABP vNext 在執行後臺任務時,會根據任務的優
先級進行排序,以便在後面執行的時候優先順序高的任務先執行。

我們來按照邏輯順序走一遍它的實現,首先後臺作業的執行工作者會從持久化存儲內,獲取 MaxJobFetchCount 個任務用於執行。從持久化存儲獲取後臺作業信息(BackgroundJobInfo),是由 IBackgroundJobStore 提供的。

var store = scope.ServiceProvider.GetRequiredService<IBackgroundJobStore>();

var waitingJobs = store.GetWaitingJobs(WorkerOptions.MaxJobFetchCount);

// 不存在任何後臺作業,則直接結束本次調用。
if (!waitingJobs.Any())
{
    return;
}

InMemoryBackgroundJobStore 的相關實現:

public List<BackgroundJobInfo> GetWaitingJobs(int maxResultCount)
{
    return _jobs.Values
        .Where(t => !t.IsAbandoned && t.NextTryTime <= Clock.Now)
        .OrderByDescending(t => t.Priority)
        .ThenBy(t => t.TryCount)
        .ThenBy(t => t.NextTryTime)
        .Take(maxResultCount)
        .ToList();
}

上面的代碼可以看出來,首先排除 被放棄的任務 ,包含達到執行時間的任務,然後根據任務的優先順序從高到低進行排序。重試次數少的優先執行,預計執行時間越早的越先執行。最後從這些數據中,篩選出 maxResultCount 結果並返回。

說到這裡,我們來看一下這個 NextTryTime 是如何被計算出來的?回想起最開始的後臺作業管理器,我們在添加一個後臺任務的時候,就會設置這個後臺任務的 預計執行時間。第一個任務被添加到執行隊列中時,它的值一般是 Clock.Now ,也就是它被添加到隊列的時間。

不過 ABP vNext 為了讓那些經常執行失敗的任務,有比較低的優先順序再執行,就在每次任務執行失敗之後,會將 NextTryTime 的值指數級進行增加。這塊代碼可以在 CalculateNextTryTime 裡面看到,也就是說某個任務的執行 失敗次數越高,那麼它下一次的預期執行時間就會越遠。

protected virtual DateTime? CalculateNextTryTime(BackgroundJobInfo jobInfo, IClock clock)
{
    // 一般來說,這個 DefaultWaitFactor 因數的值是 2.0 。
    var nextWaitDuration = WorkerOptions.DefaultFirstWaitDuration * (Math.Pow(WorkerOptions.DefaultWaitFactor, jobInfo.TryCount - 1)); // 同執行失敗的次數進行掛鉤。
    var nextTryDate = jobInfo.LastTryTime?.AddSeconds(nextWaitDuration) ??
                        clock.Now.AddSeconds(nextWaitDuration);

    if (nextTryDate.Subtract(jobInfo.CreationTime).TotalSeconds > WorkerOptions.DefaultTimeout)
    {
        return null;
    }

    return nextTryDate;
}

當預期的執行時間都超過 DefaultTimeout 的超時時間時(預設為 2 天),說明這個任務確實沒救了,就不要再執行了。

我們之前說到,從 IBackgroundJobStore 拿到了需要執行的後臺任務信息集合,接下來我們就要開始執行後臺任務了。

foreach (var jobInfo in waitingJobs)
{
    jobInfo.TryCount++;
    jobInfo.LastTryTime = clock.Now;

    try
    {
        // 根據任務名稱獲取任務的配置參數。
        var jobConfiguration = JobOptions.GetJob(jobInfo.JobName);
        // 根據配置裡面存儲的任務類型,將參數值進行反序列化。
        var jobArgs = serializer.Deserialize(jobInfo.JobArgs, jobConfiguration.ArgsType);
        // 構造一個新的執行上下文,讓執行器執行任務。
        var context = new JobExecutionContext(scope.ServiceProvider, jobConfiguration.JobType, jobArgs);

        try
        {
            jobExecuter.Execute(context);

            // 如果任務執行成功則刪除該任務。            
            store.Delete(jobInfo.Id);
        }
        catch (BackgroundJobExecutionException)
        {
            // 發生任務執行失敗異常時,根據指定的公式計算下一次的執行時間。
            var nextTryTime = CalculateNextTryTime(jobInfo, clock);

            if (nextTryTime.HasValue)
            {
                jobInfo.NextTryTime = nextTryTime.Value;
            }
            else
            {
                // 超過超時時間的時候,公式計算函數返回 null,該任務置為廢棄任務。
                jobInfo.IsAbandoned = true;
            }

            TryUpdate(store, jobInfo);
        }
    }
    catch (Exception ex)
    {
        // 執行過程中,產生了未知異常,設置為廢棄任務,並列印日誌。
        Logger.LogException(ex);
        jobInfo.IsAbandoned = true;
        TryUpdate(store, jobInfo);
    }
}

執行後臺任務的時候基本分為 5 步,它們分別是:

  1. 獲得任務關聯的配置參數,預設不用提供,因為在之前模塊初始化的時候就已經配置了(你也可以顯式指定)。
  2. 通過之前存儲的配置參數,將參數值反序列化出來,構造具體實例。
  3. 構造一個執行上下文。
  4. 後臺任務執行器執行具體的後臺任務。
  5. 成功則刪除任務,失敗則更新任務下次的執行狀態。

至於執行器裡面的真正執行操作,你都拿到了參數值和任務類型了。就可以通過類型用 IoC 獲取後臺任務對象的實例,然後通過反射匹配方法簽名,在實例上調用這個方法傳入參數即可。

public virtual void Execute(JobExecutionContext context)
{
    // 構造具體的後臺作業實例對象。
    var job = context.ServiceProvider.GetService(context.JobType);
    if (job == null)
    {
        throw new AbpException("The job type is not registered to DI: " + context.JobType);
    }

    // 獲得需要執行的方法簽名。
    var jobExecuteMethod = context.JobType.GetMethod(nameof(IBackgroundJob<object>.Execute));
    if (jobExecuteMethod == null)
    {
        throw new AbpException($"Given job type does not implement {typeof(IBackgroundJob<>).Name}. The job type was: " + context.JobType);
    }

    try
    {
        // 直接通過 MethodInfo 的 Invoke 方法調用,傳入具體的實例對象和參數值即可。
        jobExecuteMethod.Invoke(job, new[] { context.JobArgs });
    }
    catch (Exception ex)
    {
        Logger.LogException(ex);

        // 如果是執行方法內的異常,則包裝進行處理,然後拋出。
        throw new BackgroundJobExecutionException("A background job execution is failed. See inner exception for details.", ex)
        {
            JobType = context.JobType.AssemblyQualifiedName,
            JobArgs = context.JobArgs
        };
    }
}

集成 Hangfire

ABP vNext 對於 Hangfire 的集成代碼分佈在 Volo.Abp.HangFireVolo.Abp.BackgroundJobs.HangFire 模塊內部,前者是在模塊配置裡面,調用 Hangfire 庫的相關方法,註入組件到 IoC 容器當中。後者則是對後臺作業進行了適配處理,替換了預設的 IBackgroundJobManager 實現。

AbpHangfireModule 模塊內部,通過工廠創建出來一個 BackgroudJobServer 實例,並將它的生命周期與應用程式的生命周期進行綁定,以便進行銷毀處理。

public class AbpHangfireModule : AbpModule
{
    private BackgroundJobServer _backgroundJobServer;

    public override void ConfigureServices(ServiceConfigurationContext context)
    {
        context.Services.AddHangfire(configuration =>
        {
            context.Services.ExecutePreConfiguredActions(configuration);
        });
    }

    public override void OnApplicationInitialization(ApplicationInitializationContext context)
    {
        var options = context.ServiceProvider.GetRequiredService<IOptions<AbpHangfireOptions>>().Value;
        _backgroundJobServer = options.BackgroundJobServerFactory.Invoke(context.ServiceProvider);
    }

    public override void OnApplicationShutdown(ApplicationShutdownContext context)
    {
        //TODO: ABP may provide two methods for application shutdown: OnPreApplicationShutdown & OnApplicationShutdown
        _backgroundJobServer.SendStop();
        _backgroundJobServer.Dispose();
    }
}

我們直奔主題,看一下基於 Hangfire 的後臺作業管理器是怎麼實現的。

public class HangfireBackgroundJobManager : IBackgroundJobManager, ITransientDependency
{
    public Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal,
        TimeSpan? delay = null)
    {
        // 如果沒有延遲參數,則直接通過 Enqueue() 方法扔進執行對了。
        if (!delay.HasValue)
        {
            return Task.FromResult(
                BackgroundJob.Enqueue<HangfireJobExecutionAdapter<TArgs>>(
                    adapter => adapter.Execute(args)
                )
            );
        }
        else
        {
            return Task.FromResult(
                BackgroundJob.Schedule<HangfireJobExecutionAdapter<TArgs>>(
                    adapter => adapter.Execute(args),
                    delay.Value
                )
            );
        }
    }

上述代碼中使用 HangfireJobExecutionAdapter 進行了一個適配操作,因為 Hangfire 要將一個後臺任務扔進隊列執行,不是用 TArgs 就能解決的。

轉到這個適配器定義,提供了一個 Execute(TArgs) 方法,當被添加到 Hangfire 隊列執行的時候。實際 Hangfire 會調用適配器的 Excetue(TArgs) 方法,然後內部還是使用的 IBackgroundJobExecuter 來執行具體定義的任務。

public class HangfireJobExecutionAdapter<TArgs>
{
    protected AbpBackgroundJobOptions Options { get; }
    protected IServiceScopeFactory ServiceScopeFactory { get; }
    protected IBackgroundJobExecuter JobExecuter { get; }

    public HangfireJobExecutionAdapter(
        IOptions<AbpBackgroundJobOptions> options, 
        IBackgroundJobExecuter jobExecuter, 
        IServiceScopeFactory serviceScopeFactory)
    {
        JobExecuter = jobExecuter;
        ServiceScopeFactory = serviceScopeFactory;
        Options = options.Value;
    }

    public void Execute(TArgs args)
    {
        using (var scope = ServiceScopeFactory.CreateScope())
        {
            var jobType = Options.GetJob(typeof(TArgs)).JobType;
            var context = new JobExecutionContext(scope.ServiceProvider, jobType, args);
            JobExecuter.Execute(context);
        }
    }
}

集成 RabbitMQ

基於 RabbitMQ 的後臺作業實現,我想放在分散式事件匯流排裡面,對其一起進行講解。

三、總結

ABP vNext 為我們提供了多種後臺作業管理器的實現,你可以根據自己的需求選用不同的後臺作業管理器,又或者是自己動手造輪子。

需要看其他的 ABP vNext 相關文章?點擊我 即可跳轉到總目錄。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • CPF(暫時命名)(Cross platform framework),模仿WPF的框架,支持NETCore的跨平臺UI框架,暫時不夠完善,只用於測試,暫時只支持Windows和Mac。支持數據綁定,CSS,動畫。。。 可能有人會說,不是有個開源的Avalonia ,我試過,不過他的性能不行,啟動速 ...
  • 使用LinqDB查詢Sqlite資料庫數據,不管是大數據還是少量的數據,感覺特別耗時,尤其是首次查詢 一個含有2.7萬條數據的數據表 首次查詢: 查詢2.7萬條數據,耗時1s 查詢8條數據,也要耗時750ms 二次查詢: 查詢2.7萬條數據,耗時475ms 查詢指定的1條數據,耗時73ms 我們來嘗 ...
  • 當使用Sql語句查詢資料庫,返回DataSet數據集。 DataSet轉化為數據列表,可以通過映射方式直接返回Entity數據列表 新建一個特性類,用於資料庫列表列名稱映射 LinqToDB提供了一個ColumnAttribute,但是通過反射不方便獲取ColumnAttribute獲取Custom ...
  • 一、簡介 1.break語句:迴圈-迴圈中斷並停止,退出當前迴圈; 流程圖: 2.continue:迴圈-迴圈下一次迭代繼續執行。 流程圖: 執行過程:立即結果本次迴圈,判斷迴圈條件,如果成立,則進入下一次迴圈,否則退出本次迴圈。 舉例:我編寫的代碼時候,上個廁所,回來繼續編寫代碼。 二、實例 練習 ...
  • 一、簡介 嵌套迴圈:while、for和do...while迴圈使用一個或者多個嵌套。 二、實例 輸出九九乘法表(迴圈的嵌套) 輸出結果 ...
  • 場景 DevExpress的TreeList怎樣給樹節點設置圖標: https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/102745542 在上面這篇博客中有具體的應用。 註: 博客主頁:https://blog.csdn.net/ba ...
  • 一、簡介 在for和while的迴圈是在頭部寫測試迴圈條件,而do....while的迴圈是在迴圈的尾部寫測試條件 do...while的迴圈和while的類似,但是do...while的最少執行一次迴圈體。 二、語法 do{ 迴圈體; } while{ 條件; } 三、執行過程 程式先執行do{} ...
  • 場景 DevExpress的TreeList怎樣設置數據源使其顯示成單列樹形結構: https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/102742426 在上面設置TreeList的數據源並設置其為樹形結構後,如何給樹形結構設置圖標 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...