並行演算法有可能非常複雜,並且或多或少涵蓋了這些並行集合。線程安全並不是沒有代價的。比起System.Collections和System.Collections.Generic命名空間中的經典列表 、集合和數組來說,併發集合會有更大的開銷,因此,應該只在需要從多個任務中併發訪問集合的時候才使用併發集... ...
編程需要對基本的數據結構和演算法有所瞭解。程式員為併發情況 選擇最合適 的數據結構,那就需要知道演算法運行時間、空間複雜度等。
對於並行計算,我們需要使用適當的數據結構,這些數據結構需要具備可伸縮性,儘可能地避免死鎖,同時還能提供線程安全的訪問。.Net Framework 4.0引入了System.Collections.Concurrent命名空間,其中包含了一些適合併發計算的數據結構。
- ConcurrentQueue,這個集合使用了原子的比較和交換(CAS)操作,使用SpinWait來保證線程安全。它實現了一個先進先出(FIFO)的集合,就是說元素出隊列的順序與加入隊列的順序是一樣的。可以調用enqueue文獻向隊列中加入元素,使用TryDequeue方法試圖取出隊列中的第一個元素,使用TryPeek方法則試圖得到第一個元素但並不從隊列中刪除此元素。
- ConCurrentStack的實現也沒有使用任何鎖,只採用了CAS操作。它是一個後進先出(LIFO)的集合,這意味著最近添加的元素會先返回。可以使用Push和PushRange方法添加元素,使用TryPop和TryPopRange方法獲取元素,以及使用TryPeek方法檢查元素。
- ConcurrentBag是一個支持重覆元素的無序集合。它針對這樣以下情況 進行了優化,即多個線程以這樣的方式工作:每個線程產生和消費自己的任務,極少與其他線程的任務交互(如果要交互則使用鎖)。添加元素使用add方法,檢查元素使用TryPeek方法,獲取元素使用TryTake方法。
- ConcurrentDictionary是一個線程安全的字典集合的實現。對於讀操作無需使用鎖。但是對於寫操作則需要鎖。這個併發字典使用多個鎖,在字典桶之上實現了一個細粒度的鎖模型。使用參數 concurrencyLevel可以在構造函數 中定義鎖的數量,這意味著預估的線程數量將併發地更新這個字典。
註:由於併發字典使用鎖,所以沒有必要請避免使用以下操作:Count、IsEmpty、Keys、Values、CopyTo及ToArray。
5. BlockingCollection是對IProducerConsumerCollection泛型介面的實現 的一個高級封裝。它有很多先進的功能來實現管道場景,即當你有一些步驟需要使用之前步驟運行的結果時。BlockingCollectione類支持如下功能:分場 、調整內部集合容量、取消集合操作、從多個塊集合中獲取元素。
並行演算法有可能非常複雜,並且或多或少涵蓋了這些並行集合。線程安全並不是沒有代價的。比起System.Collections和System.Collections.Generic命名空間中的經典列表 、集合和數組來說,併發集合會有更大的開銷,因此,應該只在需要從多個任務中併發訪問集合的時候才使用併發集合。在中等代碼中使用併發集合沒有意義,因為它們會增加無謂的開銷。下麵我們使用最簡單的例子來學習這些並行集合。
一、 使用ConcurrentDictionary
本示例展示了一個非常簡單的場景,比較在單線程環境中使用通常的字典集合與使用併發字典的性能。
1.程式示例代碼,如下。
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Collections.Concurrent; using System.Diagnostics; namespace ThreadCollectionDemo { class Program { const string item = "Dict Name"; public static string CurrentItem; static double time1; static void Main(string[] args) { Console.WriteLine(string.Format("----- ConcurrentDictionary 操作----")); var concurrentDict = new ConcurrentDictionary<int, string>(); var dict = new Dictionary<int, string>(); var sw = new Stopwatch(); sw.Start(); //普通字典,100萬次迴圈 for (int i = 0; i < 1000000; i++) { lock (dict) { dict[i] = string.Format("{0} {1}", item, i); } } sw.Stop(); Console.WriteLine(string.Format("對普通字典集合(Dictionary) 進行100萬次寫操作共用時間:{0}----",sw.Elapsed)); time1 = sw.Elapsed.TotalMilliseconds; sw.Restart(); for (int i = 0; i < 1000000; i++) { concurrentDict[i] = string.Format("{0} {1}", item, i); } sw.Stop(); Console.WriteLine(string.Format("對並行字典集合(ConcurrentDictionary) 進行100萬次寫操作共用時間:{0}", sw.Elapsed)); Console.WriteLine(string.Format("寫操作 普通字典/並行字典 = {0}", time1/1.0/sw.Elapsed.TotalMilliseconds)); Console.WriteLine(); sw.Restart(); for (int i = 0; i < 1000000; i++) { lock (dict) { CurrentItem = dict[i]; } } sw.Stop(); Console.WriteLine(string.Format("對普通字典集合(Dictionary) 進行100萬次讀操作共用時間:{0}----", sw.Elapsed)); time1 = sw.Elapsed.TotalMilliseconds; sw.Restart(); for (int i = 0; i < 1000000; i++) { CurrentItem= concurrentDict[i]; } sw.Stop(); Console.WriteLine(string.Format("對並行字典集合(ConcurrentDictionary) 進行100萬次讀操作共用時間:{0}----", sw.Elapsed)); Console.WriteLine(string.Format("讀操作 普通字典/並行字典 = {0}", time1 / 1.0 / sw.Elapsed.TotalMilliseconds)); //多線程並行讀取數據 sw.Restart(); Task[] process = new Task[4]; for (int i = 0; i < 4; i++) { process[i ] = Task.Run(() => Get(concurrentDict, i)); } Task.WhenAll(process); sw.Stop(); Console.WriteLine(string.Format("多線程對並行字典集合(ConcurrentDictionary) 進行100萬次讀操作共用時間:{0}", sw.Elapsed)); Console.WriteLine(string.Format("讀操作 普通字典/多線程讀並行字典 = {0}", time1 / 1.0 / sw.Elapsed.TotalMilliseconds)); Console.Read(); } private static void Get(ConcurrentDictionary<int,string> dict,int index) { for (int i = 0; i < 1000000; i += 4) { if (i%4==index) { string s = dict[i]; } } } } }
2.單線程情況下,程式運行結果,如下圖。
當程式啟動時我們創建了兩個集合,其中一個是標準的字典集合,另一個是新的併發字典集合。然後採用鎖的機制向標準的字典中添加元素,並測量完成100萬次的時間。同時採用同樣的場景來測量ConcurrentDictionary的性能 ,最後 比較從兩個集合中獲取值 的性能。
通過上面的比較,我們發現ConcurrentDictionary寫操作比使用鎖的通常字典要慢的多,而讀操作則要快些。因此對字典需要大量的線程安全的讀操作,ConcurrentDictionary是最好的選擇。
最後,我們使用4個線程同時讀ConcurrentDictionary,則會發現這個字典的性能更好。
在main方法中添加一個四線程的讀字典的代碼。如下。
//多線程並行讀取數據 sw.Restart(); Task[] process = new Task[4]; for (int i = 0; i < 4; i++) { process[i ] = Task.Run(() => Get(concurrentDict, i)); } Task.WhenAll(process); sw.Stop(); Console.WriteLine(string.Format("多線程對並行字典集合(ConcurrentDictionary) 進行100萬次讀操作共用時間:{0}", sw.Elapsed)); Console.WriteLine(string.Format("讀操作 普通字典/多線程讀並行字典 = {0}", time1 / 1.0 / sw.Elapsed.TotalMilliseconds)); private static void Get(ConcurrentDictionary<int,string> dict,int index) { for (int i = 0; i < 1000000; i += 4) { if (i%4==index) { string s = dict[i]; } } }
3.多線程情況下,程式運行結果,如下圖。從圖中可以看出,普通字典的操作是多線程讀並行字典的30多倍。這是我機器上的情況。各人的機器配置不一,請自行測試。