本隨筆續接:.NET 同步與非同步 之 警惕閉包(十) 無論之前說的鎖、原子操作 還是 警惕閉包,都是為安全保駕護航,本篇隨筆繼續安全方面的主題:線程安全的集合。 先看一下命名空間:System.Collections.Concurrent,常用的類型有(均為泛型):BlockingCollectio ...
本隨筆續接:.NET 同步與非同步 之 警惕閉包(十)
無論之前說的鎖、原子操作 還是 警惕閉包,都是為安全保駕護航,本篇隨筆繼續安全方面的主題:線程安全的集合。
先看一下命名空間:System.Collections.Concurrent,常用的類型有(均為泛型):BlockingCollection<T>、ConcurrentBag<T>、ConcurrentDictionary<TKey, TValue>、ConcurrentQueue<T>、ConcurrentStack<T> 。
其中:ConcurrentBag<T> 為無序的集合、ConcurrentDictionary<TKey, TValue> 為詞典類型。
ConcurrentQueue<T>、ConcurrentStack<T> 分別為隊列 和 棧,而 BlockingCollection<T> 可以看做是 隊列 和 棧的進一步封裝調用,並提供了阻塞(超時)功能。
本隨筆著重說兩個類型:BlockingCollection<T> 和 ConcurrentDictionary<TKey, TValue>。
一、BlockingCollection<T>
1、先看一下 MSDN 上的Demo
/// <summary> /// MSDN Demo /// BlockingCollection<T>.Add() /// BlockingCollection<T>.CompleteAdding() /// BlockingCollection<T>.TryTake() /// BlockingCollection<T>.IsCompleted /// </summary> public void Demo1() { // Construct and fill our BlockingCollection using (BlockingCollection<int> bc = new BlockingCollection<int>()) { int NUMITEMS = 10000; for (int i = 0; i < NUMITEMS; i++) { bc.Add(i); } bc.CompleteAdding(); int outerSum = 0; // Delegate for consuming the BlockingCollection and adding up all items Action action = () => { int localItem; int localSum = 0; while (bc.TryTake(out localItem)) { localSum += localItem; } Interlocked.Add(ref outerSum, localSum); }; // Launch three parallel actions to consume the BlockingCollection Parallel.Invoke(action, action, action); base.PrintInfo(string.Format("Sum[0..{0}) = {1}, should be {2}", NUMITEMS, outerSum, ((NUMITEMS * (NUMITEMS - 1)) / 2))); base.PrintInfo(string.Format("bc.IsCompleted = {0} (should be true)", bc.IsCompleted)); } }MSDN Demo
從demo中看一下 BlockingCollection<T> 的用法
1)Add 方法, 將項添加到集合中。
2)CompleteAdding 方法,標記當前實例不可以再添加任何項。
3)TryTake 方法,如果可以從當前集合移除一個項,則返回true,否則返回False. 如果該集合為空,則此方法立即返回 false。
刪除了某個項的順序取決於用於創建集合的類型 BlockingCollection<T> 實例。 當您創建 BlockingCollection<T> 對象,您可以指定要使用的集合類型(通過構造函數指定)。 例如,可以指定 ConcurrentQueue<T> 先進先出 (FIFO) 行為的對象或 ConcurrentStack<T> 後進先出 (LIFO) 行為的對象。 可以使用任何集合類來實現 IProducerConsumerCollection<T> 介面。 預設集合類型 BlockingCollection<T> 是 ConcurrentQueue<T>。
4)IsCompleted 屬性,獲取此 BlockingCollection<T> 是否已標記為完成添加(即 調用了 CompleteAdding 方法)並且為空。
2、限制容量
/// <summary> /// 限制容量 /// </summary> public void Demo2() { BlockingCollection<int> blocking = new BlockingCollection<int>(5); Task.Run(() => { for (int i = 0; i < 20; i++) { blocking.Add(i); PrintInfo($"add:({i})"); } blocking.CompleteAdding(); PrintInfo("CompleteAdding"); }); // 等待先生產數據 Task.Delay(500).ContinueWith((t) => { while (!blocking.IsCompleted) { var n = 0; if (blocking.TryTake(out n)) { PrintInfo($"TryTake:({n})"); } } PrintInfo("IsCompleted = true"); }); }限制容量
調研Add方法的時候,如果集合中的項的數量已經達到上限,則Add方法將會被阻塞。
3、在 BlockingCollection 中使用Stack
/// <summary> /// 在 BlockingCollection 中使用Stack /// </summary> public void Demo3() { BlockingCollection<int> blocking = new BlockingCollection<int>(new ConcurrentStack<int>(), 5); Task.Run(() => { for (int i = 0; i < 20; i++) { blocking.Add(i); PrintInfo($"add:({i})"); } blocking.CompleteAdding(); PrintInfo("CompleteAdding"); }); // 等待先生產數據 Task.Delay(500).ContinueWith((t) => { while (!blocking.IsCompleted) { var n = 0; if (blocking.TryTake(out n)) { PrintInfo($"TryTake:({n})"); } } PrintInfo("IsCompleted = true"); }); }在 BlockingCollection 中使用Stack
該Demo和之前的Demo的唯一區別就是:構造函數、指定了 ConcurrentStack<T> 類型為存儲容器。
除此之外、BlockingCollection<T> 還是提供了對超時的控制,例如:TryAdd(T, TimeSpan) 、 TryTake(T, TimeSpan) 等數個重載版本。
二、ConcurrentDictionary<TKey, TValue>。
ConcurrentDictionary<TKey, TValue> 類, 已經實現 IDictionary<TKey, TValue> 介面。也就是說 它也實現了Dictionary 類型的基礎功能。
此外, ConcurrentDictionary<TKey, TValue> 提供了幾種方法中添加或更新鍵/值對在字典中下, 如表中所述。
任務 |
使用此方法 |
用法說明 |
---|---|---|
如果它尚不在字典中存在向字典中,添加新的密鑰 |
如果當前不在字典中存在該鍵,此方法將添加指定的鍵/值對。 該方法返回 true 或 false具體取決於是否已添加新對。 |
|
如果該註冊表項具有特定值,更新為現有鍵在字典中值 |
此方法檢查是否密鑰具有指定的值,如果它存在,則用新值更新該鍵。 它相當於CompareExchange 方法,但它的用於字典的元素。 |
|
無條件地將鍵/值對存儲在字典中,覆蓋已存在的鍵的值 |
索引器的資源庫︰dictionary[key] = newValue |
|
將鍵/值對添加到字典中,或如果鍵已存在,更新基於鍵的現有值的鍵的值 |
AddOrUpdate(TKey, Func<TKey, TValue>, Func<TKey, TValue, TValue>) - 或 - |
AddOrUpdate(TKey, Func<TKey, TValue>, Func<TKey, TValue, TValue>) 接受的鍵和兩個委托。 如果鍵在字典; 中不存在,則使用第一個委托它接受鍵並返回應添加的鍵的值。 如果該鍵不存在; 它使用第二個委托它接受的鍵和其當前值,並返回應為項設置的新值。 AddOrUpdate(TKey, TValue, Func<TKey, TValue, TValue>) 接受鍵、 值要添加,以及更新委托。 這是與以前的重載中,相同之處在於它不使用委托來添加的鍵。 |
獲取此鍵在字典中,向字典中添加值並將其返回如果該鍵不存在的值 |
- 或 - |
這些重載提供延遲初始化為鍵/值對在字典中,添加的值,僅當不存在。 GetOrAdd(TKey, TValue) 採用鍵不存在要添加的值。 GetOrAdd(TKey, Func<TKey, TValue>) 將一個委托,可將生成的值,如果鍵不存在。 |
這些操作是原子性操作,而且都是線程安全的。在 ConcurrentDictionary<TKey, TValue> 類中 唯一的例外是 AddOrUpdate 和 GetOrAdd 方法,它們是使用細粒度鎖定,以確保線程安全。
ConcurrentDictionary<TKey, TValue> 類 在上述的MSDN文檔中 已經介紹的差不多了,不再舉例。 當然還要提一句, 該類型不支持阻塞操作。
三、線程安全警告
在命名空間 System.Collections.Concurrent 中的類型,都遵循如下的線程安全規則:線程安全集合本身提供的方法 是線程安全的,但是通過其類型實現的介面的方法 和 擴展方法 卻不是線程安全的。
隨筆暫告一段落、下一篇隨筆: 因地制宜——CPU密集型操作和IO密集型操作
附,Demo : http://files.cnblogs.com/files/08shiyan/ParallelDemo.zip
參見更多:隨筆導讀:同步與非同步