一、簡要說明 文章信息: 基於的 ABP vNext 版本:1.0.0 創作日期:2019 年 10 月 24 日晚 更新日期:暫無 ABP vNext 提供了後臺工作者和後臺作業的支持,基本實現與原來的 ABP 框架類似,並且 ABP vNext 還提供了對 HangFire 和 RabbitMQ ...
一、簡要說明
文章信息:
基於的 ABP vNext 版本:1.0.0
創作日期:2019 年 10 月 24 日晚
更新日期:暫無
ABP vNext 提供了後臺工作者和後臺作業的支持,基本實現與原來的 ABP 框架類似,並且 ABP vNext 還提供了對 HangFire 和 RabbitMQ 的後臺作業集成。開發人員在使用這些第三方庫的時候,基本就是開箱即用,不需要做其他複雜的配置。
後臺作業在系統開發的過程當中,是比較常用的功能。因為總是有一些長耗時的任務,而這些任務我們不是立即響應的,例如 Excel 文檔導入、批量發送簡訊通知等。
後臺工作者 的話,ABP vNext 的實現就是在 CLR 的 Timer
之上封裝了一層,周期性地執行用戶邏輯。ABP vNext 預設提供的 後臺任務管理器,就是在後臺工作者基礎之上進行的封裝。
涉及到後臺任務、後臺工作者的模塊一共有 6 個,它們分別是:
- Volo.Abp.Threading :提供了一些常用的線程組件,其中
AbpTimer
就是在裡面實現的。 Volo.Abp.BackgroundWorkers :後臺工作者的定義和實現。
- Volo.Abp.BackgroundJobs.Abstractions :後臺任務的一些共有定義。
- Volo.Abp.BackgroundJobs :預設的後臺任務管理器實現。
- Volo.Abp.BackgroundJobs.HangFire :基於 Hangfire 庫實現的後臺任務管理器。
Volo.Abp.BackgroundJobs.RabbitMQ : 基於 RabbitMQ 實現的後臺任務管理器。
二、源碼分析
線程組件
健壯的計時器
CLR 為我們提供了多種計時器,我們一般使用的是 System.Threading.Timer
,它是基於 CLR 線程池的一個周期計時器,會根據我們配置的 Period
(周期) 定時執行。在 CLR 線程池中,所有的 Timer
只有 1 個線程為其服務。這個線程直到下一個計時器的觸發時間,當下一個 Timer
對象到期時,這個線程就會將 Timer
的回調方法通過 ThreadPool.QueueUserWorkItem()
扔到線程池去執行。
不過這帶來了一個問題,即你的回調方法執行時間超過了計時器的周期,那麼就會造成上一個任務還沒執行完成又開始執行新的任務。
解決這個方法其實很簡單,即啟動之後,將周期設置為 Timeout.Infinite
,這樣只會執行一次。當回調方法執行完成之後,就設置 dueTime
參數說明下次執行要等待多久,並且周期還是 Timeout.Infinite
。
ABP vNext 已經為我們提供了健壯的計時器,該類型的定義是 AbpTimer
,在內部用到了 volatile
關鍵字和 Monitor
實現 條件變數模式 解決多線程環境下的問題。
public class AbpTimer : ITransientDependency
{
// 回調事件。
public event EventHandler Elapsed;
// 執行周期。
public int Period { get; set; }
// 定時器啟動之後就開始運行,預設為 Fasle。
public bool RunOnStart { get; set; }
// 日誌記錄器。
public ILogger<AbpTimer> Logger { get; set; }
private readonly Timer _taskTimer;
// 定時器是否在執行任務,預設為 false。
private volatile bool _performingTasks;
// 定時器的運行狀態,預設為 false。
private volatile bool _isRunning;
public AbpTimer()
{
Logger = NullLogger<AbpTimer>.Instance;
// 回調函數是 TimerCallBack,執行周期為永不執行。
_taskTimer = new Timer(TimerCallBack, null, Timeout.Infinite, Timeout.Infinite);
}
public void Start(CancellationToken cancellationToken = default)
{
// 如果傳遞的周期小於等於 0 ,則拋出異常。
if (Period <= 0)
{
throw new AbpException("Period should be set before starting the timer!");
}
// 使用互斥鎖,保證線程安全。
lock (_taskTimer)
{
// 如果啟動之後就需要馬上執行,則設置為 0,馬上執行任務,否則會等待 Period 毫秒之後再執行(1 個周期)。
_taskTimer.Change(RunOnStart ? 0 : Period, Timeout.Infinite);
// 定時器成功運行了。
_isRunning = true;
}
// 釋放 _taskTimer 的互斥鎖。
}
public void Stop(CancellationToken cancellationToken = default)
{
// 使用互斥鎖。
lock (_taskTimer)
{
// 將內部定時器設置為永不執行的狀態。
_taskTimer.Change(Timeout.Infinite, Timeout.Infinite);
// 檢測當前是否還有正在執行的任務,如果有則等待任務執行完成。
while (_performingTasks)
{
// 臨時釋放鎖,阻塞當前線程。但是其他線程可以獲取 _timer 的互斥鎖。
Monitor.Wait(_taskTimer);
}
// 需要表示停止狀態,所以標記狀態為 false。
_isRunning = false;
}
}
private void TimerCallBack(object state)
{
lock (_taskTimer)
{
// 如果有任務正在運行,或者內部定時器已經停止了,則不做任何事情。
if (!_isRunning || _performingTasks)
{
return;
}
// 臨時停止內部定時器。
_taskTimer.Change(Timeout.Infinite, Timeout.Infinite);
// 表明馬上需要執行任務了。
_performingTasks = true;
}
try
{
// 調用綁定的事件。
Elapsed.InvokeSafely(this, new EventArgs());
}
catch
{
// 註意,這裡將會吞噬異常。
}
finally
{
lock (_taskTimer)
{
// 任務執行完成,更改狀態。
_performingTasks = false;
// 如果定時器還在運行,沒有被停止,則啟動下一個 Period 周期。
if (_isRunning)
{
_taskTimer.Change(Period, Timeout.Infinite);
}
// 解除因為釋放鎖而阻塞的線程。
// 如果已經調用了 Stop,則會喚醒那個因為 Wait 阻塞的線程,就會使 _isRunning 置為 false。
Monitor.Pulse(_taskTimer);
}
}
}
}
這裡對 _performingTasks
和 _isRunning
欄位設置為 volatile
防止指令重排和寄存器緩存。這是因為在 Stop
方法內部使用到的 _performingTasks
可能會被優化,所以將該欄位設置為了易失的。
IRunnable
介面
ABP vNext 為任務的啟動和停止,抽象了一個 IRunnable
介面。雖然描述說的是對線程的行為進行抽象,但千萬千萬不要手動調用 Thread.Abort()
。關於 Thread.Abort()
的壞處,這裡不再多加贅述,可以參考 這篇文章 的描述,或者搜索其他的相關文章。
public interface IRunnable
{
// 啟動這個服務。
Task StartAsync(CancellationToken cancellationToken = default);
/// <summary>
/// 停止這個服務。
/// </summary>
Task StopAsync(CancellationToken cancellationToken = default);
}
後臺工作者
模塊的構造
後臺工作者的模塊行為比較簡單,它定義了在應用程式初始化和銷毀時的行為。在初始化時,後臺工作者管理器 獲得所有 後臺工作者,並開始啟動它們。在銷毀時,後臺工作者管理器獲得所有後臺工作者,並開始停止他們,這樣才能夠做到優雅退出。
[DependsOn(
typeof(AbpThreadingModule)
)]
public class AbpBackgroundWorkersModule : AbpModule
{
public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundWorkerOptions>>().Value;
// 如果啟用了後臺工作者,那麼獲得後臺工作者管理器的實例,並調用 StartAsync 啟動所有後臺工作者。
if (options.IsEnabled)
{
AsyncHelper.RunSync(
() => context.ServiceProvider
.GetRequiredService<IBackgroundWorkerManager>()
.StartAsync()
);
}
}
public override void OnApplicationShutdown(ApplicationShutdownContext context)
{
var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundWorkerOptions>>().Value;
// 如果啟用了後臺工作者,那麼獲得後臺工作者管理器的實例,並調用 StopAsync 停止所有後臺工作者。
if (options.IsEnabled)
{
AsyncHelper.RunSync(
() => context.ServiceProvider
.GetRequiredService<IBackgroundWorkerManager>()
.StopAsync()
);
}
}
}
後臺工作者的定義
首先看看 IBackgroundWorker
介面的定義,是空的。不過繼承了 ISingletonDependency
介面,說明我們的每個後臺工作者都是 單例 的。
/// <summary>
/// 在後臺運行,執行某些任務的工作程式(線程)的介面定義。
/// </summary>
public interface IBackgroundWorker : IRunnable, ISingletonDependency
{
}
ABP vNext 為我們定義了一個抽象的後臺工作者類型 BackgroundWorkerBase
,這個基類的設計目的是提供一些常用組件(和 ApplicationService
一樣)。
public abstract class BackgroundWorkerBase : IBackgroundWorker
{
//TODO: Add UOW, Localization and other useful properties..?
//TODO: 是否應該提供工作單元、本地化以及其他常用的屬性?
public ILogger<BackgroundWorkerBase> Logger { protected get; set; }
protected BackgroundWorkerBase()
{
Logger = NullLogger<BackgroundWorkerBase>.Instance;
}
public virtual Task StartAsync(CancellationToken cancellationToken = default)
{
Logger.LogDebug("Started background worker: " + ToString());
return Task.CompletedTask;
}
public virtual Task StopAsync(CancellationToken cancellationToken = default)
{
Logger.LogDebug("Stopped background worker: " + ToString());
return Task.CompletedTask;
}
public override string ToString()
{
return GetType().FullName;
}
}
ABP vNext 內部只有一個預設的後臺工作者實現 PeriodicBackgroundWorkerBase
。從名字上來看,意思是就是周期執行的後臺工作者,內部就是用的 AbpTimer
來實現,ABP vNext 將其包裝起來是為了實現統一的模式(後臺工作者)。
public abstract class PeriodicBackgroundWorkerBase : BackgroundWorkerBase
{
protected readonly AbpTimer Timer;
// 也就意味著子類必須在其構造函數,指定 timer 的執行周期。
protected PeriodicBackgroundWorkerBase(AbpTimer timer)
{
Timer = timer;
Timer.Elapsed += Timer_Elapsed;
}
// 啟動後臺工作者。
public override async Task StartAsync(CancellationToken cancellationToken = default)
{
await base.StartAsync(cancellationToken);
Timer.Start(cancellationToken);
}
// 停止後臺工作者。
public override async Task StopAsync(CancellationToken cancellationToken = default)
{
Timer.Stop(cancellationToken);
await base.StopAsync(cancellationToken);
}
// Timer 關聯的周期事件,之所以不直接掛載 DoWork,是為了捕獲異常。
private void Timer_Elapsed(object sender, System.EventArgs e)
{
try
{
DoWork();
}
catch (Exception ex)
{
Logger.LogException(ex);
}
}
// 你要周期執行的任務。
protected abstract void DoWork();
}
我們如果要實現自己的後臺工作者,只需要繼承該類,實現 DoWork()
方法即可。
public class TestWorker : PeriodicBackgroundWorkerBase
{
public TestWorker(AbpTimer timer) : base(timer)
{
// 每五分鐘執行一次。
timer.Period = (int)TimeSpan.FromMinutes(5).TotalMilliseconds;
}
protected override void DoWork()
{
Console.WriteLine("後臺工作者被執行了。");
}
}
然後在我們自己模塊的 OnPreApplicationInitialization()
方法內解析出後臺作業管理器(IBackgroundWorkerManager
),調用它的 Add()
方法,將我們定義的 TestWorker
添加到管理器當中即可。
後臺工作者管理器
所有的後臺工作者都是通過 IBackgroundWorkerManager
進行管理的,它提供了 StartAsync()
、StopAsync()
、Add()
方法。前面兩個方法就是 IRunnable
介面定義的,後臺工作者管理器直接集成了該介面,後面的 Add()
方法就是用來動態添加我們的後臺工作者。
後臺工作者管理器的預設實現是 BackgroundWorkerManager
類型,它內部做的事情很簡單,就是維護一個後臺工作者集合。每當調用 StartAsync()
或 StopAsync()
方法的時候,都從這個集合遍歷後臺工作者,執行他們的啟動和停止方法。
這裡值得註意的一點是,當我們調用 Add()
方法添加了一個後臺工作者之後,後臺工作者管理器就會啟動這個後臺工作者。
public class BackgroundWorkerManager : IBackgroundWorkerManager, ISingletonDependency, IDisposable
{
protected bool IsRunning { get; private set; }
private bool _isDisposed;
private readonly List<IBackgroundWorker> _backgroundWorkers;
public BackgroundWorkerManager()
{
_backgroundWorkers = new List<IBackgroundWorker>();
}
public virtual void Add(IBackgroundWorker worker)
{
_backgroundWorkers.Add(worker);
// 如果當前後臺工作者管理器還處於運行狀態,則調用工作者的 StartAsync() 方法啟動。
if (IsRunning)
{
AsyncHelper.RunSync(
() => worker.StartAsync()
);
}
}
public virtual void Dispose()
{
if (_isDisposed)
{
return;
}
_isDisposed = true;
//TODO: ???
}
// 啟動,則遍歷集合啟動。
public virtual async Task StartAsync(CancellationToken cancellationToken = default)
{
IsRunning = true;
foreach (var worker in _backgroundWorkers)
{
await worker.StartAsync(cancellationToken);
}
}
// 停止, 則遍歷集合停止。
public virtual async Task StopAsync(CancellationToken cancellationToken = default)
{
IsRunning = false;
foreach (var worker in _backgroundWorkers)
{
await worker.StopAsync(cancellationToken);
}
}
}
上述代碼其實存在一個問題,即後臺工作者被釋放以後,是否還能執行 Add()
操作。參考我 之前的文章 ,其實當對象被釋放之後,就應該拋出 ObjectDisposedException
異常。
後臺作業
比起後臺工作者,我們執行一次性任務的時候,一般會使用後臺作業進行處理。比起只能設置固定周期的 PeriodicBackgroundWorkerBase
,集成了 Hangfire 的後臺作業管理器,能夠讓我們使用 Cron 表達式,更加靈活地設置任務的執行周期。
模塊的構造
關於後臺作業的模塊,我們需要說道兩處。第一處是位於 Volo.Abp.BackgroundJobs.Abstractions 項目的 AbpBackgroundJobsAbstractionsModule
,第二出則是位於 Volo.Abp.BackgroundJobs 項目的 AbpBackgroundJobsModule
。
AbpBackgroundJobsAbstractionsModule
的主要行為是將符合條件的後臺作業,添加到 AbpBackgroundJobOptions
配置當中,以便後續進行使用。
public override void PreConfigureServices(ServiceConfigurationContext context)
{
RegisterJobs(context.Services);
}
private static void RegisterJobs(IServiceCollection services)
{
var jobTypes = new List<Type>();
// 如果註冊的類型符合 IBackgroundJob<> 泛型,則添加到集合當中。
services.OnRegistred(context =>
{
if (ReflectionHelper.IsAssignableToGenericType(context.ImplementationType, typeof(IBackgroundJob<>)))
{
jobTypes.Add(context.ImplementationType);
}
});
services.Configure<AbpBackgroundJobOptions>(options =>
{
// 將數據賦值給配置類。
foreach (var jobType in jobTypes)
{
options.AddJob(jobType);
}
});
}
Volo.Abp.BackgroundJobs 內部是 ABP vNext 為我們提供的 預設後臺作業管理器,這個後臺作業管理器 本質上是一個後臺工作者。
這個後臺工作者會周期性(取決於 AbpBackgroundJobWorkerOptions.JobPollPeriod
值,預設為 5 秒種)地從 IBackgroundJobStore
撈出一堆後臺任務,並且在後臺執行。至於每次執行多少個後臺任務,這也取決於 AbpBackgroundJobWorkerOptions.MaxJobFetchCount
的值,預設值是 1000 個。
註意:
這裡的 Options 類是
AbpBackgroundJobWorkerOptions
,別和AbpBackgroundWorkerOptions
混淆了。
所以在 AbpBackgroundJobsModule
模塊裡面,只做了一件事情,就是將負責後臺作業的後臺工作者,添加到後臺工作者管理器種,並開始周期性地執行。
public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundJobOptions>>().Value;
if (options.IsJobExecutionEnabled)
{
// 獲得後臺工作者管理器,並將負責後臺作業的工作者添加進去。
context.ServiceProvider
.GetRequiredService<IBackgroundWorkerManager>()
.Add(context.ServiceProvider.GetRequiredService<IBackgroundJobWorker>()
);
}
}
後臺作業的定義
在上一節裡面看到,只要是實現 IBackgroundJob<TArgs>
類型的都視為一個後臺作業。這個後臺作業介面,只定義了一個行為,那就是執行(Execute(TArgs)
)。這裡的 TArgs
泛型作為執行後臺作業時,需要傳遞的參數類型。
// 因為是傳入的參數,所以泛型參數是逆變的。
public interface IBackgroundJob<in TArgs>
{
void Execute(TArgs args);
}
檢查源碼,發現 ABP vNext 的郵箱模塊定義了一個郵件發送任務 BackgroundEmailSendingJob
,它的實現大概如下。
public class BackgroundEmailSendingJob : BackgroundJob<BackgroundEmailSendingJobArgs>, ITransientDependency
{
// ...
public override void Execute(BackgroundEmailSendingJobArgs args)
{
AsyncHelper.RunSync(() => EmailSender.SendAsync(args.To, args.Subject, args.Body, args.IsBodyHtml));
}
}
後臺作業管理器
後臺作業都是通過一個後臺作業管理器(IBackgroundJobManager
)進行管理的,這個介面定義了一個入隊方法(EnqueueAsync()
),註意,我們的後臺作業在入隊後,不是馬上執行的。
說一下這個入隊處理邏輯:
- 首先我們會通過參數的類型,獲取到任務的名稱。(假設任務上面沒有標註
BackgroundJobNameAttribute
特性,那麼任務的名稱就是參數類型的FullName
。) - 構造一個
BackgroundJobInfo
對象。 - 通過
IBackgroundJobStore
持久化任務信息。
public virtual async Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null)
{
// 獲取任務名稱。
var jobName = BackgroundJobNameAttribute.GetName<TArgs>();
var jobId = await EnqueueAsync(jobName, args, priority, delay);
return jobId.ToString();
}
protected virtual async Task<Guid> EnqueueAsync(string jobName, object args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null)
{
var jobInfo = new BackgroundJobInfo
{
Id = GuidGenerator.Create(),
JobName = jobName,
// 通過序列化器,序列化參數值,方便存儲。這裡內部其實使用的是 JSON.NET 進行序列化。
JobArgs = Serializer.Serialize(args),
Priority = priority,
CreationTime = Clock.Now,
NextTryTime = Clock.Now
};
// 如果任務有執行延遲,則任務的初始執行時間要加上這個延遲。
if (delay.HasValue)
{
jobInfo.NextTryTime = Clock.Now.Add(delay.Value);
}
// 持久化任務信息,方便後面執行後臺作業的工作者能夠取到。
await Store.InsertAsync(jobInfo);
return jobInfo.Id;
}
BackgroundJobNameAttribute
相關的方法:
public static string GetName<TJobArgs>()
{
return GetName(typeof(TJobArgs));
}
public static string GetName([NotNull] Type jobArgsType)
{
Check.NotNull(jobArgsType, nameof(jobArgsType));
// 判斷參數類型上面是否標註了特性,並且特性實現了 IBackgroundJobNameProvider 介面。
return jobArgsType
.GetCustomAttributes(true)
.OfType<IBackgroundJobNameProvider>()
.FirstOrDefault()
?.Name
// 拿不到名字,則使用類型的 FullName。
?? jobArgsType.FullName;
}
後臺作業的存儲
後臺作業的存儲預設是放在記憶體的,這點可以從 InMemoryBackgroundJobStore
類型實現看出來。在它的內部使用了一個並行字典,通過作業的 Guid 與作業進行關聯綁定。
除了記憶體實現,在 Volo.Abp.BackgroundJobs.Domain 模塊還有一個 BackgroundJobStore
實現,基本套路與 SettingStore
一樣,都是存儲到資料庫裡面。
public class BackgroundJobStore : IBackgroundJobStore, ITransientDependency
{
protected IBackgroundJobRepository BackgroundJobRepository { get; }
// ...
public BackgroundJobInfo Find(Guid jobId)
{
return ObjectMapper.Map<BackgroundJobRecord, BackgroundJobInfo>(
BackgroundJobRepository.Find(jobId)
);
}
// ...
public void Insert(BackgroundJobInfo jobInfo)
{
BackgroundJobRepository.Insert(
ObjectMapper.Map<BackgroundJobInfo, BackgroundJobRecord>(jobInfo)
);
}
// ...
}
後臺作業的執行
預設的後臺作業管理器是通過一個後臺工作者來執行後臺任務的,這個實現叫做 BackgroundJobWorker
,這個後臺工作者的生命周期也是單例的。後臺作業的具體執行邏輯裡面,涉及到了以下幾個類型的交互。
類型 | 作用 |
---|---|
AbpBackgroundJobOptions |
提供每個後臺任務的配置信息,包括任務的類型、參數類型、任務名稱數據。 |
AbpBackgroundJobWorkerOptions |
提供後臺作業工作者的配置信息,例如每個周期 最大執行的作業數量、後臺 工作者的 執行周期、作業執行 超時時間 等。 |
BackgroundJobConfiguration |
後臺任務的配置信息,作用是將持久化存儲的作業信息與運行時類型進行綁定 和實例化,以便 ABP vNext 來執行具體的任務。 |
IBackgroundJobExecuter |
後臺作業的執行器,當我們從持久化存儲獲取到後臺作業信息時,將會通過 這個執行器來執行具體的後臺作業。 |
IBackgroundJobSerializer |
後臺作業序列化器,用於後臺作業持久化時進行序列化的工具,預設採用的 是 JSON.NET 進行實現。 |
JobExecutionContext |
執行器在執行後臺作業時,是通過這個上下文參數進行執行的,在這個上下 文內部,包含了後臺作業的具體類型、後臺作業的參數值。 |
IBackgroundJobStore |
前面已經講過了,這個是用於後臺作業的持久化存儲,預設實現是存儲在記憶體。 |
BackgroundJobPriority |
後臺作業的執行優先順序定義,ABP vNext 在執行後臺任務時,會根據任務的優 先級進行排序,以便在後面執行的時候優先順序高的任務先執行。 |
我們來按照邏輯順序走一遍它的實現,首先後臺作業的執行工作者會從持久化存儲內,獲取 MaxJobFetchCount
個任務用於執行。從持久化存儲獲取後臺作業信息(BackgroundJobInfo
),是由 IBackgroundJobStore
提供的。
var store = scope.ServiceProvider.GetRequiredService<IBackgroundJobStore>();
var waitingJobs = store.GetWaitingJobs(WorkerOptions.MaxJobFetchCount);
// 不存在任何後臺作業,則直接結束本次調用。
if (!waitingJobs.Any())
{
return;
}
InMemoryBackgroundJobStore
的相關實現:
public List<BackgroundJobInfo> GetWaitingJobs(int maxResultCount)
{
return _jobs.Values
.Where(t => !t.IsAbandoned && t.NextTryTime <= Clock.Now)
.OrderByDescending(t => t.Priority)
.ThenBy(t => t.TryCount)
.ThenBy(t => t.NextTryTime)
.Take(maxResultCount)
.ToList();
}
上面的代碼可以看出來,首先排除 被放棄的任務 ,包含達到執行時間的任務,然後根據任務的優先順序從高到低進行排序。重試次數少的優先執行,預計執行時間越早的越先執行。最後從這些數據中,篩選出 maxResultCount
結果並返回。
說到這裡,我們來看一下這個 NextTryTime
是如何被計算出來的?回想起最開始的後臺作業管理器,我們在添加一個後臺任務的時候,就會設置這個後臺任務的 預計執行時間。第一個任務被添加到執行隊列中時,它的值一般是 Clock.Now
,也就是它被添加到隊列的時間。
不過 ABP vNext 為了讓那些經常執行失敗的任務,有比較低的優先順序再執行,就在每次任務執行失敗之後,會將 NextTryTime
的值指數級進行增加。這塊代碼可以在 CalculateNextTryTime
裡面看到,也就是說某個任務的執行 失敗次數越高,那麼它下一次的預期執行時間就會越遠。
protected virtual DateTime? CalculateNextTryTime(BackgroundJobInfo jobInfo, IClock clock)
{
// 一般來說,這個 DefaultWaitFactor 因數的值是 2.0 。
var nextWaitDuration = WorkerOptions.DefaultFirstWaitDuration * (Math.Pow(WorkerOptions.DefaultWaitFactor, jobInfo.TryCount - 1)); // 同執行失敗的次數進行掛鉤。
var nextTryDate = jobInfo.LastTryTime?.AddSeconds(nextWaitDuration) ??
clock.Now.AddSeconds(nextWaitDuration);
if (nextTryDate.Subtract(jobInfo.CreationTime).TotalSeconds > WorkerOptions.DefaultTimeout)
{
return null;
}
return nextTryDate;
}
當預期的執行時間都超過 DefaultTimeout
的超時時間時(預設為 2 天),說明這個任務確實沒救了,就不要再執行了。
我們之前說到,從 IBackgroundJobStore
拿到了需要執行的後臺任務信息集合,接下來我們就要開始執行後臺任務了。
foreach (var jobInfo in waitingJobs)
{
jobInfo.TryCount++;
jobInfo.LastTryTime = clock.Now;
try
{
// 根據任務名稱獲取任務的配置參數。
var jobConfiguration = JobOptions.GetJob(jobInfo.JobName);
// 根據配置裡面存儲的任務類型,將參數值進行反序列化。
var jobArgs = serializer.Deserialize(jobInfo.JobArgs, jobConfiguration.ArgsType);
// 構造一個新的執行上下文,讓執行器執行任務。
var context = new JobExecutionContext(scope.ServiceProvider, jobConfiguration.JobType, jobArgs);
try
{
jobExecuter.Execute(context);
// 如果任務執行成功則刪除該任務。
store.Delete(jobInfo.Id);
}
catch (BackgroundJobExecutionException)
{
// 發生任務執行失敗異常時,根據指定的公式計算下一次的執行時間。
var nextTryTime = CalculateNextTryTime(jobInfo, clock);
if (nextTryTime.HasValue)
{
jobInfo.NextTryTime = nextTryTime.Value;
}
else
{
// 超過超時時間的時候,公式計算函數返回 null,該任務置為廢棄任務。
jobInfo.IsAbandoned = true;
}
TryUpdate(store, jobInfo);
}
}
catch (Exception ex)
{
// 執行過程中,產生了未知異常,設置為廢棄任務,並列印日誌。
Logger.LogException(ex);
jobInfo.IsAbandoned = true;
TryUpdate(store, jobInfo);
}
}
執行後臺任務的時候基本分為 5 步,它們分別是:
- 獲得任務關聯的配置參數,預設不用提供,因為在之前模塊初始化的時候就已經配置了(你也可以顯式指定)。
- 通過之前存儲的配置參數,將參數值反序列化出來,構造具體實例。
- 構造一個執行上下文。
- 後臺任務執行器執行具體的後臺任務。
- 成功則刪除任務,失敗則更新任務下次的執行狀態。
至於執行器裡面的真正執行操作,你都拿到了參數值和任務類型了。就可以通過類型用 IoC 獲取後臺任務對象的實例,然後通過反射匹配方法簽名,在實例上調用這個方法傳入參數即可。
public virtual void Execute(JobExecutionContext context)
{
// 構造具體的後臺作業實例對象。
var job = context.ServiceProvider.GetService(context.JobType);
if (job == null)
{
throw new AbpException("The job type is not registered to DI: " + context.JobType);
}
// 獲得需要執行的方法簽名。
var jobExecuteMethod = context.JobType.GetMethod(nameof(IBackgroundJob<object>.Execute));
if (jobExecuteMethod == null)
{
throw new AbpException($"Given job type does not implement {typeof(IBackgroundJob<>).Name}. The job type was: " + context.JobType);
}
try
{
// 直接通過 MethodInfo 的 Invoke 方法調用,傳入具體的實例對象和參數值即可。
jobExecuteMethod.Invoke(job, new[] { context.JobArgs });
}
catch (Exception ex)
{
Logger.LogException(ex);
// 如果是執行方法內的異常,則包裝進行處理,然後拋出。
throw new BackgroundJobExecutionException("A background job execution is failed. See inner exception for details.", ex)
{
JobType = context.JobType.AssemblyQualifiedName,
JobArgs = context.JobArgs
};
}
}
集成 Hangfire
ABP vNext 對於 Hangfire 的集成代碼分佈在 Volo.Abp.HangFire 和 Volo.Abp.BackgroundJobs.HangFire 模塊內部,前者是在模塊配置裡面,調用 Hangfire 庫的相關方法,註入組件到 IoC 容器當中。後者則是對後臺作業進行了適配處理,替換了預設的 IBackgroundJobManager
實現。
在 AbpHangfireModule
模塊內部,通過工廠創建出來一個 BackgroudJobServer
實例,並將它的生命周期與應用程式的生命周期進行綁定,以便進行銷毀處理。
public class AbpHangfireModule : AbpModule
{
private BackgroundJobServer _backgroundJobServer;
public override void ConfigureServices(ServiceConfigurationContext context)
{
context.Services.AddHangfire(configuration =>
{
context.Services.ExecutePreConfiguredActions(configuration);
});
}
public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
var options = context.ServiceProvider.GetRequiredService<IOptions<AbpHangfireOptions>>().Value;
_backgroundJobServer = options.BackgroundJobServerFactory.Invoke(context.ServiceProvider);
}
public override void OnApplicationShutdown(ApplicationShutdownContext context)
{
//TODO: ABP may provide two methods for application shutdown: OnPreApplicationShutdown & OnApplicationShutdown
_backgroundJobServer.SendStop();
_backgroundJobServer.Dispose();
}
}
我們直奔主題,看一下基於 Hangfire 的後臺作業管理器是怎麼實現的。
public class HangfireBackgroundJobManager : IBackgroundJobManager, ITransientDependency
{
public Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal,
TimeSpan? delay = null)
{
// 如果沒有延遲參數,則直接通過 Enqueue() 方法扔進執行對了。
if (!delay.HasValue)
{
return Task.FromResult(
BackgroundJob.Enqueue<HangfireJobExecutionAdapter<TArgs>>(
adapter => adapter.Execute(args)
)
);
}
else
{
return Task.FromResult(
BackgroundJob.Schedule<HangfireJobExecutionAdapter<TArgs>>(
adapter => adapter.Execute(args),
delay.Value
)
);
}
}
上述代碼中使用 HangfireJobExecutionAdapter
進行了一個適配操作,因為 Hangfire 要將一個後臺任務扔進隊列執行,不是用 TArgs
就能解決的。
轉到這個適配器定義,提供了一個 Execute(TArgs)
方法,當被添加到 Hangfire 隊列執行的時候。實際 Hangfire 會調用適配器的 Excetue(TArgs)
方法,然後內部還是使用的 IBackgroundJobExecuter
來執行具體定義的任務。
public class HangfireJobExecutionAdapter<TArgs>
{
protected AbpBackgroundJobOptions Options { get; }
protected IServiceScopeFactory ServiceScopeFactory { get; }
protected IBackgroundJobExecuter JobExecuter { get; }
public HangfireJobExecutionAdapter(
IOptions<AbpBackgroundJobOptions> options,
IBackgroundJobExecuter jobExecuter,
IServiceScopeFactory serviceScopeFactory)
{
JobExecuter = jobExecuter;
ServiceScopeFactory = serviceScopeFactory;
Options = options.Value;
}
public void Execute(TArgs args)
{
using (var scope = ServiceScopeFactory.CreateScope())
{
var jobType = Options.GetJob(typeof(TArgs)).JobType;
var context = new JobExecutionContext(scope.ServiceProvider, jobType, args);
JobExecuter.Execute(context);
}
}
}
集成 RabbitMQ
基於 RabbitMQ 的後臺作業實現,我想放在分散式事件匯流排裡面,對其一起進行講解。
三、總結
ABP vNext 為我們提供了多種後臺作業管理器的實現,你可以根據自己的需求選用不同的後臺作業管理器,又或者是自己動手造輪子。
需要看其他的 ABP vNext 相關文章?點擊我 即可跳轉到總目錄。