解決方法:在執行的任務方法前加上Mutex特性即可,如果作業未完成,新作業開啟的話,新作業會放入計劃中的作業隊列中,直到前面的作業完成。 必須使用Hangfire.Pro.Redis 和 Hangfire.SqlServer 作為資料庫。 參考:https://github.com/Hangfire ...
解決方法:在執行的任務方法前加上Mutex特性即可,如果作業未完成,新作業開啟的話,新作業會放入計劃中的作業隊列中,直到前面的作業完成。
必須使用Hangfire.Pro.Redis
和 Hangfire.SqlServer 作為資料庫。
參考:https://github.com/HangfireIO/Hangfire/issues/1053
[Mutex("DownloadVideo")] public async Task DownloadVideo() {
}
Mutex特性代碼如下:
using System; using System.Collections.Generic; using System.Linq; using Hangfire.Common; using Hangfire.States; using Hangfire.Storage; namespace Hangfire.Pro { /// <summary> /// Represents a background job filter that helps to disable concurrent execution /// without causing worker to wait as in <see cref="Hangfire.DisableConcurrentExecutionAttribute"/>. /// </summary> public class MutexAttribute : JobFilterAttribute, IElectStateFilter, IApplyStateFilter { private static readonly TimeSpan DistributedLockTimeout = TimeSpan.FromMinutes(1); private readonly string _resource; public MutexAttribute(string resource) { _resource = resource; RetryInSeconds = 15; } public int RetryInSeconds { get; set; } public int MaxAttempts { get; set; } public void OnStateElection(ElectStateContext context) { // We are intercepting transitions to the Processed state, that is performed by // a worker just before processing a job. During the state election phase we can // change the target state to another one, causing a worker not to process the // backgorund job. if (context.CandidateState.Name != ProcessingState.StateName || context.BackgroundJob.Job == null) { return; } // This filter requires an extended set of storage operations. It's supported // by all the official storages, and many of the community-based ones. var storageConnection = context.Connection as JobStorageConnection; if (storageConnection == null) { throw new NotSupportedException("This version of storage doesn't support extended methods. Please try to update to the latest version."); } string blockedBy; try { // Distributed lock is needed here only to prevent a race condition, when another // worker picks up a background job with the same resource between GET and SET // operations. // There will be no race condition, when two or more workers pick up background job // with the same id, because state transitions are protected with distributed lock // themselves. using (AcquireDistributedSetLock(context.Connection, context.BackgroundJob.Job.Args)) { // Resource set contains a background job id that acquired a mutex for the resource. // We are getting only one element to see what background job blocked the invocation. var range = storageConnection.GetRangeFromSet( GetResourceKey(context.BackgroundJob.Job.Args), 0, 0); blockedBy = range.Count > 0 ? range[0] : null; // We should permit an invocation only when the set is empty, or if current background // job is already owns a resource. This may happen, when the localTransaction succeeded, // but outer transaction was failed. if (blockedBy == null || blockedBy == context.BackgroundJob.Id) { // We need to commit the changes inside a distributed lock, otherwise it's // useless. So we create a local transaction instead of using the // context.Transaction property. var localTransaction = context.Connection.CreateWriteTransaction(); // Add the current background job identifier to a resource set. This means // that resource is owned by the current background job. Identifier will be // removed only on failed state, or in one of final states (succeeded or // deleted). localTransaction.AddToSet(GetResourceKey(context.BackgroundJob.Job.Args), context.BackgroundJob.Id); localTransaction.Commit(); // Invocation is permitted, and we did all the required things. return; } } } catch (DistributedLockTimeoutException) { // We weren't able to acquire a distributed lock within a specified window. This may // be caused by network delays, storage outages or abandoned locks in some storages. // Since it is required to expire abandoned locks after some time, we can simply // postpone the invocation. context.CandidateState = new ScheduledState(TimeSpan.FromSeconds(RetryInSeconds)) { Reason = "Couldn't acquire a distributed lock for mutex: timeout exceeded" }; return; } // Background job execution is blocked. We should change the target state either to // the Scheduled or to the Deleted one, depending on current retry attempt number. var currentAttempt = context.GetJobParameter<int>("MutexAttempt") + 1; context.SetJobParameter("MutexAttempt", currentAttempt); context.CandidateState = MaxAttempts == 0 || currentAttempt <= MaxAttempts ? CreateScheduledState(blockedBy, currentAttempt) : CreateDeletedState(blockedBy); } public void OnStateApplied(ApplyStateContext context, IWriteOnlyTransaction transaction) { if (context.BackgroundJob.Job == null) return; if (context.OldStateName == ProcessingState.StateName) { using (AcquireDistributedSetLock(context.Connection, context.BackgroundJob.Job.Args)) { var localTransaction = context.Connection.CreateWriteTransaction(); localTransaction.RemoveFromSet(GetResourceKey(context.BackgroundJob.Job.Args), context.BackgroundJob.Id); localTransaction.Commit(); } } } public void OnStateUnapplied(ApplyStateContext context, IWriteOnlyTransaction transaction) { } private static DeletedState CreateDeletedState(string blockedBy) { return new DeletedState { Reason = $"Execution was blocked by background job {blockedBy}, all attempts exhausted" }; } private IState CreateScheduledState(string blockedBy, int currentAttempt) { var reason = $"Execution is blocked by background job {blockedBy}, retry attempt: {currentAttempt}"; if (MaxAttempts > 0) { reason += $"/{MaxAttempts}"; } return new ScheduledState(TimeSpan.FromSeconds(RetryInSeconds)) { Reason = reason }; } private IDisposable AcquireDistributedSetLock(IStorageConnection connection, IEnumerable<object> args) { return connection.AcquireDistributedLock(GetDistributedLockKey(args), DistributedLockTimeout); } private string GetDistributedLockKey(IEnumerable<object> args) { return $"extension:job-mutex:lock:{GetKeyFormat(args, _resource)}"; } private string GetResourceKey(IEnumerable<object> args) { return $"extension:job-mutex:set:{GetKeyFormat(args, _resource)}"; } private static string GetKeyFormat(IEnumerable<object> args, string keyFormat) { return String.Format(keyFormat, args.ToArray()); } } }