一、CLR 線程池基礎 一般來說如果電腦的 CPU 利用率沒有 100% ,那麼說明很多進程的部分線程沒有運行。可能在等待 文件/網路/資料庫等設備讀取或者寫入數據,又可能是等待按鍵、滑鼠移動等事件。 執行 I/O 限制的操作時,操作系統通過設備驅動程式通知硬體幹活,而 CPU 處於一種空閑狀態。 ...
一、CLR 線程池基礎
一般來說如果電腦的 CPU 利用率沒有 100% ,那麼說明很多進程的部分線程沒有運行。可能在等待 文件/網路/資料庫等設備讀取或者寫入數據,又可能是等待按鍵、滑鼠移動等事件。
執行 I/O 限制的操作時,操作系統通過設備驅動程式通知硬體幹活,而 CPU 處於一種空閑狀態。而在現代應用程式當中,使用線程池來執行計算限制的操作,而不是手動創建線程。
每個 CLR 都有自己獨立的線程池,並且由各自 CLR 控制的所有 AppDomain 所共用。
線程池本身維護了一個請求隊列,當程式需要執行一個非同步操作的時候,會將一個記錄項追加到隊列之中,然後由線程池將該記錄項分派給線程池線程,如果沒有線程則創建一個新的線程。線程任務處理完整之後,將該線程放入線程池中等待以後進行復用。
線程池本身是啟髮式的,結合程式負載,他會自己根據當前線程池內線程的狀態銷毀/新增線程。
二、執行簡單的計算限制操作
通過 ThreadPool
靜態類,我們可以方便地使用線程池中的線程為我們執行一些計算限制的非同步操作。只需要調用 ThreadPool
的 QueueUserWorkItem(WaitCallBack callback)
方法,或者是他的另一個重載方法,接收一個 state 值作為參數。
他的兩個方法都是非阻塞的,調用之後會立即返回。
WaitCallBack
的方法簽名如下:
delegate void WaitCallBack(Object state);
在 CLR 的線程池中,將 callback 委托作為工作項添加到隊列當中,然後由線程池分發線程進行處理。
【註意】
一旦回調方法拋出了未處理的異常,CLR 會立即終止進程。
三、執行上下文
每個線程都有一個執行上下文的數據結構,包含由安全設置,宿主設置和邏輯調用上下文數據(AsyncLocal 與 CallContext)。
當在某個線程(例如主線程)使用了另外一個線程(輔助線程),就會產生執行上下文由調用者線程流向被調用線程。這會對性能造成一定的影響,這是因為執行上下文包含有大量地信息。而如果輔助線程又調用了更多的輔助線程,這個時候執行上下問的複製開銷就非常大。
我們可以通過 ExecutionContext
類控制線程的執行上下文是否流向輔助線程,只有輔助線程不需要訪問執行上下文時可以阻止執行上下文流動。當阻止了執行上下文流動時,輔助線程會使用最後一次與其關聯的任意執行上下文,這個時候對於安全設置等就不可信,不應執行任何依賴於執行上下文的操作。
一般來說在主線程,可以通過 ExecutionContext.SuppressFlow();
方法阻止執行上下文流動,然後再通過 ExecutionContext.RestoreFlow();
恢復流動。
四、協作式取消和超時
.NET 提供了標準的取消操作模式,這個模式是協作式的,也就是你要取消的操作必須顯式聲明自己可以被取消。這是因為用戶在執行某些長耗時的計算限制操作的時候,可能會因為等待時間太長或者其他原因需要取消這個操作。
首先我們通過 System.Threading.CancellationTokenSource
對象管理或者取消對象狀態,使用時直接 new 一個即可,而該對象擁有一個 CancellationToken
對象。
這個 Token 對象用於傳遞給執行計算限制操作的方法,通過該 Token 的 IsCancellationRequested
屬性你可以在方法內部確認任務是否被取消,如果被取消你就可以進行返回操作。
例子如下:
static void Main(string[] args)
{
var tokenSource = new CancellationTokenSource();
ThreadPool.QueueUserWorkItem(z => Calculate(CancellationToken.None, 10000));
Console.ReadKey();
tokenSource.Cancel();
Console.ReadLine();
}
private static void Calculate(CancellationToken token, int count)
{
for (int i = 0; i < count; i++)
{
if (token.IsCancellationRequested)
{
Console.WriteLine("用戶提前終止操作,退出線程..");
break;
}
Console.WriteLine(count);
Thread.Sleep(200);
}
Console.WriteLine("計數完成.");
}
【註意】
如果你要執行一個不允許被取消的操作,可以為方法傳遞一個
CancellationToken.None
對象,因為該對象沒有 Source 源,則不可能會被調用Cancel()
進行取消。
註冊取消事件
CancellationToken
允許我們通過 Register()
方法註冊多個委托,這些被註冊了的委托會在 TokenSource 調用 Cancel
取消的時候優先調用,其調用的先後順序為註冊時的順序。
【註意】
調用
Register()
方法的時候,他有兩個bool
類型的參數,分別是useSyncContext
與useExecutionContext
。這兩個參數用於指定,是否要用調用線程的同步上下文或者執行上下文來調用回調函數。
同時在註冊成功之後會返回一個 CancellationTokenRegistration
對象,通過調用該對象的 Dispose
方法可以刪除已經註冊的委托回調,這樣在取消時就不會調用該回調。
TokenSource 鏈接
可以通過 CancellationTokenSource.CreateLinkedTokenSource()
鏈接兩個或多個對象,鏈接成功後會返回一個單獨的 TokenSource 對象。
一旦這個新對象鏈接的任何一個 TokenSource 對象被取消的時候,該對象也會被取消掉。
Cancel 的異常處理
在調用 TokenSource 的 Cancel()
方法時(預設為 false),該方法還有另外一個重載傳入 bool
值,如果為 true 的時候,有多個註冊的回調委托,一旦某個出現異常直接會被拋出該異常,不會等待其他回調執行完畢。
如果為 false,則會等到所有回調方法執行完成時,拋出一個 AggregateException
異常,內部的 InnerExceptions
包含有所有在執行過程中產生的異常信息集合。
超時取消
除了直接調用 Cancel()
立即取消操作之外,還有一個延遲取消的方法 CancelAfter()
,通過傳遞具體的延遲時間,我們可以在指定的之間之後取消某個任務。(PS:有點像 Polly 的 TimeOut )
五、任務
為啥使用任務,雖然通過 ThreadPool
可以很方便地發起一次計算限制的操作,但是你不知道你的方法啥時候執行完成,也無法在操作完成之後獲得返回值。
使用任務執行一個計算限制操作有兩種方式,兩者也一樣的可以傳遞 CancellationToken
進行取消操作:
new Task(Sum,20).Start();
Task.Run(()=>Sum(20));
除此之外還可以在構造 Task
時 傳遞一些標誌位,用於任務調度器進行一些特殊處理。
等待任務完成並獲取結果
任務除了標準的無返回值的 Task
類型之外,還有一個包含有泛型參數的 Task<TResult>
類型,其中 TResult 參數就是任務的返回值類型。
在創建好 Task<TResult>
對象之後,可以通過 Task.Wait()
等待任務執行完成,Task 的 Wait()
方法會阻塞調用者線程直到任務執行完成。執行完成之後,可以通過 Task.Reuslt
獲取任務執行之後的返回值。
PS:
這裡獲取
Result
屬性值時,其內部也會調用Wait()
方法。
如果該 Task 內的計算限制操作拋出了未經處理的異常,這個異常會被吞噬掉,調用 Wait()
方法或者使用 Result
屬性的時候,這些異常信息會被包裹在 AggregateException
內部並返回給調用者線程。
【註意】
不推薦直接調用
Wait()
,如果 Task 已經開始執行,該方法會阻塞調用者線程,直到執行完成。第二種情況是任務還沒有開始執行的時候,調用者線程不會被阻塞,Task 立即執行並返回。而調度器可能會使用調用者線程來執行這些操作,這個時候,如果調用者線程獲取了一個線程同步鎖,而 Task 因為也是在調用者線程執行,嘗試獲取鎖的時候,就會產生死鎖。
AggregateException
可能會包含有多個異常,這個時候可以使用該對象的 Handle(Func<Exception,bool> predicate)
方法來為每一個異常進行處理,處理返回 true,未處理返回 false。
在調用了 Handle 方法之後,仍然有異常沒有處理,這些沒有處理的異常又會造成一個新的 AggregateException
被拋出。
【註意】
如果不知道有哪些 Task 內部未處理的異常,可以通過象任務調度器的
UnobservedTaskException
事件登記一個回調方法,如果存在一個沒有處理到的異常,則會調用你的回調方法,並將異常傳遞給你。
除了 Task.Wait()
方法,還有等待一組任務的 Task.WaitAny()
和 Task.WaitAll()
。幾個方法都會阻塞調用者線程,前者當傳遞的一組任務有任意一個完成則立即返回該任務的索引,後者則是要等待這一組任務全部完成之後才會喚醒調用線程。
這兩個方法一旦被取消,都會拋出 OperationCanceledException
異常。
取消任務
可以通過一個 CancellationTokenSource
來取消 Task,一樣的需要傳入的計算限制方法添加 CancellationToken
參數。
只不過呢,在 Task 任務內部我們不通過 IsCancellationRequested
來判斷任務是否取消,而是通過調用 Token 的 ThrowIfCancellationRequested()
方法來拋出異常。
該方法會判斷當前任務是否被取消,如果被取消了,則拋出異常。這是因為與直接通過線程池添加任務不同,線程池無法知道任務何時完成,而任務則可以表示是否完成,並且還能返回值。
任務完成時啟動新任務
之前說過通過調用 Task.Wait()
或者在任務尚未完成的時候調用 Task.Result
屬性,都會造成線程池創建新的線程。而我們可以通過在任務完成之後,立即開啟一個新的任務,這樣我們就可以通過新的任務知道前一個任務是否已經完成了。
創建一個的計算限制任務對象,我們在啟動了該任務對象之後,調用 Task.ContinueWith()
方法來創建一個延續任務,新的延續性任務會有一個 Task 參數,該參數就是最開始的任務。
而在使用 Task.ContinueWith()
時,他還可以傳遞一個標識位。這個標識位用於表明這個延續性任務是在第一個任務什麼情況下才會執行,一般有三種:OnlyOnCanceled(第一個任務取消時才被執行)、OnlyOnFaulted(第一個任務拋出未處理異常時執行)、OnlyOnRanToCompletion(第一個任務順利完成時執行)。
啟動子任務
一個任務在其內部可以創建其子任務,只需要在內部構造 Task 對象的時候,傳遞一個標識位 TaskCreationOptions.AttachedToParent
將其與父任務關聯。這樣的話,除非其所有子任務執行完成,父任務不會被認為已經完成。
延續性任務也可以作為第一個任務的子任務,指定對應的標識位即可。
任務的內部構造
任務主要由以下幾部分構成:
- 任務唯一的 Task Id。
- 調度器的引用。
- 回調方法的引用。
- 執行上下文的引用。
- 其他...
可以看到構造一個 Task 還是需要比較大的開銷的,如果你不需要 Task 的附加特性,完全可以使用 TaskPool.QueueUserworkItem
來獲得更好的性能與效率。
通過 Task 的只讀屬性 Task.Status
,我們可以知道任務目前處於哪種狀態,其最終的狀態主要有 3 種,分別是:RanToCompletion(已完成)、Canceled(被取消)、Faulted(出現異常失敗),這三種狀態都屬於任務完成狀態。
另外值得註意的是,通過 ContinueWith()
、ContinueWhenAll()
、ContinueWhenAny()
等方法創建的任務狀態都為 WaitingForActivation
,這個狀態代表任務會自動開始。
任務工廠
如果你需要在執行多個相同配置的 Task 對象,可以通過 TaskFactory
和 TaskFactory<TResult>
,其大概含義與 Task
的含義相同。
在創建工廠時,可以傳遞一些常用的配置標識位和 CancellationToken 對象,之後我們可以通過 StartNew()
方法來統一執行一堆任務。
任務調度器
任務調度器一般有兩種,第一種是線程池任務調度器,一般用於服務端程式。還有一種是同步上下文任務調度器,一般用於 GUI 程式。
六、Parallel 的 For、Foreach、Invoke
For 與 Foreach 基本用於操作一個集合,然後迴圈處理其值。而如果在某個方法內部需要執行多個方法,則可以通過 Invoke 來進行執行。使用 Parallel 類可以讓 CPU 最大化地利用起來而不會阻塞主線程。
不過一般不會將所有 For 與 Foreach 都替換為並行化的查詢,這是因為某些迴圈會修改共用數據,這個時候使用 Parallel 的操作則會破壞數據,雖然可以通過增加線程同步鎖來解決,不過這樣會造成單線程訪問,無法享受並行處理的好處。
同時 Parallel 方法本身也會有開銷,當然在大量重覆性任務中這種開銷可以忽略不計,但是如果僅為幾十個短耗時的計算限制任務啟用 Parallel 就會得不償失。
這三種操作都接受一個ParallelOptions
對象用於配置最大並行的工作項數目與調度器。
Parallel 的 For 與 Foreach 的一個重載方法允許傳入 3 個委托,他們分別是:
- 任務局部初始化委托(localInit):該委托是在每次工作項處理之前被調用。
- 任務主體委托(body):具體的工作項處理邏輯,參與工作的各個線程都會調用一次。
- 任務局部終結器委托(localFinally):本委托是在每個工作項處理完成之後都會被調用。
從上述邏輯來看,可以看作局部初始化委托為一個父任務,後面兩個為子級連續任務的構造。
實例:
static void Main(string[] args)
{
var files = new List<string>();
files.AddRange(Directory.GetFiles(@"E:\Program Files","*.*",SearchOption.AllDirectories));
files.AddRange(Directory.GetFiles(@"E:\Program Files (x86)","*.*",SearchOption.AllDirectories));
files.AddRange(Directory.GetFiles(@"E:\Project","*.*",SearchOption.AllDirectories));
files.AddRange(Directory.GetFiles(@"E:\Cache","*.*",SearchOption.AllDirectories));
files.AddRange(Directory.GetFiles(@"E:\Windows Kits","*.*",SearchOption.AllDirectories));
files.AddRange(Directory.GetFiles(@"C:\Program Files\dotnet","*.*",SearchOption.AllDirectories));
Console.WriteLine($"總文件數量:{files.Count}");
long allFileCount = 0;
var watch = new Stopwatch();
watch.Start();
Parallel.ForEach<string, long>(files,
localInit: () =>
{
// 初始化文件大小為 0,
// 這裡的參數類型取決於任務返回的參數
return 0;
},
body: (fileName, parallelStatus, index, fileCount) =>
{
// 計算文件大小並返回
long count = 0;
try
{
var info = new FileInfo(fileName);
count = info.Length;
}
catch (Exception e)
{
}
// 這裡因為該任務會被線程池復用,所以要進行累加
return count + fileCount;
},
localFinally: fileCount => { Interlocked.Add(ref allFileCount, fileCount); }
);
watch.Stop();
Console.WriteLine($"並行效率:{watch.ElapsedMilliseconds} ms");
Console.WriteLine($"文件總大小:{allFileCount / 1024 / 1024 / 1024} Gb");
allFileCount = 0;
watch.Reset();
watch.Start();
foreach (var file in files)
{
long count = 0;
try
{
var info = new FileInfo(file);
count = info.Length;
}
catch (Exception e)
{
}
allFileCount+=count;
}
watch.Stop();
Console.WriteLine($"單線程效率:{watch.ElapsedMilliseconds} ms");
Console.WriteLine($"文件總大小:{allFileCount / 1024 / 1024 / 1024} Gb");
Console.ReadLine();
}
性能提升:
通過 Parallel 的 Foreach 與普通的 foreach 遍歷計算,性能總體提升了約 56%,越耗時的操作提升的效果就越明顯。
在 Body 的主體委托當中,傳入了一個 ParallelLoopState
對象,該對象用於每個線程與其他任務進行交互。主要有兩個方法 Stop()
與 Break()
,前者用於停止迴圈,後者用於跳出迴圈,並且跳出迴圈之後,其 LowestBreakIteration
會返回調用過 Break()
方法的最低項。
並且 Parallel 還會返回一個 ParallelLoopResult
對象,該通過該對象我們可以得知這些迴圈是否正常完成。
七、並行語言集成查詢 PLINQ
LINQ 預設查詢的方式是一個線程順序處理數據集合中的所有項,稱之為順序查詢。而 PLINQ 就是將這些操作分散到各個 CPU 並行執行,通過 AsParallel()
擴展方法可以將 IEnumerable<TSource>
轉換為 ParallelQuery<TSource>
。
而從並行操作切換回順序操作,只需要調用 ParallelEnumable
的 AsSequential()
方法即可。
經過 PLINQ 處理後的數據項其結果是無序的,如果需要有序結果,可以調用 AsOrdered()
方法。但是該方法比較損耗性能,一般不推薦使用,如果需要排序應該直接使用與 LINQ 同名的 PLINQ 擴展方法。
PLINQ 一般會自己分析使用最好的查詢方式進行查詢,有時候使用順序查詢性能更好。
WithCancellation()
:允許取消某個 PLINQ 查詢。WithExecutionMode()
:允許配置 PLINQ 執行模式,是否強制並行查詢。WithMergeOptions()
:允許配置結果的緩衝與合併方式。WithDegreeOfParallelism()
:允許配置查詢的最大並行數。
PS:
不建議在多線程環鏡中使用
Console.Write()
進行輸出,因為 Console 類內部會對線程進行同步,確保只有一個線程可以訪問控制台視窗,這樣會損害性能。
八、定時計算限制操作
通過 CLR 提供的 Timer
定時器,我們可以傳入一個回調方法。這樣的話計時器會可以根據傳入的周期,來定時將我們的回調方法通過線程池線程進行調用。
同時計時器還允許傳入一個 dueTime
參數來指定這個計時器首次調用回調方法時需要等待多久(立即執行可以傳入 0),而 period
可以指定 Timer
調用回調方法的周期。
【原理】
線上程池內部所有的
Timer
對象只使用了一個線程,當某個Timer
到期的時候,這個線程就會被喚醒。該線程通過ThreadPool.QueueUserWorkItem()
方法將一個工作項添加到線程池隊列,這樣你的回調方法就會得到執行。
【註意】
如果回調方法執行的時常超過了你設置的周期時常,這樣會造成多個線程都在執行你的回調。因為
Timer
不知道你的回調執行完成沒有,他只會到期執行你的回調方法。解決措施是構造一個
Timer
的時候,為period
指定一個Timeout.Infinite
常量,這樣計時器只會觸發一次。之後在你的回調方法執行完成之後,在其內部通過Timer.Change()
方法指定一個執行周期,並且設置其 dueTime 為立即執行。這樣做了之後,你的
Timer
就會確保你的回調被執行完成之後再開始下一個周期。這一點可以參考 Abp 實現的
AbpTimer
對象。
九、線程池如何管理線程
CLR 允許開發人員設置線程池最大工作者線程數,但是一般不要輕易設置該值,但你可以通過 ThreadPool.GetMaxThreads()
、ThreadPool.GetMinThreads()
、GetAvailableThreads()
方法來獲取一些相關信息。
通過 ThreadPool.QueueUserWorkItem()
方法和 Timer
類處理的工作項總是存儲到 CLR 線程池的 全局隊列 中。工作者線程採用一個 FIFO 演算法將工作項從 全局隊列 取出,因為所有工作者線程都有可能去這個隊列拿去工作項,這個時候會使用 線程同步鎖 以確保工作項只會被工作者線程處理一次。這可能會造成性能瓶頸,對伸縮性和性能會造成某些限制。
預設的任務調度器中,非工作者線程調度 Task 時都是存放在全局隊列,而工作者線程調度 Task 則是存放在他自己的本地隊列。
工作者線程處理 Task 的步驟:
- 首先從本地隊列採用 LIFO 演算法取得一個 Task 進行處理。
- 如果本地隊列沒有 Task,則從其他的工作者線程本地隊列拿一個 Task 自己來處理。(會使用線程同步鎖)
- 所有本地隊列都為空,則工作者線程會使用 FIFO 演算法去全局隊列拿一個 Task 進行處理。
- 如果全局隊列為空,則線程處於休眠狀態,時間過長則銷毀自身。
PS:
結合上下文,說明工作項首先被添加到了全局隊列,然後由工作者線程取到自己的本地隊列進行處理。
線程池會動態地根據工作項的多少動態地調整工作者線程的數量,一般不需要開發人員進行管控。