數據流塊基礎

来源:https://www.cnblogs.com/BigBrotherStone/archive/2020/01/31/12246085.html
-Advertisement-
Play Games

" 返回《C 併發編程》" "1. 簡介" "2. 鏈接數據流塊" "3. 傳遞出錯信息" "4. 斷開鏈接" "5. 限制流量" "6. 數據流塊的並行處理" "7. 創建自定義數據流塊" 1. 簡介 TPL 數據流(dataflow)庫的功能很強大,可用來創建 網格 (mesh)和 管道 (pi ...


>>返回《C# 併發編程》

1. 簡介

TPL 數據流(dataflow)庫的功能很強大,可用來創建網格(mesh)和管道(pipleline), 並通過它們以非同步方式發送數據。

主要命名空間: System.Threading.Tasks.Dataflow

2. 鏈接數據流塊

創建網格時,需要把數據流塊互相連接起來。

public static void LinkBlockRun()
{
    System.Console.WriteLine("Building Block link.");
    TransformBlock<int, int> multiplyBlock = new TransformBlock<int, int>(item =>
    {
        System.Console.WriteLine("first block.");
        Thread.Sleep(500);
        return item * 2;
    });
    var subtractBlock = new TransformBlock<int, int>(item =>
    {
        System.Console.WriteLine("last block.");
        Thread.Sleep(500);
        return item - 2;
    });
    var options = new DataflowLinkOptions
    {
        PropagateCompletion = true
    };
    multiplyBlock.LinkTo(subtractBlock, options);

    System.Console.WriteLine("Builded Block link.");

    var task = Task.Run(async () =>
    {
        System.Console.WriteLine("Posting");

        for (int i = 0; i < 3; i++)
        {
            multiplyBlock.Post(i);
        }

        System.Console.WriteLine("Posted");

        // 第一個塊的完成情況自動傳遞給第二個塊。 
        // Complete 後,再進行 Post 是無效的
        multiplyBlock.Complete();

        await multiplyBlock.Completion;
        // 鏈接使用完了
        System.Console.WriteLine("Block link Ended.");
    });

    task.Wait();
}

輸出為:

Building Block link.
Builded Block link.
Posting
Posted
first block.
first block.
last block.
first block.
last block.
last block.
Block link Ended.

3. 傳遞出錯信息

public static void BlockErrorRun()
{
    Task.Run(async () =>
    {
        try
        {
            //單個塊異常類型
            var block = new TransformBlock<int, int>(item =>
              {
                  if (item == 1)
                      throw new InvalidOperationException("Blech.");
                  return item * 2;
              });
            block.Post(1);
            await block.Completion;

        }
        catch (InvalidOperationException ex)
        {
            System.Console.WriteLine(ex.GetType().Name);
        }

        try
        {
            //被連接的塊異常類型
            var multiplyBlock = new TransformBlock<int, int>(item =>
             {
                 if (item == 1)
                     throw new InvalidOperationException("Blech.");
                 return item * 2;
             });
            var subtractBlock = new TransformBlock<int, int>(item => item - 2);
            multiplyBlock.LinkTo(subtractBlock, new DataflowLinkOptions { PropagateCompletion = true });
            multiplyBlock.Post(1);
            await subtractBlock.Completion;
        }
        catch (AggregateException ex)
        {
            System.Console.WriteLine(ex.GetType().Name);
        }

    }).Wait();
}

輸出為:

InvalidOperationException
AggregateException
  • 對於最簡單的情況,最好是把錯誤傳遞下去,等到最後再作一次性處理。
  • 對於更複雜的網格,在數據流完成後需要檢查每一個數據流塊。

4. 斷開鏈接

public static void BlockDisposeRun()
{
    var multiplyBlock = new TransformBlock<int, int>(item =>
    {
        System.Console.WriteLine("first block.");
        Thread.Sleep(500);
        return item * 2;
    });
    var subtractBlock = new TransformBlock<int, int>(item =>
    {
        System.Console.WriteLine("last block.");
        Thread.Sleep(500);
        return item - 2;
    });

    IDisposable link = multiplyBlock.LinkTo(subtractBlock);
    multiplyBlock.Post(1);
    multiplyBlock.Post(2);
    // 斷開數據流塊的鏈接。
    // 前面的代碼中,數據可能已經通過鏈接傳遞過去,也可能還沒有。 
    // 在實際應用中,考慮使用代碼塊,而不是調用 Dispose。 
    link.Dispose();
    Thread.Sleep(1200);
}

輸出為:

first block.
first block.

5. 限制流量

用數據流塊 的 BoundedCapacity 屬性,來限制目標塊的流量(throttling)。 BoundedCapacity 的預設設置是 DataflowBlockOptions.Unbounded

解決的問題:

  • 防止數據的數據太多太快,導致第一個目標塊在還來不及處理數據時就得對所有數據進行了緩衝
public static void BlockBoundedCapacityRun()
{
    var sourceBlock = new BufferBlock<int>();
    var options = new DataflowBlockOptions
    {
        BoundedCapacity = 10
        //BoundedCapacity = DataflowBlockOptions.Unbounded
    };
    var targetBlockA = new BufferBlock<int>(options);
    var targetBlockB = new BufferBlock<int>(options);
    sourceBlock.LinkTo(targetBlockA);
    sourceBlock.LinkTo(targetBlockB);

    for (int i = 0; i < 31; i++)
    {
        System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} Post:{i % 10}");
        sourceBlock.Post(i % 10);
    }
    //向水管中註入31個水滴
    //由於分支的限流, targetBlockA 和 targetBlockB 各得到了10各水滴
    var task = Task.Run(() =>
    {
        int i = 0;

        System.Console.WriteLine("先處理 targetBlockA 的水滴,此處迴圈接收會將水滴接乾,但是接不到存在 targetBlockB 中的水滴");
        do
        {
            IList<int> res;
            if (targetBlockA.TryReceiveAll(out res))
            {
                i += res.Count;
                System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} RevcA:{string.Join(",", res)} {i}");
            }
            else
            {
                break;
            }
            Thread.Sleep(100);
        } while (true);

        i = 0;

        System.Console.WriteLine("處理 targetBlockB 的水滴,只剩下緩衝的水滴");
        do
        {
            IList<int> res;
            if (targetBlockB.TryReceiveAll(out res))
            {
                i += res.Count;
                System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} RevcB:{string.Join(",", res)} {i}");
            }
            else
            {
                break;
            }
            Thread.Sleep(100);
        } while (true);
    });

    task.Wait();
}

