線程的創建 Thread Sleep/ SpinWait Sleep與SpinWait的區別: 使用Thread.Sleep()會導致等待過於進行切換,等待時間不准確,而且會由用戶模式切換到內核模式;使用SpinWait(一個輕量同步類型(結構體))來進行等待的處理,等待過程中會使用自旋等待,從而避 ...
線程的創建
Thread
1 var thread = new Thread(() => 2 { 3 Console.WriteLine("thread start:" + Thread.CurrentThread.ManagedThreadId); //ManagedThreadId為線程的id 4 Thread.Sleep(10000); 5 Console.WriteLine("thread end:" + Thread.CurrentThread.ManagedThreadId); 6 }); 7 //設置是否為後臺線程: 8 // 前臺線程:所有前臺線程執行結束後,該進程才會關閉退出(主線程和通過Thread類創建的線程預設是前臺線程) 9 // 後臺線程:所有前臺結束後,後臺線程就會立即結束(不管是否執行完成都會結束) 10 thread.IsBackground = true; 11 thread.Start();//開啟線程,不傳遞參數 12 13 //傳遞參數的 14 var thread1 = new Thread(param => 15 { 16 Thread.Sleep(3000); 17 Console.WriteLine(param); 18 }); 19 thread1.Start("val"); 20 thread1.Join(); //等待線程執行完成(使當前調用Join的線程阻塞) 21 //暫停和恢複線程都標誌為已過時了,不建議使用 22 //thread1.Suspend(); 23 //thread1.Resume(); 24 //設置線程的優先順序,註意:在NT內核的Windows平臺上建議不使用優先順序來影響線程優先調度的行為,因為根本沒法預期一個高優先順序的線程必然會先於一個低優先順序的線程執行,所以也就失去了控制線程調度的價值 25 //thread1.Priority = ThreadPriority.Highest; 26 //thread1.Abort(); //暴力的終止線程,一般不建議使用
Sleep/ SpinWait
Sleep與SpinWait的區別:
使用Thread.Sleep()會導致等待過於進行切換,等待時間不准確,而且會由用戶模式切換到內核模式;使用SpinWait(一個輕量同步類型(結構體))來進行等待的處理,等待過程中會使用自旋等待,從而避免線程頻繁的用戶模式和內核模式切換,一般用於短時的等待操作:
1 //參數一為Func<bool>,就是自旋時的迴圈體,直到返回true或者過時為止 2 SpinWait.SpinUntil(() => 3 { 4 Console.WriteLine("Spin Waiting"); 5 return false; 6 }, 1000); 7 SpinWait.SpinUntil(() => false, 1000); //返回false會進入等待狀態,類似於Thread.Sleep()等待,但是會盤旋CPU周期,在短期內等待事件準確度都高於Sleep 8 SpinWait.SpinUntil(() => true, 1000); //返回true會自動跳出等待狀態,不再休眠,繼續執行下麵的代碼
使用SpinWait做一些多線程的流程式控制制
1 int i = 0; 2 Task.Run(() => 3 { 4 Thread.Sleep(1000); //模擬一些操作 5 Interlocked.Increment(ref i); 6 }); 7 Task.Run(() => 8 { 9 Thread.Sleep(1000); //模擬一些操作 10 SpinWait.SpinUntil(() => i == 1); //等待1完成 11 Thread.Sleep(1000); //模擬一些操作 12 Interlocked.Increment(ref i); 13 }); 14 SpinWait.SpinUntil(() => i == 2); //等待所有流程完成 15 Console.WriteLine("Completed!");
ThreadPool
通過線程池創建線程,池中的線程都是後臺線程
使用線程更應該使用線程池來創建:比如一個伺服器需要處理成千上萬個客戶端鏈接,並處理不同的請求時,這種情況下如果簡單通過Thread來創建線程處理,那麼就是需要創建成千上萬個線程了,那麼多線程會頻繁的調度切換,資源浪費嚴重、性能十分低下,因此需要線程池來維護多線程(會動態調整線程數量)
1 ThreadPool.QueueUserWorkItem(param => 2 { 3 Console.WriteLine(param); //val,param為傳遞過來的參數 4 }, "val");
Task
通過Task來創建線程(線程也是由線程池維護,也是後臺線程),比ThreadPool更加靈活方便
1 var tasks = new List<Task>(); 2 tasks.Add(Task.Factory.StartNew(param => 3 { 4 Thread.Sleep(5000); 5 Console.WriteLine(param); 6 }, "val")); 7 tasks.Add(Task.Run(() => Console.WriteLine(Thread.CurrentThread.ManagedThreadId))); 8 Task.WaitAny(tasks.ToArray()); //等待(阻塞)只要有一個Task執行完畢就不再等待了 9 Task.WaitAll(tasks.ToArray()); //等待(阻塞)所有Task執行結束 10 11 //帶返回值的 12 var task = Task.Run<string>(() => 13 { 14 Thread.Sleep(3000); 15 return "rtn Val"; 16 }); 17 //task.Wait(); //等待執行結束 18 Console.WriteLine(task.Result); //獲取返回的結果,調用Result就會等待Task執行結束返回結果,因此也會造成阻塞
ConfigureAwait
1 Task.Run(() => 2 { 3 Thread.Sleep(1000); 4 Console.WriteLine("Async"); 5 6 //ConfigureAwait為false發生異常的時候不會回取捕捉原始Context(上下文), 7 //這樣子就是線上程池中運行,而不是在ASP.NET/UI的Context的上下文線程中運 8 //行了,這樣子性能上提高了 9 }).ConfigureAwait(false);
1 // Thread.Sleep是同步延遲, Task.Delay非同步延遲; 2 // Thread.Sleep不能取消,Task.Delay可以。 3 Task.Run(async () => 4 { 5 //將任務延遲1000毫秒後運行,如果無限等待那麼指定為-1 6 await Task.Delay(1000); 7 Console.WriteLine("Task Start"); 8 //CancellationToken設置為true就是標誌Task任務取消,為false和 await Task.Delay(1000)一樣將任務延遲3000毫秒後運行 9 await Task.Delay(1000, new CancellationToken(true)); 10 Console.WriteLine("這裡不會被執行,因為任務取消了~"); 11 });
Task與async/await
1 public class TaskTest 2 { 3 public Task DoAsync(string param) 4 { 5 return Task.Run(() => 6 { 7 //調用Result會阻塞直到獲取到返回值 8 NextDo(LongTimeDoAsync(param).Result); 9 }); 10 } 11 12 public async Task Do1Async(string param) 13 { 14 //對比上面的DoAsync方法,執行結果一樣,但是使用async/await配合Task使用,節省了代碼量,而且也方便外部的調用和等待處理等等 15 NextDo(await LongTimeDoAsync(param)); 16 } 17 18 async Task<object> LongTimeDoAsync(string param) 19 { 20 return await Task.Run<object>(() => 21 { 22 //執行一些耗時的操作 23 Thread.Sleep(10000); 24 return param + " ok"; 25 }); 26 } 27 28 void NextDo(object result) 29 { 30 Console.WriteLine(result); 31 } 32 }
調用:
1 var test = new TaskTest(); 2 test.DoAsync("DoAsync"); 3 test.Do1Async("Do1Async");
併發集合
在System.Collections.Concurrent下有集合,都是寫多線程安全集合,而ConcurrentXXX為併發集合,有不少方法帶有Try首碼,這些方法在多線程下執行過程中可能會失敗返回false,因此不要相信這些操作會完成任務,需要判斷返回的結果;還有BlockingCollection<T>是阻塞集合,就是添加/獲取元素的時候會阻塞線程直到操作完成。
ConcurrentDictionary
1 ConcurrentDictionary<string, string> dict = new ConcurrentDictionary<string, string>(); 2 dict.TryAdd("key1", "val1"); 3 string val; 4 dict.TryGetValue("key1", out val); 5 dict.TryUpdate("key1", "val2", val);//最後參數為比較的值,值不同才會更新 6 dict.TryRemove("key1", out val); 7 Console.WriteLine(val); //val2 8 9 val = dict.GetOrAdd("key1", "val3"); 10 val = dict.GetOrAdd("key1", "val4"); 11 Console.WriteLine(val); //val3 12 13 dict["key1"] = null; 14 //對於AddOrUpdate方法,如果指定的key已經存在,那麼調用第三個參數進行UpdateValue 15 //如果不存在,那麼調用第二個參數進行AddValue 16 val = dict.AddOrUpdate("key1", "val5", (key, oldVal) => 17 { 18 Console.WriteLine(oldVal); //null 19 return "val6"; 20 }); 21 Console.WriteLine(val); //val6 22 23 val = dict.AddOrUpdate("key2", key => 24 { 25 return "val7"; 26 }, (key, oldVal) => 27 { 28 Console.WriteLine(oldVal); 29 return "val8"; 30 }); 31 Console.WriteLine(val); //val7
ConcurrentQueue
1 ConcurrentQueue<string> q = new ConcurrentQueue<string>(); 2 q.Enqueue("val1"); 3 q.Enqueue("val2"); 4 string val; 5 q.TryPeek(out val); 6 Console.WriteLine(val); //val1 7 q.TryDequeue(out val); 8 Console.WriteLine(val); //val1
ConcurrentStack
1 ConcurrentStack<string> s = new ConcurrentStack<string>(); 2 s.Push("val1"); 3 s.Push("val2"); 4 string val; 5 s.TryPeek(out val); 6 Console.WriteLine(val); //val2 7 s.TryPop(out val); 8 Console.WriteLine(val); //val2
ConcurrentBag
1 //ConcurrentBag:無序的併發集合(相同元素可重覆添加) 2 ConcurrentBag<object> bag = new ConcurrentBag<object>(); 3 var obj = new object(); 4 bag.Add(obj); 5 bag.Add(obj); 6 Console.WriteLine(bag.Count); //2 7 while (!bag.IsEmpty) //判斷集合是否為空 8 { 9 bag.TryTake(out obj); //獲取 10 }
並行計算
Parallel
For
1 //並行計算,調用的線程會等待直到並行執行完畢 2 Parallel.For(2, 10, i => 3 { 4 //i的值為[2, 10)(不包括10),就是執行次數為8次 5 Console.WriteLine(i); 6 });
1 //MaxDegreeOfParallelism為指定並行計算的最大線程數 2 Parallel.For(1, 10, new ParallelOptions { MaxDegreeOfParallelism = 3 }, i => 3 { 4 Console.WriteLine(Thread.CurrentThread.ManagedThreadId); 5 });
1 int result = 0; 2 Parallel.For(0, 100, new ParallelOptions { MaxDegreeOfParallelism = 4 }, 3 //初始化localState 4 () => 0, 5 //並行迴圈體(i為[0, 100),也就是會執行100次) 6 (i, loop, localState) => 7 { 8 //localState從0開始,不斷累加i的值 9 return localState + i; //迴圈體中返回的結果會在下麵的回調中進行值的合併(結果的合併必須在下麵進行) 10 }, 11 //合併計算的結果 12 localState => Interlocked.Add(ref result, localState) 13 ); 14 Console.WriteLine("真實結果: {0}. 預期結果:4950.", result);
ForEach
1 int aCount = 0; 2 //並行計算,會等待(阻塞)直到執行完成 3 Parallel.ForEach("aaaabbbbbcccc", 4 //設置並行計算的最大線程數 5 new ParallelOptions { MaxDegreeOfParallelism = 4 }, 6 c => 7 { 8 //計算'a'的個數 9 if (c == 'a') 10 { 11 Interlocked.Increment(ref aCount); 12 } 13 }); 14 Console.WriteLine(aCount); //4
1 //Partitioner為設置策略分區:例如值範圍為[0, 100],每個區域的大小為4 2 Parallel.ForEach(Partitioner.Create(0, 10, 4), 3 val => 4 { 5 Console.WriteLine(val); //val是一個Tuple<int, int>,分成的區間值有:(0, 4),(4, 8),(8, 10) 6 }); 7 8 int result = 0; 9 Parallel.ForEach(Partitioner.Create(1, 101, 10), 10 val => 11 { 12 for (int i = val.Item1; i < val.Item2; i++) 13 { 14 Interlocked.Add(ref result, i); 15 } 16 }); 17 Console.WriteLine(result); //輸出:5050
1 int[] vals = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; 2 int sum = 0; 3 Parallel.ForEach( 4 vals, 5 //localSum的初始值 6 () => 0, 7 //並行執行的迴圈體 8 (val, loopState, localSum) => 9 { 10 //val為集合vals中的值 11 //這裡的操作是並行計算集合中值的總和 12 localSum += val; 13 return localSum; //迴圈體中返回的結果會在下麵的回調中進行值的合併(結果的合併必須在下麵進行) 14 }, 15 //合併計算的結果 16 (localSum) => Interlocked.Add(ref sum, localSum) 17 ); 18 Console.WriteLine(sum); //55
Invoke
1 int i = 0; 2 Action action = () => Interlocked.Increment(ref i); 3 Action action1 = () => Interlocked.Add(ref i, 2); 4 Action action2 = () => Interlocked.Add(ref i, 3); 5 //並行調用Action,調用的線程會等待直到並行執行完畢 6 Parallel.Invoke(action, action1, action2); 7 //Parallel.Invoke(new ParallelOptions { MaxDegreeOfParallelism = 3 }, action, action1, action2); 8 Console.WriteLine(i); //輸出:6
PLINQ
1 var list = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; 2 //PLINQ,進行並行計算,但是PLINQ不能限定並行計算時的最大線程數 3 list.AsParallel().ForAll(l => 4 { 5 Console.WriteLine(Thread.CurrentThread.ManagedThreadId); 6 }); 7 8 Console.WriteLine(list.AsParallel().Where(l => l > 5).Sum()); //40 9 Console.WriteLine(list.AsParallel().Aggregate((sum, val) => 10 { 11 return val + sum + 1; 12 })); //64 13 14 var list1 = new int[] { 1, 1, 1, 2, 2, 2, 3 }; 15 Console.WriteLine(list1.AsParallel().GroupBy(l => l).Count()); //3
線程同步
lock(Monitor) / SpinLock
lock
lock使用起來很簡單,為Monitor封裝的語法糖:
1 lock (obj) 2 { 3 //同步操作. 4 }
鎖的對象不要為類型Type,因為性能上會損失大:lock(typeof(Class))
Monitor
1 Monitor.Enter(obj); 2 try 3 { 4 //同步操作 5 } 6 finally 7 { 8 Monitor.Exit(obj); 9 }
使用Monitor的主要優點就是可能設置等待的超時值:
1 bool lockTaken = false; 2 Monitor.TryEnter(obj, 1000, ref lockTaken); 3 if (lockTaken) 4 { 5 try 6 { 7 //同步操作 8 } 9 finally 10 { 11 Monitor.Exit(obj); 12 } 13 } 14 else 15 { 16 Console.WriteLine("超時了"); 17 }
SpinLock自旋鎖/細粒度鎖
自旋鎖:就是在等待的過程中會做自旋等待,避免線程頻繁的用戶模式和內核模式切換
msdn中的說明:
自旋鎖可用於葉級鎖定,此時在大小方面或由於垃圾回收壓力,使用Monitor(lock)所隱含的對象分配消耗過多。自旋鎖非常有助於避免阻塞,但是如果預期有大量阻塞,由於旋轉過多,您可能不應該使用自旋鎖。當鎖是細粒度的並且數量巨大(例如鏈接的列表中每個節點一個鎖)時以及鎖保持時間總是非常短時,旋轉可能非常有幫助。通常,在保持一個旋鎖時,應避免任何這些操作:
- 阻塞,
- 調用本身可能阻塞的任何內容,
- 一次保持多個自旋鎖,
- 進行動態調度的調用(介面和虛方法)
- 在某一方不擁有的任何代碼中進行動態調度的調用,或
- 分配記憶體。
簡單封裝:
1 public class SpinLockEx 2 { 3 SpinLock _slock = new SpinLock(); 4 public void Lock(Action action) 5 { 6 bool lockTaken = false; 7 try 8 { 9 _slock.Enter(ref lockTaken); 10 action(); 11 } 12 finally 13 { 14 if(lockTaken) _slock.Exit(); 15 } 16 } 17 }
使用:
1 int ival1 = 0, ival2 = 0; 2 List<Task> list = new List<Task>(); 3 var slock = new SpinLockEx(); 4 for (int i = 0; i < 10000; i++) 5 { 6 list.Add(Task.Run(() => 7 { 8 slock.Lock(() => 9 { 10 ival1++; //註意:這裡只是模擬多線程操作共用資源,對於數值操作應該使用Interlocked 11 }); 12 })); 13 list.Add(Task.Run(() => 14 { 15 ival2++; 16 })); 17 } 18 Task.WaitAll(list.ToArray()); 19 Console.WriteLine(ival1); //值計算準確:10000 20 Console.WriteLine(ival2); //值計算可能會不准確,因為沒有做多線程安全
Mutex
Mutex互斥鎖(互斥對象)的使用作用和Monitor(lock)差不多,但是Mutex是內核對象,可以跨進程共用的,不過性能方面Monitor比較高,因為Mutex控制需要從用戶模式到內核模式,而Monitor是用戶模式下控制的。
1 bool isNew; 2 //參數一:主調線程是否初始擁有互斥對象 3 //參數二:定義互斥對象的名稱(命名互斥對象跨進程共用) 4 //參數三:該命名的互斥對象是否為新創建的 5 var m = new Mutex(false, "Tom123", out isNew); 6 if (m.WaitOne()) //等待互斥對象擁有權(一個線程擁有了,另一個線程等待擁有權,直到擁有的線程調用ReleaseMutex釋放) 7 { 8 try 9 { 10 //同步操作 11 Thread.Sleep(3000); 12 Console.WriteLine("do something"); 13 } 14 finally 15 { 16 m.ReleaseMutex(); //釋放擁有權 17 } 18 } 19 else 20 { 21 //等待失敗,如果WaitOne的時候有指定超時值,否則會一直等待 22 } 23 24 bool isNew; 25 //因為命名的互斥對象是跨進程的,因此通過第三個參數判斷互斥對象是否已經存在, 26 //可做一些檢測程式是否已經運行的操作 27 m = new Mutex(false, "Tom123", out isNew); 28 if (!isNew) 29 { 30 Console.WriteLine("該程式已經運行!"); 31 } 32 m.Dispose();//記住需要釋放資源
Event
事件對象也是內核對象,事件對象分為 人工重置 和 自動重置:
AutoResetEvent(自動重置)
1 AutoResetEvent e = new AutoResetEvent(true); //參數為是否初始化為有信號狀態 2 if (e.WaitOne()) //