NET 6 環境開發 實現 線程數量,任務隊列,非核心線程,及核心線程活躍時間的管理。 namespace CustomThreadPool; /// <summary> /// 線程池類 /// </summary> public class ThreadPoolExecutor { /// <s ...
NET 6 環境開發 實現 線程數量,任務隊列,非核心線程,及核心線程活躍時間的管理。
namespace CustomThreadPool; /// <summary> /// 線程池類 /// </summary> public class ThreadPoolExecutor { /// <summary> /// 核心線程的任務隊列 /// </summary> private readonly Queue<WorkTask> tasks = new Queue<WorkTask>(); /// <summary> /// 最大核心線程數 /// </summary> private int coreThreadCount; /// <summary> /// 最大非核心線程數 /// </summary> private int noneCoreThreadCount; /// <summary> /// 當前運行的核心線程的數量 /// </summary> private int runCoreThreadCount; /// <summary> /// 當前運行的非核心線程的數量 /// </summary> private int runNoneCoreThreadCount; /// <summary> /// 核心線程隊列的最大數 /// </summary> private int maxQueueCount; /// <summary> /// 當核心線程空閑時最大活躍時間 /// </summary> private int keepAliveTimeout; /// <summary> /// 設置是否為後臺線程 /// </summary> private bool isBackground; private ThreadPoolExecutor() { } /// <summary> /// /// </summary> /// <param name="CoreThreadCount">核心線程數</param> /// <param name="TotalThreadCount">匯流排程數</param> /// <param name="IsBackground">是否為後臺線程</param> /// <param name="QueueCount">核心隊列的最大數</param> /// <param name="KeepAliveTimeout">當核心線程空閑時最大活躍時間</param> /// <exception cref="ArgumentOutOfRangeException"></exception> /// <exception cref="ArgumentException"></exception> public ThreadPoolExecutor(int CoreThreadCount = 5, int TotalThreadCount = 10, bool IsBackground = true, int QueueCount = 200, int KeepAliveTimeout = 0) { if (CoreThreadCount < 1) throw new ArgumentOutOfRangeException(nameof(CoreThreadCount), CoreThreadCount, null); if (TotalThreadCount < CoreThreadCount) throw new ArgumentException($"{nameof(TotalThreadCount)}:{TotalThreadCount} must be greater than {nameof(CoreThreadCount)}:{CoreThreadCount}"); if (QueueCount < 0) throw new ArgumentOutOfRangeException(nameof(QueueCount), QueueCount, null); if (KeepAliveTimeout < 0) throw new ArgumentOutOfRangeException(nameof(KeepAliveTimeout), KeepAliveTimeout, null); coreThreadCount = CoreThreadCount; noneCoreThreadCount = TotalThreadCount - CoreThreadCount; keepAliveTimeout = KeepAliveTimeout; maxQueueCount = QueueCount; isBackground = IsBackground; } /// <summary> /// 執行任務 /// </summary> /// <param name="task">一個自定義任務</param> /// <exception cref="ArgumentNullException">任務為null時,拋出該錯誤</exception> /// <exception cref="NotSupportedException">當核心任務隊列已滿且非核心線程最大數為0時拋出該錯誤</exception> public void QueueTask(WorkTask task) { if (task == null) throw new ArgumentNullException(nameof(task)); lock (tasks) { tasks.Enqueue(task); if (tasks.Count <= maxQueueCount) { if (runCoreThreadCount < coreThreadCount) { ++runCoreThreadCount; Run(true); } } else { if (noneCoreThreadCount > 0 && runNoneCoreThreadCount < noneCoreThreadCount) { ++runNoneCoreThreadCount; Run(false); } } } } private void Run(bool isCore) { Tuple<int, bool> state = new(keepAliveTimeout, isCore); Thread thread = new(t => Excute(t)) { Name = Guid.NewGuid().ToString("D"), IsBackground = isBackground }; thread.Start(state); } private void Excute(object? state) { if (state == null) return; var parameter = (Tuple<int, bool>)state; bool first = true; DateTime firstTime = DateTime.Now; while (true) { WorkTask? item = null; lock (tasks) { if (tasks.Count > 0) { first = true; item = tasks.Dequeue(); } else { if (parameter.Item2) { if (first) { firstTime = DateTime.Now; first = false; } if ((DateTime.Now - firstTime).TotalMilliseconds > parameter.Item1) { --runCoreThreadCount; break; } } else { --runNoneCoreThreadCount; break; } } } item?.Runsynchronous(); } } }
namespace CustomThreadPool; /// <summary> /// 包裝的任務類 /// </summary> public class WorkTask { public static WorkTaskFactory Factory { get; private set; } = new WorkTaskFactory(); /// <summary> /// 任務運行結束時觸發該事件 /// </summary> public event Action<WorkTask>? TaskCompleted; /// <summary> /// 任務ID /// </summary> private static int _id = 0; /// <summary> /// 委托給任務不帶執行參數的代碼 /// </summary> private readonly Action? action; /// <summary> /// 委托給任務執行的帶輸入參數代碼 /// </summary> private readonly Action<object?>? actionWithParamter; /// <summary> /// 線程間的同步事件 /// </summary> public AutoResetEvent WaitHandle { get; protected set; } = new AutoResetEvent(false); /// <summary> /// 執行代碼的參數 /// </summary> public object? State { get; protected set; } /// <summary> /// 接收任務拋出的異常 /// </summary> public WorkTaskException? Exception { get; protected set; } /// <summary> /// 任務是否完成標誌 /// </summary> public bool IsCompleted { get; protected set; } = false; /// <summary> /// 任務知否有異常 /// </summary> public bool IsFaulted { get; protected set; } = false; /// <summary> /// 任務狀態 /// </summary> public WorkTaskStatus Status { get; protected set; } = WorkTaskStatus.Created; public int Id { get { return Interlocked.Increment(ref _id); } } protected WorkTask() { } protected void OnTaskCompleted(WorkTask sender) { TaskCompleted?.Invoke(sender); } public WorkTask(Action action) { this.action = action ?? throw new ArgumentNullException(nameof(action)); } public WorkTask(Action<object?> action, object state) { actionWithParamter = action ?? throw new ArgumentNullException(nameof(action)); this.State = state; } /// <summary> /// 任務的同步方法 /// </summary> public virtual void Runsynchronous() { if (Status != WorkTaskStatus.Created) return; Status = WorkTaskStatus.Running; try { action?.Invoke(); actionWithParamter?.Invoke(State); } catch (Exception ex) { Exception = new WorkTaskException(ex.Message, ex); IsFaulted = true; } finally { OnTaskCompleted(this); WaitHandle.Set(); IsCompleted = true; Status = WorkTaskStatus.RanToCompleted; } } /// <summary> /// 通過調用線程執行的方法 /// </summary> public void Start() { Factory.ThreadPoolExcutor?.QueueTask(this); } /// <summary> /// 通過調用線程執行的方法 /// </summary> /// <param name="executor">線程池管理類</param> public void Start(ThreadPoolExecutor executor) { executor.QueueTask(this); } /// <summary> /// 執行一組任務並等待所有任務完成。 /// </summary> /// <param name="tasks">一組任務</param> /// <returns>所有任務是否都接收到完成的信號。</returns> public static bool WaitAll(WorkTask[] tasks) { var result = true; foreach (var task in tasks) { result = result && task.WaitHandle.WaitOne(); } return result; } /// <summary> /// 執行一組任務並等待任意一個任務完成。 /// </summary> /// <param name="tasks">一組任務</param> /// <returns>返回已完成任務的索引</returns> public static int WaitAny(WorkTask[] tasks) { var index = new Random().Next(0, tasks.Length - 1); tasks[index].WaitHandle.WaitOne(); return index; } } /// <summary> /// 具有返回類型的任務 /// </summary> /// <typeparam name="TResult"></typeparam> public class WorkTask<TResult> : WorkTask { private readonly Func<TResult>? func; private readonly Func<object?, TResult>? funcWithParameter; protected TResult? _result = default(TResult); public TResult? Result { get { if (!isSetSignal) WaitHandle.WaitOne(); return _result; } } public WorkTask(Func<TResult> func) { this.func = func ?? throw new ArgumentNullException(nameof(func)); } public WorkTask(Func<object?, TResult> func, object? state) { this.funcWithParameter = func ?? throw new ArgumentNullException(nameof(func)); this.State = state; } private bool isSetSignal = false; public override void Runsynchronous() { if (Status != WorkTaskStatus.Created) return; Status = WorkTaskStatus.Running; try { if (func != null) _result = func(); if (funcWithParameter != null) _result = funcWithParameter(State); } catch (Exception ex) { Exception = new WorkTaskException(ex.Message, ex); IsFaulted = true; } finally { OnTaskCompleted(this); isSetSignal = WaitHandle.Set(); Status = WorkTaskStatus.RanToCompleted; IsCompleted = true; } } } public class WorkTaskException : Exception { public WorkTaskException() { } public WorkTaskException(string Message) : base(Message) { } public WorkTaskException(string Message, Exception InnerException) : base(Message, InnerException) { } } public enum WorkTaskStatus { /// <summary> /// 已創建 /// </summary> Created = 0, /// <summary> /// 正在運行 /// </summary> Running = 1, /// <summary> /// 已完成 /// </summary> RanToCompleted = 2, }
namespace CustomThreadPool; public class WorkTaskFactory { public ThreadPoolExecutor? ThreadPoolExcutor { get; private set; } public WorkTaskFactory(ThreadPoolExecutor excutor) { ThreadPoolExcutor = excutor; } public WorkTaskFactory() : this(new ThreadPoolExecutor(5, 10)) { } public WorkTask StartNew(Action action, ThreadPoolExecutor? executor = null) { WorkTask task = new WorkTask(action); ThreadPoolExcutor = executor ?? ThreadPoolExcutor; ThreadPoolExcutor?.QueueTask(task); return task; } public WorkTask<TResult> StartNew<TResult>(Func<object?, TResult> func, object? state, ThreadPoolExecutor? executor = null) { WorkTask<TResult> task = new WorkTask<TResult>(func, state); ThreadPoolExcutor = executor ?? ThreadPoolExcutor; ThreadPoolExcutor?.QueueTask(task); return task; } }
namespace CustomThreadPool; using System.Threading; using System.Text; using System; using System.Diagnostics; using System.Reflection.Emit; class Program { static void Main(string[] args) { int count = 5; ThreadPoolExecutor poolExcutor = new(5, 6, QueueCount: 5, KeepAliveTimeout: 2000); WorkTask<int?>[] workTasks = new WorkTask<int?>[count]; for (int i = 0; i < count; i++) workTasks[i] = WorkTask.Factory.StartNew(t => Action(t), state: i, executor: poolExcutor); WorkTask<int> task = WorkTask.Factory.StartNew(t => { Thread.Sleep(100); Console.WriteLine("start thread"); return 100; }, state: null, executor: poolExcutor); Console.WriteLine("start main"); WorkTask.WaitAll(workTasks); Console.WriteLine(task.Result); Console.WriteLine(workTasks.Sum(t => t.Result)); } private static int? Action(object? t) { Thread.Sleep(2000); Console.WriteLine($"Task Id:{Environment.CurrentManagedThreadId},Parameter:{t}"); return t == null ? default(int?) : (int)t + 1; } }
調用結果