本示例將學習如何創建一個能被多個線程非同步處理的一組任務的例子。 ...
二、 使用ConcurrentQueue來實現非同步處理
本示例將學習如何創建一個能被多個線程非同步處理的一組任務的例子。
一、程式示例代碼如下:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Collections.Concurrent; using System.Diagnostics; using System.Threading; namespace ThreadCollectionDemo { class Program { const string item = "Dict Name"; public static string CurrentItem; static double time1; static void Main(string[] args) { Console.WriteLine(string.Format("----- ConcurrentQueue 操作----")); Task task = TaskRun1(); task.Wait(); Console.Read(); } private static async Task TaskRun1() { var queue = new ConcurrentQueue<CustomTask>(); var cts = new CancellationTokenSource(); var taskSrc = Task.Run(() => TaskProduct(queue)); Task[] process = new Task[4]; for (int i = 1; i <= 4; i++) { string processId = i.ToString(); process[i - 1] = Task.Run(() => TaskProcess(queue, "Processer " + processId, cts.Token)); } await taskSrc; cts.CancelAfter(TimeSpan.FromSeconds(2)); await Task.WhenAll(process); } static async Task TaskProduct(ConcurrentQueue<CustomTask> queue) { for (int i = 0; i < 20; i++) { await Task.Delay(50); var workitem = new CustomTask { Id = i }; queue.Enqueue(workitem); Console.WriteLine(string.Format("把{0} 元素添加到ConcurrentQueue",workitem.Id)); } } static async Task TaskProcess(ConcurrentQueue<CustomTask> queue,string name,CancellationToken token) { CustomTask workitem; bool dequeueSuccesfl = false; await GetRandomDely(); do { dequeueSuccesfl = queue.TryDequeue(out workitem); if (dequeueSuccesfl) { Console.WriteLine(string.Format("元素 {0} 從ConcurrentQueue中取出 ,名稱:{1} ", workitem.Id, name)); } await GetRandomDely(); } while (!token.IsCancellationRequested); } static Task GetRandomDely() { int dely = new Random(DateTime.Now.Millisecond).Next(1, 1000); return Task.Delay(dely); } } public class CustomTask { public int Id { get; set; } } }
2.程式運行結果如下圖。
當程式運行時,我們使用ConcurrentQueue集合實現創建了一個任務隊列。然後創建了一個取消標誌,它是用來在我們將任務放入隊列後停止工作 的。接下來啟動了一個單獨的工作線程來將任務放入任務隊列中。這部分分為非同步處理產生了工作 量。
現在定義這個程式中消費任務的部分。我們創建了四個工作 線程,它們會隨機等待一段時間,然後從任務隊列中獲取一個任務,處理這個任務,一直重覆整個過程直到我們發出取消標誌信號。最後,我們啟動產生任務的線程,等待這個線程完成。然後使用取消標誌給消費發信號 我們完成了工作。最後一步將等待所有的消費完成。
我們看到隊列中的任務按從前到後的順序被 處理,但一個後面的任務是有可能會比前面的任務先處理的,因為我們有四個工作 線程獨立地運行,而且任務處理時間並不是恆定的。我們看到 訪問這個隊列是線程安全的,沒有一個元素會被提取兩次。
二、 改變ConcurrentStack非同步處理順序
本示例是對上一面的示例的修改版。我們又一次創建了被多個工作線程非同步處理的一組任務,但是這次使用ConcurrentStack來實現,我們來看看這兩個示例會有什麼不同。
1. 程式的代碼如下圖。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Collections.Concurrent; using System.Diagnostics; using System.Threading; namespace ThreadCollectionDemo { class Program { static void Main(string[] args) { Console.WriteLine(string.Format("----- ConcurrentStack 操作----")); Task task = TaskStack(); task.Wait(); Console.Read(); } private static async Task TaskStack() { var stack = new ConcurrentStack<CustomTask>(); var cts = new CancellationTokenSource(); var taskSrc = Task.Run(() => TaskProduct(stack)); Task[] process = new Task[4]; for (int i = 1; i <= 4; i++) { string processId = i.ToString(); process[i - 1] = Task.Run(() => TaskProcess(stack, "Processer " + processId, cts.Token)); } await taskSrc; cts.CancelAfter(TimeSpan.FromSeconds(2)); await Task.WhenAll(process); } static async Task TaskProduct(ConcurrentStack<CustomTask> stack) { for (int i = 0; i < 20; i++) { await Task.Delay(50); var workitem = new CustomTask { Id = i }; stack.Push(workitem); Console.WriteLine(string.Format("把{0} 元素添加到ConcurrentStack",workitem.Id)); } } static async Task TaskProcess(ConcurrentStack<CustomTask> stack,string name,CancellationToken token) { CustomTask workitem; bool popSuccesful = false; await GetRandomDely(); do { popSuccesful = stack.TryPop(out workitem); if (popSuccesful) { Console.WriteLine(string.Format("元素 {0} ConcurrentStack 取出 ,名稱:{1} ", workitem.Id, name)); } await GetRandomDely(); } while (!token.IsCancellationRequested); } static Task GetRandomDely() { int dely = new Random(DateTime.Now.Millisecond).Next(1, 1000); return Task.Delay(dely); } } }
2.程式的運行結果如下圖。
當程式運行時,我們創建了一個ConcurrentStack集合的實例。其餘的代碼與前一示例幾乎一樣,唯一不同之年是我們對併發堆棧使用了Push和TryPop方法,而對併發隊列使用Enqueue和TryDequeue方法。
從上圖結果中可以扯到任務處理的順序被改變了。堆棧是一個LIFO集合,工作線程先處理最近的任務。在併發隊列中,任務被處理的順序與被添加的順序幾乎一樣。這說明根據工作線程的數量,我們將在一定時間內處理先被創建的任務。而在堆棧中,早先創建的任務具有較低的優先順序,而且直到生產者停止向堆棧中放入更多任務後,這個任務才有可能被處理。這行為是確定 的,最好在這種場景下使用隊列。