HangFire的拓展和使用 看了很多博客,小白第一次寫博客。 最近由於之前的任務調度框架總出現問題,因此想尋找一個替代品,之前使用的是Quartz.Net,這個框架方便之處就是支持cron表達式適合複雜日期場景使用,以及秒級任務。但是配置比較複雜,而且管理不方便,自己開發了個web管理頁面,不過這 ...
HangFire的拓展和使用
看了很多博客,小白第一次寫博客。
最近由於之前的任務調度框架總出現問題,因此想尋找一個替代品,之前使用的是Quartz.Net,這個框架方便之處就是支持cron表達式適合複雜日期場景使用,以及秒級任務。但是配置比較複雜,而且管理不方便,自己開發了個web管理頁面,不過這個需要額外的單獨線程去統一管理工作狀態,很容易出現問題。
有考慮過 “FluentScheduler” ,使用簡單,但是管理配置也很麻煩,我希望能做到配置簡單,管理方便,高性能。最後想到了以前聽過的hangfire,它的好處就是自帶控制面板,在園子里看了很多相關資料,偶然發現了有人拓展過hangfire通過調用api介面來執行任務,這種方式可以避免依賴本地代碼,方便部署,在此基礎上,我用空閑時間拓展了一下現在已經基本可以滿足需求。
所拓展的功能全部屬於外部拓展,因此hangfire版本可以一直更新,現在已經更新最新版,支持秒級任務
由於更新到最新版hangfire 1.7支持秒級任務,使用的線上表達式生成部分表達式有問題,註掉了秒級任務表達式生成,有時間需要詳細測試更改,可以參考(hangfire官方提供的表達式)
現在已經實現的功能有:
1,部署及調試:只需要配置資料庫連接,然後編譯即可運行,無需建表,支持(redis,mysql, sqlserver)其他資料庫暫時用不到沒測試。推薦使用redis集群。項目中直接添加了redis的存儲包,已經更新StackExchange.Redis到最新版本方便拓展,調試時可以直接調試。部署,只需要發佈項目,運行創建windows服務的bat命令,命令已經包含在項目中,或者發佈至Linux。
2,周期任務:支持在控制面板頁面上添加周期任務,編輯周期任務,刪除周期任務,手動觸發周期任務,暫停和繼續周期任務(暫停實現的原理是通過set中添加屬性,在job執行前,過濾掉,直接跳過執行,因為hangfire中job一旦創建就失去了控制權,只能通過過濾器去攔截),任務暫停後會查詢狀態並渲染面板列表為紅色字體方便查找哪個任務被暫停。
3,計劃任務:在作業選項卡中,計劃作業中可以實現添加計劃任務,計劃任務可以使任務在指定的分鐘後執行,只執行一次。
4,只讀面板:通過配置的用戶名密碼,使用戶只具有讀取面板的許可權,這樣可以防止誤操作
1 //只讀面板,只能讀取不能操作 2 app.UseHangfireDashboard("/job-read", new DashboardOptions 3 { 4 AppPath = "#",//返回時跳轉的地址 5 DisplayStorageConnectionString = false,//是否顯示資料庫連接信息 6 IsReadOnlyFunc = Context => 7 { 8 return true; 9 }, 10 Authorization = new[] { new BasicAuthAuthorizationFilter(new BasicAuthAuthorizationFilterOptions 11 { 12 RequireSsl = false,//是否啟用ssl驗證,即https 13 SslRedirect = false, 14 LoginCaseSensitive = true, 15 Users = new [] 16 { 17 new BasicAuthAuthorizationUser 18 { 19 Login = "read", 20 PasswordClear = "only" 21 }, 22 new BasicAuthAuthorizationUser 23 { 24 Login = "test", 25 PasswordClear = "123456" 26 }, 27 new BasicAuthAuthorizationUser 28 { 29 Login = "guest", 30 PasswordClear = "123@123" 31 } 32 } 33 }) 34 } 35 });View Code
5,郵件推送:目前使用的方式是,任務錯誤重試達到指定次數後,發送郵件通知,使用的MailKit
1 catch (Exception ex) 2 { 3 //獲取重試次數 4 var count = context.GetJobParameter<string>("RetryCount"); 5 context.SetTextColor(ConsoleTextColor.Red); 6 //signalR推送 7 //SendRequest(ConfigSettings.Instance.URL+"/api/Publish/EveryOne", "測試"); 8 if (count == "3")//重試達到三次的時候發郵件通知 9 { 10 SendEmail(item.JobName, item.Url, ex.ToString()); 11 } 12 logger.Error(ex, "HttpJob.Excute"); 13 context.WriteLine($"執行出錯:{ex.Message}"); 14 throw;//不拋異常不會執行重試操作 15 }View Code
1 /// <summary> 2 /// 郵件模板 3 /// </summary> 4 /// <param name="jobname"></param> 5 /// <param name="url"></param> 6 /// <param name="exception"></param> 7 /// <returns></returns> 8 private static string SethtmlBody(string jobname, string url, string exception) 9 { 10 var htmlbody = $@"<h3 align='center'>{HangfireHttpJobOptions.SMTPSubject}</h3> 11 <h3>執行時間:</h3> 12 <p> 13 {DateTime.Now} 14 </p> 15 <h3> 16 任務名稱:<span> {jobname} </span><br/> 17 </h3> 18 <h3> 19 請求路徑:{url} 20 </h3> 21 <h3><span></span> 22 執行結果:<br/> 23 </h3> 24 <p> 25 {exception} 26 </p> "; 27 return htmlbody; 28 }郵件模板
1 //使用redis 2 config.UseRedisStorage(Redis, new Hangfire.Redis.RedisStorageOptions() 3 { 4 FetchTimeout=TimeSpan.FromMinutes(5), 5 Prefix = "{hangfire}:", 6 //活動伺服器超時時間 7 InvisibilityTimeout = TimeSpan.FromHours(1), 8 //任務過期檢查頻率 9 ExpiryCheckInterval = TimeSpan.FromHours(1), 10 DeletedListSize = 10000, 11 SucceededListSize = 10000 12 }) 13 .UseHangfireHttpJob(new HangfireHttpJobOptions() 14 { 15 SendToMailList = HangfireSettings.Instance.SendMailList, 16 SendMailAddress = HangfireSettings.Instance.SendMailAddress, 17 SMTPServerAddress = HangfireSettings.Instance.SMTPServerAddress, 18 SMTPPort = HangfireSettings.Instance.SMTPPort, 19 SMTPPwd = HangfireSettings.Instance.SMTPPwd, 20 SMTPSubject = HangfireSettings.Instance.SMTPSubject 21 })配置郵件參數
6,signalR 推送:宿主程式使用的weapi,因此可以通過webapi推送,這樣做的好處是可以將服務當作推送服務使用,第三方介面也可以利用此來推送,
1 /// <summary> 2 ///用戶加入組處理 3 /// </summary> 4 /// <param name="userid">用戶唯一標識</param> 5 /// <param name="GroupName">組名稱</param> 6 /// <returns></returns> 7 public Task InitUsers(string userid,string GroupName) 8 { 9 Console.WriteLine($"{userid}加入用戶組"); 10 Groups.AddToGroupAsync(Context.ConnectionId, GroupName); 11 SignalrGroups.UserGroups.Add(new SignalrGroups() 12 { 13 ConnectionId = Context.ConnectionId, 14 GroupName = GroupName, 15 UserId = userid 16 }); 17 return Clients.All.SendAsync("UserJoin", "用戶組數據更新,新增id為:" + Context.ConnectionId + " pid:" + userid); 18 } 19 /// <summary> 20 /// 斷線的時候處理 21 /// </summary> 22 /// <param name="exception"></param> 23 /// <returns></returns> 24 public override Task OnDisconnectedAsync(Exception exception) 25 { 26 //掉線移除用戶,不給其推送 27 var user = SignalrGroups.UserGroups.FirstOrDefault(c => c.ConnectionId == Context.ConnectionId); 28 29 if (user != null) 30 { 31 Console.WriteLine($"用戶:{user.UserId}已離線"); 32 SignalrGroups.UserGroups.Remove(user); 33 Groups.RemoveFromGroupAsync(Context.ConnectionId, user.GroupName); 34 } 35 return base.OnDisconnectedAsync(exception); 36 }Hub定義
1 /// <summary> 2 /// 單個connectionid推送 3 /// </summary> 4 /// <param name="groups"></param> 5 /// <returns></returns> 6 [HttpPost, Route("AnyOne")] 7 public IActionResult AnyOne([FromBody]IEnumerable<SignalrGroups> groups) 8 { 9 if (groups != null && groups.Any()) 10 { 11 var ids = groups.Select(c => c.UserId); 12 var list = SignalrGroups.UserGroups.Where(c => ids.Contains(c.UserId)); 13 foreach (var item in list) 14 hubContext.Clients.Client(item.ConnectionId).SendAsync("AnyOne", $"{item.ConnectionId}: {item.Content}"); 15 } 16 return Ok(); 17 } 18 19 /// <summary> 20 /// 全部推送 21 /// </summary> 22 /// <param name="message"></param> 23 /// <returns></returns> 24 [HttpPost, Route("EveryOne")] 25 public IActionResult EveryOne([FromBody] MSG body) 26 { 27 var data = HttpContext.Response.Body; 28 hubContext.Clients.All.SendAsync("EveryOne", $"{body.message}"); 29 return Ok(); 30 } 31 32 /// <summary> 33 /// 單個組推送 34 /// </summary> 35 /// <param name="group"></param> 36 /// <returns></returns> 37 [HttpPost, Route("AnyGroups")] 38 public IActionResult AnyGroups([FromBody]SignalrGroups group) 39 { 40 if (group != null) 41 { 42 hubContext.Clients.Group(group.GroupName).SendAsync("AnyGroups", $"{group.Content}"); 43 } 44 return Ok(); 45 }推送介面定義
7,介面健康檢查:因為主要用來調用api介面,因此集成介面健康檢查還是很有必要的,目前使用的方式是配置文件中添加需要檢查的地址
1 /*健康檢查配置項*/ 2 "HealthChecks-UI": { 3 /*檢查地址,可以配置當前程式和外部程式*/ 4 "HealthChecks": [ 5 { 6 "Name": "Hangfire Api 健康檢查", 7 "Uri": "http://localhost:9006/healthz" 8 } 9 ], 10 /*需要檢查的Api地址*/ 11 "CheckUrls": [ 12 { 13 "Uri": "http://localhost:17600/CityService.svc/HealthyCheck", 14 "httpMethod": "Get" 15 }, 16 { 17 "Uri": "http://localhost:9098/CheckHelath", 18 "httpMethod": "Post" 19 }, 20 { 21 "Uri": "http://localhost:9067/GrtHelathCheck", 22 "httpMethod": "Get" 23 }, 24 { 25 "Uri": "http://localhost:9043/GrtHelathCheck", 26 "httpMethod": "Get" 27 } 28 ], 29 "Webhooks": [], //鉤子配置 30 "EvaluationTimeOnSeconds": 10, //檢測頻率 31 "MinimumSecondsBetweenFailureNotifications": 60, //推送間隔時間 32 "HealthCheckDatabaseConnectionString": "Data Source=\\healthchecksdb" //-> sqlite庫存儲檢查配置及日誌信息 33 }健康檢查相關配置
後臺會根據配置的指定間隔去檢查服務介面是否可以正常訪問,(這個中間件可以實現很多檢查功能,包括網路,資料庫,mq等,支持webhook推送等豐富功能,系統用不到因此沒有添加)
健康檢查的配置
1 //添加健康檢查地址 2 HangfireSettings.Instance.HostServers.ForEach(s => 3 { 4 services.AddHealthChecks().AddUrlGroup(new Uri(s.Uri), s.httpMethod.ToLower() == "post" ? HttpMethod.Post : HttpMethod.Get, $"{s.Uri}"); 5 });健康檢查地址添加
1 app.UseHealthChecks("/healthz", new HealthCheckOptions() 2 { 3 Predicate = _ => true, 4 ResponseWriter = UIResponseWriter.WriteHealthCheckUIResponse 5 }); 6 app.UseHealthChecks("/health", options);//獲取自定義格式的json數據 7 app.UseHealthChecksUI(setup => 8 { 9 setup.UIPath = "/hc"; // 健康檢查的UI面板地址 10 setup.ApiPath = "/hc-api"; // 用於api獲取json的檢查數據 11 });健康檢查中間件配置
其中,ui配置路徑是在面板中展示檢查結果需要使用的地址
api地址,可以通過介面的方式來調用檢查結果,方便在第三方系統中使用,其數據格式可以自定義
通過介面調用
1 [{ 2 "id": 1, 3 "status": "Unhealthy", 4 "onStateFrom": "2019-04-07T18:00:09.6996751+08:00", 5 "lastExecuted": "2019-04-07T18:05:03.4761739+08:00", 6 "uri": "http://localhost:53583/healthz", 7 "name": "Hangfire Api 健康檢查", 8 "discoveryService": null, 9 "entries": [{ 10 "id": 1, 11 "name": "http://localhost:17600/CityService.svc/HealthyCheck", 12 "status": "Unhealthy", 13 "description": "An error occurred while sending the request.", 14 "duration": "00:00:04.3907375" 15 }, { 16 "id": 2, 17 "name": "http://localhost:9098/CheckHelath", 18 "status": "Unhealthy", 19 "description": "An error occurred while sending the request.", 20 "duration": "00:00:04.4140310" 21 }, { 22 "id": 3, 23 "name": "http://localhost:9067/GrtHelathCheck", 24 "status": "Unhealthy", 25 "description": "An error occurred while sending the request.", 26 "duration": "00:00:04.3847367" 27 }, { 28 "id": 4, 29 "name": "http://localhost:9043/GrtHelathCheck", 30 "status": "Unhealthy", 31 "description": "An error occurred while sending the request.", 32 "duration": "00:00:04.4499007" 33 }], 34 "history": [] 35 }]介面返回數據原始格式
1 { 2 "status": "Unhealthy", 3 "errors": [{ 4 "key": "http://localhost:17600/CityService.svc/HealthyCheck", 5 "value": "Unhealthy" 6 }, { 7 "key": "http://localhost:9098/CheckHelath", 8 "value": "Unhealthy" 9 }, { 10 "key": "http://localhost:9067/GrtHelathCheck", 11 "value": "Unhealthy" 12 }, { 13 "key": "http://localhost:9043/GrtHelathCheck", 14 "value": "Unhealthy" 15 }] 16 }介面返回數據處理後格式
1 //重寫json報告數據,可用於遠程調用獲取健康檢查結果 2 var options = new HealthCheckOptions 3 { 4 ResponseWriter = async (c, r) => 5 { 6 c.Response.ContentType = "application/json"; 7 8 var result = JsonConvert.SerializeObject(new 9 { 10 status = r.Status.ToString(), 11 errors = r.Entries.Select(e => new { key = e.Key, value = e.Value.Status.ToString() }) 12 }); 13 await c.Response.WriteAsync(result); 14 } 15 };處理方式
8,通過介面添加任務:添加編輯周期任務,添加計劃任務,觸發周期任務,刪除周期任務,多個任務連續一次執行的任務