C#中定義自己的消費隊列(上)

来源:https://www.cnblogs.com/seekdream/archive/2023/02/19/17135350.html
-Advertisement-
Play Games

一 背景 在我們的工作中我們經常有一種場景就是要使用到隊列,特別是對於這樣的一種情況:就是生產的速度明顯快於消費的速度,而且在多線程的環境下消息的生產由多個線程產生消息的消費則沒有這種限制,通過使用隊列這種方式能夠很大程度上將多線程的問題通過入隊的方式變成單個線程內的消息的聚合,然後通過單獨的線程進 ...


一 背景

在我們的工作中我們經常有一種場景就是要使用到隊列,特別是對於這樣的一種情況:就是生產的速度明顯快於消費的速度,而且在多線程的環境下消息的生產由多個線程產生消息的消費則沒有這種限制,通過使用隊列這種方式能夠很大程度上將多線程的問題通過入隊的方式變成單個線程內的消息的聚合,然後通過單獨的線程進行消費,本篇文章我將介紹一種常見通過包裝C#中Queue的方式實現一個能夠通過外部增加隊列元素,並且由外部進行隊列終止的一種常見的CustomQueue,後面會通過對源碼的講解來一步步加深對基礎原理的理解,最後會通過幾個單元測試來驗證對應的使用。

二 原理講解

2.1 源代碼展示

using System;
using System.Collections.Generic;
using System.Threading;

namespace Pangea.Common.Utility.Buffer
{
    public sealed class ConsumeQueue<T>
    {
        public static int _Counter_Instance;

        public enum ConsumeState
        {
            Idle,
            Consuming,
            Terminated
        }

        private Queue<T> _queue;

        private int _threadCounter = 0;

        private object _lock = new object();
        private Action<T> _consumeAction;
        private Action _onTerminatedNotify;
        private Func<bool> _shouldTerminateConsume;

        public ConsumeState State { get; private set; }

        public int PeedingItemsCount
        {
            get
            {
                lock (_lock)
                {
                    if (_queue == null)
                    {
                        return 0;
                    }
                    else
                    {
                        return _queue.Count;
                    }
                }
            }
        }

        public ConsumeQueue(Action<T> consume, Func<bool> shouldTerminate, Action onTerminated)
        {
            Interlocked.Increment(ref _Counter_Instance);

            _queue = new Queue<T>();

            State = ConsumeState.Idle;

            _consumeAction = consume;
            _shouldTerminateConsume = shouldTerminate;
            _onTerminatedNotify = onTerminated;
        }

        ~ConsumeQueue()
        {
            Interlocked.Decrement(ref _Counter_Instance);
        }

        public void ProduceItem(T item)
        {
            lock (_lock)
            {
                if (State == ConsumeState.Terminated) return;

                _queue.Enqueue(item);

                if (State == ConsumeState.Idle)
                {
                    State = ConsumeState.Consuming;
                    StartConsuming();
                }
            }
        }

        private void StartConsuming()
        {
            ThreadPool.QueueUserWorkItem(_ =>
            {
                ++_threadCounter;
                while (true)
                {
                    T newData = default(T);
                    lock (_lock)
                    {
                        newData = _queue.Dequeue();
                    }

                    _consumeAction(newData);

                    lock (_lock)
                    {
                        if (_shouldTerminateConsume())
                        {
                            OnTerminated();
                            State = ConsumeState.Terminated;

                            break;
                        }                        
                        else if (_queue.Count == 0)
                        {
                            State = ConsumeState.Idle;

                            break;
                        }                      
                    }
                }
                --_threadCounter;
            });
        }

        private void OnTerminated()
        {
            _queue.Clear();
            _queue = null;

            _consumeAction = null;
            _shouldTerminateConsume = null;

            _onTerminatedNotify?.Invoke();
        }
    }
}

2.2 代碼解析

  1. 增加泛型定義和類類型

首先對於該類作為一個完整的工具類,所以該類設計為禁止被繼承和重寫,所以增加C#關鍵字sealed作為一個密封類,另外對於該類中定義的數據類型並沒有明確的規定,所以該類設計成一個泛型類並且沒有對該類型T做一個明確的類型限制,這個在一定程度上增加了該類的靈活性。

  1. 定義ConsumeQueue中內部執行狀態
    在實際的代碼中通過下麵的一個枚舉類型State來定義內部執行狀態