輸出為:

40:28.026 Post:0
40:28.038 Post:1
40:28.038 Post:2
40:28.038 Post:3
40:28.038 Post:4
40:28.038 Post:5
40:28.038 Post:6
40:28.038 Post:7
40:28.038 Post:8
40:28.038 Post:9
40:28.038 Post:0
40:28.038 Post:1
40:28.038 Post:2
40:28.038 Post:3
40:28.038 Post:4
40:28.038 Post:5
40:28.038 Post:6
40:28.038 Post:7
40:28.038 Post:8
40:28.038 Post:9
40:28.038 Post:0
40:28.038 Post:1
40:28.038 Post:2
40:28.038 Post:3
40:28.038 Post:4
40:28.038 Post:5
40:28.038 Post:6
40:28.038 Post:7
40:28.038 Post:8
40:28.038 Post:9
40:28.038 Post:0
先處理 targetBlockA 的水滴,此處迴圈接收會將水滴接乾,但是接不到存在 targetBlockB 中的水滴
40:28.043 RevcA:0,1,2,3,4,5,6,7,8,9 10
40:28.149 RevcA:0,1,2,3,4,5,6,7,8,9 20
40:28.249 RevcA:0 21
處理 targetBlockB 的水滴,只剩下緩衝的水滴
40:28.350 RevcB:0,1,2,3,4,5,6,7,8,9 10

