HangFire迴圈作業中作業因執行時間太長未完成新作業開啟導致重覆數據的問題

来源:https://www.cnblogs.com/ldybyz/archive/2018/08/17/9494255.html
-Advertisement-
Play Games

解決方法:在執行的任務方法前加上Mutex特性即可,如果作業未完成,新作業開啟的話,新作業會放入計劃中的作業隊列中,直到前面的作業完成。 必須使用Hangfire.Pro.Redis 和 Hangfire.SqlServer 作為資料庫。 參考:https://github.com/Hangfire ...


解決方法:在執行的任務方法前加上Mutex特性即可,如果作業未完成,新作業開啟的話,新作業會放入計劃中的作業隊列中,直到前面的作業完成。

必須使用Hangfire.Pro.Redis 和 Hangfire.SqlServer 作為資料庫。

參考:https://github.com/HangfireIO/Hangfire/issues/1053

  [Mutex("DownloadVideo")]
  public async Task DownloadVideo()
  {
}

 

Mutex特性代碼如下:

using System;
using System.Collections.Generic;
using System.Linq;
using Hangfire.Common;
using Hangfire.States;
using Hangfire.Storage;

namespace Hangfire.Pro
{
    /// <summary>
    /// Represents a background job filter that helps to disable concurrent execution
    /// without causing worker to wait as in <see cref="Hangfire.DisableConcurrentExecutionAttribute"/>.
    /// </summary>
    public class MutexAttribute : JobFilterAttribute, IElectStateFilter, IApplyStateFilter
    {
        private static readonly TimeSpan DistributedLockTimeout = TimeSpan.FromMinutes(1);

        private readonly string _resource;

        public MutexAttribute(string resource)
        {
            _resource = resource;
            RetryInSeconds = 15;
        }

        public int RetryInSeconds { get; set; }
        public int MaxAttempts { get; set; }

        public void OnStateElection(ElectStateContext context)
        {
            // We are intercepting transitions to the Processed state, that is performed by
            // a worker just before processing a job. During the state election phase we can
            // change the target state to another one, causing a worker not to process the
            // backgorund job.
            if (context.CandidateState.Name != ProcessingState.StateName ||
                context.BackgroundJob.Job == null)
            {
                return;
            }

            // This filter requires an extended set of storage operations. It's supported
            // by all the official storages, and many of the community-based ones.
            var storageConnection = context.Connection as JobStorageConnection;
            if (storageConnection == null)
            {
                throw new NotSupportedException("This version of storage doesn't support extended methods. Please try to update to the latest version.");
            }

            string blockedBy;

            try
            {
                // Distributed lock is needed here only to prevent a race condition, when another 
                // worker picks up a background job with the same resource between GET and SET 
                // operations.
                // There will be no race condition, when two or more workers pick up background job
                // with the same id, because state transitions are protected with distributed lock
                // themselves.
                using (AcquireDistributedSetLock(context.Connection, context.BackgroundJob.Job.Args))
                {
                    // Resource set contains a background job id that acquired a mutex for the resource.
                    // We are getting only one element to see what background job blocked the invocation.
                    var range = storageConnection.GetRangeFromSet(
                        GetResourceKey(context.BackgroundJob.Job.Args),
                        0,
                        0);

                    blockedBy = range.Count > 0 ? range[0] : null;

                    // We should permit an invocation only when the set is empty, or if current background
                    // job is already owns a resource. This may happen, when the localTransaction succeeded,
                    // but outer transaction was failed.
                    if (blockedBy == null || blockedBy == context.BackgroundJob.Id)
                    {
                        // We need to commit the changes inside a distributed lock, otherwise it's 
                        // useless. So we create a local transaction instead of using the 
                        // context.Transaction property.
                        var localTransaction = context.Connection.CreateWriteTransaction();

                        // Add the current background job identifier to a resource set. This means
                        // that resource is owned by the current background job. Identifier will be
                        // removed only on failed state, or in one of final states (succeeded or
                        // deleted).
                        localTransaction.AddToSet(GetResourceKey(context.BackgroundJob.Job.Args), context.BackgroundJob.Id);
                        localTransaction.Commit();

                        // Invocation is permitted, and we did all the required things.
                        return;
                    }
                }
            }
            catch (DistributedLockTimeoutException)
            {
                // We weren't able to acquire a distributed lock within a specified window. This may
                // be caused by network delays, storage outages or abandoned locks in some storages.
                // Since it is required to expire abandoned locks after some time, we can simply
                // postpone the invocation.
                context.CandidateState = new ScheduledState(TimeSpan.FromSeconds(RetryInSeconds))
                {
                    Reason = "Couldn't acquire a distributed lock for mutex: timeout exceeded"
                };

                return;
            }

            // Background job execution is blocked. We should change the target state either to 
            // the Scheduled or to the Deleted one, depending on current retry attempt number.
            var currentAttempt = context.GetJobParameter<int>("MutexAttempt") + 1;
            context.SetJobParameter("MutexAttempt", currentAttempt);

            context.CandidateState = MaxAttempts == 0 || currentAttempt <= MaxAttempts
                ? CreateScheduledState(blockedBy, currentAttempt)
                : CreateDeletedState(blockedBy);
        }