public enum ConsumeState
        {
            Idle,
            Consuming,
            Terminated
        }
  • A 當CustomQueue初始化或者其內部的消息隊列Queue被清除完畢的時候設置狀態為Idle狀態並退出StartConsuming方法中的消費迴圈中
  • B 當內部的消息隊列中存在未被消費的項目時啟動消費過程,並設置State為Consuming
  • C 當外部傳入的ShouldTerminateConsume觸發時則不論內部的待消費的隊列是否為空都將退出當前消費過程,並調用內部的OnTerminated方法清除所有消費隊列對象

2.3 對應的單元測試

單元測試部分主要是通過模擬隨機產生1000條模擬數據,併在中途產生的數據大於900的時候去模擬終止CustomQueue的行為並斷言最後的結果和行為。這裡需要註意的是ConsumeQueue_TerminatedStatus除了模擬前面中斷的行為以外還通過反射確認threadCounter==0確保當前的消費線程都能夠得到正確的釋放。

using NUnit.Framework;
using Pangea.Common.Utility.Buffer;
using System;
using System.Reflection;
using System.Threading;

namespace ACM.Framework.Test.Modules.Utils
{
    [TestFixture]
    internal class ConsumeQueueTests
    {
        [Test, Timeout(5000)]
        public void ConsumeQueue_IdleProducing()
        {
            ManualResetEvent mre = new ManualResetEvent(false);

            int prevData = -1;
            Action<int> consume = data =>
            {
                Assert.IsTrue(data - prevData == 1, $"prev-{prevData}, current-{data}");
                prevData = data;
            };

            Func<bool> func = () => prevData > 900;

            Action terminated = () =>
            {
                mre.Set();
            };

            ConsumeQueue<int> queue = new ConsumeQueue<int>(consume, func, terminated);

            GenerateIntData(data =>
            {
                queue.ProduceItem(data);
            }, false);

            mre.WaitOne();
            int pendingCount = queue.PeedingItemsCount;
            var currentState = queue.State;
            Assert.IsTrue(currentState == ConsumeQueue<int>.ConsumeState.Terminated, $"current state : {currentState}");
            Assert.IsTrue(pendingCount == 0, $"{pendingCount}");
        }

        [Test, Timeout(5000)]
        public void ConsumeQueue_ContinueProducing()
        {
            ManualResetEvent mre = new ManualResetEvent(false);

            int prevData = -1;
            Action<int> consume = data =>
            {
                Assert.IsTrue(data - prevData == 1, $"prev-{prevData}, current-{data}");
                prevData = data;
            };

            Func<bool> func = () => prevData > 900;

            Action terminated = () =>
            {
                mre.Set();
            };

            ConsumeQueue<int> queue = new ConsumeQueue<int>(consume, func, terminated);

            GenerateIntData(data =>
            {
                queue.ProduceItem(data);
            }, true);

            mre.WaitOne();
            int pendingCount = queue.PeedingItemsCount;
            var currentState = queue.State;
            Assert.IsTrue(currentState == ConsumeQueue<int>.ConsumeState.Terminated, $"current state : {currentState}");
            Assert.IsTrue(pendingCount == 0, $"{pendingCount}");
        }

