線程安全集合

来源:https://www.cnblogs.com/BigBrotherStone/archive/2020/02/01/12247969.html
-Advertisement-
Play Games

" 返回《C 併發編程》" "1. 簡介" "2. 不可變棧和隊列" "3. 不可變列表" "4. 不可變Set集合" "5. 不可變字典" "6. 線程安全字典" "7. 阻塞隊列" "8. 阻塞棧和包" "9. 非同步隊列" "10. 非同步棧和包" "11. 阻塞/非同步隊列" 1. 簡介 + 不可 ...


>>返回《C# 併發編程》

1. 簡介

  • 不可變集合
    • 不可變集合之間通常共用了大部分存儲空間,因此其實浪費並不大
    • 因為是無法修改的,所以是線程安全
  • 線程安全集合
    • 可同時被多個線程修改的可變集合
      • 線程安全集合混合使用了細粒度鎖定無鎖技術,以確保線程被阻塞的時間最短
        • 通常情況下是根本不阻塞
    • 對很多線程安全集合進行枚舉操作時,內部創建了該集合的一個快照(snapshot),並對這個快照進行枚舉操作。
    • 線程安全集合的主要優點是多個線程可以安全地對其進行訪問,而代碼只會被阻塞很短的時間,或根本阻塞。

下麵對常用的屬於不可變集合線程安全集合類型的,特定數據結構的集合進行說明。

2. 不可變棧和隊列

不可變集合採用的模式是返回一個修改過的集合,原始的集合引用是不變化的。

  • 這意味著,如果引用了特定的不可變集合的實例,它是不會變化的。
var stack = ImmutableStack<int>.Empty;
stack = stack.Push(13);
var biggerStack = stack.Push(7);
// 先顯示“7”,接著顯示“13”。 
foreach (var item in biggerStack)
    Console.WriteLine($"biggerStack {item}");

// 只顯示“13”。
foreach (var item in stack)
    Console.WriteLine($"stack {item}");

輸出:

biggerStack 7
biggerStack 13
stack 13

兩個棧實際上在內部共用了存儲項目 13 的記憶體。

  • 這種實現方式的效率很高,並且可以很方便地創建當前狀態的快照
  • 每個不可變集合的實例都是絕對線程安全

ImmutableQueue 使用方法類似。

  • 不可變集合的一個實例是永遠不改變的。
  • 因為不會改變,所以是絕對線程安全的。
  • 對不可變集合使用修改方法時,返回修改後的集合。
  • 不可變集合非常適用於共用狀態,但不適合用來做交換數據的通道。

3. 不可變列表

不可變列表的內部是用二叉樹組織數據的。這麼做是為了讓不可變列表的實例之間共用的記憶體最大化。

  • 這導致 ImmutableList<T>List<T> 在常用操作上有性能上的差別(參見下表)。
操 作 List<T> ImmutableList<T>
Add 平攤 O(1) O(log N)
Insert O(N) O(log N)
RemoveAt O(N) O(log N)
Item[index] O(1) O(log N)

不可變列表確實可以使用index獲取數據項,但需要註意性能問題。不能簡單地用它來替代 List<T>

  • 這意味著應該儘量使用 foreach 而不是用 for

4. 不可變Set集合

  • ImmutableHashSet<T>
    • 是一個不含重覆元素的集合
  • ImmutableSortedSet<T>
    • 是一個已排序的不含重覆元素的集合
  • 都有相似的介面
//ImmutableHashSet
var hashSet = ImmutableHashSet<int>.Empty;
hashSet = hashSet.Add(13);
hashSet = hashSet.Add(7);
// 顯示“7”和“13”,次序不確定。 
foreach (var item in hashSet)
    Console.Write(item + " ");

System.Console.WriteLine();
hashSet = hashSet.Remove(7);

//ImmutableSortedSet
var sortedSet = ImmutableSortedSet<int>.Empty;
sortedSet = sortedSet.Add(13);
sortedSet = sortedSet.Add(7);
// 先顯示“7”,接著顯示“13”。 
foreach (var item in sortedSet)
    Console.Write(item + " ");