限流例子: 在用 I/O 操作的數據填充數據流網格時,可以設置數據流塊的 BoundedCapacity 屬性。這樣,在網格來不及處理數據時,就不會讀取過多的 I/O 數據,網格也不會緩存所有數據。

6. 數據流塊的並行處理

public static void BlockParalleRun()
{
    var multiplyBlock = new TransformBlock<int, int>(
    item =>
    {
        System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} first block.");
        Thread.Sleep(100);
        return item * 2;
    },
    new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    }
    );
    var subtractBlock = new TransformBlock<int, int>(item =>
    {
        System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} last block.");
        Thread.Sleep(100);
        return item - 2;
    });
    multiplyBlock.LinkTo(subtractBlock, new DataflowLinkOptions { PropagateCompletion = true });

    var task = Task.Run(async () =>
    {
        for (int i = 0; i < 7; i++)
        {
            multiplyBlock.Post(i);
        }

        multiplyBlock.Complete();
        await multiplyBlock.Completion;

        var tk = Task.Run(() =>
        {
            IList<int> recvResList;
            //此處延時為了TryReceiveAll獲取所有數據,防止 subtractBlock 還有數據未接收
            Thread.Sleep(1500);
            if (subtractBlock.TryReceiveAll(out recvResList))
            {
                System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} Revc {string.Join(",", recvResList)}.");
            }
            else
            {
                System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} Revc null.");
            }
        });
        await tk;
        // multiplyBlock 已經調用完成,subtractBlock 的完成狀態依賴於 Link 參數 PropagateCompletion
        await subtractBlock.Completion;
    });
    task.Wait();
}

輸出為:

44:16.023 first block.
44:16.023 first block.
44:16.023 first block.
44:16.023 first block.
44:16.023 first block.
44:16.023 first block.
44:16.023 first block.
44:16.146 last block.
44:16.250 last block.
44:16.351 last block.
44:16.452 last block.
44:16.552 last block.
44:16.652 last block.
44:16.753 last block.
44:17.656 Revc -2,0,2,4,6,8,10.

真正的難點: 找出哪些數據流塊需要並行處理

7. 創建自定義數據流塊

public static void BlockCustomRun()
{
    var block = CreateMyCustomBlock();
    for (int i = 0; i < 7; i++)
    {
        block.Post(i);//target
    }
    var task = Task.Run(async () =>
    {
        var tk = Task.Run(() =>
        {
            List<int> recvResList = new List<int>();
            //此處延時為了TryReceiveAll獲取所有數據,防止 subtractBlock 還有數據未接收

            while (true)
            {
                try
                {
                    var recvRes = block.Receive();//source
                    recvResList.Add(recvRes);
                }
                catch (System.InvalidOperationException)
                {
                    break;
                }
            }
            Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} Revc {string.Join(",", recvResList)}.");
        });
        block.Complete();//target
        await block.Completion;//source
        await tk;
    });
    task.Wait();
}

static IPropagatorBlock<int, int> CreateMyCustomBlock()
{
    var multiplyBlock = new TransformBlock<int, int>(item =>
    {
        int res = item * 2;
        System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} first block {res}.");
        Thread.Sleep(100);
        return res;
    });
    var addBlock = new TransformBlock<int, int>(item =>
    {
        int res = item + 2;
        System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} next block {res}.");
        Thread.Sleep(100);
        return res;
    });
    var divideBlock = new TransformBlock<int, int>(item =>
    {
        int res = item / 2;
        System.Console.WriteLine($"{DateTime.Now.ToString("mm:ss.fff")} last block {res}.");
        Thread.Sleep(100);
        return res;
    });
    var flowCompletion = new DataflowLinkOptions { PropagateCompletion = true };
    multiplyBlock.LinkTo(addBlock, flowCompletion);
    addBlock.LinkTo(divideBlock, flowCompletion);
    return DataflowBlock.Encapsulate(multiplyBlock, divideBlock);
}

