前言: 自從接觸非同步(async await Task)操作後,始終都不明白,這個Task調度的問題。 接觸Quartz.net已經很久了,只知道它實現了一套Task調度的方法,自己跟著Quartz.net源代碼寫了遍,調試後我算是明白了Task調度的一部分事( )。 春風來不遠,只在屋東頭。 理解 ...
前言:
自從接觸非同步(async await Task)操作後,始終都不明白,這個Task調度的問題。
接觸Quartz.net已經很久了,只知道它實現了一套Task調度的方法,自己跟著Quartz.net源代碼寫了遍,調試後我算是明白了Task調度的一部分事( )。
春風來不遠,只在屋東頭。
理解Task運行,請參考大佬文章 https://www.cnblogs.com/artech/p/task_scheduling.html ,推薦大佬的書。
直到我看Quartz.net源代碼中的任務調度 “QueuedTaskScheduler”,我才搞明白了,如何寫一個簡單的任務調度器,或者說線程如何執行代碼,才不會造成死迴圈,CPU吃滿等問題,下麵代碼有的直接從quartz.net copy過來的。
BlockingCollection類
微軟文檔 https://learn.microsoft.com/zh-cn/dotnet/standard/collections/thread-safe/blockingcollection-overview
個人博客,中文解釋通俗易懂 https://www.cnblogs.com/gl1573/p/14595985.html
BlockingCollection 提供一個很重要的“阻塞”功能。
TaskScheduler類
TaskScheduler 直譯過來:表示一個對象,該對象處理將任務排隊到線程上的低級工作。
該類為抽象類,其真正意義在於“對Task任務的編排”
基於TaskScheduler類實現自定義的“Task隊列調度器”
源代碼,我的倉庫 https://github.com/qiqiqiyaya/Learning-Case/tree/main/TaskScheduler/AspNet6TaskScheduler
1.定義一個存儲Task的隊列容器,使用BlockingCollection容器來添加Task,為什麼使用BlockingCollection,後面會解釋
/// <summary>The collection of tasks to be executed on our custom threads.</summary>
private readonly BlockingCollection<Task> _blockingTaskQueue;
2.定義CancellationTokenSource變數,用於釋放。通常就是調用 CancellationToken.ThrowIfCancellationRequested() ,拋出一個 “OperationCanceledException”的異常,使正在執行的Task任務停止。
3.創建Thread數組,用於存儲創建出的Thread
/// <summary>The threads used by the scheduler to process work.</summary>
private readonly Thread[] _threads;
4.自定義一個類QueuedTaskScheduler,繼承 “TaskScheduler”,“IDisposable”
public class QueuedTaskScheduler: System.Threading.Tasks.TaskScheduler, IDisposable
實現構造函數
public QueuedTaskScheduler(int threadCount)
{
_threadCount = threadCount;
_blockingTaskQueue = new BlockingCollection<Task>();
// create threads
_threads = new Thread[threadCount];
for (int i = 0; i < threadCount; i++)
{
_threads[i] = new Thread(ThreadBasedDispatchLoop)
{
Priority = ThreadPriority.Normal,
IsBackground = true,
Name = $"threadName ({i})"
};
}
// start
foreach (var thread in _threads)
{
thread.Start();
}
}
在構造函數中創建,並啟動“Thread”,構造函數接收一個“線程數量的參數”,控制開啟的線程數。
Thread中實現的委托為“ThreadBasedDispatchLoop”,其表達意思是“基於迴圈的調度”。
5.重點來了,具體看下“ThreadBasedDispatchLoop”方法的實現
ThreadBasedDispatchLoop實現
/// <summary>The dispatch loop run by all threads in this scheduler.</summary>
private void ThreadBasedDispatchLoop()
{
_taskProcessingThread.Value = true;
try
{
// If a thread abort occurs, we'll try to reset it and continue running.
while (true)
{
try
{
// For each task queued to the scheduler, try to execute it.
foreach (var task in _blockingTaskQueue.GetConsumingEnumerable(_disposeCancellation.Token))
{
TryExecuteTask(task);
}
}
catch (ThreadAbortException)
{
// If we received a thread abort, and that thread abort was due to shutting down
// or unloading, let it pass through. Otherwise, reset the abort so we can
// continue processing work items.
if (!Environment.HasShutdownStarted && !AppDomain.CurrentDomain.IsFinalizingForUnload())
{
#pragma warning disable SYSLIB0006
Thread.ResetAbort();
#pragma warning restore SYSLIB0006
}
}
}
}
catch (OperationCanceledException)
{
// If the scheduler is disposed, the cancellation token will be set and
// we'll receive an OperationCanceledException. That OCE should not crash the process.
}
finally
{
_taskProcessingThread.Value = false;
}
}
在外層套一層try catch捕獲 CancellationTokenSource 變數,取消操作(CancellationTokenSource.Cancel())產生的異常,並且忽略該異常。
其中使用while(true),無限迴圈執行,?????奇了怪了,為什麼以前寫代碼時,while(true)寫了,會直接把CPU吃滿,程式搞奔潰呢????
關鍵點就在於
當_blockingTaskQueue.GetConsumingEnumerable(_disposeCancellation.Token)執行時,如果_blockingTaskQueue容器中沒有元素時,執行就會被“阻塞”,這種阻塞不會造成或者造成很小的資源浪費。
當_blockingTaskQueue有值時,阻塞就會停止,_blockingTaskQueue.GetConsumingEnumerable(_disposeCancellation.Token)執行,返回一個Task對象,然後開始執行 TryExecuteTask(task) ,執行Task。
6.繼承 “TaskScheduler”後需要實現的幾個方法
GetScheduledTasks
protected override IEnumerable<Task>? GetScheduledTasks()
{
return _blockingTaskQueue.ToList();
}
GetScheduledTasks 返回需要被調度的 Tasks
QueueTask
protected override void QueueTask(Task task)
{
// QueuedTaskScheduler 釋放時,禁止向隊列中添加Task
if (_disposeCancellation.IsCancellationRequested)
{
throw new ObjectDisposedException(GetType().Name);
}
_blockingTaskQueue.Add(task);
}
QueueTask 將排隊等候的Task加入到 “_blockingTaskQueue”隊列變數中
TryExecuteTaskInline
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
// If we're already running tasks on this threads, enable inlining
return _taskProcessingThread.Value && TryExecuteTask(task);
}
意思是,參數task是否在此線程上運行,請查看ThreadBasedDispatchLoop方法。
“ThreadLocal<bool>” 該類型變數聲明生命周期跟隨 “構造函數”中啟動的線程,且每一個線程單獨一個變數,值存儲線上程上。
自此自定義“Task調度器”完成。
啟動,運行QueuedTaskScheduler
1. 創建 QueuedTaskScheduler ,其中用於執行Task的線程數為 1
2.創建 Task ,並將其加入到指定的 Task調度器中。
3.調試一下
A. 創建 QueuedTaskScheduler ,創建 線程 “Thread” ,並啟動線程
B. 調試過程
當 _blockingTaskQueue沒有Task時,執行到 _blockingTaskQueue.GetConsumingEnumerable(_disposeCancellation.Token) 就會阻塞。
自此 自定義TaskScheduler完成。
我的源代碼 https://github.com/qiqiqiyaya/Learning-Case/tree/main/TaskScheduler/AspNet6TaskScheduler
本文來自博客園,作者:youliCC,轉載請註明原文鏈接:https://www.cnblogs.com/youlicc/p/17403429.html