var smallestItem = sortedSet[0];
// smallestItem == 7
sortedSet = sortedSet.Remove(7);

輸出:

7 13 
7 13 
操 作 ImmutableHashSet<T> ImmutableSortedSet<T>
Add O(log N) O(log N)
Remove O(log N) O(log N)
Item[index] 不可用 O(log N)

ImmutableSortedSet 索引操作的時間複雜度是 O(log N),而不是 O(1),這跟 上節中 ImmutableList<T> 的情況類似。

  • 這意味著它們適用同樣的警告:使用 ImmutableSortedSet<T>時,應該儘量用 foreach 而不是用 for 。

可以先快速地以可變方式構建,然後轉換成不可變集合

5. 不可變字典

  • ImmutableDictionary<TKey,TValue>
  • ImmutableSortedDictionar y<TKey,TValue>
//ImmutableDictionary
var dictionary = ImmutableDictionary<int, string>.Empty;
dictionary = dictionary.Add(10, "Ten");
dictionary = dictionary.Add(21, "Twenty-One");
dictionary = dictionary.SetItem(10, "Diez");
// 顯示“10Diez”和“21Twenty-One”,次序不確定。 
foreach (var item in dictionary)
    Console.WriteLine(item.Key + ":" + item.Value);

var ten = dictionary[10]; // ten == "Diez"
dictionary = dictionary.Remove(21);

//ImmutableSortedDictionary
var sortedDictionary = ImmutableSortedDictionary<int, string>.Empty; sortedDictionary = sortedDictionary.Add(10, "Ten");
sortedDictionary = sortedDictionary.Add(21, "Twenty-One");
sortedDictionary = sortedDictionary.SetItem(10, "Diez");
// 先顯示“10Diez”,接著顯示“21Twenty-One”。 
foreach (var item in sortedDictionary)
    Console.WriteLine(item.Key + ":" + item.Value);

ten = sortedDictionary[10];
// ten == "Diez"
sortedDictionary = sortedDictionary.Remove(21);

輸出:

10:Diez
21:Twenty-One
10:Diez
21:Twenty-One
操 作 I
操 作 ImmutableDictionary<TK,TV> ImmutableSortedDictionary<TK,TV>
Add O(log N) O(log N)
SetItem O(log N) O(log N)
Item[key] O(log N) O(log N)
Remove O(log N) O(log N)

6. 線程安全字典

var dictionary = new ConcurrentDictionary<int, string>(); 
var newValue = dictionary.AddOrUpdate(0,
key => "Zero",
(key, oldValue) => "Zero");

AddOrUpdate 方法有些複雜,這是因為這個方法必須執行多個步驟,具體步驟取決於併發字典的當前內容。

  • 方法的第一個參數是
  • 第二個參數是一個委托,它把鍵(本例中為 0)轉換成添加到字典的值(本例中為“Zero”)
    • 只有當字典中沒有這個鍵時,這個委托才會運行。
  • 第三個參數也是一個委托,它把(0)和原來的值轉換成字典中修改後的值
    (“Zero”)。
    • 只有當字典中已經存在這個鍵時,這個委托才會運行。
  • AddOrUpdate return 這個鍵對應的新值(與其中一個委托返回的值相同)。

AddOrUpdate 可能要多次調用其中一個(或兩個)委托。這種情況很少,但確實會發生。

  • 因此這些委托必須簡單、快速,並且不能有副作用
  • 這些委托只能創建新的值,不能修改程式中其他變數
  • 這個原則適用於所有 ConcurrentDictionary<TKey,TValue> 的方法所使用的委托
// 使用與前面一樣的“字典”。
string currentValue;
bool keyExists = dictionary.TryGetValue(0, out currentValue);

// 使用與前面一樣的“字典”。
string removedValue;
bool keyExisted = dictionary.TryRemove(0, out removedValue);
  • 如果多個線程讀寫一個共用集合, 使用 ConcurrentDictrionary<TKey,TValue> 是最合適的
  • 如果不會頻繁修改(很少修改), 那更適合使用 ImmutableDictionary<TKey, TValue>

  • 如果一些線程只添加元素,另一些線程只移除元素,那最好使用生產者/消費者集合

