業務背景 在稍微複雜點業務系統中,不可避免會碰到做定時任務的需求,比如淘寶的交易超時自動關閉訂單、超時自動確認收貨等等。對於一些定時作業比較多的系統,通常都會搭建專門的調度平臺來管理,通過創建定時器來周期性執行任務。如剛纔所說的場景,我們可以給訂單創建一個專門的任務來處理交易狀態,每秒輪詢一次訂單表 ...
業務背景
在稍微複雜點業務系統中,不可避免會碰到做定時任務的需求,比如淘寶的交易超時自動關閉訂單、超時自動確認收貨等等。對於一些定時作業比較多的系統,通常都會搭建專門的調度平臺來管理,通過創建定時器來周期性執行任務。如剛纔所說的場景,我們可以給訂單創建一個專門的任務來處理交易狀態,每秒輪詢一次訂單表,找出那些符合超時條件的訂單然後標記狀態。這是最簡單粗暴的做法,但明顯也很low,自己都下不去手寫這樣的代碼,所有必須要找個更好的方案。
回到真實項目中的場景,系統中某個活動上線後要給目標用戶發送簡訊通知,這些通知需要按時間點批量發送。雖然已經基於quartz.net給系統搭建了任務調度平臺,但著實不想用上述方案來實現。在網上各種搜索和思考,找到一篇文章讓我眼前一亮,稍加分析發現裡面的思路完全符合現在的場景,於是決定在自己項目中實現出來。
原理分析
這種方案的核心就是構造一種數據結構,稱之為環形隊列,但實際上還是一個數組,加上對它的迴圈遍歷,達到一種環狀的假象。然後再配合定時器,就可以實現按需延時的效果。上面提到的文章中也介紹了實現思路,這裡我採用我的理解再更加詳細的解釋一下。
我們先為這個數組分配一個固定大小的空間,比如60,每個數組的元素用來存放任務的集合。然後開啟一個定時器每隔一秒來掃描這個數組,掃完一圈剛好是一分鐘。如果提前設置好任務被掃描的圈數(CycleNum)和在數組中的位置(Slot),在剛好掃到數組的Slot位置時,集合里那些CycleNum為0的任務就是達到觸發條件的任務,拉出來做業務操作然後移除掉,其他的把圈數減掉一次,然後留到下次繼續掃描,這樣就實現了延時的效果。原理如下圖所示:
可以看出中間的重點是計算出每個任務所在的位置以及需要迴圈的圈數。假設當前時間為15:20:08,當前掃描位置是2,我的任務要在15:22:35這個時刻觸發,也就是147秒後。那麼我需要迴圈的圈數就是147/60=2圈,需要被掃描的位置就是(147+2)%60=29的地方。計算好任務的坐標後塞到數組中屬於它的位置,然後靜靜等待被消費就好啦。
擼碼實現
光講原理不上代碼怎麼能行呢,根據上面的思路,下麵一步步在.net平臺下實現出來。
先做一些基礎封裝。
首先構造任務參數的基類,用來記錄任務的位置信息和定義業務回調方法:
public class DelayQueueParam { internal int Slot { get; set; } internal int CycleNum { get; set; } public Action<object> Callback { get; set; } }
接下來是核心地方。再構造隊列的泛型類,真實類型必須派生自上面的基類,用來擴展一些業務欄位方便消費時使用。隊列的主要屬性有當前位置指針以及數組容器,主要的操作有插入、移除和消費。插入任務時需要傳入執行時間,用來計算這個任務的坐標。
public class DelayQueue<T> where T : DelayQueueParam { private List<T>[] queue; private int currentIndex = 1; public DelayQueue(int length) { queue = new List<T>[length]; } public void Insert(T item, DateTime time) { //根據消費時間計算消息應該放入的位置 var second = (int)(time - DateTime.Now).TotalSeconds; item.CycleNum = second / queue.Length; item.Slot = (second + currentIndex) % queue.Length; //加入到延時隊列中 if (queue[item.Slot] == null) { queue[item.Slot] = new List<T>(); } queue[item.Slot].Add(item); } public void Remove(T item) { if (queue[item.Slot] != null) { queue[item.Slot].Remove(item); } } public void Read() { if (queue.Length >= currentIndex) { var list = queue[currentIndex - 1]; if (list != null) { List<T> target = new List<T>(); foreach (var item in list) { if (item.CycleNum == 0) { //在本輪命中,用單獨線程去執行業務操作 Task.Run(()=> { item.Callback(item); }); target.Add(item); } else { //等下一輪 item.CycleNum--; System.Diagnostics.Debug.WriteLine($"@@@@@索引:{item.Slot},剩餘:{item.CycleNum}"); } } //把已過期的移除掉 foreach (var item in target) { list.Remove(item); } } currentIndex++; //下一遍從頭開始 if (currentIndex > queue.Length) { currentIndex = 1; } } } }
接下來是使用方法。
創建一個管理隊列實例的靜態類,裡面封裝對隊列的操作:
public static class NotifyPlanManager { private static DelayQueue<NotifyPlan> _queue = new DelayQueue<NotifyPlan>(60); public static void Insert(NotifyPlan plan, DateTime time) { _queue.Insert(plan, time); } public static void Read() { _queue.Read(); } }
構建我們的實際業務參數類,派生自DelayQueueParam:
public class NotifyPlan : DelayQueueParam { public Guid CamId { get; set; } public int PreviousTotal { get; set; } public int Amount { get; set; } }
生產端往隊列中插入數據:
Action<object> callback = (result) => { var np = result as NotifyPlan; //這裡做自己的業務操作 //舉個例子: Debug.WriteLine($"活動ID:{np.CamId},已發送數量:{np.PreviousTotal},本次發送數量:{np.Amount}"); }; NotifyPlanManager.Insert(new NotifyPlan { Amount = set.MainAmount, CamId = camId, PreviousTotal = 0, Callback = callback }, smsTemplate.SendDate);
再創建一個每秒執行一次的定時器用做消費端,我這裡使用的是FluentScheduler,核心代碼:
internal class NotifyPlanJob : IJob { /// <summary> /// 執行計劃 /// </summary> public void Execute() { NotifyPlanManager.Read(); } } internal class JobFactory : Registry { public JobFactory() { //每秒運行一次 Schedule<NotifyPlanJob >().ToRunEvery(1).Seconds(); } } JobManager.Initialize(new JobFactory());
然後開啟調試運行,打開本機的系統時間面板,對著時間看輸出結果。親測有效。
總結
這種方案的好處是避免了頻繁地掃描資料庫和不必要的業務操作,另外也很方便控制時間精度。帶來的問題是如果web服務異常或重啟可能會發生任務丟失的情況,我目前的處理方法是在資料庫中標記任務狀態,服務啟動時把狀態為“排隊中”的任務重新載入到隊列中等待消費。
以上方案在單機環境測試沒問題,多節點情況下暫時沒有深究。若有設計實現上的缺陷,歡迎討論與指正,要是有更好的方案,那就當拋磚引玉,再好不過了~