        public void OnStateApplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
        {
            if (context.BackgroundJob.Job == null) return;

            if (context.OldStateName == ProcessingState.StateName)
            {
                using (AcquireDistributedSetLock(context.Connection, context.BackgroundJob.Job.Args))
                {
                    var localTransaction = context.Connection.CreateWriteTransaction();
                    localTransaction.RemoveFromSet(GetResourceKey(context.BackgroundJob.Job.Args), context.BackgroundJob.Id);

                    localTransaction.Commit();
                }
            }
        }

        public void OnStateUnapplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
        {
        }

        private static DeletedState CreateDeletedState(string blockedBy)
        {
            return new DeletedState
            {
                Reason = $"Execution was blocked by background job {blockedBy}, all attempts exhausted"
            };
        }

        private IState CreateScheduledState(string blockedBy, int currentAttempt)
        {
            var reason = $"Execution is blocked by background job {blockedBy}, retry attempt: {currentAttempt}";

            if (MaxAttempts > 0)
            {
                reason += $"/{MaxAttempts}";
            }

            return new ScheduledState(TimeSpan.FromSeconds(RetryInSeconds))
            {
                Reason = reason
            };
        }

        private IDisposable AcquireDistributedSetLock(IStorageConnection connection, IEnumerable<object> args)
        {
            return connection.AcquireDistributedLock(GetDistributedLockKey(args), DistributedLockTimeout);
        }

        private string GetDistributedLockKey(IEnumerable<object> args)
        {
            return $"extension:job-mutex:lock:{GetKeyFormat(args, _resource)}";
        }

        private string GetResourceKey(IEnumerable<object> args)
        {
            return $"extension:job-mutex:set:{GetKeyFormat(args, _resource)}";
        }

        private static string GetKeyFormat(IEnumerable<object> args, string keyFormat)
        {
            return String.Format(keyFormat, args.ToArray());
        }
    }
}

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • JAVA反射機制是在運行狀態中,對於任意一個類,都能夠知道這個類的所有屬性和方法;對於任意一個對象,都能夠調用它的任意方法和屬性;這種動態獲取信息以及動態調用對象方法的功能稱為java語言的反射機制。 一:Class類 在面向對象的世界里,萬物皆對象。類也是對象,類是java.lang.Class類 ...
  • 簡介 Python 是一種高層次的結合瞭解釋性、編譯性、互動性和麵向對象的腳本語言。Python 由 Guido van Rossum 於 1989 年底在荷蘭國家數學和電腦科學研究所發明,第一個公開發行版發行於 1991 年。 特點 易於學習:Python 有相對較少的關鍵字,結構簡單,和一個明 ...
  • 為何要用中間件來實現音頻處理的監聽服務 當然也可以使用Startup來進行服務的自啟動,或者也可以使用quartz定時調度任務來啟動音頻服務,大家隨意。 筆者認為使用中間件的目的,是為了分離應用和服務,也是一種解耦手段。 我們知道,在NETCORE中的中間件,有點類似像AOP的一種實現形式,他的調用 ...
  • http://cron.qqe2.com/ 如果不會 或者想檢驗自己是否寫的對就 通過這個網站 檢測 或自動生成 * * * * * * * [秒] [分] [小時] [日] [月] [周] [年] 共7個*號 序號 說明 是否必填 允許填寫的值 允許的通配符 1 秒 是 0-59 , - * /  ...
  • 昨天更新了VS到最新版本v15.8.0,但是編譯UWP出現了操蛋的bug。 谷歌一下,vs社區已經有答案了。 打開.csproj文件,在節點 <PropertyGroup> 裡面,加上一行 保存,重新載入項目,編譯即可。 原文參考:https://developercommunity.visuals ...
  • 消息隊列 神馬是消息隊列,看看某度的原話“在項目中,將一些無需即時返回且耗時的操作提取出來,進行了非同步處理,而這種非同步處理的方式大大的節省了伺服器的請求響應時間,從而提高了系統的吞吐量”。 其實消息隊列還可以用於解耦,在多層項目模型或中型項目以上,都會用到消息隊列,減少層與層之間的耦合;還可以做跨進 ...
  • 本文使用的DEV版本為10.1版本 首先需要添加Dll引用 DevExpress.Data.v10.1 DevExpress.XtraPrinting.v10.1 DevExpress.XtraReports.v10.1 在winfrom的環境下 XtraReport 是可以直接調用 Print() ...
  • default 關鍵字有兩類用法 1. switch語句中指定預設標簽 2. 預設值表達式 switch 語句 預設值表達式 default對應各種類型生成預設值列表如下: |類型|預設值| | | | |任何引用類型| | |數值類型|零| |bool| | |enum|表達式 生成的值,其中 是 ...
