BlockingCollection集合是一個擁有阻塞功能的集合,它就是完成了經典生產者消費者的演算法功能。一般情況下,我們可以基於 生產者 - 消費者模式來實現併發。BlockingCollection<T> 類是最好的解決方案 剛結束的物聯網卡項目,我需要調用移動的某個具有批量獲取物聯網卡數據的接 ...
BlockingCollection集合是一個擁有阻塞功能的集合,它就是完成了經典生產者消費者的演算法功能。一般情況下,我們可以基於 生產者 - 消費者模式來實現併發。BlockingCollection<T> 類是最好的解決方案
剛結束的物聯網卡項目,我需要調用移動的某個具有批量獲取物聯網卡數據的介面,其實最主要的數據就是物聯網卡卡號,然後通過這兩個卡號去調用其餘的兩個介面,最後拼接起來,就有了物聯網卡的完整信息。但是問題來了,物聯網卡數量多,而且每次調用介面還需要費上一到兩秒,如果正常的讀取,那不得慢死,所以就用併發來做。我想到的是阻塞隊列+生產者消費者模型,使用的阻塞隊列是.net線程安全集合的BlockingCollection, 具體的可以看《你不能錯過.net 併發解決方案》《深入理解阻塞隊列》《.net framework 4 線程安全概述》。
但是問題來了,MSDN上的例子以及《C# 高級編程第九版》中的管道模型代碼都是基於單個的Task, 在這裡我肯定是用了多個Task去讀取介面,為什麼我要說這點,多線程是不可測得,我如何識別阻塞隊列已滿,如何及時獲取阻塞隊列中的數據,並不重覆的獲取呢?具體的簡單demo,請看《你不能錯過.net 併發解決方案》。我一開始是這麼寫的:
BlockingCollection<string> blockingCollection = new BlockingCollection<string>(); ConcurrentQueue<string> concurrentQueue = new ConcurrentQueue<string>(); var t = new Task[50]; for (int i = 0; i <= 49; i++) { t[i] = Task.Factory.StartNew((obj) => { Thread.Sleep(2500); blockingCollection.Add(obj.ToString()); concurrentQueue.Enqueue(obj.ToString()); Console.WriteLine("Task中的數據: {0}", obj.ToString()); }, i + 1); } Thread.Sleep(5000); while (!blockingCollection.IsCompleted) { foreach (var b in blockingCollection.GetConsumingEnumerable()) { Console.WriteLine("開始輸出阻塞隊列: {0}", b); Console.WriteLine(blockingCollection.IsCompleted); Console.WriteLine("併發隊列的數量: {0}", concurrentQueue.Count); } Console.WriteLine("調用GetConsumingEnumerable方法遍歷完之後阻塞隊列的數量: {0}", blockingCollection.Count); if (concurrentQueue.Count == 50) { blockingCollection.CompleteAdding(); } } Console.WriteLine("********"); Console.WriteLine("是否完成添加: {0}", blockingCollection.IsAddingCompleted); Console.Read();
但是結果:
可以看到,這結果有問題啊,按道理來講foreach遍歷完了就會出來啊,但是這是阻塞隊列,肯定不是這樣的,那麼什麼時候能挑出foreach迴圈?這就和BlockingCollection的設計有關了,我查看了下它的源碼,原諒我沒有看懂,也就不貼了。後來,我改了下代碼,就解決問題了。
BlockingCollection<string> blockingCollection = new BlockingCollection<string>(); ConcurrentQueue<string> concurrentQueue = new ConcurrentQueue<string>(); var t = new Task[50]; for (int i = 0; i <= 49; i++) { t[i] = Task.Factory.StartNew((obj) => { Thread.Sleep(2500); blockingCollection.Add(obj.ToString()); concurrentQueue.Enqueue(obj.ToString()); Console.WriteLine("Task中的數據: {0}", obj.ToString()); }, i+1); } Thread.Sleep(5000); while (!blockingCollection.IsCompleted) { foreach (var b in blockingCollection.GetConsumingEnumerable()) { Console.WriteLine("開始輸出阻塞隊列: {0}", b); Console.WriteLine(blockingCollection.IsCompleted); Console.WriteLine("併發隊列的數量: {0}", concurrentQueue.Count); if (concurrentQueue.Count == 50) { blockingCollection.CompleteAdding(); } } Console.WriteLine("調用GetConsumingEnumerable方法遍歷完之後阻塞隊列的數量: {0}", blockingCollection.Count); } Console.WriteLine("********"); Console.WriteLine("是否完成添加: {0}", blockingCollection.IsAddingCompleted);
結果:
我沒有寫的很詳細,因為,只是做個筆記,平時學習的時候沒有註意到這些問題,沒有遇到特定情況下的問題,項目開發中遇到了,就記錄下。