三年前寫過基於ConcurrentQueue的非同步隊列,今天在整理代碼的時候發現當時另外一種實現方式-使用BlockingCollection實現,這種方式目前依然在實際項目中使用。關於BlockingCollection的基本使用請查閱MSDN。源碼實現 下麵直接上代碼:(代碼已經放到了我的git ...
三年前寫過基於ConcurrentQueue的非同步隊列,今天在整理代碼的時候發現當時另外一種實現方式-使用BlockingCollection實現,這種方式目前依然在實際項目中使用。關於BlockingCollection的基本使用請查閱MSDN。源碼實現
下麵直接上代碼:(代碼已經放到了我的github上)
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading; using Danny.Infrastructure.Helper; namespace Danny.Infrastructure.Collections { /// <summary> /// 一個基於BlockingCollection實現的多線程的處理隊列 /// </summary> public class ProcessQueue<T> { private BlockingCollection<T> _queue; private CancellationTokenSource _cancellationTokenSource; private CancellationToken _cancellToken; //內部線程池 private List<Thread> _threadCollection; //隊列是否正在處理數據 private int _isProcessing; //有線程正在處理數據 private const int Processing = 1; //沒有線程處理數據 private const int UnProcessing = 0; //隊列是否可用 private volatile bool _enabled = true; //內部處理線程數量 private int _internalThreadCount; public event Action<T> ProcessItemEvent; //處理異常,需要三個參數,當前隊列實例,異常,當時處理的數據 public event Action<dynamic,Exception,T> ProcessExceptionEvent; public ProcessQueue() { _queue=new BlockingCollection<T>(); _cancellationTokenSource = new CancellationTokenSource(); _internalThreadCount = 1; _cancellToken = _cancellationTokenSource.Token; _threadCollection = new List<Thread>(); } public ProcessQueue(int internalThreadCount):this() { this._internalThreadCount = internalThreadCount; } /// <summary> /// 隊列內部元素的數量 /// </summary> public int GetInternalItemCount() { return _queue.Count; } public void Enqueue(T items) { if (items == null) { throw new ArgumentException("items"); } _queue.Add(items); DataAdded(); } public void Flush() { StopProcess(); while (_queue.Count != 0) { T item=default(T); if (_queue.TryTake(out item)) { try { ProcessItemEvent(item); } catch (Exception ex) { OnProcessException(ex,item); } } } } private void DataAdded() { if (_enabled) { if (!IsProcessingItem()) { ProcessRangeItem(); StartProcess(); } } } //判斷是否隊列有線程正在處理 private bool IsProcessingItem() { return !(Interlocked.CompareExchange(ref _isProcessing, Processing, UnProcessing) == UnProcessing); } private void ProcessRangeItem() { for (int i = 0; i < this._internalThreadCount; i++) { ProcessItem(); } } private void ProcessItem() { Thread currentThread = new Thread((state) => { T item=default(T); while (_enabled) { try { try { item = _queue.Take(_cancellToken); ProcessItemEvent(item); } catch (OperationCanceledException ex) { DebugHelper.DebugView(ex.ToString()); } } catch (Exception ex) { OnProcessException(ex,item); } } }); _threadCollection.Add(currentThread); } private void StartProcess() { foreach (var thread in _threadCollection) { thread.Start(); } } private void StopProcess() { this._enabled = false; foreach (var thread in _threadCollection) { if (thread.IsAlive) { thread.Join(); } } _threadCollection.Clear(); } private void OnProcessException(Exception ex,T item) { var tempException = ProcessExceptionEvent; Interlocked.CompareExchange(ref ProcessExceptionEvent, null, null); if (tempException != null) { ProcessExceptionEvent(this,ex,item); } } } }
使用方法:
class Program { static void Main(string[] args) { ProcessQueue<int> processQueue = new ProcessQueue<int>(); processQueue.ProcessExceptionEvent += ProcessQueue_ProcessExceptionEvent; processQueue.ProcessItemEvent += ProcessQueue_ProcessItemEvent; processQueue.Enqueue(1); processQueue.Enqueue(2); processQueue.Enqueue(3); } /// <summary> /// 該方法對入隊的每個元素進行處理 /// </summary> /// <param name="value"></param> private static void ProcessQueue_ProcessItemEvent(int value) { Console.WriteLine(value); } /// <summary> /// 處理異常 /// </summary> /// <param name="obj">隊列實例</param> /// <param name="ex">異常對象</param> /// <param name="value">出錯的數據</param> private static void ProcessQueue_ProcessExceptionEvent(dynamic obj, Exception ex, int value) { Console.WriteLine(ex.ToString()); } }