一周排行
    -Advertisement-
    Play Games
  • 前言 本文介紹一款使用 C# 與 WPF 開發的音頻播放器,其界面簡潔大方,操作體驗流暢。該播放器支持多種音頻格式(如 MP4、WMA、OGG、FLAC 等),並具備標記、實時歌詞顯示等功能。 另外,還支持換膚及多語言(中英文)切換。核心音頻處理採用 FFmpeg 組件,獲得了廣泛認可,目前 Git ...
  • OAuth2.0授權驗證-gitee授權碼模式 本文主要介紹如何筆者自己是如何使用gitee提供的OAuth2.0協議完成授權驗證並登錄到自己的系統,完整模式如圖 1、創建應用 打開gitee個人中心->第三方應用->創建應用 創建應用後在我的應用界面,查看已創建應用的Client ID和Clien ...
  • 解決了這個問題:《winForm下,fastReport.net 從.net framework 升級到.net5遇到的錯誤“Operation is not supported on this platform.”》 本文內容轉載自:https://www.fcnsoft.com/Home/Sho ...
  • 國內文章 WPF 從裸 Win 32 的 WM_Pointer 消息獲取觸摸點繪製筆跡 https://www.cnblogs.com/lindexi/p/18390983 本文將告訴大家如何在 WPF 裡面,接收裸 Win 32 的 WM_Pointer 消息,從消息裡面獲取觸摸點信息,使用觸摸點 ...
  • 前言 給大家推薦一個專為新零售快消行業打造了一套高效的進銷存管理系統。 系統不僅具備強大的庫存管理功能,還集成了高性能的輕量級 POS 解決方案,確保頁面載入速度極快,提供良好的用戶體驗。 項目介紹 Dorisoy.POS 是一款基於 .NET 7 和 Angular 4 開發的新零售快消進銷存管理 ...
  • ABP CLI常用的代碼分享 一、確保環境配置正確 安裝.NET CLI: ABP CLI是基於.NET Core或.NET 5/6/7等更高版本構建的,因此首先需要在你的開發環境中安裝.NET CLI。這可以通過訪問Microsoft官網下載並安裝相應版本的.NET SDK來實現。 安裝ABP ...
  • 問題 問題是這樣的:第三方的webapi,需要先調用登陸介面獲取Cookie,訪問其它介面時攜帶Cookie信息。 但使用HttpClient類調用登陸介面,返回的Headers中沒有找到Cookie信息。 分析 首先,使用Postman測試該登陸介面,正常返回Cookie信息,說明是HttpCli ...
  • 國內文章 關於.NET在中國為什麼工資低的分析 https://www.cnblogs.com/thinkingmore/p/18406244 .NET在中國開發者的薪資偏低,主要因市場需求、技術棧選擇和企業文化等因素所致。歷史上,.NET曾因微軟的閉源策略發展受限,儘管後來推出了跨平臺的.NET ...
  • 在WPF開發應用中,動畫不僅可以引起用戶的註意與興趣,而且還使軟體更加便於使用。前面幾篇文章講解了畫筆(Brush),形狀(Shape),幾何圖形(Geometry),變換(Transform)等相關內容,今天繼續講解動畫相關內容和知識點,僅供學習分享使用,如有不足之處,還請指正。 ...
  • 什麼是委托? 委托可以說是把一個方法代入另一個方法執行,相當於指向函數的指針;事件就相當於保存委托的數組; 1.實例化委托的方式: 方式1:通過new創建實例: public delegate void ShowDelegate(); 或者 public delegate string ShowDe ...