輸出為:

45:00.528 first block 0.
45:00.639 first block 2.
45:00.641 next block 2.
45:00.739 first block 4.
45:00.746 next block 4.
45:00.747 last block 1.
45:00.844 first block 6.
45:00.847 next block 6.
45:00.848 last block 2.
45:00.947 first block 8.
45:00.951 next block 8.
45:00.951 last block 3.
45:01.049 first block 10.
45:01.055 next block 10.
45:01.056 last block 4.
45:01.152 first block 12.
45:01.159 next block 12.
45:01.160 last block 5.
45:01.264 next block 14.
45:01.265 last block 6.
45:01.365 last block 7.
45:01.472 Revc 1,2,3,4,5,6,7.

DataflowBlock.Encapsulate 只會封裝只有一個輸入塊和一個輸出塊的網格。如果一個可重用的網格帶有多個輸入或輸出,就應該把它封裝進一個自定義對象


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 轉發、重定向到其它業務方法 @org.springframework.stereotype.Controller @RequestMapping("/userController") public class UserController{ @RequestMapping("/handler1") ...
  • " 返回《C 併發編程》" "1. 調度到線程池" "2. 任務調度器" "2.1. Default 調度器" "2.2. 捕獲當前同步上下文 調度器" "2.3. ConcurrentExclusiveSchedulerPair 調度器" "3. 調度並行代碼" "4. 用調度器實現數據流的同步" ...
  • 場景 一個對象A,希望它的某些狀態在發生改變時通知到B(或C、D),常見的做法是在A中定義一個事件(或直接用委托),當狀態改變時A去觸發這個事件。而B直接訂閱這個事件 這種設計有點問題B由於要訂閱A的事件,所以B得完全引用A,其實有時候沒必要,因為我只關心A的狀態變化而已狀態變更通知這種場景很多,有 ...
  • " 返回《C 併發編程》" "1. 取消請求" "2. 超時後取消" "3. 取消並行" "4. 取消響應式代碼" "5. 與其他取消體系的互操作" 是一個等同於 預設 的特殊值,表示這個方法是永遠不會被取消的。 實例代碼 輸出: 1. 取消請求 2. 超時後取消 輸出: 只要執行代碼時用到了超時, ...
  • " 返回《C 併發編程》" "1. 簡介" "2. 不可變棧和隊列" "3. 不可變列表" "4. 不可變Set集合" "5. 不可變字典" "6. 線程安全字典" "7. 阻塞隊列" "8. 阻塞棧和包" "9. 非同步隊列" "10. 非同步棧和包" "11. 阻塞/非同步隊列" 1. 簡介 + 不可 ...
  • 匿名方法:通過匿名委托 、lamada表達式定義的函數具體操作並複製給委托類型;匿名委托:委托的一種簡單化聲明方式通過delegate關鍵字聲明;內置泛型委托:系統已經內置的委托類型主要是不帶返回值的Action和帶返回值的Func實例代碼(運行環境netcoreapp3.1)class demoF... ...
  • " 返回《C 併發編程》" "1. 用 async 代碼封裝非同步方法與 Completed 事件" "2. 用 async 代碼封裝 Begin/End 方法" "3. 用 async 代碼封裝並行代碼" "4. 用 async 代碼封裝 Rx Observable 對象" "5. 用 Rx Obs ...
  • " 返回《C 併發編程》" "1. 轉換.NET事件" "1.1. 進度通知" "1.2. 定時器示例" "1.3. 錯誤傳遞" "2. 發通知給上下文" "3. 用視窗和緩衝對事件分組" "4. 用限流和抽樣抑制事件流" "4.1. Throttle" "4.2. Sample" "5. 超時" ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...