7. 阻塞隊列

  • GetConsumingEnumerable 會阻塞線程
  • CommpleteAdding 方法執行後所有被 GetConsumingEnumerable 阻塞的線程開始執行
  • 每個元素只會被消費一次
private static readonly BlockingCollection<int> _blockingQueue = new BlockingCollection<int>();
public static async Task BlockingCollectionSP()
{
    Action consumerAction = () =>
     {
        Console.WriteLine($"started print({Thread.CurrentThread.ManagedThreadId}).");
        // 先顯示“7”,後顯示“13”。
        foreach (var item in _blockingQueue.GetConsumingEnumerable())
        {
             Console.WriteLine($"print({Thread.CurrentThread.ManagedThreadId}) {item}");
        }
        Console.WriteLine($"ended print({Thread.CurrentThread.ManagedThreadId}).");
     };
    Task task1 = Task.Run(consumerAction);
    Task task2 = Task.Run(consumerAction);
    Task task3 = Task.Run(consumerAction);

    _blockingQueue.Add(7);
    System.Console.WriteLine($"added 7.");
    _blockingQueue.Add(13);
    System.Console.WriteLine($"added 13.");
    _blockingQueue.CompleteAdding();
    System.Console.WriteLine("CompleteAdding.");

    try
    {
        _blockingQueue.Add(15);
    }
    catch (Exception ex)
    {
        System.Console.WriteLine($"{ex.GetType().Name}:{ex.Message}");
    }

    await Task.WhenAll(task1, task2, task3);
}

輸出:

started print(4).
started print(3).
started print(6).
added 7.
added 13.
CompleteAdding.
ended print(6).
InvalidOperationException:The collection has been marked as complete with regards to additions.
print(4) 7
ended print(4).
print(3) 13
ended print(3).

8. 阻塞棧和包

  • 在預設情況下,.NET 中的 BlockingCollection<T> 用作阻塞隊列,但它也可以作為任何類型的生產者/消費者集合
  • BlockingCollection<T> 實際上是對線程安全集合進行了封裝, 實現了 IProducerConsumerCollection<T> 介面。
    • 因此可以在創建 BlockingCollection<T> 實例時指明規則
BlockingCollection<int> _blockingStack = new BlockingCollection<int>( new ConcurrentStack<int>());
BlockingCollection<int> _blockingBag = new BlockingCollection<int>( new ConcurrentBag<int>());

替換到阻塞隊列示例代碼中試試。

9. 非同步隊列

public static async Task BufferBlockPS()
{
    BufferBlock<int> _asyncQueue = new BufferBlock<int>();
    Func<Task> concurrentConsumerAction = async () =>
     {
         while (true)
         {
             int item;
             try
             {
                 item = await _asyncQueue.ReceiveAsync();
             }
             catch (InvalidOperationException)
             {
                 System.Console.WriteLine($"exit({Thread.CurrentThread.ManagedThreadId}).");
                 break;
             }
             Console.WriteLine($"print({Thread.CurrentThread.ManagedThreadId}) {item}");
         }
     };
    Func<Task> consumerAction = async () =>
    {
        try
        {
            // 先顯示“7”,後顯示“13”。 單線程可用
            while (await _asyncQueue.OutputAvailableAsync())
            {
                Console.WriteLine($"print({Thread.CurrentThread.ManagedThreadId}) {await _asyncQueue.ReceiveAsync()}");
            }
        }
        catch (Exception ex)
        {
            System.Console.WriteLine($"{ex.GetType().Name}({Thread.CurrentThread.ManagedThreadId}):{ex.Message}");
        }

    };

    Task t1 = consumerAction();
    Task t2 = consumerAction();

    // Task t1 = concurrentConsumerAction();
    // Task t2 = concurrentConsumerAction();

    // 生產者代碼
    await _asyncQueue.SendAsync(7);
    await _asyncQueue.SendAsync(13);
    await _asyncQueue.SendAsync(15);
    System.Console.WriteLine("Added 7 13 15.");
    _asyncQueue.Complete();

    await Task.WhenAll(t1, t2);
}

輸出:

