問題抽象:當某一資源同一時刻允許一定數量的線程使用的時候,需要有個機制來阻塞多餘的線程,直到資源再次變得可用。線程同步方案:Semaphore、SemaphoreSlim、CountdownEvent方案特性:限量供應;除所有者外,其他人無條件等待;先到先得,沒有先後順序 1、Semaphore類 ...
問題抽象:當某一資源同一時刻允許一定數量的線程使用的時候,需要有個機制來阻塞多餘的線程,直到資源再次變得可用。
線程同步方案:Semaphore、SemaphoreSlim、CountdownEvent
方案特性:限量供應;除所有者外,其他人無條件等待;先到先得,沒有先後順序
1、Semaphore類
用於控制線程的訪問數量,預設的構造函數為initialCount和maximumCount,表示預設設置的信號量個數和最大信號量個數。當你WaitOne的時候,信號量自減,當Release的時候,信號量自增,然而當信號量為0的時候,後續的線程就不能拿到WaitOne了,所以必須等待先前的線程通過Release來釋放。
using System; using System.Threading; namespace ConsoleApp1 { class Program { static void Main(string[] args) { Thread t1 = new Thread(Run1); t1.Start(); Thread t2 = new Thread(Run2); t2.Start(); Thread t3 = new Thread(Run3); t3.Start(); Console.ReadKey(); } //初始可以授予2個線程信號,因為第3個要等待前面的Release才能得到信號 static Semaphore sem = new Semaphore(2, 10); static void Run1() { sem.WaitOne(); Console.WriteLine("大家好,我是Run1;" + DateTime.Now.ToString("mm:ss")); //兩秒後 Thread.Sleep(2000); sem.Release(); } static void Run2() { sem.WaitOne(); Console.WriteLine("大家好,我是Run2;" + DateTime.Now.ToString("mm:ss")); //兩秒後 Thread.Sleep(2000); sem.Release(); } static void Run3() { sem.WaitOne(); Console.WriteLine("大家好,我是Run3;" + DateTime.Now.ToString("mm:ss")); //兩秒後 Thread.Sleep(2000); sem.Release(); } } }Program
在以上的方法中Release()方法相當於自增一個信號量,Release(5)自增5個信號量。但是,Release()到構造函數的第二個參數maximumCount的值就不能再自增了。
Semaphore可用於進程級交互。
using System; using System.Diagnostics; using System.Threading; namespace ConsoleApp1 { class Program { static void Main(string[] args) { Thread t1 = new Thread(Run1); t1.Start(); Thread t2 = new Thread(Run2); t2.Start(); Console.Read(); } //初始可以授予2個線程信號,因為第3個要等待前面的Release才能得到信號 static Semaphore sem = new Semaphore(3, 10, "命名Semaphore"); static void Run1() { sem.WaitOne(); Console.WriteLine("進程:" + Process.GetCurrentProcess().Id + " 我是Run1" + DateTime.Now.TimeOfDay); } static void Run2() { sem.WaitOne(); Console.WriteLine("進程:" + Process.GetCurrentProcess().Id + " 我是Run2" + DateTime.Now.TimeOfDay); } } }Program
直接運行兩次bin目錄的exe文件,就能發現最多只能輸出3個。
Semaphore可以限制可同時訪問某一資源或資源池的線程數。 Semaphore類在內部維護一個計數器,當一個線程調用Semaphore對象的Wait系列方法時,此計數器減一,只要計數器還是一個正數,線程就不會阻塞。當計數器減到0時,再調用Semaphore對象Wait系列方法的線程將被阻塞,直到有線程調用Semaphore對象的Release()方法增加計數器值時,才有可能解除阻塞狀態。 示例說明:圖書館都配備有若幹台公用電腦供讀者查詢信息,當某日讀者比較多時,必須排隊等候。UseLibraryComputer實例用多線程模擬了多人使用多台電腦的過程
using System; using System.Threading; namespace ConsoleApp1 { class Program { //圖書館擁有的公用電腦 private const int ComputerNum = 3; private static Computer[] LibraryComputers; //同步信號量 public static Semaphore sp = new Semaphore(ComputerNum, ComputerNum); static void Main(string[] args) { //圖書館擁有ComputerNum臺電腦 LibraryComputers = new Computer[ComputerNum]; for (int i = 0; i < ComputerNum; i++) LibraryComputers[i] = new Computer("Computer" + (i + 1).ToString()); int peopleNum = 0; Random ran = new Random(); Thread user; System.Console.WriteLine("敲任意鍵模擬一批批的人排隊使用{0}台電腦,ESC鍵結束模擬……", ComputerNum); //每次創建若幹個線程,模擬人排隊使用電腦 while (System.Console.ReadKey().Key != ConsoleKey.Escape) { peopleNum = ran.Next(0, 10); System.Console.WriteLine("\n有{0}人在等待使用電腦。", peopleNum); for (int i = 1; i <= peopleNum; i++) { user = new Thread(UseComputer); user.Start("User" + i.ToString()); } } } //線程函數 static void UseComputer(Object UserName) { sp.WaitOne();//等待電腦可用 //查找可用的電腦 Computer cp = null; for (int i = 0; i < ComputerNum; i++) if (LibraryComputers[i].IsOccupied == false) { cp = LibraryComputers[i]; break; } //使用電腦工作 cp.Use(UserName.ToString()); //不再使用電腦,讓出來給其他人使用 sp.Release(); } } class Computer { public readonly string ComputerName = ""; public Computer(string Name) { ComputerName = Name; } //是否被占用 public bool IsOccupied = false; //人在使用電腦 public void Use(String userName) { System.Console.WriteLine("{0}開始使用電腦{1}", userName, ComputerName); IsOccupied = true; Thread.Sleep(new Random().Next(1, 2000)); //隨機休眠,以模擬人使用電腦 System.Console.WriteLine("{0}結束使用電腦{1}", userName, ComputerName); IsOccupied = false; } } }Program
2、SemaphoreSlim類 在.net 4.0之前,framework中有一個重量級的Semaphore,可以跨進程同步,SemaphoreSlim輕量級不行,msdn對它的解釋為:限制可同時訪問某一資源或資源池的線程數。
using System; using System.Threading; using System.Threading.Tasks; namespace ConsoleApp1 { class Program { static SemaphoreSlim slim = new SemaphoreSlim(Environment.ProcessorCount, 12); static void Main(string[] args) { for (int i = 0; i < 12; i++) { Task.Factory.StartNew((obj) => { Run(obj); }, i); } Console.Read(); } static void Run(object obj) { slim.Wait(); Console.WriteLine("當前時間:{0}任務 {1}已經進入。", DateTime.Now, obj); //這裡busy3s中 Thread.Sleep(3000); slim.Release(); } } }Program
同樣,防止死鎖的情況,我們需要知道”超時和取消標記“的解決方案,像SemaphoreSlim這種定死的”線程請求範圍“,其實是降低了擴展性,使用需謹慎,在覺得有必要的時候使用它
註:Semaphore類是SemaphoreSlim類的老版本,該版本使用純粹的內核時間(kernel-time)方式。 SemaphoreSlim類不使用Windows內核信號量,而且也不支持進程間同步。所以在跨程式同步的場景下可以使用Semaphore 3、CountdownEvent類 這種採用信號狀態的同步基元非常適合在動態的fork,join的場景,它採用“信號計數”的方式,就比如這樣,一個麻將桌只能容納4個人打麻將,如果後來的人也想搓一把碰碰運氣,那麼他必須等待直到麻將桌上的人走掉一位。好,這就是簡單的信號計數機制,從技術角度上來說它是定義了最多能夠進入關鍵代碼的線程數。 但是CountdownEvent更牛X之處在於我們可以動態的改變“信號計數”的大小,比如一會兒能夠容納8個線程,一下又4個,一下又10個,這樣做有什麼好處呢?比如一個任務需要載入1w條數據,那麼可能出現這種情況。 例如: 載入User表: 根據user表的數據量,我們需要開5個task。 載入Product表: 產品表數據相對比較多,計算之後需要開8個task。 載入order表: 由於我的網站訂單豐富,計算之後需要開12個task。using System; using System.Threading; using System.Threading.Tasks; namespace ConsoleApp1 { class Program { //預設的容納大小為“硬體線程“數 static CountdownEvent cde = new CountdownEvent(Environment.ProcessorCount); static void LoadUser(object obj) { try { Console.WriteLine("ThreadId={0};當前任務:{1}正在載入User部分數據!", Thread.CurrentThread.ManagedThreadId, obj); } finally { cde.Signal(); } } static void LoadProduct(object obj) { try { Console.WriteLine("ThreadId={0};當前任務:{1}正在載入Product部分數據!", Thread.CurrentThread.ManagedThreadId, obj); } finally { cde.Signal(); } } static void LoadOrder(object obj) { try { Console.WriteLine("ThreadId={0};當前任務:{1}正在載入Order部分數據!", Thread.CurrentThread.ManagedThreadId, obj); } finally { cde.Signal(); } } static void Main(string[] args) { //載入User表需要5個任務 var userTaskCount = 5; //重置信號 cde.Reset(userTaskCount); for (int i = 0; i < userTaskCount; i++) { Task.Factory.StartNew((obj) => { LoadUser(obj); }, i); } //等待所有任務執行完畢 cde.Wait(); Console.WriteLine("\nUser表數據全部載入完畢!\n"); //載入product需要8個任務 var productTaskCount = 8; //重置信號 cde.Reset(productTaskCount); for (int i = 0; i < productTaskCount; i++) { Task.Factory.StartNew((obj) => { LoadProduct(obj); }, i); } cde.Wait(); Console.WriteLine("\nProduct表數據全部載入完畢!\n"); //載入order需要12個任務 var orderTaskCount = 12; //重置信號 cde.Reset(orderTaskCount); for (int i = 0; i < orderTaskCount; i++) { Task.Factory.StartNew((obj) => { LoadOrder(obj); }, i); } cde.Wait(); Console.WriteLine("\nOrder表數據全部載入完畢!\n"); Console.WriteLine("\n(*^__^*) 嘻嘻,恭喜你,數據全部載入完畢\n"); Console.Read(); } } }Program
我們看到有兩個主要方法:Wait和Signal。每調用一次Signal相當於麻將桌上走了一個人,直到所有人都搓過麻將wait才給放行,這裡同樣要註意也就是“超時“問題的存在性,尤其是在並行計算中,輕量級別給我們提供了”取消標記“的機制,這是在重量級別中不存在的
註:如果調用Signal()沒有到達指定的次數,那麼Wait()將一直等待,請確保使用每個線程完成後都要調用Signal方法。