C#中那些常見的消費隊列使用(上)

来源:https://www.cnblogs.com/seekdream/archive/2023/02/19/17135350.html
-Advertisement-
Play Games

一 背景 在我們的工作中我們經常有一種場景就是要使用到隊列,特別是對於這樣的一種情況:就是生產的速度明顯快於消費的速度,而且在多線程的環境下消息的生產由多個線程產生消息的消費則沒有這種限制,通過使用隊列這種方式能夠很大程度上將多線程的問題通過入隊的方式變成單個線程內的消息的聚合,然後通過單獨的線程進 ...


一 背景

在我們的工作中我們經常有一種場景就是要使用到隊列,特別是對於這樣的一種情況:就是生產的速度明顯快於消費的速度,而且在多線程的環境下消息的生產由多個線程產生消息的消費則沒有這種限制,通過使用隊列這種方式能夠很大程度上將多線程的問題通過入隊的方式變成單個線程內的消息的聚合,然後通過單獨的線程進行消費,本篇文章我將介紹一種常見通過包裝C#中Queue的方式實現一個能夠通過外部增加隊列元素,並且由外部進行隊列終止的一種常見的CustomQueue,後面會通過對源碼的講解來一步步加深對基礎原理的理解,最後會通過幾個單元測試來驗證對應的使用。

二 原理講解

2.1 源代碼展示

using System;
using System.Collections.Generic;
using System.Threading;

namespace Pangea.Common.Utility.Buffer
{
    public sealed class ConsumeQueue<T>
    {
        public static int _Counter_Instance;

        public enum ConsumeState
        {
            Idle,
            Consuming,
            Terminated
        }

        private Queue<T> _queue;

        private int _threadCounter = 0;

        private object _lock = new object();
        private Action<T> _consumeAction;
        private Action _onTerminatedNotify;
        private Func<bool> _shouldTerminateConsume;

        public ConsumeState State { get; private set; }

        public int PeedingItemsCount
        {
            get
            {
                lock (_lock)
                {
                    if (_queue == null)
                    {
                        return 0;
                    }
                    else
                    {
                        return _queue.Count;
                    }
                }
            }
        }

        public ConsumeQueue(Action<T> consume, Func<bool> shouldTerminate, Action onTerminated)
        {
            Interlocked.Increment(ref _Counter_Instance);

            _queue = new Queue<T>();

            State = ConsumeState.Idle;

            _consumeAction = consume;
            _shouldTerminateConsume = shouldTerminate;
            _onTerminatedNotify = onTerminated;
        }

        ~ConsumeQueue()
        {
            Interlocked.Decrement(ref _Counter_Instance);
        }

        public void ProduceItem(T item)
        {
            lock (_lock)
            {
                if (State == ConsumeState.Terminated) return;

                _queue.Enqueue(item);

                if (State == ConsumeState.Idle)
                {
                    State = ConsumeState.Consuming;
                    StartConsuming();
                }
            }
        }

        private void StartConsuming()
        {
            ThreadPool.QueueUserWorkItem(_ =>
            {
                ++_threadCounter;
                while (true)
                {
                    T newData = default(T);
                    lock (_lock)
                    {
                        newData = _queue.Dequeue();
                    }

                    _consumeAction(newData);

                    lock (_lock)
                    {
                        if (_shouldTerminateConsume())
                        {
                            OnTerminated();
                            State = ConsumeState.Terminated;

                            break;
                        }                        
                        else if (_queue.Count == 0)
                        {
                            State = ConsumeState.Idle;

                            break;
                        }                      
                    }
                }
                --_threadCounter;
            });
        }

        private void OnTerminated()
        {
            _queue.Clear();
            _queue = null;

            _consumeAction = null;
            _shouldTerminateConsume = null;

            _onTerminatedNotify?.Invoke();
        }
    }
}

2.2 代碼解析

  1. 增加泛型定義和類類型

