為什麼編寫TaskSchedulerEx類? 因為.NET預設線程池只有一個線程池,如果某個批量任務一直占著大量線程,甚至耗盡預設線程池,則會嚴重影響應用程式域中其它任務或批量任務的性能。 特點: 1、使用獨立線程池,線程池中線程分為核心線程和輔助線程,輔助線程會動態增加和釋放,且匯流排程數不大於參數 ...
為什麼編寫TaskSchedulerEx類?
因為.NET預設線程池只有一個線程池,如果某個批量任務一直占著大量線程,甚至耗盡預設線程池,則會嚴重影響應用程式域中其它任務或批量任務的性能。
特點:
1、使用獨立線程池,線程池中線程分為核心線程和輔助線程,輔助線程會動態增加和釋放,且匯流排程數不大於參數_maxThreadCount
2、無縫相容Task,使用上和Task一樣,可以用它來實現非同步,參見:C# async await 非同步執行方法封裝 替代 BackgroundWorker
3、隊列中尚未執行的任務可以取消
4、通過擴展類TaskHelper實現任務分組
5、和SmartThreadPool對比,優點是無縫相容Task類,和Task類使用沒有區別,因為它本身就是對Task、TaskScheduler的擴展,所以Task類的ContinueWith、WaitAll等方法它都支持,以及相容async、await非同步編程
6、代碼量相當精簡,TaskSchedulerEx類只有230多行代碼
7、池中的線程數量會根據負載自動增減,支持,但沒有SmartThreadPool智能,為了性能,使用了比較笨的方式實現,不知道大家有沒有既智能,性能又高的方案,我有一個思路,在定時器中計算每個任務執行平均耗時,然後使用公式(線程數 = CPU核心數 * ( 本地計算時間 + 等待時間 ) / 本地計算時間)來計算最佳線程數,然後按最佳線程數來動態創建線程,但這個計算過程可能會犧牲性能
對比SmartThreadPool:
TaskSchedulerEx類代碼(使用BlockingCollection,當線程數大於200時,CPU占用高,線程數小於100時,CPU占用正常):
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Runtime.InteropServices; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Utils { public class TaskSchedulerEx : TaskScheduler, IDisposable { #region 外部方法 [DllImport("kernel32.dll", EntryPoint = "SetProcessWorkingSetSize")] public static extern int SetProcessWorkingSetSize(IntPtr process, int minSize, int maxSize); #endregion #region 變數屬性事件 private BlockingCollection<Task> _tasks = new BlockingCollection<Task>(); List<Thread> _threadList = new List<Thread>(); private int _threadCount = 0; private int _maxThreadCount = 0; private int _timeOut = Timeout.Infinite; private int _extTimeOut = 3000; private Task _tempTask; private int _activeThreadCount = 0; private System.Timers.Timer _timer; private object _lockCreateTimer = new object(); /// <summary> /// 活躍線程數 /// </summary> public int ActiveThreadCount { get { return _activeThreadCount; } } /// <summary> /// 核心線程數 /// </summary> public int CoreThreadCount { get { return _threadCount; } } /// <summary> /// 最大線程數 /// </summary> public int MaxThreadCount { get { return _maxThreadCount; } } #endregion #region 構造函數 public TaskSchedulerEx(int threadCount = 10, int maxThreadCount = 20) { _maxThreadCount = maxThreadCount; CreateThreads(threadCount); } #endregion #region override GetScheduledTasks protected override IEnumerable<Task> GetScheduledTasks() { return _tasks; } #endregion #region override TryExecuteTaskInline protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) { return false; } #endregion #region override QueueTask protected override void QueueTask(Task task) { _tasks.Add(task); } #endregion #region 資源釋放 /// <summary> /// 資源釋放 /// 如果尚有任務在執行,則會在調用此方法的線程上引發System.Threading.ThreadAbortException,請使用Task.WaitAll等待任務執行完畢後,再調用該方法 /// </summary> public void Dispose() { if (_timer != null) { _timer.Stop(); _timer.Dispose(); _timer = null; } _timeOut = 100; foreach (Thread item in _threadList) { item.Abort(); Interlocked.Decrement(ref _activeThreadCount); } _threadList.Clear(); GC.Collect(); GC.WaitForPendingFinalizers(); if (Environment.OSVersion.Platform == PlatformID.Win32NT) { SetProcessWorkingSetSize(System.Diagnostics.Process.GetCurrentProcess().Handle, -1, -1); } } #endregion #region 創建線程池 /// <summary> /// 創建線程池 /// </summary> private void CreateThreads(int? threadCount = null) { if (threadCount != null) _threadCount = threadCount.Value; _timeOut = Timeout.Infinite; for (int i = 0; i < _threadCount; i++) { Interlocked.Increment(ref _activeThreadCount); Thread thread = new Thread(new ThreadStart(() => { Task task; while (_tasks.TryTake(out task, _timeOut)) { CreateTimer(); TryExecuteTask(task); } })); thread.IsBackground = true; thread.Start(); _threadList.Add(thread); } } #endregion #region 創建線程 /// <summary> /// 創建線程 /// </summary> private void CreateThread() { Interlocked.Increment(ref _activeThreadCount); Thread thread = null; thread = new Thread(new ThreadStart(() => { Task task; while (_tasks.TryTake(out task, _extTimeOut)) { TryExecuteTask(task); } Interlocked.Decrement(ref _activeThreadCount); if (_activeThreadCount == _threadCount) { GC.Collect(); GC.WaitForPendingFinalizers(); if (Environment.OSVersion.Platform == PlatformID.Win32NT) { SetProcessWorkingSetSize(System.Diagnostics.Process.GetCurrentProcess().Handle, -1, -1); } } if (thread != null) { thread.Abort(); thread = null; } })); thread.IsBackground = true; thread.Start(); } #endregion #region 創建定時器 private void CreateTimer() { if (_timer == null) //_timer不為空時,跳過,不走lock,提升性能 { lock (_lockCreateTimer) { if (_timer == null) { int interval = 20; _timer = new System.Timers.Timer(); _timer.Interval = 1000; _timer.Elapsed += (s, e) => { if (_timer.Interval != interval) _timer.Interval = interval; if (_activeThreadCount >= _threadCount && _activeThreadCount < _maxThreadCount) { if (_tasks.Count > 0) { CreateThread(); } else { if (_timer != null) { _timer.Stop(); _timer.Dispose(); _timer = null; } } } }; _timer.Start(); } } } } #endregion #region 全部取消 /// <summary> /// 全部取消 /// 當前正在執行的任務無法取消,取消的只是後續任務,相當於AbortAll /// </summary> public void CancelAll() { while (_tasks.TryTake(out _tempTask)) { } } #endregion } }View Code
TaskSchedulerEx類代碼(使用ConcurrentQueue,測試500個線程,CPU占用0%-2%,正常):
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Runtime.InteropServices; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Utils { public class TaskSchedulerEx : TaskScheduler, IDisposable { #region 外部方法 [DllImport("kernel32.dll", EntryPoint = "SetProcessWorkingSetSize")] public static extern int SetProcessWorkingSetSize(IntPtr process, int minSize, int maxSize); #endregion #region 變數屬性事件 private ConcurrentQueue<Task> _tasks = new ConcurrentQueue<Task>(); List<Thread> _threadList = new List<Thread>(); private int _threadCount = 0; private int _maxThreadCount = 0; private int _timeOut = Timeout.Infinite; private int _extTimeOut = 20000; private Task _tempTask; private int _activeThreadCount = 0; private System.Timers.Timer _timer; private object _lockCreateTimer = new object(); private bool _run = true; private SpinWait _spinWait = new SpinWait(); /// <summary> /// 活躍線程數 /// </summary> public int ActiveThreadCount { get { return _activeThreadCount; } } /// <summary> /// 核心線程數 /// </summary> public int CoreThreadCount { get { return _threadCount; } } /// <summary> /// 最大線程數 /// </summary> public int MaxThreadCount { get { return _maxThreadCount; } } #endregion #region 構造函數 public TaskSchedulerEx(int threadCount = 10, int maxThreadCount = 20) { _maxThreadCount = maxThreadCount; CreateThreads(threadCount); } #endregion #region override GetScheduledTasks protected override IEnumerable<Task> GetScheduledTasks() { return _tasks; } #endregion #region override TryExecuteTaskInline protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) { return false; } #endregion #region override QueueTask protected override void QueueTask(Task task) { _tasks.Enqueue(task); } #endregion #region 資源釋放 /// <summary> /// 資源釋放 /// 如果尚有任務在執行,則會在調用此方法的線程上引發System.Threading.ThreadAbortException,請使用Task.WaitAll等待任務執行完畢後,再調用該方法 /// </summary> public void Dispose() { _run = false; if (_timer != null) { _timer.Stop(); _timer.Dispose(); _timer = null; } _timeOut = 100; foreach (Thread item in _threadList) { item.Abort(); Interlocked.Decrement(ref _activeThreadCount); } _threadList.Clear(); GC.Collect(); GC.WaitForPendingFinalizers(); if (Environment.OSVersion.Platform == PlatformID.Win32NT) { SetProcessWorkingSetSize(System.Diagnostics.Process.GetCurrentProcess().Handle, -1, -1); } } #endregion #region 創建線程池 /// <summary> /// 創建線程池 /// </summary> private void CreateThreads(int? threadCount = null) { if (threadCount != null) _threadCount = threadCount.Value; _timeOut = Timeout.Infinite; for (int i = 0; i < _threadCount; i++) { Interlocked.Increment(ref _activeThreadCount); Thread thread = new Thread(new ThreadStart(() => { Task task; while (_run) { if (_tasks.TryDequeue(out task)) { CreateTimer(); TryExecuteTask(task); } else { Thread.Sleep(10); } } })); thread.IsBackground = true; thread.Start(); _threadList.Add(thread); } } #endregion #region 創建線程 /// <summary> /// 創建線程 /// </summary> private void CreateThread() { Interlocked.Increment(ref _activeThreadCount); Thread thread = null; thread = new Thread(new ThreadStart(() => { Task task; DateTime dt = DateTime.Now; while (_run && DateTime.Now.Subtract(dt).TotalMilliseconds < _extTimeOut) { if (_tasks.TryDequeue(out task)) { TryExecuteTask(task); dt = DateTime.Now; } else { Thread.Sleep(100); } } Interlocked.Decrement(ref _activeThreadCount); if (_activeThreadCount == _threadCount) { GC.Collect(); GC.WaitForPendingFinalizers(); if (Environment.OSVersion.Platform == PlatformID.Win32NT) { SetProcessWorkingSetSize(System.Diagnostics.Process.GetCurrentProcess().Handle, -1, -1); } } if (thread != null) { thread.Abort(); thread = null; } })); thread.IsBackground = true; thread.Start(); } #endregion #region 創建定時器 private void CreateTimer() { if (_timer == null) //_timer不為空時,跳過,不走lock,提升性能 { lock (_lockCreateTimer) { if (_timer == null) { int interval = 20; _timer = new System.Timers.Timer(); _timer.Interval = 500; _timer.Elapsed += (s, e) => { if (_timer.Interval != interval) _timer.Interval = interval; if (_activeThreadCount >= _threadCount && _activeThreadCount < _maxThreadCount) { if (_tasks.Count > 0) { CreateThread(); } else { if (_timer != null) { _timer.Stop(); _timer.Dispose(); _timer = null; } } } }; _timer.Start(); } } } } #endregion #region 全部取消 /// <summary> /// 全部取消 /// 當前正在執行的任務無法取消,取消的只是後續任務,相當於AbortAll /// </summary> public void CancelAll() { while (_tasks.TryDequeue(out _tempTask)) { } } #endregion } }View Code
RunHelper類代碼:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Utils { /// <summary> /// 線程工具類 /// </summary> public static class RunHelper { #region 變數屬性事件 #endregion #region 線程中執行 /// <summary> /// 線程中執行 /// </summary> public static Task Run(this TaskScheduler scheduler, Action<object> doWork, object arg = null, Action<Exception> errorAction = null) { return Task.Factory.StartNew((obj) => { try { doWork(obj); } catch (Exception ex) { if (errorAction != null) errorAction(ex); LogUtil.Error(ex, "ThreadUtil.Run錯誤"); } }, arg, CancellationToken.None, TaskCreationOptions.None, scheduler); } #endregion #region 線程中執行 /// <summary> /// 線程中執行 /// </summary> public static Task Run(this TaskScheduler scheduler, Action doWork, Action<Exception> errorAction = null) { return Task.Factory.StartNew(() => { try { doWork(); } catch (Exception ex) { if (errorAction != null) errorAction(ex); LogUtil.Error(ex, "ThreadUtil.Run錯誤"); } }, CancellationToken.None, TaskCreationOptions.None, scheduler); } #endregion #region 線程中執行 /// <summary> /// 線程中執行 /// </summary> public static Task<T> Run<T>(this TaskScheduler scheduler, Func<object, T> doWork, object arg = null, Action<Exception> errorAction = null) { return Task.Factory.StartNew<T>((obj) => { try { return doWork(obj); } catch (Exception ex) { if (errorAction != null) errorAction(ex); LogUtil.Error(ex, "ThreadUtil.Run錯誤"); return default(T); } }, arg, CancellationToken.None, TaskCreationOptions.None, scheduler); } #endregion #region 線程中執行 /// <summary> /// 線程中執行 /// </summary> public static Task<T> Run<T>(this TaskScheduler scheduler, Func<T> doWork, Action<Exception> errorAction = null) { return Task.Factory.StartNew<T>(() => { try { return doWork(); } catch (Exception ex) { if (errorAction != null) errorAction(ex); LogUtil.Error(ex, "ThreadUtil.Run錯誤"); return default(T); } }, CancellationToken.None, TaskCreationOptions.None, scheduler); } #endregion #region 線程中執行 /// <summary> /// 線程中執行 /// </summary> public static async Task<T> RunAsync<T>(this TaskScheduler scheduler, Func<object, T> doWork, object arg = null, Action<Exception> errorAction = null) { return await Task.Factory.StartNew<T>((obj) => { try { return doWork(obj); } catch (Exception ex) { if (errorAction != null) errorAction(ex); LogUtil.Error(ex, "ThreadUtil.Run錯誤"); return default(T); } }, arg, CancellationToken.None, TaskCreationOptions.None, scheduler); } #endregion #region 線程中執行 /// <summary> /// 線程中執行 /// </summary> public static async Task<T> RunAsync<T>(this TaskScheduler scheduler, Func<T> doWork, Action<Exception> errorAction = null) { return await Task.Factory.StartNew<T>(() => { try { return doWork(); } catch (Exception ex) { if (errorAction != null) errorAction(ex); LogUtil.Error(ex, "ThreadUtil.Run錯誤"); return default(T); } }, CancellationToken.None, TaskCreationOptions.None, scheduler); } #endregion } }View Code
TaskHelper擴展類(代碼中LimitedTaskScheduler改為TaskSchedulerEx即可)(這個任務分類有點多,每個任務分類的核心線程一般是不釋放的,一直占著線程,算不算濫用):
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace Utils { /// <summary> /// Task幫助類基類 /// </summary> public class TaskHelper { #region UI任務 private static LimitedTaskScheduler _UITask; /// <summary> /// UI任務(4個線程) /// </summary> public static LimitedTaskScheduler UITask { get { if (_UITask == null) _UITask = new LimitedTaskScheduler(4); return _UITask; } } #endregion #region 菜單任務 private static LimitedTaskScheduler _MenuTask; /// <summary> /// 菜單任務 /// </summary> public static LimitedTaskScheduler MenuTask { get { if (_MenuTask == null) _MenuTask = new LimitedTaskScheduler(2); return _MenuTask; } } #endregion #region 計算任務 private static LimitedTaskScheduler _CalcTask; /// <summary> /// 計算任務(8個線程) /// </summary> public static LimitedTaskScheduler CalcTask { get { if (_CalcTask == null) _CalcTask = new LimitedTaskScheduler(8); return _CalcTask; } } #endregion #region 網路請求 private static LimitedTaskScheduler _RequestTask; /// <summary> /// 網路請求(32個線程) /// </summary> public static LimitedTaskScheduler RequestTask { get { if (_RequestTask == null) _RequestTask = new LimitedTaskScheduler(32); return _RequestTask; } } #endregion #region 資料庫任務 private static LimitedTaskScheduler _DBTask; /// <summary> /// 資料庫任務(32個線程) /// </summary> public static LimitedTaskScheduler DBTask { get { if (_DBTask == null) _DBTask = new LimitedTaskScheduler(32); return _DBTask; } } #endregion #region IO任務 private static LimitedTaskScheduler _IOTask; /// <summary> /// IO任務(8個線程) /// </summary> public static LimitedTaskScheduler IOTask { get { if (_IOTask == null) _IOTask = new LimitedTaskScheduler(8); return _IOTask; } } #endregion #region 首頁任務 private static LimitedTaskScheduler _MainPageTask; /// <summary> /// 首頁任務(16個線程) /// </summary> public static Limit