一 背景 在我們的工作中我們經常有一種場景就是要使用到隊列,特別是對於這樣的一種情況:就是生產的速度明顯快於消費的速度,而且在多線程的環境下消息的生產由多個線程產生消息的消費則沒有這種限制,通過使用隊列這種方式能夠很大程度上將多線程的問題通過入隊的方式變成單個線程內的消息的聚合,然後通過單獨的線程進 ...
一 背景
在我們的工作中我們經常有一種場景就是要使用到隊列,特別是對於這樣的一種情況:就是生產的速度明顯快於消費的速度,而且在多線程的環境下消息的生產由多個線程產生消息的消費則沒有這種限制,通過使用隊列這種方式能夠很大程度上將多線程的問題通過入隊的方式變成單個線程內的消息的聚合,然後通過單獨的線程進行消費,本篇文章我將介紹一種常見通過包裝C#中Queue的方式實現一個能夠通過外部增加隊列元素,並且由外部進行隊列的終止的一種常見的CustomQueue,後面會通過對源碼的講解來一步步加深對基礎原理的理解,最後會通過幾個單元測試來驗證對應的使用。
二 原理講解
2.1 源代碼展示
using System;
using System.Collections.Generic;
using System.Threading;
namespace Pangea.Common.Utility.Buffer
{
public sealed class ConsumeQueue<T>
{
public static int _Counter_Instance;
public enum ConsumeState
{
Idle,
Consuming,
Terminated
}
private Queue<T> _queue;
private int _threadCounter = 0;
private object _lock = new object();
private Action<T> _consumeAction;
private Action _onTerminatedNotify;
private Func<bool> _shouldTerminateConsume;
public ConsumeState State { get; private set; }
public int PeedingItemsCount
{
get
{
lock (_lock)
{
if (_queue == null)
{
return 0;
}
else
{
return _queue.Count;
}
}
}
}
public ConsumeQueue(Action<T> consume, Func<bool> shouldTerminate, Action onTerminated)
{
Interlocked.Increment(ref _Counter_Instance);
_queue = new Queue<T>();
State = ConsumeState.Idle;
_consumeAction = consume;
_shouldTerminateConsume = shouldTerminate;
_onTerminatedNotify = onTerminated;
}
~ConsumeQueue()
{
Interlocked.Decrement(ref _Counter_Instance);
}
public void ProduceItem(T item)
{
lock (_lock)
{
if (State == ConsumeState.Terminated) return;
_queue.Enqueue(item);
if (State == ConsumeState.Idle)
{
State = ConsumeState.Consuming;
StartConsuming();
}
}
}
private void StartConsuming()
{
ThreadPool.QueueUserWorkItem(_ =>
{
++_threadCounter;
while (true)
{
T newData = default(T);
lock (_lock)
{
newData = _queue.Dequeue();
}
_consumeAction(newData);
lock (_lock)
{
if (_shouldTerminateConsume())
{
OnTerminated();
State = ConsumeState.Terminated;
break;
}
else if (_queue.Count == 0)
{
State = ConsumeState.Idle;
break;
}
}
}
--_threadCounter;
});
}
private void OnTerminated()
{
_queue.Clear();
_queue = null;
_consumeAction = null;
_shouldTerminateConsume = null;
_onTerminatedNotify?.Invoke();
}
}
}
2.2 代碼解析
- 增加泛型定義和類類型
首先對於該類作為一個完整的工具類
,所以該類設計為禁止被繼承和重寫,所以增加C#關鍵字sealed
作為一個密封類,另外對於該類中定義的數據類型並沒有明確的規定,所以該類設計成一個泛型類
- 定義ConsumeQueue中內部執行狀態
在實際的代碼中通過下麵的一個枚舉類型State
來定義內部執行狀態
public enum ConsumeState
{
Idle,
Consuming,
Terminated
}
- A 當CustomQueue初始化或者其內部的消息隊列Queue被清除完畢的時候設置狀態為
Idle
狀態並退出StartConsuming方法中的消費迴圈中 - B 當內部的消息隊列中存在未被消費的項目時啟動消費過程,並設置State為
Consuming
- C 當外部傳入的
ShouldTerminateConsume
觸發時則不論內部的待消費的隊列是否為空都將退出當前消費過程,並調用內部的OnTerminated
方法清除所有消費隊列對象
2.3 對應的單元測試
單元測試部分主要是通過模擬隨機產生1000條模擬數據,併在中途產生的數據大於900的時候去模擬終止CustomQueue的行為並斷言最後的結果和行為。這裡需要註意的是ConsumeQueue_TerminatedStatus
除了模擬前面中斷的行為以外還通過反射確認threadCounter==0
確保當前的消費線程都能夠得到正確的釋放。
using NUnit.Framework;
using Pangea.Common.Utility.Buffer;
using System;
using System.Reflection;
using System.Threading;
namespace ACM.Framework.Test.Modules.Utils
{
[TestFixture]
internal class ConsumeQueueTests
{
[Test, Timeout(5000)]
public void ConsumeQueue_IdleProducing()
{
ManualResetEvent mre = new ManualResetEvent(false);
int prevData = -1;
Action<int> consume = data =>
{
Assert.IsTrue(data - prevData == 1, $"prev-{prevData}, current-{data}");
prevData = data;
};
Func<bool> func = () => prevData > 900;
Action terminated = () =>
{
mre.Set();
};
ConsumeQueue<int> queue = new ConsumeQueue<int>(consume, func, terminated);
GenerateIntData(data =>
{
queue.ProduceItem(data);
}, false);
mre.WaitOne();
int pendingCount = queue.PeedingItemsCount;
var currentState = queue.State;
Assert.IsTrue(currentState == ConsumeQueue<int>.ConsumeState.Terminated, $"current state : {currentState}");
Assert.IsTrue(pendingCount == 0, $"{pendingCount}");
}
[Test, Timeout(5000)]
public void ConsumeQueue_ContinueProducing()
{
ManualResetEvent mre = new ManualResetEvent(false);
int prevData = -1;
Action<int> consume = data =>
{
Assert.IsTrue(data - prevData == 1, $"prev-{prevData}, current-{data}");
prevData = data;
};
Func<bool> func = () => prevData > 900;
Action terminated = () =>
{
mre.Set();
};
ConsumeQueue<int> queue = new ConsumeQueue<int>(consume, func, terminated);
GenerateIntData(data =>
{
queue.ProduceItem(data);
}, true);
mre.WaitOne();
int pendingCount = queue.PeedingItemsCount;
var currentState = queue.State;
Assert.IsTrue(currentState == ConsumeQueue<int>.ConsumeState.Terminated, $"current state : {currentState}");
Assert.IsTrue(pendingCount == 0, $"{pendingCount}");
}
[Test, Timeout(5000)]
public void ConsumeQueue_TerminatedStatus()
{
ManualResetEvent mre = new ManualResetEvent(false);
int prevData = -1;
Action<int> consume = data =>
{
Assert.IsTrue(data - prevData == 1, $"prev-{prevData}, current-{data}");
prevData = data;
};
Func<bool> func = () => prevData > 500;
Action terminated = () => mre.Set();
ConsumeQueue<int> queue = new ConsumeQueue<int>(consume, func, terminated);
GenerateIntData(data =>
{
queue.ProduceItem(data);
if(queue.State == ConsumeQueue<int>.ConsumeState.Terminated)
{
Assert.IsTrue(queue.PeedingItemsCount == 0);
}
else
{
Assert.IsTrue(queue.PeedingItemsCount > 0);
}
}, true);
mre.WaitOne();
Thread.Sleep(1000); // wait one second for waiting consuming thread exit
int pendingCount = queue.PeedingItemsCount;
var currentState = queue.State;
int queueThreadNum = (int)queue.GetType().GetField("_threadCounter", BindingFlags.Instance | BindingFlags.NonPublic).GetValue(queue);
Assert.IsTrue(currentState == ConsumeQueue<int>.ConsumeState.Terminated, $"current state : {currentState}");
Assert.IsTrue(pendingCount == 0, $"{pendingCount}");
Assert.IsTrue(queueThreadNum == 0, $"{queueThreadNum}");
}
private void GenerateIntData(Action<int> intData, bool withIdle)
{
ThreadPool.QueueUserWorkItem(state =>
{
int target = 1000;
int index = 0;
while (index < target)
{
intData(index++);
if (withIdle)
{
Thread.Sleep(new Random(Guid.NewGuid().GetHashCode()).Next(1, 5));
}
}
});
}
}
}