        [Test, Timeout(5000)]
        public void ConsumeQueue_TerminatedStatus()
        {
            ManualResetEvent mre = new ManualResetEvent(false);

            int prevData = -1;
            Action<int> consume = data =>
            {
                Assert.IsTrue(data - prevData == 1, $"prev-{prevData}, current-{data}");
                prevData = data;
            };

            Func<bool> func = () => prevData > 500;
            Action terminated = () => mre.Set();

            ConsumeQueue<int> queue = new ConsumeQueue<int>(consume, func, terminated);

            GenerateIntData(data =>
            {
                queue.ProduceItem(data);
                if(queue.State == ConsumeQueue<int>.ConsumeState.Terminated)
                {
                    Assert.IsTrue(queue.PeedingItemsCount == 0);
                }
                else
                {
                    Assert.IsTrue(queue.PeedingItemsCount > 0);
                }
            }, true);

            mre.WaitOne();

            Thread.Sleep(1000); // wait one second for waiting consuming thread exit

            int pendingCount = queue.PeedingItemsCount;
            var currentState = queue.State;
            int queueThreadNum = (int)queue.GetType().GetField("_threadCounter", BindingFlags.Instance | BindingFlags.NonPublic).GetValue(queue);
            Assert.IsTrue(currentState == ConsumeQueue<int>.ConsumeState.Terminated, $"current state : {currentState}");
            Assert.IsTrue(pendingCount == 0, $"{pendingCount}");
            Assert.IsTrue(queueThreadNum == 0, $"{queueThreadNum}");
        }


        private void GenerateIntData(Action<int> intData, bool withIdle)
        {
            ThreadPool.QueueUserWorkItem(state =>
            {
                int target = 1000;
                int index = 0;
                while (index < target)
                {
                    intData(index++);

                    if (withIdle)
                    {
                        Thread.Sleep(new Random(Guid.NewGuid().GetHashCode()).Next(1, 5));
                    }
                }
            });
        }
    }
}


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 原本也沒深究過這個,用的多了,完全憑藉經驗辦事,理論差的一塌糊塗,最近不流行那個openai,於是在偉大的人工智慧輔導下好好梳理一遍理論知識 初步理論認知 async 和 await 是 C# 語言中用於非同步編程的關鍵字,主要作用是讓代碼在等待非同步操作完成的時候繼續執行,從而達到不會阻塞線程的效果 ...
  • 如何優化線上WebAssembly WebAssembly部署使用 HTTPS : 為什麼?我可以通過一個案例查看 ,下麵我們會通過masa docs站點進行測試 打開 http://docs.masastack.com/blazor/getting-started/installation 網站 ...
  • 如何使用命名行指令去運行和打包.net6項目 前言 之前發佈了一個.net webApi入門項目,項目文章,在文章中我推薦的是Docker部署,只說明瞭如何打包,但是沒有說怎麼運行,考慮到很多人寫代碼不是用的Visual Studio。這裡講一下控制台怎麼去管理項目。 準備工作 安裝.net6 SD ...
  • 接前上一篇:平臺調用 (P/Invoke):DllImport特性說明 首先,我們知道C#和C/C++都是跨平臺的,但是原理上他們是不一樣的: C#首先編譯成一種中間語言(IL)的程式集,然後將這種程式集放到不同平臺下的解釋器裡面去執行,這就是說一次編譯到處運行 C/C++是針對不同的平臺直接編譯, ...
  • 1.說說.NET7中 _ViewImports文件的作用? 2.什麼是Razor頁面? 3.說說.NET5中 __ViewStart文件的作用? 4.如何在Razor頁面中實現數據模型綁定? 5.如何在Controller中註入service? 6.描述一下依賴註入後的服務生命周期? 7.說說ASP ...
  • 今年開年,最火的莫過於ChatGPT的相關討論,這個提供了非常強大的AI處理,並且整個平臺也提供了很多對應的API進行接入的處理,使得我們可以在各種程式上無縫接入AI的後端處理,從而實現智能AI的各種應用。ChatGPT的API可以在前端,以及一些後端進行API的接入,本篇隨筆主要介紹基於ChatG... ...
  • 一、什麼時候需要用到NuGet私有伺服器 很多公司中架構師會搭建一個統一的項目基礎架構模板,然後全部新項目都會拿這個基礎架構來開發新的項目,那架構中就會有很多的中間件,比喻公司內部的封裝好的Redis中間件,訪問資料庫的中間件,MQ中間件,小程式中間件等等。 現在很多項目都用這個模板開發了,然後其中 ...
  • 概述 使用原型實例指定創建對象的種類,並且通過拷貝這些原型創建新的對象。 在軟體系統開發中,有時候會遇到這樣的情況:我們需要用到多個相同實例,最簡單直接的方法是通過多次調用new方法來創建相同的實例。 student s=new student(); student s1=new student() ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...