--近期有一個需要運用多線程的項目,會有併發概率,所以寫了一份代碼,可能有寫地方還不完善,後續有需求在改 1 /// <summary> 2 /// 併發對象 3 /// </summary> 4 public class MeterAsyncQueue 5 { 6 public MeterAsyn... ...
--近期有一個需要運用多線程的項目,會有併發概率,所以寫了一份代碼,可能有寫地方還不完善,後續有需求在改
1 /// <summary> 2 /// 併發對象 3 /// </summary> 4 public class MeterAsyncQueue 5 { 6 public MeterAsyncQueue() 7 { 8 MeterInfoTask = new MeterInfo(); 9 } 10 11 public MeterInfo MeterInfoTask { get; set; } 12 } 13 public class MeterInfo 14 { 15 public MeterInfo() 16 { 17 18 } 19 public int Id { get; set; } 20 21 }
1 /// <summary> 2 /// 線程通用類 3 /// </summary> 4 public class TaskCommand 5 { 6 CancellationTokenSource tokenSource = new CancellationTokenSource(); 7 ManualResetEvent resetEvent = new ManualResetEvent(true); 8 Thread thread = null; 9 /// <summary> 10 /// 開始任務 11 /// </summary> 12 public void StartData() 13 { 14 tokenSource = new CancellationTokenSource(); 15 resetEvent = new ManualResetEvent(true); 16 17 List<int> Ids = new List<int>(); 18 for (int i = 0; i < 10000; i++) 19 { 20 Ids.Add(i); 21 } 22 thread = new Thread(new ThreadStart(() => StartTask(Ids))); 23 thread.Start(); 24 } 25 /// <summary> 26 /// 暫停任務 27 /// </summary> 28 public void OutData() 29 { 30 //task暫停 31 resetEvent.Reset(); 32 } 33 /// <summary> 34 /// 繼續任務 35 /// </summary> 36 public void ContinueData() 37 { 38 //task繼續 39 resetEvent.Set(); 40 } 41 /// <summary> 42 /// 取消任務 43 /// </summary> 44 public void Cancel() 45 { 46 //釋放對象 47 resetEvent.Dispose(); 48 foreach (var CurrentTask in ParallelTasks) 49 { 50 if (CurrentTask != null) 51 { 52 if (CurrentTask.Status == TaskStatus.Running) { } 53 { 54 //終止task線程 55 tokenSource.Cancel(); 56 } 57 } 58 } 59 thread.Abort(); 60 } 61 /// <summary> 62 /// 執行數據 63 /// </summary> 64 /// <param name="Index"></param> 65 public void Execute(int Index) 66 { 67 //阻止當前線程 68 resetEvent.WaitOne(); 69 70 Console.WriteLine("當前第" + Index + "個線程"); 71 72 Thread.Sleep(1000); 73 74 } 75 //隊列對象 76 private Queue<MeterAsyncQueue> AsyncQueues { get; set; } 77 78 /// <summary> 79 /// 併發任務數 80 /// </summary> 81 private int ParallelTaskCount { get; set; } 82 83 84 /// <summary> 85 /// 並行任務集合 86 /// </summary> 87 private List<Task> ParallelTasks { get; set; } 88 //控制線程並行數量 89 public void StartTask(List<int> Ids) 90 { 91 IsInitTask = true; 92 ParallelTasks = new List<Task>(); 93 AsyncQueues = new Queue<MeterAsyncQueue>(); 94 //獲取併發數 95 ParallelTaskCount = 5; 96 97 //初始化非同步隊列 98 InitAsyncQueue(Ids); 99 //開始執行隊列任務 100 HandlingTask(); 101 102 Task.WaitAll(new Task[] { Task.WhenAll(ParallelTasks.ToArray()) }); 103 } 104 /// <summary> 105 /// 初始化非同步隊列 106 /// </summary> 107 private void InitAsyncQueue(List<int> Ids) 108 { 109 foreach (var item in Ids) 110 { 111 MeterInfo info = new MeterInfo(); 112 info.Id = item; 113 AsyncQueues.Enqueue(new MeterAsyncQueue() 114 { 115 MeterInfoTask = info 116 }); 117 } 118 } 119 /// <summary> 120 /// 是否首次執行任務 121 /// </summary> 122 private bool IsInitTask { get; set; } 123 //鎖 124 private readonly object _objLock = new object(); 125 126 /// <summary> 127 /// 開始執行隊列任務 128 /// </summary> 129 private void HandlingTask() 130 { 131 lock (_objLock) 132 { 133 if (AsyncQueues.Count <= 0) 134 { 135 return; 136 } 137 138 var loopCount = GetAvailableTaskCount(); 139 //併發處理隊列 140 for (int i = 0; i < loopCount; i++) 141 { 142 HandlingQueue(); 143 } 144 IsInitTask = false; 145 } 146 } 147 /// <summary> 148 /// 獲取隊列鎖 149 /// </summary> 150 private readonly object _queueLock = new object(); 151 152 /// <summary> 153 /// 處理隊列 154 /// </summary> 155 private void HandlingQueue() 156 { 157 CancellationToken token = tokenSource.Token; 158 lock (_queueLock) 159 { 160 if (AsyncQueues.Count > 0) 161 { 162 var asyncQueue = AsyncQueues.Dequeue(); 163 164 if (asyncQueue == null) return; 165 var task = Task.Factory.StartNew(() => 166 { 167 if (token.IsCancellationRequested) 168 { 169 return; 170 } 171 //阻止當前線程 172 resetEvent.WaitOne(); 173 //執行任務 174 Execute(asyncQueue.MeterInfoTask.Id); 175 176 }, token).ContinueWith(t => 177 { 178 HandlingTask(); 179 }, TaskContinuationOptions.OnlyOnRanToCompletion | TaskContinuationOptions.ExecuteSynchronously); 180 ParallelTasks.Add(task); 181 } 182 } 183 } 184 /// <summary> 185 /// 獲取當前有效並行的任務數 186 /// </summary> 187 /// <returns></returns> 188 [MethodImpl(MethodImplOptions.Synchronized)] 189 private int GetAvailableTaskCount() 190 { 191 if (IsInitTask) 192 return ParallelTaskCount; 193 return 1; 194 } 195 }