在多線程環境下,使用BlockingCollection以及ConcurrentQueue來消費生產者生產的資源,這是我自己寫的多生產者多消費者的作法,其實也是基於單個task下的阻塞隊列的IsComplete來識別的。 使用阻塞隊列更簡單但是內部的消費者線程比較適合使用單獨的線程不適合使用線程池, ...
在多線程環境下,使用BlockingCollection以及ConcurrentQueue來消費生產者生產的資源,這是我自己寫的多生產者多消費者的作法,其實也是基於單個task下的阻塞隊列的IsComplete來識別的。
使用阻塞隊列更簡單但是內部的消費者線程比較適合使用單獨的線程不適合使用線程池,而且阻塞隊列為空時會阻塞消費者線程,當然阻塞線程池內的線程也沒什麼影響只是不推薦這麼做,而且阻塞的隊列的性能也沒有ConcurrentQueue的性能高。
我在項目中遇到多生產者多消費者問題,多生產者沒有問題,但是如何在多線程下消費生產者的資源,這就是比較麻煩了,不能僅僅通過判斷數量來做,網上也找了一些資源,但是也都是給了個demo,還不全,自己想了個方法,暫時解決了,回頭在研究下別人封裝的基於Thread的作法。其實是在<<.NET 中的阻塞隊列BlockingCollection的正確打開方式>>基礎上做的,也沒有什麼,但是這是個好思路。後續嘗試自己封裝線程標誌來做,不依靠FCL的阻塞隊列。code如下:
ConcurrentDictionary<string, string> dic1 = new ConcurrentDictionary<string, string>(); ConcurrentDictionary<string, string> dic2 = new ConcurrentDictionary<string, string>(); ConcurrentQueue<string> queue = new ConcurrentQueue<string>(); BlockingCollection<string> blockingCollection = new BlockingCollection<string>(); var t = new Task[50]; Console.WriteLine("生產者開始寫入數據.............\r\n"); for(int i=0; i<=49; i++) { t[i] = Task.Factory.StartNew((param) => { Console.WriteLine("生產者中 *** 阻塞隊列輸入: {0}", param.ToString()); blockingCollection.Add(param.ToString()); Console.WriteLine("生產者中 *** 阻塞隊列的數量是: {0}", blockingCollection.Count); Console.WriteLine("生產者中 *** 字典dic1輸入: {0}", param.ToString()); dic1.TryAdd(param.ToString(), param.ToString()); Console.WriteLine("生產者中 *** 字典dic1的數量是: {0}", dic1.Count); Console.WriteLine("生產者中 *** 字典dic2輸入: {0}", param.ToString()); dic2.TryAdd(param.ToString(), param.ToString()); Console.WriteLine("生產者中 *** 字典dic2的數量是: {0}", dic2.Count); Console.WriteLine("生產者中 *** 隊列輸入: {0}", param.ToString()); queue.Enqueue(param.ToString()); Console.WriteLine("生產者中 *** 隊列的數量: {0}", queue.Count); }, i); } //Thread.Sleep(500); Console.WriteLine("\r\n消費者開始讀入數據.............\r\n"); while (!blockingCollection.IsCompleted) { Task tt = Task.Factory.StartNew(() => { foreach (var b in blockingCollection.GetConsumingEnumerable()) { Console.WriteLine("消費者中 *** 字典dic1的數量是: {0}", dic1.Count); Console.WriteLine("消費者中 *** 字典dic2的數量是: {0}", dic2.Count); Console.WriteLine("消費者中 *** 阻塞隊列的數量是: {0}", blockingCollection.Count); string value1 = ""; string value2 = ""; dic1.TryGetValue(b, out value1); dic2.TryGetValue(b, out value2); Console.WriteLine("消費者中 *** 字典dic1的鍵值{0}的value值是: {1}", b, value1); Console.WriteLine("消費者中 *** 字典dic1的鍵值{0}的value值是: {1}", b, value2); Console.WriteLine("消費者中 *** 隊列的數量是: {0}", queue.Count); Console.WriteLine("消費者中 *** 字典的數量是: {0}", dic1.Count); if (queue.Count == 50) { blockingCollection.CompleteAdding(); } } }); } Console.WriteLine("是否完成添加: {0}", blockingCollection.IsCompleted);
參考:
.Net中的並行編程-7.基於BlockingCollection實現高性能非同步隊列