前言 拋開死鎖不談,只聊性能問題,儘管鎖總能粗暴的滿足同步需求,但一旦存在競爭關係,意味著一定會有線程被阻塞,競爭越激烈,被阻塞的線程越多,上下文切換次數越多,調度成本越大,顯然在高併發的場景下會損害性能。在高併發高性能且要求線程安全的述求下,無鎖構造(非阻塞構造)閃亮登場。 參考文檔: C# - ...
前言
拋開死鎖不談,只聊性能問題,儘管鎖總能粗暴的滿足同步需求,但一旦存在競爭關係,意味著一定會有線程被阻塞,競爭越激烈,被阻塞的線程越多,上下文切換次數越多,調度成本越大,顯然在高併發的場景下會損害性能。在高併發高性能且要求線程安全的述求下,無鎖構造(非阻塞構造)閃亮登場。
參考文檔:
一、非阻塞同步
重排序與緩存
我們觀察下麵這個例子:
public class Foo
{
private int _answer;
private bool _complete;
void A() //A 1
{
_answer = 10;
_complete = true;
}
void B() //B 2
{
if (_complete) Console.WriteLine(_answer);
}
}
如果方法A
和B
在不同的線程上併發運行,B
可能會列印 “ 0 “ 嗎?答案是會的,原因如下:
- 編譯器、CLR 或 CPU 可能會對代碼/指令進行重排序(reorder)以提高效率。
- 編譯器、CLR 或 CPU 可能會進行緩存優化,導致其它線程不能馬上看到變數的新值。
請務必重視它們,它們將是幽靈般的存在
int x = 0, y = 0, a = 0, b = 0;
var task1 = Task.Run(() => // A 1
{
a = 1; // 1
x = b; // 2
});
var task2 = Task.Run(() => // B 2
{
b = 2; // 3
y = a; // 4
});
Task.WaitAll(task1, task2);
Console.WriteLine("x:" + x + " y:" + y);
直覺和經驗告訴我們,程式至頂向下執行:代碼1一定發生在代碼2之前,代碼3一定發生在代碼4之前,然鵝
在一個獨立的線程中,每一個語句的執行順序是可以被保證的,但在不使用lock,waithandle這樣的顯式同步操作時,我們就沒法保證事件在不同的線程中看到的執行順序是一致的了。儘管線程A中一定需要觀察到a=1執行成功之後才會去執行x=b,但它沒法確保自己觀察得到線程B中對b的寫入,所以A還可能會列印出y的一個舊版的值。這就叫指令重排序。
x:0 y:1 #1-2-3-4
x:2 y:0 #3-4-1-2
x:2 y:1 #1-3-2-4
可實際運行時還是有些讓我們驚訝的情況:
x:0 y:0 #??
這就是緩存問題,如果兩個線程在不同的CPU上執行,每一個核心有自己的緩存,這樣一個線程的寫入對於其它線程,在主存同步之前就是不可見的了。
C#編譯器和CLR運行時會非常小心的保證上述優化不會破壞普通的單線程代碼,和正確使用鎖的多線程代碼。但有時,你仍然需要通過顯示的創建記憶體屏障(memory barrier,也稱作記憶體柵欄 (memory fence))來對抗這些優化,限制指令重排序和讀寫緩存產生的影響。
記憶體屏障
處理器支持哪種記憶體重排序(LoadLoad重排序、LoadStore重排序、StoreStore重排序、StoreLoad重排序),就會提供相對應能夠禁止重排序的指令,而這些指令就被稱之為記憶體屏障(LoadLoad屏障、LoadStore屏障、StoreStore屏障、StoreLoad屏障)
屏障名稱 | 示例 | 具體作用 |
---|---|---|
StoreLoad | Store1;Store2;Store3;StoreLoad;Load1;Load2;Load3 | 禁止StoreLoad重排序,確保屏障之前任何一個寫(如Store2)的結果都會在屏障後任意一個讀操作(如Load1)載入之前被寫入 |
StoreStore | Store1;Store2;Store3;StoreStore;Store4;Store5;Store6 | 禁止StoreStore重排序,確保屏障之前任何一個寫(如Store1)的結果都會在屏障後任意一個寫操作(如Store4)之前被寫入 |
LoadLoad | Load1;Load2;Load3;LoadLoad;Load4;Load5;Load6 | 禁止LoadLoad重排序,確保屏障之前任何一個讀(如Load1)的數據都會在屏障後任意一個讀操作(如Load4)之前被載入 |
LoadStore | Load1;Load2;Load3;LoadStore;Store1;Store2;Store3 | 禁止LoadStore重排序,確保屏障之前任何一個讀(如Load1)的數據都會在屏障後任意一個寫操作(如Store1)的結果被寫入高速緩存(或主記憶體)前被載入 |
讀屏障告訴處理器在執行任何的載入前,執行所有已經在失效隊列(Invalidte Queues)中的失效(I)指令。即:所有load barrier之前的store指令對之後(本核心和其他核心)的指令都是可見的。
Store Memory Barrier:寫屏障,等同於前文的StoreStore Barriers 將store buffer都寫入緩存。
寫屏障告訴處理器在執行這之後的指令之前,執行所有已經在存儲緩存(store buffer)中的修改(M)指令。即:所有store barrier之前的修改(M)指令都是對之後的指令可見。
最簡單的記憶體屏障是完全記憶體屏障(full memory barrier,或全柵欄(full fence)),它可以阻止所有跨越柵欄的指令進行重排並提交修改和刷新緩存
。記憶體屏障之前的所有寫操作都要寫入記憶體,並將記憶體中的新值刷到緩存,使得其它CPU核心能夠讀取到最新值,完全保證了數據的強一致性,進而解決CPU緩存帶來的可見性問題。
我們簡單修改一下前面的案例
void A()
{
_answer = 10;
Thread.MemoryBarrier(); // 1
_complete = true;
Thread.MemoryBarrier(); // 3
}
void B()
{
Thread.MemoryBarrier(); // 2
if (_complete)
{
_testOutputHelper.WriteLine(_answer.ToString());
}
}
屏障1,3使得這個例子不可能列印出0,屏障2保證如果B在A之後執行,_complete一定讀到的是true
記憶體屏障離我們並不遙遠,以下方式都會隱式的使用全柵欄:
-
lock語法糖或
Monitor.Enter
/Monitor.Exit
-
Interlocked
類中的所有方法 -
使用線程池的非同步回調,包括非同步委托,APM回調,以及任務延續(task continuations)
-
信號構造的等待/複位
-
任何依賴信號同步的情況,比如啟動或等待Task,因此下麵的代碼也是線程安全的
int x = 0; Task t = Task.Factory.StartNew (() => x++); t.Wait(); Console.WriteLine (x); // 1
volatile
另一個(更高級的)解決這個問題的方法是對_complete
欄位使用volatile
關鍵字。
volatile bool _complete;
volatile
關鍵字通知編譯器在每個讀這個欄位的地方使用一個讀柵欄(acquire-fence),並且在每個寫這個欄位的地方使用一個寫柵欄(release-fence)。
這種“半柵欄(half-fences)”比全柵欄更快,因為它給了運行時和硬體更大的優化空間。
讀柵欄:也就是讀屏障(Store Memory Barrier),等同於前文的LoadLoad Barriers 將Invalidate的 都執行完成。告訴處理器在執行任何的載入前,執行所有已經在失效隊列(Invalidte Queues)中的失效(I)指令。即:所有load barrier之前的store指令對之後(本核心和其他核心)的指令都是可見的。
寫柵欄:也就是寫屏障(Store Memory Barrier),等同於前文的StoreStore Barriers 將store buffer都寫入主存。
告訴處理器在執行這之後的指令之前,執行所有已經在存儲緩存(store buffer)中的修改(M)指令。即:所有store barrier之前的修改(M)指令都是對之後的指令可見。
巧的是,Intel 的 X86 和 X64 處理器總是在讀時使用讀柵欄,寫時使用寫柵欄,無論是否使用
volatile
關鍵字。所以在使用這些處理器的情況下,這個關鍵字對硬體來說是無效的。然而,volatile
關鍵字對編譯器和 CLR 進行的優化是有作用的,以及在 64 位 AMD 和 Itanium 處理器上也是有作用的。這意味著不能因為你的客戶端運行在特定類型的 CPU 上而放鬆警惕。
註意:使用volatile
不能阻止寫-讀被交換
第一條指令 | 第二條指令 | 是否會被交換 |
---|---|---|
讀 | 讀 | 不會 |
讀 | 寫 | 不會 |
寫 | 寫 | 不會(CLR 確保寫-寫操作永遠不會被交換,就算是沒有volatile 關鍵字) |
寫 | 讀 | 會! |
在下麵案例中仍然有可能會列印00的情況(對a的讀取可能發生在寫入前--重排序)
int a = 0, b = 0;
int x = 0, y = 0;
var task1 = Task.Run(() =>
{
Thread.VolatileWrite(ref a, 1);
x = Thread.VolatileRead(ref b);
});
var task2 = Task.Run(() =>
{
Thread.VolatileWrite(ref b, 2);
y = Thread.VolatileRead(ref a);
});
Task.WaitAll(task1, task2);
Console.WriteLine("x:" + x + " y:" + y);
volatile
關鍵字不能應用於數組元素,不能用在捕獲的局部變數:這些情況下你必須使用VolatileRead
和VolatileWrite
方法
從上面的例子我們可以看出,寫-讀操作可能被重新排序,官方的解釋是:
在多處理器系統上,易失性讀取操作不保證獲取由任何處理器寫入該記憶體位置的最新值。 同樣,易失性寫入操作不保證寫入的值會立即對其他處理器可見。
(我的理解是:
volatile
關鍵字只能解決重排序問題,解決不了多處理器的緩存一致性問題)
註意double
和 long
無法標記為 volatile
,因為對這些類型的欄位的讀取和寫入不能保證是原子的。 若要保護對這些類型欄位的多線程訪問,請使用 Interlocked 類成員或使用 lock
語句保護訪問許可權。
Interlocked
位於System.Threading
,為多個線程共用的變數提供原子操作,這也是DOTNET為數不多的線程安全類型之一。
Interlocked
通過將原子性的需求傳達給操作系統和CLR來進行實現其功能,此類的成員不會引發異常。
可以防止 1.線程上下文切換,2.線程更新可由其他線程訪問的變數時,或者當兩個線程同時在不同的處理器上執行時 可能會出現的錯誤。
場景:
int i = 0;
i ++;
在大多數電腦上,自增並不是原子操作,需要以下步驟:
- 將變數
i
的值載入到寄存器中。 - 計算
i + 1
。 - 將上面的計算結果存儲在變數
i
中。
假設A線程執行完1-2時被搶占,B線程執行1-2-3,當A線程恢復時繼續執行3,此時B線程的值就被覆蓋掉了。
使用Increment
即可解決,123會被打包成一個操作,以原子的方式實現自增
CAS
CAS 操作包含三個操作數 —— 記憶體位置(V)、預期原值(A)和新值(B)。如果記憶體位置的值與預期原值相匹配,那麼處理器會自動將該位置值更新為新值。否則,處理器不做任何操作。無論哪種情況,它都會在 CAS 指令之前返回該位置的值。(在 CAS 的一些特殊情況下將僅返回 CAS 是否成功,而不提取當前值。)CAS 有效地說明瞭“我認為位置 V 應該包含值 A;如果包含該值,則將 B 放到這個位置;否則,不要更改該位置,只告訴我這個位置的值即可。”
Interlocked.CompareExchange,實現了CAS:比較兩個值是否相等,如果相等,則替換第一個值,否則什麼都不做,最終返回這個位置的原始值。
Interlocked.CompareExchange(ref _num, 1000, 500);
CAS在保證原子性讀寫的同時,沒有加鎖,保障了程式併發度,但也存在缺陷:
- ABA問題
- 只能保證一個地址的讀寫原子性
- 自旋CAS時間過長,容易給CPU帶來大開銷
二、延遲初始化
面試時候經常問:單例模式中的懶漢模式線程安全問題
場景:某個欄位構造開銷非常大,使得在初始化A
時需要承擔初始化Expensive
的開銷,即使Expensive欄位不會被用到。
public class A
{
public readonly Expensive Expensive = new Expensive();
// ..
}
public class Expensive
{
// 構造開銷非常昂貴
}
自然會想到懶漢模式:按需載入
public class B
{
private Expensive _expensive;
public Expensive GetExpensiveInstance()
{
if (_expensive == null) _expensive = new Expensive();
return _expensive;
}
}
新的問題產生:GetExpensiveInstance
是線程安全的嗎?我們可以通過加鎖解決
public class C
{
private readonly object _locker = new object();
private Expensive _expensive;
public Expensive GetExpensiveInstance()
{
lock (_locker)
{
if (_expensive == null) _expensive = new Expensive();
return _expensive;
}
}
}
現在面試官繼續問:還有性能更好的版本嗎?..
Lazy
net standard1.0 提供System.Lazy<T>
來幫助你以線程安全且高效的方式(DCL)解決延遲初始化問題,只需
public class D
{
private Lazy<Expensive> _expensive = new Lazy<Expensive>(() => new Expensive(), true);
public Expensive GetExpensiveInstance() => _expensive.Value;
}
第一個參數是一個委托,告知如何構建,第二個參數是boolean類型,傳false
實現的就是上面提到的plain B
非線程安全遲初始化
雙檢鎖 double checked locking會進行一次額外的易失讀(volatile read),在對象已經完成初始化時,能夠避免獲取鎖產生的開銷。
public class E
{
private readonly object _locker = new object();
private volatile Expensive _expensive;
public Expensive GetExpensiveInstance()
{
// 額外的易失讀(volatile read)
if (_expensive == null)
{
lock (_locker)
{
if (_expensive == null) _expensive = new Expensive();
}
}
return _expensive;
}
}
LazyInitializer
LazyInitializer
是一個靜態類,提供EnsureInitialized
方法,第一個參數是需要構造的變數地址,第二個參數是一個委托,告知如何構造
public class F
{
private Expensive _expensive;
public Expensive GetExpensiveInstance()
{
LazyInitializer.EnsureInitialized(ref _expensive,
() => new Expensive());
return _expensive;
}
}
它使用競爭初始化模式的實現,比雙檢鎖更快(在多核心情況下),因為它的實現完全不使用鎖。這是一個很少需要用到的極端優化,並且會帶來以下代價:
- 當參與初始化的線程數大於核心數時,它會更慢。
- 可能會因為進行了多餘的初始化而浪費 CPU 資源。
- 初始化邏輯必須是線程安全的(例如,
Expensive
的構造器對靜態欄位進行寫,就不是線程安全的)。 - 如果初始化的對象是需要進行銷毀的,多餘的對象需要額外的邏輯才能被銷毀。
競爭初始化(race-to-initialize)模式,通過易失性和CAS,實現無鎖構造
public class G
{
private volatile Expensive _expensive;
public Expensive Expensive
{
get
{
if (_expensive == null)
{
var instance = new Expensive();
Interlocked.CompareExchange (ref _expensive, instance, null);
}
return _expensive;
}
}
}
三、線程局部存儲
我們花費了大量篇幅來講併發訪問公共數據問題,前文提到的鎖構造,信號構造,無鎖構造本質上都是使用同步構造,使得多線程在訪問公共數據時能安全的進行,然而有時我們會希望數據線上程間是隔離的,局部變數就能實現這個目的,但他們的生命周期總是那麼短暫(隨代碼塊而釋放),我們期待更大作用域的隔離數據,線程局部變數(thread-local storage,TLS)就可以實現這個目的。
ThreadStatic
被ThreadStatic標記的static欄位不會線上程間共用,每個執行線程都有一個單獨的欄位實例
Note:
- 被標記的必須是static欄位,不能在實例欄位上使用(添加了也無效)
- 請不要給被標記的欄位指定初始值,因為這種初始化只會在類被構造時執行一次,影響一個線程,因此他依賴零值
如果你需要使用實例欄位,或者非零值,請使用ThreadLocal<T>
public class ThreadStatic測試
{
private readonly ITestOutputHelper _testOutputHelper;
[ThreadStatic] private static int _num;
public ThreadStatic測試(ITestOutputHelper testOutputHelper)
{
_testOutputHelper = testOutputHelper;
}
[Fact]
void Show()
{
void Work()
{
for (int i = 0; i < 100000; i++)
{
_num++;
_testOutputHelper.WriteLine(_num.ToString());
}
}
var t1 = new Thread(Work);
var t2 = new Thread(Work);
t1.Start();
t2.Start();
t1.Join();
t2.Join();
_testOutputHelper.WriteLine(_num.ToString());
}
}
輸出:
100000
100000
0
LocalDataStoreSlot
封裝記憶體槽以存儲本地數據。 此類不能被繼承。.NET Framework 1.1加入,但在standard2.0+才有。
public sealed class LocalDataStoreSlot
.NET Framework 提供了兩種機制,用於使用線程本地存儲 (TLS) :LocalDataStoreSlot
和ThreadStaticAttribute
LocalDataStoreSlot
比ThreadStaticAttribute
更慢,更尷尬。此外,數據存儲為類型 Object
,因此必須先將其強制轉換為正確的類型,然後再使用它。
有關使用 TLS 的詳細信息,請參閱 線程本地存儲。
同樣,.NET Framework 提供了兩種使用上下文本地存儲的機制:LocalDataStoreSlot
和ContextStaticAttribute
。 上下文相對靜態欄位是用屬性標記的 ContextStaticAttribute 靜態欄位。 請參考註解
// 同一個 LocalDataStoreSlot 對象可以跨線程使用。
LocalDataStoreSlot _slot = Thread.AllocateNamedDataSlot("mySlot");
void Work()
{
for (int i = 0; i < 100000; i++)
{
int num = (int)(Thread.GetData(_slot)??0);
Thread.SetData(_slot, num + 1);
}
_testOutputHelper.WriteLine(((int)(Thread.GetData(_slot)??0)).ToString());
}
var t1 = new Thread(Work);
var t2 = new Thread(Work);
t1.Start();
t2.Start();
t1.Join();
t2.Join();
_testOutputHelper.WriteLine(((int)(Thread.GetData(_slot)??0)).ToString());
輸出效果和ThreadStaticAttribute
一樣:
100000
100000
0
使用Thread.FreeNamedDataSlot("mySlot");
可以釋放所有線程上的指定槽,但是只有在所有對該槽的引用都出了其作用域,並且被垃圾回收後才會真正釋放。這確保了只要保持對槽的引用,就能繼續使用槽。
你也可以通過Thread.AllocateDataSlot()
來創建一個無名槽位,與命名槽的區別是無名槽需要自行控製作用域
當然我們也可以對上面複雜的᠍᠍᠍᠍᠍Thread.GetData
,Thread.SetData
進行封裝
LocalDataStoreSlot _secSlot = Thread.GetNamedDataSlot ("securityLevel");
int Num
{
get
{
object data = Thread.GetData(_secSlot);
return data == null ? 0 : (int) data; // null 相當於未初始化。
}
set { Thread.SetData (_secSlot, value); }
}
ThreadLocal
ThreadLocal<T>
是 Framework 4.0 加入的,涵蓋在netstandard1.0。它提供了可用於靜態欄位和實例欄位的線程局部存儲,並且允許設置預設值。
public class ThreadLocal測試
{
ThreadLocal<int> _num = new ThreadLocal<int> (() => 3);
private readonly ITestOutputHelper _testOutputHelper;
public ThreadLocal測試(ITestOutputHelper testOutputHelper)
{
_testOutputHelper = testOutputHelper;
}
[Fact]
void Show()
{
void Work()
{
for (int i = 0; i < 100000; i++)
{
_num.Value++;
}
_testOutputHelper.WriteLine(_num.ToString());
}
var t1 = new Thread(Work);
var t2 = new Thread(Work);
t1.Start();
t2.Start();
t1.Join();
t2.Join();
_testOutputHelper.WriteLine(_num.ToString());
}
}
輸出
100003
100003
3
下麵這個測試非常有意思
[Fact]
void Show()
{
var threadName = new ThreadLocal<string>(() => "Thread" + Thread.CurrentThread.ManagedThreadId);
Parallel.For(0, 13, x =>
{
bool repeat = threadName.IsValueCreated;
_testOutputHelper.WriteLine($"ThreadName = {threadName.Value} {(repeat ? "(repeat)" : "")}");
});
threadName.Dispose(); // 釋放資源
}
你會發現當Parallel.For第二個參數超過你的邏輯內核後,repeat出現了!
ThreadName = Thread5
ThreadName = Thread8
ThreadName = Thread31
ThreadName = Thread29
ThreadName = Thread31 (repeat)
ThreadName = Thread30
ThreadName = Thread18
ThreadName = Thread12
ThreadName = Thread32
ThreadName = Thread28
ThreadName = Thread33
ThreadName = Thread35
ThreadName = Thread34
Random
類不是線程安全的,所以我們要不然在使用Random
時加鎖(這樣限制了併發),如今我們有了ThreadLocal:
var localRandom = new ThreadLocal<Random>(() => new Random());
很輕易的就解決了線程安全問題,但是上面的版本使用的Random
的無參構造方法,會依賴系統時間作為生成隨機數的種子,在大概 10ms 時間內創建的兩個Random
對象可能會使用相同的種子,下邊是解決這個問題的一個辦法:
var localRandom = new ThreadLocal<Random>(() => new Random (Guid.NewGuid().GetHashCode()) );
特別註意,不要以為GUID全局唯一,GUID的HashCode也全局唯一,上面的隨機數仍然不是真隨機
四、Monitor之信號構造
信號構造本質:一個線程阻塞直到收到另一個線程發來的通知。
當多線程Wait
同一對象時,就形成了一個“等待隊列(waiting queue)”,和用於等待獲得鎖的“就緒隊列(ready queue)”不同,每次調用Pulse
時會釋放隊頭線程,它會進入就緒隊列,然後重新獲取鎖。可以把它想象成一個自動停車場,首先你在收費站(等待隊列)排隊驗票,然後在柵欄前(就緒隊列)排隊等待放行。
這個隊列結構天然有序,但是,對於Wait/Pulse
應用通常不重要,在這種場景下把它想象成一個等待線程的“池(pool)”更好理解,每次調用Pulse
都會從池中釋放一個等待線程。
PulseAll
釋放整個等待隊列或者說等待池。收到Pulse
的線程不會完全同時開始執行,而是有序的執行,因為每個Wait
語句都要試圖重新獲取同一把鎖。他們的效果就是,PulseAll
將線程從等待隊列移到就緒隊列中,讓它們可以繼續有序執行。
使用Wait/Pulse
需要註意:
Wait / Pulse
不能lock塊之外使用,否則會拋異常。Pulse
最多釋放一個線程,而PulseAll
釋放所有線程。Wait
會立即釋放當前持有的鎖,然後進入阻塞,等待脈衝- 收到脈衝會立即嘗試重新獲取鎖,如果在指定時間內重新獲取,則返回true,如果在超過指定時間獲取,則返回false,如果沒有獲取鎖,則一直阻塞不會返回
Wait和Pulse
-
定義一個欄位,作為同步對象
private readonly object _locker = new object();
-
定義一個或多個欄位,作為阻塞條件
private bool _ok;
-
當你希望阻塞的時候
Monitor.Wait
在等待脈衝時,同步對象上的鎖會被釋放,並且進入阻塞狀態,直到收到 _locker上的脈衝,收到脈衝後重新獲取 _locker,如果此時 _locker 已經被別的線程占有,則繼續阻塞,直至_獲取 _lockerlock (_locker) { while (!_ok) { Monitor.Wait (_locker); } }
-
當你希望改變阻塞條件時
lock (_locker) { _ok = true; Monitor.Pulse(_locker); // Monitor.PulseAll(_locker); }
Wait
和Pulse
幾乎是萬能的,通過一個bool標識我們就能實現AutoResetEvent/ManualResetEvent的功能,同理使用一個整形欄位,就可以實現CountdownEvent/Semaphore
性能方面,調用Pulse
花費大概約是在等待句柄上調用Set
三分之一的時間。但是,使用Wait
和Pulse
進行信號同步,對比事件等待句柄有以下缺點:
-
Wait / Pulse
不能跨越應用程式域和進程使用。 -
必須通過鎖保護所有信號同步邏輯涉及的變數。
等待超時
調用Wait
方法時,你可以設定一個超時時間,可以是毫秒或TimeSpan
的形式。如果因為超時而放棄了等待,那麼Wait
方法就會返回false
。
public static bool Wait(object obj, TimeSpan timeout)
如果在超時到達時仍然沒有獲得一個脈衝,CLR會主動給它發送一個虛擬的脈衝(virtual pulse),使其能夠重新獲得鎖,然後繼續執行,就像收到一個真實脈衝一樣。
下麵這個例子非常有用,它可以定期的檢查阻塞條件。即使其它線程無法按照預期發送脈衝,例如程式之後被其他人修改,但沒能正確使用Pulse
,這樣也可以在一定程度上免疫 bug。因此在複雜的同步設計中可以給所有Wait
指定超時時間。
lock (_locker)
while (/* <blocking-condition> */)
Monitor.Wait (_locker, /* <timeout> */);
Monitor.Wait
的boolean類型返回值其實還可以這麼理解:其返回值意味著是否獲得了一個“真實的脈衝“。如果”虛擬的脈衝“並不是期待的行為,可以記錄日誌或拋出異常。
Wait
等待一個變數上的脈衝,Pulse
對一個變數發送脈衝。脈衝也是一種信號形式,相對於事件等待句柄那種鎖存(latching)信號,脈衝顧名思義是一種非鎖存或者說易失的信號
雙向信號與競爭狀態
Monitor.Pulse
是一種單向通信機制:發送脈衝的線程不關心發出的脈衝被誰收到了,他沒有返回值,不會阻塞,內部也沒有確認機制。
當一個線程發起一次脈衝:
- 如果等待隊列中沒有任何線程,那麼這次發起的脈衝不會有任何效果。
- 如果等待隊列中有線程,線程發送完脈衝並釋放鎖後,並不能保證接到脈衝信號的等待線程能立即開始工作。
然後我們有一些場景依賴等待線程能夠在收到脈衝後及時的響應,此時,雙向信號出現了,這是一種自定義的確認機制。
在上文的信號構造基礎上改造一個競爭狀態的案例:
public class 競爭狀態測試
{
private readonly ITestOutputHelper _testOutputHelper;
private readonly object _locker = new object();
private bool _ok;
public 競爭狀態測試(ITestOutputHelper testOutputHelper)
{
_testOutputHelper = testOutputHelper;
}
[Fact]
void Show()
{
new Thread(() => // Worker
{
for (int i = 0; i < 5; i++)
lock (_locker)
{
while (!_ok) Monitor.Wait(_locker);
_ok = false;
_testOutputHelper.WriteLine("Wassup?");
}
}).Start();
for (int i = 0; i < 5; i++)
{
lock (_locker)
{
_ok = true;
Monitor.Pulse(_locker);
}
}
}
}
我們期待的結果:
Wassup?
Wassup?
Wassup?
Wassup?
Wassup?
實際上這個這個程式可能一次”Wassup?“都不會輸出:主線程可能在工作線程啟動之前完成,這五次Pulse
啥事都沒乾
還記得我們講事件等待句柄時,使用AutoResetEvent
來模擬的雙向信號嗎?現在使用Monitor來實現一個擴展性更好的版本
public class 雙向信號測試
{
private readonly ITestOutputHelper _testOutputHelper;
private readonly object _locker = new();
private bool _entry; // 我是否可以工作了
private bool _ready; // 我是否可以繼續投遞了
public 雙向信號測試(ITestOutputHelper testOutputHelper)
{
_testOutputHelper = testOutputHelper;
}
[Fact]
void Show()
{
new Thread(() =>
{
Thread.Sleep(100);
for (int i = 0; i < 5; i++)
{
lock (_locker)
{
_ready = true;
Monitor.PulseAll(_locker);
while (!_entry) Monitor.Wait(_locker);
_entry = false;
_testOutputHelper.WriteLine("Wassup?");
}
}
}).Start();
for (int i = 0; i < 5; i++)
{
lock (_locker)
{
while (!_ready) Monitor.Wait(_locker);
_ready = false;
_entry = true;
Monitor.PulseAll(_locker);
}
}
}
}
我們仍然使用_ready
來作為上游脈衝線程的自旋條件,使用_entry
作為下游等待線程的自旋條件。由於我們的邏輯都在lock語句中,即使之後引入了第三個線程,我們的邏輯仍然不會出問題,_ready
和_entry
的讀寫總是原子的。
升級生產消費隊列
-
這次,我們將允許多個消費者,各自擁有獨立的消費線程。使用一個數組來存放這些線程,並且他們接收的不再是string,而是更加靈活的委托:
private Thread[] _workers; private Queue<Action> _queue = new Queue<Action>();
-
和上次一樣,我們傳遞null來告知消費者線程退出:
foreach (var worker in _workers) { AddTask(null); }
-
在告知消費線程退出後
Join
這些線程,等待未完成的任務被消費:foreach (var worker in _workers) { worker.Join(); }
-
每個工作線程會執行一個名為
Consume
的方法。我們在構造隊列時迴圈創建和啟動這些線程:_workers = new Thread[workerCount]; for (int i = 0; i < workerCount; i++) { _workers[i] = new Thread(Consume); _workers[i].Start(); }
-
消費
Comsume
方法,一個工作線程從隊列中取出並執行一個項目。我們希望工作線程沒什麼事情做的時候,或者說當隊列中沒有任何項目時,它們應該被阻塞。因此,我們的阻塞條件是_queue.Count == 0
:private void Consume() { while (true) { Action task; lock (_locker) { while (_queue.Count == 0) { Monitor.Wait(_locker); // 隊列里沒任務,釋放鎖,進入等待 } // 獲取新任務,重新持有鎖 task = _queue.Dequeue(); } if (task == null) return; // 空任務代表退出 task(); // 執行任務 } }
-
添加一個任務。出於效率考慮,加入一個任務時,我們調用
Pulse
而不是PulseAll
。這是因為每個項目只需要喚醒(至多)一個消費者。如果你只有一個冰激凌,你不會把一個班 30 個正在睡覺的孩子都叫起來排隊獲取它。public void AddTask(Action task) { lock (_locker) { _queue.Enqueue(task); Monitor.Pulse(_locker); } }
模擬等待句柄
在雙向信號中,你可能註意到了一個模式:_flag
在當前線程被作為自旋阻塞條件,在另一線程中被設置為true
,跳出自旋
lock(_locker)
{
while (!_flag) Monitor.Wait(_locker);
_flag = false;
}
ManualResetEvent
事實上它的工作原理就是模仿AutoResetEvent
。如果去掉_flag=false
,就得到了ManualResetEvent
的基礎版本。
private readonly object _locker = new object();
private bool _signal;
void WaitOne()
{
lock (_locker)
{
while (!_signal) Monitor.Wait(_locker);
}
}
void Set()
{
lock (_locker)
{
_signal = true;
Monitor.PulseAll(_locker);
}
}
void Reset()
{
lock (_locker) _signal = false;
}
使用PulseAll
,是因為可能存在多個被阻塞的等待線程。而EventWaitHandle.WaitOne()
的通行條件就是:門
是開著的,ManualResetEvent
被放行通過後不會自己關門,只能通過Reset
將門關上,再次期間其它所有阻塞線程都能通行。
AutoResetEvent
實現AutoResetEvent
非常簡單,只需要將WaitOne
方法改為:
lock (_locker)
{
while (!_signal) Monitor.Wait(_locker);
_signal = false; // 添加一條,自己關門
}
然後將Set
方法改為:
lock (_locker)
{
_signal = true;
Monitor.Pulse(_locker); // PulseAll替換成Pulse:
}
Semaphore
把_signal
替換為一個整型欄位可以得到Semaphore
的基礎版本
public class 模擬信號量
{
private readonly object _locker = new object();
private int _count, _initialCount;
public 模擬信號量(int initialCount)
{
_initialCount = initialCount;
}
void WaitOne() // +1
{
lock (_locker)
{
_count++;
while (_count >= _initialCount)
{
Monitor.Wait(_locker);
}
}
}
void Release() // -1
{
lock (_locker)
{
_count --;
Monitor.Pulse(_locker);
}
}
}
模擬CountdownEvent
是不是非常類似信號量?
public class 模擬CountdownEvent
{
private object _locker = new object();
private int _initialCount;
public 模擬CountdownEvent(int initialCount)
{
_initialCount = initialCount;
}
public void Signal() // +1
{
AddCount(-1);
}
public void AddCount(int amount) // +amount
{
lock (_locker)
{
_initialCount -= amount;
if (_initialCount <= 0) Monitor.PulseAll(_locker);
}
}
public void Wait()
{
lock (_locker)
{
while (_initialCount > 0)
Monitor.Wait(_locker);
}
}
}
線程會合
CountdownEvent
利用我們剛剛實現的模擬CountdownEvent
,來實現兩個線程的會和,和同步基礎中提到的WaitHandle.SignalAndWait
一樣。
並且我們也可以通過initialCount
將會和的線程擴展到更多個,顯而易見的強大。
public class 線程會和測試
{
private readonly ITestOutputHelper _testOutputHelper;
private 模擬CountdownEvent _countdown = new 模擬CountdownEvent(2);
public 線程會和測試(ITestOutputHelper testOutputHelper)
{
_testOutputHelper = testOutputHelper;
}
[Fact]
public void Show()
{
// 每個線程都睡眠一段隨機時間
Random r = new Random();
new Thread(Mate).Start(r.Next(10000));
Thread.Sleep(r.Next(10000));
_countdown.Signal();
_countdown.Wait();
_testOutputHelper.WriteLine("Mate! ");
}
void Mate(object delay)
{
Thread.Sleep((int)delay);
_countdown.Signal(); //+1
_countdown.Wait();
_testOutputHelper.WriteLine("Mate! ");
}
}
上面例子,每個線程隨機休眠一段時間,然後等待對方,他們幾乎在同時列印”Mate!“,這被稱為線程執行屏障(thread execution barrier)
當你想讓多個線程執行一個系列任務,希望它們步調一致時,可以用到線程執行屏障。然而,我們現在的解決方案有一定限制:我們不能重用同一個Countdown
對象來第二次會合線程,至少在沒有額外信號構造的情況下不能。為解決這個問題,Framework 4.0 提供了一個新的類Barrier
。
Barrier
Framework 4.0 加入的一個信號構造。它實現了線程執行屏障(thread execution barrier),允許多個線程在一個時間點會合。這個類非常快速和高效,它是建立在Wait / Pulse
和自旋鎖基礎上的。
-
實例化它,指定有多少個線程參與會合(可以調用
AddParticipants / RemoveParticipants
來進行更改)。public Barrier(int participantCount)
-
當希望會合時,調用
SignalAndWait
。表示參與者已到達障礙,並等待所有其他參與者到達障礙public void SignalAndWait()
他還實現了協作取消模式
public void SignalAndWait(CancellationToken cancellationToken)
並提供了超時時間的重載,返回一個
bool
類型,true標識在規定的時間,其他參與者到達障礙,false標識沒有全部到達public bool SignalAndWait(TimeSpan timeout)
實例化Barrier
,參數為 3 ,意思是調用SignalAndWait
會被阻塞直到該方法被調用 3 次。但與CountdownEvent
不同,它會自動複位:再調用SignalAndWait
仍會阻塞直到被調用 3 次。這允許你保持多個線程“步調一致”,讓它們執行一個系列任務。
下邊的例子中,三個線程步調一致地列印數字 0 到 4:
private readonly ITestOutputHelper _testOutputHelper;
private Barrier _barrier = new Barrier(3);
public Barrier測試(ITestOutputHelper testOutputHelper)
{
_testOutputHelper = testOutputHelper;
}
[Fact]
void Show()
{
new Thread(Speak).Start();
new Thread(Speak).Start();
new Thread(Speak).Start();
}
void Speak()
{
for (int i = 0; i < 5; i++)
{
_testOutputHelper.WriteLine(i.ToString());
_barrier.SignalAndWait();
}
}
Barrier
還提供一個非常用有的構造參數,他是一個委托,會在每個會和處執行。不用擔心搶占,因為當它被執行時,所有的參與者都是被阻塞的。
public Barrier(int participantCount, Action<Barrier>? postPhaseAction)
五、拓展
前景回顧:
還記得我們在講同步的時候提到的最小化共用數據和無狀態設計嗎?經過前面的學習,稍加思考,其實引發線程安全的本質是多線程併發下的數據交互問題。如果我們的數據線上程之間沒有交互,或者說我們的數據都是只讀的,那不就天然的線程安全了嗎?
現在你能理解為什麼只讀欄位是天然線程安全的了嗎?
然而有的場景下又需要對公共數據進行讀寫,同步篇中我們通過很簡單的排它鎖來保證線程安全,在這裡,我們不在滿足這種粗暴的粒度(事實上多數時候讀總是多於寫),這時,讀寫鎖出現了。
ReaderWriterLockSlim
ReaderWriterLockSlim
在 Framework 3.5 加入的,被加入了standard 1.0,此類型是線程安全的,用於保護由多個線程讀取的資源。
ReaderWriterLockSlim
出現的目的是為了取締ReaderWriterLock
,他簡化了遞歸規則以及鎖狀態的升級和降級規則。避免了許多潛在的死鎖情況。 另外,他的性能顯著優於ReaderWriterLock
。 建議對所有新開發的項目使用ReaderWriterLockSlim
然而如果與普通的
lock
(Monitor.Enter / Exit
)對比,他還是要慢一倍。
ReaderWriterLockSlim
有三種模式:
-
讀取模式:允許任意多的線程處於讀取模式
-
可升級模式:只允許一個線程處於可升級模式,與讀鎖相容
-
寫入模式:完全互斥,不允許任何模式下的線程獲取任何鎖
ReaderWriterLockSlim
定義瞭如下的方法來獲取和釋放讀 / 寫鎖:
public void EnterReadLock();
public void ExitReadLock();
public void EnterWriteLock();
public void ExitWriteLock();
另外,對應所有EnterXXX
的方法,都有相應的TryXXX
版本,可以接受一個超時參數,與Monitor.TryEnter
類似。
讓我們來看一個案例:
模擬三個讀線程,兩個寫線程,並行執行
new Thread(Read).Start();
new Thread(Read).Start();
new Thread(Read).Start();
new Thread(Write).Start();
new Thread(Write).Start();
讀方法是這樣的
while (true)
{
_rw.EnterReadLock();
foreach (int number in _items)
{
Console.WriteLine("Thread " + Thread.CurrentThread.ManagedThreadId + " added " + number);
Thread.Sleep(100);
}
_rw.ExitReadLock();
}
寫方法是這樣的
while (true)
{
int number = _rand.Value.Next(100);
_rw.EnterWriteLock();
_items.Add(number);
_rw.ExitWriteLock();
Console.WriteLine("Thread " + Thread.CurrentThread.ManagedThreadId + " added " + number);
Thread.Sleep(100);
}
隨機數生成方法就是用的TLS講過的
new ThreadLocal<Random>(() => new Random(Guid.NewGuid().GetHashCode()));
需要註意ReaderWriterLockSlim
實現了IDisposable
,用完了請記得釋放
public class ReaderWriterLockSlim : IDisposable
運行結果:
Thread 11 added 42
Thread 8 reading 42
Thread 6 reading 42
Thread 7 reading 42
Thread 10 added 98
Thread 8 reading 42
...
顯而易見的,併發度變高了
鎖遞歸
ReaderWriterLockSlim
提供一個構造參數LockRecursionPolicy
用於配置鎖遞歸策略
public ReaderWriterLockSlim(LockRecursionPolicy recursionPolicy)
public enum LockRecursionPolicy
{
/// <summary>If a thread tries to enter a lock recursively, an exception is thrown. Some classes may allow certain recursions when this setting is in effect.</summary>
NoRecursion,
/// <summary>A thread can enter a lock recursively. Some classes may restrict this capability.</summary>
SupportsRecursion,
}
預設情況下是使用NoRecursion
策略:不允許遞歸或重入,這與GO的讀寫鎖設計不謀而合,建議使用此預設策略,因為遞歸引入了不必要的複雜性,並使代碼更易於死鎖。
public ReaderWriterLockSlim() : this(LockRecursionPolicy.NoRecursion)
開啟支持遞歸策略後,以下代碼不會拋出LockRecursionException
異常
var rw = new ReaderWriterLockSlim (LockRecursionPolicy.SupportsRecursion);
rw.EnterReadLock();
rw.EnterReadLock();
rw.ExitReadLock();
rw.ExitReadLock();
遞歸鎖定級別隻能越來越小,級別順序如下:讀鎖,可升級鎖,寫鎖
。下麵代碼會拋出LockRecursionException
異常
void F()
{
var rw = new ReaderWriterLockSlim (LockRecursionPolicy.SupportsRecursion);
rw.EnterReadLock();
rw.EnterWriteLock();
rw.EnterWriteLock();
rw.ExitReadLock();
}
Assert.Throws<LockRecursionException>(F);
可升級鎖例外,把可升級鎖升級為寫鎖是合法的。
var rw = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
rw.EnterUpgradeableReadLock();
rw.EnterWriteLock();
rw.ExitWriteLock();
rw.ExitUpgradeableReadLock();
思考一個問題:為什麼只允許一個線程處於可升級模式?
SQL Server | ReaderWriterLockSlim |
---|---|
共用鎖(Share lock) | 讀鎖(Read lock) |
排它鎖(Exclusive lock) | 寫鎖(Write lock) |
更新鎖(Update lock) | 可升級鎖(Upgradeable lock) |
Timer
如果你需要使用規律的時間間隔重覆執行一些方法,這個例子會使得一個線程永遠被占用
while (true)
{
// do something
Thread.Sleep(1000);
}
這時候你會需要Timer
創建計時器時,可以指定在方法首次執行之前等待的時間 dueTime
,以及後續執行之間等待的時間period
。 類 Timer 的解析度與系統時鐘相同。 這意味著,如果period
小於系統時鐘的解析度,委托將以系統時鐘解析度定義的時間間隔執行,在Windows 7 和Windows 8系統上大約為 15 毫秒。
public Timer(TimerCallback callback, object? state, int dueTime, int period)
下麵這個例子首次間隔1s,之後間隔500ms列印tick...
Timer timer = new Timer ((data) =>
{
_testOutputHelper.WriteLine(data.ToString());
}, "tick...", 1000, 500);
Thread.Sleep(3000);
timer.Dispose();
計時器委托是在構造計時器時指定的,不能更改。 該方法不會在創建計時器的線程上執行;而是在線程池(thread pool)執行。
如果計時器間隔
period
小於執行回調所需的時間,或者如果所有線程池線程都在使用,並且回調被多次排隊,則可以在兩個線程池線程上同時執行回調。只要使用 Timer,就必須保留對它的引用。 與任何托管對象一樣,當沒有對其引用時,會受到垃圾回收的約束。 即使 Timer 仍然處於活動狀態也不會阻止它被收集。
不再需要計時器時,請調用 Dispose 釋放計時器持有的資源。請註意,調用 Dispose() 後仍然可能會發生回調,因為計時器將回調排隊供線程池線程執行。可以使用
public bool Dispose(WaitHandle notifyObject)
重載等待所有回調完成。
System.Threading.Timer
是一個普通計時器。 它會回調一個線程池線程(來自工作池)。
System.Timers.Timer
是一個System.ComponentModel.Component
,它包裝System.Threading.Timer
,並提供一些用於在特定線程上調度的附加功能。