首先對於該類作為一個完整的工具類,所以該類設計為禁止被繼承和重寫,所以增加C#關鍵字sealed作為一個密封類,另外對於該類中定義的數據類型並沒有明確的規定,所以該類設計成一個泛型類並且沒有對該類型T做一個明確的類型限制,這個在一定程度上增加了該類的靈活性。

  1. 定義ConsumeQueue中內部執行狀態
    在實際的代碼中通過下麵的一個枚舉類型State來定義內部執行狀態
public enum ConsumeState
        {
            Idle,
            Consuming,
            Terminated
        }
  • A 當CustomQueue初始化或者其內部的消息隊列Queue被清除完畢的時候設置狀態為Idle狀態並退出StartConsuming方法中的消費迴圈中
  • B 當內部的消息隊列中存在未被消費的項目時啟動消費過程,並設置State為Consuming
  • C 當外部傳入的ShouldTerminateConsume觸發時則不論內部的待消費的隊列是否為空都將退出當前消費過程,並調用內部的OnTerminated方法清除所有消費隊列對象

2.3 對應的單元測試

單元測試部分主要是通過模擬隨機產生1000條模擬數據,併在中途產生的數據大於900的時候去模擬終止CustomQueue的行為並斷言最後的結果和行為。這裡需要註意的是ConsumeQueue_TerminatedStatus除了模擬前面中斷的行為以外還通過反射確認threadCounter==0確保當前的消費線程都能夠得到正確的釋放。

using NUnit.Framework;
using Pangea.Common.Utility.Buffer;
using System;
using System.Reflection;
using System.Threading;

namespace ACM.Framework.Test.Modules.Utils
{
    [TestFixture]
    internal class ConsumeQueueTests
    {
        [Test, Timeout(5000)]
        public void ConsumeQueue_IdleProducing()
        {
            ManualResetEvent mre = new ManualResetEvent(false);

            int prevData = -1;
            Action<int> consume = data =>
            {
                Assert.IsTrue(data - prevData == 1, $"prev-{prevData}, current-{data}");
                prevData = data;
            };

            Func<bool> func = () => prevData > 900;

            Action terminated = () =>
            {
                mre.Set();
            };

            ConsumeQueue<int> queue = new ConsumeQueue<int>(consume, func, terminated);

            GenerateIntData(data =>
            {
                queue.ProduceItem(data);
            }, false);

            mre.WaitOne();
            int pendingCount = queue.PeedingItemsCount;
            var currentState = queue.State;
            Assert.IsTrue(currentState == ConsumeQueue<int>.ConsumeState.Terminated, $"current state : {currentState}");
            Assert.IsTrue(pendingCount == 0, $"{pendingCount}");
        }

        [Test, Timeout(5000)]
        public void ConsumeQueue_ContinueProducing()
        {
            ManualResetEvent mre = new ManualResetEvent(false);

            int prevData = -1;
            Action<int> consume = data =>
            {
                Assert.IsTrue(data - prevData == 1, $"prev-{prevData}, current-{data}");
                prevData = data;
            };

            Func<bool> func = () => prevData > 900;

            Action terminated = () =>
            {
                mre.Set();
            };

            ConsumeQueue<int> queue = new ConsumeQueue<int>(consume, func, terminated);

            GenerateIntData(data =>
            {
                queue.ProduceItem(data);
            }, true);

            mre.WaitOne();
            int pendingCount = queue.PeedingItemsCount;
            var currentState = queue.State;
            Assert.IsTrue(currentState == ConsumeQueue<int>.ConsumeState.Terminated, $"current state : {currentState}");
            Assert.IsTrue(pendingCount == 0, $"{pendingCount}");
        }

