HangFire迴圈作業中作業因執行時間太長未完成新作業開啟導致重覆數據的問題

来源:https://www.cnblogs.com/ldybyz/archive/2018/08/17/9494255.html
-Advertisement-
Play Games

解決方法:在執行的任務方法前加上Mutex特性即可,如果作業未完成,新作業開啟的話,新作業會放入計劃中的作業隊列中,直到前面的作業完成。 必須使用Hangfire.Pro.Redis 和 Hangfire.SqlServer 作為資料庫。 參考:https://github.com/Hangfire ...


解決方法:在執行的任務方法前加上Mutex特性即可,如果作業未完成,新作業開啟的話,新作業會放入計劃中的作業隊列中,直到前面的作業完成。

必須使用Hangfire.Pro.Redis 和 Hangfire.SqlServer 作為資料庫。

參考:https://github.com/HangfireIO/Hangfire/issues/1053

  [Mutex("DownloadVideo")]
  public async Task DownloadVideo()
  {
}

 

Mutex特性代碼如下:

using System;
using System.Collections.Generic;
using System.Linq;
using Hangfire.Common;
using Hangfire.States;
using Hangfire.Storage;

namespace Hangfire.Pro
{
    /// <summary>
    /// Represents a background job filter that helps to disable concurrent execution
    /// without causing worker to wait as in <see cref="Hangfire.DisableConcurrentExecutionAttribute"/>.
    /// </summary>
    public class MutexAttribute : JobFilterAttribute, IElectStateFilter, IApplyStateFilter
    {
        private static readonly TimeSpan DistributedLockTimeout = TimeSpan.FromMinutes(1);

        private readonly string _resource;

        public MutexAttribute(string resource)
        {
            _resource = resource;
            RetryInSeconds = 15;
        }

        public int RetryInSeconds { get; set; }
        public int MaxAttempts { get; set; }

        public void OnStateElection(ElectStateContext context)
        {
            // We are intercepting transitions to the Processed state, that is performed by
            // a worker just before processing a job. During the state election phase we can
            // change the target state to another one, causing a worker not to process the
            // backgorund job.
            if (context.CandidateState.Name != ProcessingState.StateName ||
                context.BackgroundJob.Job == null)
            {
                return;
            }

            // This filter requires an extended set of storage operations. It's supported
            // by all the official storages, and many of the community-based ones.
            var storageConnection = context.Connection as JobStorageConnection;
            if (storageConnection == null)
            {
                throw new NotSupportedException("This version of storage doesn't support extended methods. Please try to update to the latest version.");
            }

            string blockedBy;

            try
            {
                // Distributed lock is needed here only to prevent a race condition, when another 
                // worker picks up a background job with the same resource between GET and SET 
                // operations.
                // There will be no race condition, when two or more workers pick up background job
                // with the same id, because state transitions are protected with distributed lock
                // themselves.
                using (AcquireDistributedSetLock(context.Connection, context.BackgroundJob.Job.Args))
                {
                    // Resource set contains a background job id that acquired a mutex for the resource.
                    // We are getting only one element to see what background job blocked the invocation.
                    var range = storageConnection.GetRangeFromSet(
                        GetResourceKey(context.BackgroundJob.Job.Args),
                        0,
                        0);

                    blockedBy = range.Count > 0 ? range[0] : null;

                    // We should permit an invocation only when the set is empty, or if current background
                    // job is already owns a resource. This may happen, when the localTransaction succeeded,
                    // but outer transaction was failed.
                    if (blockedBy == null || blockedBy == context.BackgroundJob.Id)
                    {
                        // We need to commit the changes inside a distributed lock, otherwise it's 
                        // useless. So we create a local transaction instead of using the 
                        // context.Transaction property.
                        var localTransaction = context.Connection.CreateWriteTransaction();

                        // Add the current background job identifier to a resource set. This means
                        // that resource is owned by the current background job. Identifier will be
                        // removed only on failed state, or in one of final states (succeeded or
                        // deleted).
                        localTransaction.AddToSet(GetResourceKey(context.BackgroundJob.Job.Args), context.BackgroundJob.Id);
                        localTransaction.Commit();

                        // Invocation is permitted, and we did all the required things.
                        return;
                    }
                }
            }
            catch (DistributedLockTimeoutException)
            {
                // We weren't able to acquire a distributed lock within a specified window. This may
                // be caused by network delays, storage outages or abandoned locks in some storages.
                // Since it is required to expire abandoned locks after some time, we can simply
                // postpone the invocation.
                context.CandidateState = new ScheduledState(TimeSpan.FromSeconds(RetryInSeconds))
                {
                    Reason = "Couldn't acquire a distributed lock for mutex: timeout exceeded"
                };

                return;
            }

            // Background job execution is blocked. We should change the target state either to 
            // the Scheduled or to the Deleted one, depending on current retry attempt number.
            var currentAttempt = context.GetJobParameter<int>("MutexAttempt") + 1;
            context.SetJobParameter("MutexAttempt", currentAttempt);

            context.CandidateState = MaxAttempts == 0 || currentAttempt <= MaxAttempts
                ? CreateScheduledState(blockedBy, currentAttempt)
                : CreateDeletedState(blockedBy);
        }

