任務概述 線程(Thread)是創建併發的底層工具,因此有一定的局限性(不易得到返回值(必須通過創建共用域);異常的捕獲和處理也麻煩;同時線程執行完畢後無法再次開啟該線程),這些局限性會降低性能同時影響併發性的實現(不容易組合較小的併發操作實現較大的併發操作,會增加手工同步處理(加鎖,發送信號)的依 ...
任務概述
線程(Thread)是創建併發的底層工具,因此有一定的局限性(不易得到返回值(必須通過創建共用域);異常的捕獲和處理也麻煩;同時線程執行完畢後無法再次開啟該線程),這些局限性會降低性能同時影響併發性的實現(不容易組合較小的併發操作實現較大的併發操作,會增加手工同步處理(加鎖,發送信號)的依賴,容易出現問題)。
線程池的(ThreadPool)的QueueUserWorkItem方法很容發起一次非同步的計算限制操作。但這個技術同樣有著許多限制,最大的問題是沒有內建的機制讓你知道操作在什麼時候完成,也沒有機制在操作完成時獲得返回值。
而Task類可以解決上述所有的問題。
任務(Task)表示一個通過或不通過線程實現的併發操作,任務是可組合的,使用延續(continuation)可將它們串聯在一起,它們可以使用線程池減少啟動延遲,可使用回調方法避免多個線程同時等待I/O密集操作。
基礎任務(Task)
微軟在.NET 4.0 引入任務(Task)的概念。通過System.Threading.Tasks命名空間使用任務。它是在ThreadPool的基礎上進行封裝的。Task預設都是使用池化線程,它們都是後臺線程,這意味著主線程結束時其它任務也會隨之停止。
啟動一個任務有多種方式,如以下示例:
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 Console.WriteLine("主線程Id:{0}", Thread.CurrentThread.ManagedThreadId); 6 int workerThreadsCount, completionPortThreadsCount; 7 ThreadPool.GetAvailableThreads(out workerThreadsCount, out completionPortThreadsCount); 8 Console.WriteLine("剩餘工作線程數:{0},剩餘IO線程數{1}", workerThreadsCount, completionPortThreadsCount); 9 //第一種:實例化方式Start啟動 10 { 11 Task task = new Task(() => 12 { 13 Test("one-ok"); 14 }); 15 task.Start(); 16 } 17 //第二種:通過Task類靜態方法Run方式進行啟動 18 { 19 Task.Run(() => 20 { 21 Test("two-ok"); 22 }); 23 } 24 //第三種:通過TaskFactory的StartNew方法啟動 25 { 26 TaskFactory taskFactory = new TaskFactory(); 27 taskFactory.StartNew(() => 28 { 29 Test("three-ok"); 30 }); 31 } 32 //第四種:.通過Task.Factory進行啟動 33 { 34 Task taskStarNew = Task.Factory.StartNew(() => 35 { 36 Test("four-ok"); 37 }); 38 } 39 //第五種:通過Task對象的RunSynchronously方法啟動(同步,由主線程執行,會卡主線程) 40 { 41 Task taskRunSync = new Task(() => 42 { 43 Console.WriteLine("線程Id:{0},執行方法:five-ok", Thread.CurrentThread.ManagedThreadId); 44 }); 45 taskRunSync.RunSynchronously(); 46 } 47 Thread.Sleep(1000); 48 ThreadPool.GetAvailableThreads(out workerThreadsCount, out completionPortThreadsCount); 49 Console.WriteLine("剩餘工作線程數:{0},剩餘IO線程數{1}", workerThreadsCount, completionPortThreadsCount); 50 Console.ReadKey(); 51 } 52 static void Test(string o) 53 { 54 Thread.Sleep(2000); 55 Console.WriteLine("線程Id:{0},執行方法:{1}", Thread.CurrentThread.ManagedThreadId, o); 56 } 57 /* 58 * 作者:Jonins 59 * 出處:http://www.cnblogs.com/jonins/ 60 */ 61 }
執行結果:
上面示例中除去使用RunSynchronously方法啟動的是同步任務(由啟用的線程執行任務)外,其它幾種方式內部都由線程池內的工作者線程處理。
說明:
1.事實上Task.Factory類型本身就是TaskFactory(任務工廠),而Task.Run(在.NET4.5引入,4.0版本調用的是後者)是Task.Factory.StartNew的簡寫法,是後者的重載版本,更靈活簡單些。
2.調用靜態Run方法會自動創建Task對象並立即調用Start
3.如Task.Run等方式啟動任務並沒有調用Start,因為它創建的是“熱”任務,相反“冷”任務的創建是通過Task構造函數。
返回值(Task<TResult>)&狀態(Status)
Task有一個泛型子類Task<TResult>,它允許任務返回一個值。調用Task.Run,傳入一個Func<Tresult>代理或相容的Lambda表達式,然後查詢Result屬性獲得結果。如果任務沒有完成,那麼訪問Result屬性會阻塞當前線程,直至任務完成。
1 public static Task<TResult> Run<TResult>(Func<TResult> function);
而任務的Status屬性可用於跟蹤任務的執行狀態,如下所示:
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 Task<int> task = Task.Run(() => 6 { 7 int total = 0; 8 for (int i = 0; i <= 100; i++) 9 { 10 total += i; 11 } 12 Thread.Sleep(2000); 13 return total; 14 }); 15 Console.WriteLine("任務狀態:{0}",task.Status); 16 Thread.Sleep(1000); 17 Console.WriteLine("任務狀態:{0}", task.Status); 18 int totalCount = task.Result;//如果任務沒有完成,則阻塞 19 Console.WriteLine("任務狀態:{0}", task.Status); 20 Console.WriteLine("總數為:{0}",totalCount); 21 Console.ReadKey(); 22 } 23 }
執行如下:
Reulst屬性內部會調用Wait(等待);
任務的Status屬性是一個TaskStatus枚舉類型:
1 public TaskStatus Status { get; }
說明如下:
枚舉值 | 說明 |
Canceled |
任務已通過對其自身的 CancellationToken 引發 OperationCanceledException 對取消進行了確認,此時該標記處於已發送信號狀態; 或者在該任務開始執行之前,已向該任務的 CancellationToken 發出了信號。 |
Created | 該任務已初始化,但尚未被計劃。 |
Faulted | 由於未處理異常的原因而完成的任務。 |
RanToCompletion | 已完成執行的任務。 |
Running | 任務正在運行,尚未完成。 |
WaitingForActivation | 該任務正在等待 .NET Framework 基礎結構在內部將其激活併進行計劃。 |
WaitingForChildrenToComplete | 該任務已完成執行,正在隱式等待附加的子任務完成。 |
WaitingToRun | 該任務已被計劃執行,但尚未開始執行。 |
任務集合返回值(WhenAll&WhenAny)
Task中有非常方便的對並行運行的任務集合獲取返回值的方式,比如WhenAll和WhenAny。
1.WhenAll
WhenAll:等待提供的所有 Task 對象完成執行過程(所有任務全部完成)。
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 List<Task<int>> taskList = new List<Task<int>>();//聲明一個任務集合 6 TaskFactory taskFactory = new TaskFactory(); 7 for (int i = 0; i < 5; i++) 8 { 9 int total = i; 10 Task<int> task = taskFactory.StartNew(() => Test(total)); 11 taskList.Add(task);//將任務放進集合中 12 } 13 Console.WriteLine("主線程Id:{0},繼續執行A.....", Thread.CurrentThread.ManagedThreadId); 14 Task<int[]> taskReulstList = Task.WhenAll(taskList);//創建一個任務,該任務將集合中的所有 Task 對象都完成時完成 15 for (int i = 0; i < taskReulstList.Result.Length; i++)//這裡調用了Result,所以會阻塞線程,等待集合內所有任務全部完成 16 { 17 Console.WriteLine("返回值:{0}", taskReulstList.Result[i]);//遍歷任務集合內Task返回的值 18 } 19 Console.WriteLine("主線程Id:{0},繼續執行B.....", Thread.CurrentThread.ManagedThreadId); 20 Console.ReadKey(); 21 } 22 private static int Test(int o) 23 { 24 Console.WriteLine("線程Id:{0},Task執行成功,參數為:{1}", Thread.CurrentThread.ManagedThreadId, o); 25 Thread.Sleep(500 * o); 26 return o; 27 } 28 }
執行結果:
2.WhenAny
WhenAny:等待提供的任一 Task 對象完成執行過程(只要有一個任務完成)。
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 List<Task<int>> taskList = new List<Task<int>>();//聲明一個任務集合 6 TaskFactory taskFactory = new TaskFactory(); 7 for (int i = 0; i < 5; i++) 8 { 9 int total = i; 10 Task<int> task = taskFactory.StartNew(() => Test(total)); 11 taskList.Add(task);//將任務放進集合中 12 } 13 Console.WriteLine("主線程Id:{0},繼續執行A.....", Thread.CurrentThread.ManagedThreadId); 14 Task<Task<int>> taskReulstList = Task.WhenAny(taskList);//創建一個任務,該任務將在集合中的任意 Task 對象完成時完成 15 Console.WriteLine("返回值:{0}", taskReulstList.Result.Result);//得到任務集合內最先完成的任務的返回值 16 Console.WriteLine("主線程Id:{0},繼續執行B.....", Thread.CurrentThread.ManagedThreadId); 17 Console.ReadKey(); 18 } 19 private static int Test(int o) 20 { 21 Console.WriteLine("線程Id:{0},Task執行成功,參數為:{1}", Thread.CurrentThread.ManagedThreadId, o); 22 Thread.Sleep(500 * o); 23 return o; 24 } 25 }
執行結果(這裡返回值肯定會是0,因為休眠最短):
等待(Wait)&執行方式(TaskCreationOptions)
1.任務等待(Wait)
調用任務的Wait方法可以阻塞任務直至任務完成,類似於線程的join。
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 Task task = Task.Run(() => 6 { 7 Console.WriteLine("線程執行Begin"); 8 Thread.Sleep(2000); 9 Console.WriteLine("線程執行End"); 10 }); 11 Console.WriteLine("任務是否完成:{0}", task.IsCompleted); 12 task.Wait();//阻塞,直至任務完成 13 Console.WriteLine("任務是否完成:{0}", task.IsCompleted); 14 Console.ReadKey(); 15 } 16 }
執行如下:
註意:
線程調用Wait方法時,系統檢測線程要等待的Task是否已經開始執行。如果是線程則會阻塞直到Task運行結束為止。但如果Task還沒有開始執行任務,系統可能(取決於TaskScheduler)使用調用Wait的線程來執行Task,這種情況下調用Wait的線程不會阻塞,它會執行Task並立即返回。好處在於沒有線程會被阻塞,所以減少了資源占用。不好的地方在於加入線程在調用Wait前已經獲得了一個線程同步鎖,而Task試圖獲取同一個鎖,就會造成死鎖的線程。
2.任務執行方式(TaskCreationOptions)
我們知道為了創建一個Task,需要調用構造函數並傳遞一個Action或Action<object>委托,如果傳遞的是期待一個Object的方法,還必須向Task的構造函數穿都要傳給操作的實參。還可以選擇向構造器傳遞一些TaskCreationOptions標記來控制Task的執行方式。
TaskCreationOptions為枚舉類型
枚舉值 | 說明 |
None | 預設。 |
PreferFairness | 儘可能公平的方式安排任務,即先進先執行。 |
LongRunning | 指定任務將是長時間運行的,會新建線程執行,不會使用池化線程。 |
AttachedToParent | 指定將任務附加到任務層次結構中的某個父級 |
DenyChildAttach | 任務試圖和這個父任務連接將拋出一個InvalidOperationException |
HideScheduler | 強迫子任務使用預設調度而非父級任務調度 |
在預設情況下,Task內部是運行在池化線程上,這種線程會非常適合執行短計算密集作業。如果要執行長阻塞操作,則要避免使用池化線程。
在池化線程上運行一個長任務問題不大,但是如果要同時運行多個長任務(特別是會阻塞的任務),則會對性能產生影響。最好使用:TaskCreationOptions.LongRunning。
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 int workerThreadsCount, completionPortThreadsCount; 6 ThreadPool.GetAvailableThreads(out workerThreadsCount, out completionPortThreadsCount); 7 Console.WriteLine("剩餘工作線程數:{0},剩餘IO線程數{1},主線程Id:{2}", workerThreadsCount, completionPortThreadsCount, Thread.CurrentThread.ManagedThreadId); 8 Task task = Task.Factory.StartNew(() => 9 { 10 Console.WriteLine("長任務執行,線程Id:{0}", Thread.CurrentThread.ManagedThreadId); 11 Thread.Sleep(2000); 12 }, TaskCreationOptions.LongRunning); 13 Thread.Sleep(1000); 14 ThreadPool.GetAvailableThreads(out workerThreadsCount, out completionPortThreadsCount); 15 Console.WriteLine("剩餘工作線程數:{0},剩餘IO線程數{1},主線程Id:{2}", workerThreadsCount, completionPortThreadsCount, Thread.CurrentThread.ManagedThreadId); 16 Console.ReadKey(); 17 } 18 }
執行結果如下:
註意:
如果使運行I/O密集任務,則可以使用TaskCompletionSource和非同步函數(asynchronous functions),通過回調(延續)實現併發性,而是不通過線程實現。
如果使運行計算密集性任務,則可以使用一個生產者/消費者隊列,控制這些任務的併發數量,避免出現線程和進程阻塞的問題。
延續(continuation)&延續選項(TaskContinuationOptions)
延續(continuation)會告訴任務在完成後繼續執行下麵的操作。延續通常由一個回調方法實現,它會在操作完成之後執行一次。給一個任務附加延續的方法有兩種
1.GetAwaiter
任務的方法GetAwaiter是Framework 4.5新增加的,而C# 5.0的非同步功能使用了這種方法,因此它非常重要。給一個任務附加延續如下:
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 Task<int> task = Task.Run(() => 6 { 7 int total = 0; 8 for (int i = 0; i <= 100; i++) 9 { 10 total += i; 11 } 12 Thread.Sleep(2000); 13 return total; 14 }); 15 var awaiter = task.GetAwaiter(); 16 awaiter.OnCompleted(() => 17 { 18 int result = awaiter.GetResult();//在延續中獲取Task的執行結果 19 Console.WriteLine(result); 20 }); 21 Console.ReadKey(); 22 } 23 }
執行結果控制台會列印:5050。
調用GetAwaiter會返回一個等待者(awaiter)對象,它會讓先導(antecedent)任務在任務完成(或出錯)之後執行一個代理。已經完成的任務也可以附加一個延續,這事延續會馬上執行。
註意:
1.等待者(awaiter)可以是任意對象,但必須包含特定的兩個方法和一個Boolean類型屬性。
1 public struct TaskAwaiter<TResult> : ICriticalNotifyCompletion, INotifyCompletion 2 { 3 public bool IsCompleted { get; } 4 public TResult GetResult(); 5 public void OnCompleted(Action continuation); 6 }
2.先導任務出現錯誤,那麼當延續代碼調用awaiter.GetResult()時就會重新拋出異常。我們可以需要調用GetResult,而是直接訪問先導任務的Result屬性(task.Result)。
GetResult的好處是,當先導任務出現錯誤時,異常可以直接拋出而不封裝在AggregateException中。
3.如果出現同步上下文,那麼會自動捕捉它,然後延續提交到這個上下文中。在無需同步上下文的情況下通常不採用這種方法,使用ConfigureAwait代替它。它通常會使延續運行在先導任務所在的線程上,從而避免不必要的過載。
1 var awaiter = task.ConfigureAwait(false).GetAwaiter();
2.ContinueWith
另一種附加延續的方法是調用任務的ContinueWith方法:
1 static void Main(string[] args) 2 { 3 Task<int> task = Task.Run(() => 4 { 5 int total = 0; 6 for (int i = 0; i <= 100; i++) 7 { 8 total += i; 9 } 10 Thread.Sleep(2000); 11 return total; 12 }); 13 task.ContinueWith(continuationAction => 14 { 15 int result = continuationAction.Result; 16 Console.WriteLine(result); 17 }); 18 Console.ReadKey(); 19 }
ContinueWith本身會返回一個Task,它非常適用於添加更多的延續。然後如果任務出現錯誤,我們必須直接處理AggregateException。
如果想讓延續運行在統一個線程上,必須指定 TaskContinuationOptions.ExecuteSynchronously;否則它會彈回線程池。ContinueWith特別適用於並行編程場景。
3.延續選項(TaskContinuationOptions)
在使用ContinueWith時可以指定任務的延續選項即TaskContinuationOptions,它的前六個枚舉類型與之前說的TaskCreationOptions枚舉提供的標誌完全一樣,補充後續幾個枚舉值:
枚舉值 | 說明 |
LazyCancellation | 除非先導任務完成,否則禁止延續任務完成(取消)。 |
NotOnRanToCompletion | 指定不應在延續任務前面的任務已完成運行的情況下安排延續任務。 |
NotOnFaulted | 指定不應在延續任務前面的任務引發了未處理異常的情況下安排延續任務。 |
NotOnCanceled | 指定不應在延續任務前面的任務已取消的情況下安排延續任務。 |
OnlyOnCanceled | 指定只應在延續前面的任務已取消的情況下安排延續任務。 |
OnlyOnFaulted | 指定只有在延續任務前面的任務引發了未處理異常的情況下才應安排延續任務。 |
OnlyOnRanToCompletion | 指定只有在延續任務前面的任務引發了未處理異常的情況下才應安排延續任務。 |
ExecuteSynchronously | 指定希望由先導任務的線程執行,先導任務完成後線程繼續執行延續任務。 |
ExecuteSynchronously是指同步執行,兩個任務都在同一個=線程一前一後的執行。
ContinueWith結合TaskContinuationOptions使用的示例:
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 Task<int> task = Task.Run(() => 6 { 7 int total = 0; 8 for (int i = 0; i <= 100; i++) 9 { 10 total += i; 11 } 12 if (total == 5050) 13 { 14 throw new Exception("錯誤");//這段代碼可以註釋或開啟,用於測試 15 } 16 return total; 17 }); 18 //指定先導任務無報錯的延續任務 19 task.ContinueWith(continuationAction => 20 { 21 int result = continuationAction.Result; 22 Console.WriteLine(result); 23 }, TaskContinuationOptions.NotOnFaulted); 24 //指定先導任務報錯時的延續任務 25 task.ContinueWith(continuationAction => 26 { 27 foreach (Exception ex in continuationAction.Exception.InnerExceptions)//有關AggregateException異常處理後續討論 28 { 29 Console.WriteLine(ex.Message); 30 } 31 }, TaskContinuationOptions.OnlyOnFaulted); 32 Console.ReadKey(); 33 } 34 }
執行結果會列印:報錯,如果註釋掉拋出異常的代碼則會列印5050。
TaskCompletionSource
另一種創建任務的方法是使用TaskCompletionSource。它允許創建一個任務,並可以任務分發給使用者,並且這些使用者可以使用該任務的任何成員。它的實現原理是通過一個可以手動操作的“附屬”任務,用於指示操作完成或出錯的時間。
TaskCompletionSource的真正作用是創建一個不綁定線程的任務(手動控制任務工作流,可以使你把創建任務和完成任務分開)。
這種方法非常適合I/O密集作業:可以利用所有任務的優點(它們能夠生成返回值、異常和延續),但不會在操作執行期間阻塞線程。
例如,假設一個任務需要等待2秒,然後返回10,我們的方法會返回在一個2秒後完成的任務,通過給任務附加一個延續就可以在不阻塞任何線程的前提下列印這個結果,如下:
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 var awaiter = Demo(2000).GetAwaiter();//得到任務通過延續輸出返回值 6 awaiter.OnCompleted(() => 7 { 8 Console.WriteLine(awaiter.GetResult()); 9 }); 10 Console.WriteLine("主線程繼續執行...."); 11 Console.ReadKey(); 12 } 13 static Task<int> Demo(int millis) 14 { 15 //創建一個任務完成源 16 TaskCompletionSource<int> taskCompletionSource = new TaskCompletionSource<int>(); 17 var timer = new System.Timers.Timer(millis) { AutoReset = false }; 18 timer.Elapsed += delegate 19 { 20 timer.Dispose(); taskCompletionSource.SetResult(10);//寫入返回值 21 }; 22 timer.Start(); 23 return taskCompletionSource.Task;//返回任務 24 } 25 }
執行結果:
註意:如果多次調用SetResult、SetException或SetCanceled,它們會拋出異常,而TryXXX會返回false。
任務取消(CancellationTokenSource)
一些情況下,後臺任務可能運行很長時間,取消任務就非常有用了。.NET提供了一種標準的任務取消機制可用於基於任務的非同步模式。
取消基於CancellationTokenSource類,該類可用於發送取消請求。請求發送給引用CancellationToken類的任務,其中CancellationToken類與CancellationTokenSource類相關聯。
使用示例如下:
1 class Program 2 { 3 static void Main(string[] args) 4 { 5 //構造函數 指定延遲2秒後自動取消任務 6 CancellationTokenSource source = new CancellationTokenSource(2000); 7 //註冊一個任務取消後執行的委托 8 source.Token.Register(() => 9 { 10 Console.WriteLine("線程Id:{0} 任務被取消後的業務邏輯正在運行", Thread.CurrentThread.ManagedThreadId); 11 }); 12 //啟動任務,將取消標記源帶入參數 13 Task.Run(() => 14 { 15 while (!source.IsCancellationRequested)//IsCancellationRequested為True時取消任務 16 { 17 Thread.Sleep(100); 18 Console.WriteLine("