Added 7 13 15.
print(4) 7
print(6) 13
print(4) 15
InvalidOperationException(3):The source completed without providing data to receive.

10. 非同步棧和包

Nito.AsyncEx 庫

AsyncCollection<int> _asyncStack = new AsyncCollection<int>( new ConcurrentStack<int>());
AsyncCollection<int> _asyncBag = new AsyncCollection<int>( new ConcurrentBag<int>());

11. 阻塞/非同步隊列

在阻塞隊列中已經介紹了BufferBlock<T>

這裡介紹 ActionBlock<int>

public static async Task ActionBlockPS()
{
    ActionBlock<int> queue = new ActionBlock<int>(u => Console.WriteLine($"print({Thread.CurrentThread.ManagedThreadId}) {u}"));

    // 非同步的生產者代碼
    await queue.SendAsync(7);
    await queue.SendAsync(13);
    System.Console.WriteLine("Added async.");
    // 同步的生產者代碼 
    queue.Post(15);
    queue.Post(17);
    System.Console.WriteLine("Added sync.");
    queue.Complete();
    System.Console.WriteLine($"Completed({Thread.CurrentThread.ManagedThreadId}).");
}

輸出:

Added async.
Added sync.
Completed(1).
print(3) 7
print(3) 13
print(3) 15
print(3) 17

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 這次我們可以看看併發中鎖的原理,大概會說到AQS,ReentrantLock,ReentrantReadWriteLock以及JDK8中新增的StampedLock,這些都是在java併發中很重要的東西,慢慢看吧! 一.LockSupport工具類 LockSupport工具類是jdk中rt.jar ...
  • 一、現象描述 筆者在用visual studio2010進行控制台程式進行程式編譯時候,經常會遇到代碼代碼沒有語法錯誤,但是編譯不通過的現象。系統報錯為 這個錯誤總是出現,特別是在每次新裝系統後,很是煩人。 二、出現原因 通過查閱相關資料可知: 出現該問題的原因通常是由於高版本與低版本之間的鏈接文件 ...
  • Redis詳解(一)——RDB 前言 由於 Redis 是一個記憶體資料庫,所謂記憶體資料庫,就是將資料庫中的內容保存在記憶體中,這與傳統的MySQL,Oracle等關係型資料庫直接將內容保存到硬碟中相比,記憶體資料庫的讀寫效率比傳統資料庫要快的多(記憶體的讀寫效率遠遠大於硬碟的讀寫效率)。但是保存在記憶體中也 ...
  • 項目簡介 項目來源於: "https://gitee.com/suimz_admin/BookShop" 一個基於JSP+Servlet+Jdbc的書店系統。涉及技術少,易於理解,適合JavaWeb初學者學習使用。 本人親測可正常啟動。 技術棧 前端技術 基礎:html+css+JavaScript ...
  • 轉發、重定向到其它業務方法 @org.springframework.stereotype.Controller @RequestMapping("/userController") public class UserController{ @RequestMapping("/handler1") ...
  • " 返回《C 併發編程》" "1. 調度到線程池" "2. 任務調度器" "2.1. Default 調度器" "2.2. 捕獲當前同步上下文 調度器" "2.3. ConcurrentExclusiveSchedulerPair 調度器" "3. 調度並行代碼" "4. 用調度器實現數據流的同步" ...
  • 場景 一個對象A,希望它的某些狀態在發生改變時通知到B(或C、D),常見的做法是在A中定義一個事件(或直接用委托),當狀態改變時A去觸發這個事件。而B直接訂閱這個事件 這種設計有點問題B由於要訂閱A的事件,所以B得完全引用A,其實有時候沒必要,因為我只關心A的狀態變化而已狀態變更通知這種場景很多,有 ...
  • " 返回《C 併發編程》" "1. 取消請求" "2. 超時後取消" "3. 取消並行" "4. 取消響應式代碼" "5. 與其他取消體系的互操作" 是一個等同於 預設 的特殊值,表示這個方法是永遠不會被取消的。 實例代碼 輸出: 1. 取消請求 2. 超時後取消 輸出: 只要執行代碼時用到了超時, ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...