        [Test, Timeout(5000)]
        public void ConsumeQueue_TerminatedStatus()
        {
            ManualResetEvent mre = new ManualResetEvent(false);

            int prevData = -1;
            Action<int> consume = data =>
            {
                Assert.IsTrue(data - prevData == 1, $"prev-{prevData}, current-{data}");
                prevData = data;
            };

            Func<bool> func = () => prevData > 500;
            Action terminated = () => mre.Set();

            ConsumeQueue<int> queue = new ConsumeQueue<int>(consume, func, terminated);

            GenerateIntData(data =>
            {
                queue.ProduceItem(data);
                if(queue.State == ConsumeQueue<int>.ConsumeState.Terminated)
                {
                    Assert.IsTrue(queue.PeedingItemsCount == 0);
                }
                else
                {
                    Assert.IsTrue(queue.PeedingItemsCount > 0);
                }
            }, true);

            mre.WaitOne();

            Thread.Sleep(1000); // wait one second for waiting consuming thread exit

            int pendingCount = queue.PeedingItemsCount;
            var currentState = queue.State;
            int queueThreadNum = (int)queue.GetType().GetField("_threadCounter", BindingFlags.Instance | BindingFlags.NonPublic).GetValue(queue);
            Assert.IsTrue(currentState == ConsumeQueue<int>.ConsumeState.Terminated, $"current state : {currentState}");
            Assert.IsTrue(pendingCount == 0, $"{pendingCount}");
            Assert.IsTrue(queueThreadNum == 0, $"{queueThreadNum}");
        }


        private void GenerateIntData(Action<int> intData, bool withIdle)
        {
            ThreadPool.QueueUserWorkItem(state =>
            {
                int target = 1000;
                int index = 0;
                while (index < target)
                {
                    intData(index++);

                    if (withIdle)
                    {
                        Thread.Sleep(new Random(Guid.NewGuid().GetHashCode()).Next(1, 5));
                    }
                }
            });
        }
    }
}


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 背景 golang可以獲取命令執行的輸出結果,但要執行完才能夠獲取。 如果執行的命令是ssh,我們要實時獲取,並執行相應的操作呢? 示例 func main() { user := "root" host := "172.16.116.133" //獲取執行命令 cmd := exec.Comman ...
  • IDEA如何使用Maven不通過模板創建javaWeb項目 1.創建項目 進入IDEA,點擊“項目”>“新建項目”,填寫項目信息,最後點擊“創建”。 點擊“創建”後,自動進入新創建的項目。 2.給項目配置Web框架 點擊 “文件”>“項目結構”,自動跳轉到項目結構。 點擊 “模塊” > “+” > ...
  • 給大家推薦一個比Redis性能更強的數據:KeyDB KeyDB是Redis的高性能分支,側重於多線程、記憶體效率和高吞吐量。除了性能改進外,KeyDB還提供主動複製、快閃記憶體和子密鑰過期等功能。KeyDB具有MVCC架構,允許您在不阻塞資料庫和降低性能的情況下執行密鑰和掃描等查詢。 KeyDB與Redi ...
  • ​ 函數的調用、定義、參數 ​編輯 #######命名關鍵字參數沒完 abs()函數:絕對值 >>> abs(100) 100 >>> abs(-20) 20 max()函數:接收任意多個參數,並返回最大的那個 >>> max(1, 2) 2 >>> max(2, 3, 1, -5) 3 數據類型轉 ...
  • 介紹 棧(stack)又名堆棧,它是一種運算受限的線性表。限定僅在表尾進行插入和刪除操作的線性表。這一端被稱為棧頂,相對地,把另一端稱為棧底。向一個棧插入新元素又稱作進棧、入棧或壓棧,它是把新元素放到棧頂元素的上面,使之成為新的棧頂元素;從一個棧刪除元素又稱作出棧或退棧,它是把棧頂元素刪除掉,使其相 ...
  • leetcode《圖解數據結構》劍指 Offer 34. 二叉樹中和為某一值的路徑(java解題)的解題思路和java代碼,並附上java中常用數據結構的功能函數。 ...
  • 運算符 算術運算符 算數運算符: + - * / // % ** # + var1 = 7 var2 = 90 res = var1 + var2 print(res) # 97 # - var1 = 7 var2 = 90 res = var1 - var2 print(res) # 83 # * ...
  • synchronized 是 Java 語言的一個關鍵字,它允許多個線程同時訪問共用的資源,以避免多線程編程中的競爭條件和死鎖問題。 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...