        public void OnStateApplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
        {
            if (context.BackgroundJob.Job == null) return;

            if (context.OldStateName == ProcessingState.StateName)
            {
                using (AcquireDistributedSetLock(context.Connection, context.BackgroundJob.Job.Args))
                {
                    var localTransaction = context.Connection.CreateWriteTransaction();
                    localTransaction.RemoveFromSet(GetResourceKey(context.BackgroundJob.Job.Args), context.BackgroundJob.Id);

                    localTransaction.Commit();
                }
            }
        }

        public void OnStateUnapplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
        {
        }

        private static DeletedState CreateDeletedState(string blockedBy)
        {
            return new DeletedState
            {
                Reason = $"Execution was blocked by background job {blockedBy}, all attempts exhausted"
            };
        }

        private IState CreateScheduledState(string blockedBy, int currentAttempt)
        {
            var reason = $"Execution is blocked by background job {blockedBy}, retry attempt: {currentAttempt}";

            if (MaxAttempts > 0)
            {
                reason += $"/{MaxAttempts}";
            }

            return new ScheduledState(TimeSpan.FromSeconds(RetryInSeconds))
            {
                Reason = reason
            };
        }

        private IDisposable AcquireDistributedSetLock(IStorageConnection connection, IEnumerable<object> args)
        {
            return connection.AcquireDistributedLock(GetDistributedLockKey(args), DistributedLockTimeout);
        }

        private string GetDistributedLockKey(IEnumerable<object> args)
        {
            return $"extension:job-mutex:lock:{GetKeyFormat(args, _resource)}";
        }

        private string GetResourceKey(IEnumerable<object> args)
        {
            return $"extension:job-mutex:set:{GetKeyFormat(args, _resource)}";
        }

        private static string GetKeyFormat(IEnumerable<object> args, string keyFormat)
        {
            return String.Format(keyFormat, args.ToArray());
        }
    }
}

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • JAVA反射機制是在運行狀態中,對於任意一個類,都能夠知道這個類的所有屬性和方法;對於任意一個對象,都能夠調用它的任意方法和屬性;這種動態獲取信息以及動態調用對象方法的功能稱為java語言的反射機制。 一:Class類 在面向對象的世界里,萬物皆對象。類也是對象,類是java.lang.Class類 ...
  • 簡介 Python 是一種高層次的結合瞭解釋性、編譯性、互動性和麵向對象的腳本語言。Python 由 Guido van Rossum 於 1989 年底在荷蘭國家數學和電腦科學研究所發明,第一個公開發行版發行於 1991 年。 特點 易於學習:Python 有相對較少的關鍵字,結構簡單,和一個明 ...
  • 為何要用中間件來實現音頻處理的監聽服務 當然也可以使用Startup來進行服務的自啟動,或者也可以使用quartz定時調度任務來啟動音頻服務,大家隨意。 筆者認為使用中間件的目的,是為了分離應用和服務,也是一種解耦手段。 我們知道,在NETCORE中的中間件,有點類似像AOP的一種實現形式,他的調用 ...
  • http://cron.qqe2.com/ 如果不會 或者想檢驗自己是否寫的對就 通過這個網站 檢測 或自動生成 * * * * * * * [秒] [分] [小時] [日] [月] [周] [年] 共7個*號 序號 說明 是否必填 允許填寫的值 允許的通配符 1 秒 是 0-59 , - * /  ...
  • 昨天更新了VS到最新版本v15.8.0,但是編譯UWP出現了操蛋的bug。 谷歌一下,vs社區已經有答案了。 打開.csproj文件,在節點 <PropertyGroup> 裡面,加上一行 保存,重新載入項目,編譯即可。 原文參考:https://developercommunity.visuals ...
  • 消息隊列 神馬是消息隊列,看看某度的原話“在項目中,將一些無需即時返回且耗時的操作提取出來,進行了非同步處理,而這種非同步處理的方式大大的節省了伺服器的請求響應時間,從而提高了系統的吞吐量”。 其實消息隊列還可以用於解耦,在多層項目模型或中型項目以上,都會用到消息隊列,減少層與層之間的耦合;還可以做跨進 ...
  • 本文使用的DEV版本為10.1版本 首先需要添加Dll引用 DevExpress.Data.v10.1 DevExpress.XtraPrinting.v10.1 DevExpress.XtraReports.v10.1 在winfrom的環境下 XtraReport 是可以直接調用 Print() ...
  • default 關鍵字有兩類用法 1. switch語句中指定預設標簽 2. 預設值表達式 switch 語句 預設值表達式 default對應各種類型生成預設值列表如下: |類型|預設值| | | | |任何引用類型| | |數值類型|零| |bool| | |enum|表達式 生成的值,其中 是 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...