1.Task概述:Task是對操作系統線程的抽象,目的是使線程池能高效地管理線程的分配和回收,Task使用的底層線程屬於一種共用資源,任務需要互相協作,並及時歸還線程,以便用相同的共用資源(線程)滿足其他請求。 2.Task.AsyncState:獲取在創建 Task 時提供的狀態對象,如果未提供, ...
1.Task概述:Task是對操作系統線程的抽象,目的是使線程池能高效地管理線程的分配和回收,Task使用的底層線程屬於一種共用資源,任務需要互相協作,並及時歸還線程,以便用相同的共用資源(線程)滿足其他請求。
2.Task.AsyncState:獲取在創建 Task 時提供的狀態對象,如果未提供,則為 null。若狀態對象在task內部改變了,AsyncState的數據也是改變後的狀態對象。可查看ThreadApply.TaskAsyncState()方法的代碼。
3.Task.ContinueWith():它的作用是把任務鏈接起來,在父任務完成後會立即執行後續任務。若在父任務的實例上多次調用ContinueWith()方法,在父任務完成後,所添加的後續任務會並行執行。當然可以根據TaskContinuationOptions枚舉來指定根據父任務的執行情況,來執行後續任務。
4.Task的異常:Task執行期間產生的未處理的異常後會被禁止,直到調用某個任務完成(Task)成員,如Wait()、Result、WaitAll()、WaitAny(),上述的成員都會引發在任務執行期間發生的未處理異常。可查看代碼TreadApply.TaskException()。
5.Task的異常2:要在不使用try/catch塊的情況下處理未處理的異常,另一個辦法是使用ContinueWith()任務,利用ContinueWith()委托的task參數,可以評估父任務的Exception屬性,可查看代碼TreadApply.TaskException2()。
6.Task的取消:可查看代碼ThreadApply.TaskCancellationToken()方法,需要用到CancellationTokenSource類,對CancellationTokenSource.Cancel()方法的調用,會在從CancellationTokenSource.Token複製的所有取消標誌上設置IsCancellationRequested屬性。此中需要註意的2點,
(1)取消標誌:CancellationToken(而不是CancellationTokenSource)會在非同步任務中求值,CancellationToken看起來和CancellationTokenSource差不多。但CancellationToken用於監視和響應一個取消請求,而CancellationTokenSource用於取消任務本身。
(2)複製的:CancellationToken 是一個struct,所以調用CancellationTokenSource.Token會創建標誌的一個副本,這樣一來所有取消標誌的實例都是線程安全的。
7.長時間運行的任務:如果開發人員知道一個Task要長時間運行,會長時間“霸占”一個底層線程資源,開發人員應告訴線程池共用線程不會太快交還。這樣一來,線程池更有可能為任務創建一個專用線程(而不是分配其中的一個共用線程),為此,在調用StartNew()時,要使用TaskCreationOption.LongRunning選項。
8.並行迭代:.net4.0中新增了2個並行迭代,分別為Parallel.For()與Parallel.Foreach()。API會判斷同時執行多少個線程才是效率最高的,可查看ThreadApply.ParallelFor()與ThreadApply.ParallelForeach()方法的代碼。
9.並行異常的處理:在並行處理時,在並行的內容中可能出現多個異常,那麼其異常信息會歸到AggregateException異常類型,它是包含了多個內部異常的一個異常。System.Threading.Task命名空間一致使用System.AggregateException對未處理的異常進行分組,因為對於並行操作,經常都可能產生多個異常。如ThreadApply.ParallelForeach()代碼清單。
10.並行迴圈的取消:Task需要一個顯式調用才能阻塞(它的調用線程,並一直等)到它完成,並行迴圈雖然和任務不同,它以並行方式執行迭代,但它仍會阻塞(它的調用線程,並一直等)到整個Parallel.For()或Parallel.ForEach()迴圈結束。所以為了取消並行迴圈,調用取消請求的那個線程通常不能是正在執行並行迴圈的那個線程。如代碼ThreadApply.CancelParallelForeach()清單。
11.並行迭代的中斷:和標準的for迴圈相似,Parallel的迴圈也支持中斷(ParallelLoopState.Break())以退出迴圈並取消進一步迭代的概念。但是,在並行執行的上下文中,中斷迴圈意味著中斷迭代之後的新迭代不應開始,當前正在執行的迭代還是會繼續運行直至完成的。要想知道執行了一次中斷的最低的迭代,並瞭解中斷是否阻止了一個或多個迭代啟動,可查看並行For()/ForEach()方法返回的ParallelLoopResult對象(其含有IsCompleted、LowestBreakIteration屬性)。
12.並行Linq查詢:並行Linq的功能都在System.Linq.ParallelEnumerable類中,該類的方法形式與System.Linq.Enumerable類的方法形式基本一致,ParallelEnumerable類中有對IEnumerable<T>類型,進行轉化為並行的處理類型的擴展方法,即IEnumerable<T>.AsParallel()。只有在執行此轉化後,才能對集合進行並行處理。
13.並行Linq的異常:與並行For和Foreach一樣,PLinq運算也可能因為完全相同的原因返回多個異常(不同的迭代同時執行),幸好,捕獲異常的機制也是一樣。PLinq異常可以通過AggregateException的InnerException屬性來訪問,因此,將一個Plinq查詢包裝到一個try/catch塊中,並捕捉AggregateException類型的異常,就可以處理每一次迭代中的未處理異常。
14.取消PLinq查詢:和並行迴圈相似,取消的PLinq查詢會引發一個OperationCanceledException,另外,PLinq查詢會阻塞調用線程,直到查詢完成。所以應該把並行Linq包裝在Task中。在使用取消PLinq功能前,需要對並行集合執行WithCancellation()方法,如代碼TreadApply.CancelParallelLinq()清單。
15.非同步類使用說明:在選擇要使用的非同步類時,按照優選順序從高到低依次是Task、ThreadPool、Thread。換而言之,首選TPL,如果不合適,就是使用ThreadPool,如果還是不合適就用Thread。
16.AppDomain的未處理異常:在主應用程式上,如果要登記一個回調方法來接收關於未處理的異常通知,只需要嚮應用程式域的UnhandledException事件註冊即可。可查看ThreadApply.AppDomainException()代碼清單,應用程式域的線程(包括主線程)上發生任何未處理的異常,都會觸發UnhandledException回調,這隻是一個通知機制,而不是實際捕捉和處理異常以使應用程式能繼續運行的機制。事件發生之後,應用程式會退出。
public class ThreadApply { public void TaskCreate() { int times = 1000; Task task = new Task(() => { for (int i = 0; i < times; i++) { Console.Write("-"); } }); task.Start(); for (int i = 0; i < times; i++) { Console.Write("."); } task.Wait(); } public void TaskStaticCreate() { Task<string> task = Task.Factory.StartNew(() => { Thread.Sleep(100); return "主神"; }); foreach (char busySymbol in Utility.BusySymbols()) { if (task.IsCompleted) { Console.Write('\b'); break; } Console.Write(busySymbol); } Console.WriteLine(); Console.WriteLine(task.Result); Console.WriteLine(task.Status); Trace.Assert(task.IsCompleted); } public void TaskAsyncState() { Man man = new Man() { Name = "主神", Age = 1 }; Task task = Task.Factory.StartNew((_man) => { Man tempMan = (Man)_man; Console.WriteLine("執行前{0}", tempMan); tempMan.Name = "諸神"; tempMan.Age += 10; Console.WriteLine("執行後{0}", tempMan); }, man); task.Wait(); Console.WriteLine("原始{0},非同步後{1}", man, task.AsyncState); } public void TaskContinueWith() { Task<string> task = Task<string>.Factory.StartNew(() => { Console.WriteLine("主任務"); Thread.Sleep(200); return "紅星"; }); Task faultedTask = task.ContinueWith((antecedentTask) => { Trace.Assert(antecedentTask.IsFaulted); Console.WriteLine("Task State: Faulted"); }, TaskContinuationOptions.OnlyOnFaulted); Task cancelTask = task.ContinueWith((antecedentTask) => { Trace.Assert(antecedentTask.IsCanceled); Console.WriteLine("Task State: Canceled"); }, TaskContinuationOptions.OnlyOnCanceled); Task completedTask = task.ContinueWith((antecedentTask) => { Trace.Assert(antecedentTask.IsCompleted); Console.WriteLine("Task Result:{0}", antecedentTask.Result); Console.WriteLine("Task State: Completed"); }, TaskContinuationOptions.OnlyOnRanToCompletion); completedTask.Wait();//這句代碼可以不用,僅僅是為了阻止控制台關閉 } public void TaskException() { //此處代碼的演示了未處理的框架,如何將任務的未處理的異常傳回給主線程,註意異常的數據類型是異常集合AggregateException, Task task = Task.Factory.StartNew(() => { throw new ApplicationException(); }); try { task.Wait(); } catch (AggregateException ex) { foreach (Exception item in ex.InnerExceptions) { Console.WriteLine("error : {0}", item.Message); } } } public void TaskException2() { /*Task task = Task.Factory.StartNew(() => { throw new ApplicationException(); }); Task faultedTask = task.ContinueWith(antecedentTask => { Console.WriteLine("父任務執行失敗"); }, TaskContinuationOptions.OnlyOnFaulted); faultedTask.Wait(); if (task.IsFaulted) { Console.WriteLine("Error: {0}", task.Exception.Message);//task的異常類型為AggregateException }*/ Task task = Task.Factory.StartNew(() => { throw new ApplicationException(); }); Task faultedTask = task.ContinueWith(antecedentTask => { foreach (var item in antecedentTask.Exception.InnerExceptions) { Console.WriteLine("Error:{0}", item.Message); } Console.WriteLine("父任務執行失敗"); }, TaskContinuationOptions.OnlyOnFaulted); faultedTask.Wait(); } public void TaskCancellationToken() { string stars = "*".PadRight(50, '*'); Console.WriteLine("Push Enter to exit."); CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); Task task = Task.Factory.StartNew(() => Utility.WriteNaturalNumber(cancellationTokenSource.Token), cancellationTokenSource.Token); Thread.Sleep(2000); Console.WriteLine(); cancellationTokenSource.Cancel(); Console.WriteLine(stars); task.Wait(); Console.WriteLine("Task Status: {0}", task.Status); //在任務中是正常取消的,所以任務的狀態是RanToCompletion。若要狀態為Canceled,可是使用CancellationToken.ThrowIfCancellationRequested()方法,報告異常。 } public void ParallelFor() { Parallel.For(0, 100, (i) => { Console.WriteLine("第{0}次,threadId={1}", i, Thread.CurrentThread.ManagedThreadId); }); } public void ParallelForeach() { IEnumerable<string> files = Directory.GetFiles("D:\\", "*.txt", SearchOption.AllDirectories); try { Parallel.ForEach(files, (fileName) => { //Encrypt(fileName); }); } catch (AggregateException ex) { foreach (var item in ex.InnerExceptions) { Console.WriteLine(item.Message); } } } public void CancelParallelForeach() { IEnumerable<string> files = Directory.GetFiles("D:\\", "*.txt", SearchOption.AllDirectories); CancellationTokenSource cts = new CancellationTokenSource(); ParallelOptions parallelOptions = new ParallelOptions() { CancellationToken = cts.Token }; cts.Token.Register(() => Console.WriteLine("Cancelling...")); Console.WriteLine("Push Enter to exit."); Task task = Task.Factory.StartNew(() => { //註意,在內部,並行迴圈條件通過IsCancellationRequested屬性阻止尚未開始的新迭代開始。 Parallel.ForEach(files, parallelOptions, (fileName, loopState) => { //Encrypt(fileName); }); }); Console.ReadLine(); cts.Cancel(); Console.WriteLine("---------分割線----------"); task.Wait(); } public void ParalleEncrypt(List<string> data) { ParallelQuery<string> result = data.AsParallel().Select(item => Utility.Encrypt(item)); } public void CancelParallelLinq() { IEnumerable<int> data = Enumerable.Range(0, 10000); CancellationTokenSource cts = new CancellationTokenSource(); Console.WriteLine("Push Enter to exit."); Task task = Task.Factory.StartNew(() => { data.AsParallel().WithCancellation(cts.Token).Select(item => { Thread.Sleep(100);//模擬損耗的時間 return item >> 1; }); }, cts.Token); /*CancellationToken除了要傳給WithCancellation(),還要作為StartNew()的第二個參數傳遞,這會造成Task.Wait()引發一個AggregateException, 它的InnerException屬性會被設為一個TaskCanceledAException*/ Console.ReadLine(); cts.Cancel(); Console.WriteLine("------------分割線----------------"); try { task.Wait(); } catch (AggregateException ex) { foreach (Exception item in ex.InnerExceptions) { Console.WriteLine(item.Message); } } } public void AppDomainException() { try { AppDomain.CurrentDomain.UnhandledException += Utility.OnUnHandledException; ThreadPool.QueueUserWorkItem(state => { throw new Exception("不可饒恕"); }); Thread.Sleep(3000); Console.WriteLine("Still Running..."); } finally { Console.WriteLine("Exiting...."); } } public class Man { public string Name { get; set; } public int Age { get; set; } public override string ToString() { return string.Format("Name:{0},Age:{1}", Name, Age); } } } public class Utility { public static IEnumerable<char> BusySymbols() { string busySymbols = @"-\|/-\|/"; int next = 0; while (true) { yield return busySymbols[next]; next = ++next % busySymbols.Length; yield return '\b'; } } /// <summary> /// 輸出自然數 /// </summary> /// <param name="cancellationToken">取消標記</param> public static void WriteNaturalNumber(CancellationToken cancellationToken) { int num = 0; while (!cancellationToken.IsCancellationRequested) { Thread.Sleep(100); Console.Write("{0},", num++); } } public static string Encrypt(string text) { throw new NotImplementedException(); } public static void OnUnHandledException(object sender, UnhandledExceptionEventArgs e) { Exception exception = (Exception)e.ExceptionObject; Console.WriteLine("Error {0}:{1} -->{2}", exception.GetType().Name, exception.Message, exception.InnerException.Message); } }View Code
--------------以上內容根據《C#本質論 第三版》進行整理