我 們知道並行編程模型兩種:一種是基於消息式的,第二種是基於共用記憶體式的。 前段時間項目中遇到了第二種 使用多線程開發並行程式共用資源的問題 ,今天以實際案例出發對.net里的共用記憶體式的線程同步機製做個總結,由於某些類庫的應用屬於基礎,所以本次不對基本使用做出講解,基本使用 MSDN是最好的教程。 ...
我 們知道並行編程模型兩種:一種是基於消息式的,第二種是基於共用記憶體式的。 前段時間項目中遇到了第二種 使用多線程開發並行程式共用資源的問題 ,今天以實際案例出發對.net里的共用記憶體式的線程同步機製做個總結,由於某些類庫的應用屬於基礎,所以本次不對基本使用做出講解,基本使用 MSDN是最好的教程。
一、volatile關鍵字
基本介紹: 封裝了 Thread.VolatileWrite() 和 Thread.VolatileRead()的實現 ,主要作用是強制刷新高速緩存。
使用場景: 適用於在多核多CPU的機器上 解決變數在記憶體和高速緩存同步不及時的問題。
案例:參考下文 二、原子操作的 案例 或者 System.Collections.Concurrent命名空間下的 ConcurrentQueue ,ConcurrentDictionary 等併發集合的實現方式。
二、原子操作(Interlock)
基本介紹: 原 子操作是 實現Spinlock,Monitor,ReadWriterLock鎖的基礎,其實現原理是在電腦匯流排上標誌一個信號來表示資源已經被占用 如果其他指令進行修改則等待本次操作完成後才能進行,因為原子操作是在硬體上實現的 所以速度非常快,大約在50個時鐘周期。其實原子操作也可以看做一種鎖。
使用場景:性 能要求較高的場合,需要對欄位進行快速的同步或者對變數進行原子形式的跟新操作(例如:int b=0; b=b+1 實際分解為多條彙編指令,在多線程情況下 多條彙編指令並行的執行可能導致錯誤的結果,所以要保證執行 b=b+1 生成的彙編指令是一個原子形式執行 ),例如實現一個並行隊列,非同步隊列等。
案例:一個基於事件觸發機制隊列的實現
001.
/// <summary>
002.
/// 表示一個實時處理隊列
003.
/// </summary>
004.
public
class
ProcessQueue<T>
005.
{
006.
#region [成員]
007.
008.
private
ConcurrentQueue<IEnumerable<T>> queue;
009.
010.
private
Action<IEnumerable<T>> PublishHandler;
011.
012.
//指定處理的線程數
013.
private
int
core = Environment.ProcessorCount;
014.
015.
//正在運行的線程數
016.
private
int
runingCore = 0;
017.
018.
public
event
Action<Exception> OnException;
019.
020.
//隊列是否正在處理數據
021.
private
int
isProcessing=0;
022.
023.
//隊列是否可用
024.
private
bool
enabled =
true
;
025.
026.
#endregion
027.
028.
#region 構造函數
029.
030.
public
ProcessQueue(Action<IEnumerable<T>> handler)
031.
{
032.
033.
queue =
new
ConcurrentQueue<IEnumerable<T>>();
034.
035.
PublishHandler = handler;
036.
this
.OnException += ProcessException.OnProcessException;
037.
}
038.
039.
#endregion
040.
041.
#region [方法]
042.
043.
/// <summary>
044.
/// 入隊
045.
/// </summary>
046.
/// <param name="items">數據集合</param>
047.
public
void
Enqueue(IEnumerable<T> items)
048.
{
049.
if
(items !=
null
)
050.
{
051.
queue.Enqueue(items);
052.
}
053.
054.
//判斷是否隊列有線程正在處理
055.
if
(enabled && Interlocked.CompareExchange(
ref
isProcessing, 1, 0) == 0)
056.
{
057.
if
(!queue.IsEmpty)
058.
{
059.
ThreadPool.QueueUserWorkItem(ProcessItemLoop);
060.
}
061.
else
062.
{
063.
Interlocked.Exchange(
ref
isProcessing, 0);
064.
}
065.
}
066.
}
067.
068.
/// <summary>
069.
/// 開啟隊列數據處理
070.
/// </summary>
071.
public
void
Start()
072.
{
073.
Thread process_Thread =
new
Thread(PorcessItem);
074.
process_Thread.IsBackground =
true
;
075.
process_Thread.Start();
076.
}
077.
078.
/// <summary>
079.
/// 迴圈處理數據項
080.
/// </summary>
081.
/// <param name="state"></param>
082.
private
void
ProcessItemLoop(
object
state)
083.
{
084.
//表示一個線程遞歸 當處理完當前數據時 則開起線程處理隊列中下一條數據 遞歸終止條件是隊列為空時
085.
//但是可能會出現 隊列有數據但是沒有線程去處理的情況 所有一個監視線程監視隊列中的數據是否為空,如果為空
086.
//並且沒有線程去處理則開啟遞歸線程
087.
088.
if
(!enabled && queue.IsEmpty)
089.
{
090.
Interlocked.Exchange(
ref
isProcessing, 0);
091.
return
;
092.
}
093.
094.
//處理的線程數 是否小於當前CPU核數
095.
if
(Thread.VolatileRead(
ref
runingCore) <= core * 2*)
096.
{
097.
IEnumerable<T> publishFrame;
098.
//出隊以後交給線程池處理
099.
if
(queue.TryDequeue(
out
publishFrame))
100.
{
101.
Interlocked.Increment(
ref
runingCore);
102.
try
103.
{
104.
PublishHandler(publishFrame);
105.
106.
if
(enabled && !queue.IsEmpty)
107.
{
108.
ThreadPool.QueueUserWorkItem(ProcessItemLoop);
109.
}
110.
else
111.
{
112.
Interlocked.Exchange(
ref
isProcessing, 0);
113.
}
114.
115.
}
116.
catch
(Exception ex)
117.
{
118.
OnProcessException(ex);
119.
}
120.
121.
finally
122.
{
123.
Interlocked.Decrement(
ref
runingCore);
124.
}
125.
}
126.
}
127.
128.
}
129.
130.
/// <summary>
131.
///定時處理幀 線程調用函數
132.
///主要是監視入隊的時候線程 沒有來的及處理的情況
133.
/// </summary>
134.
private
void
PorcessItem(
object
state)
135.
{
136.
int
sleepCount=0;
137.
int
sleepTime = 1000;
138.
while
(enabled)
139.
{
140.
//如果隊列為空則根據迴圈的次數確定睡眠的時間
141.
if
(queue.IsEmpty)
142.
{
143.
if
(sleepCount == 0)
144.
{
145.
sleepTime = 1000;
146.
}
147.
else
if
(sleepCount == 3)
148.
{
149.
sleepTime = 1000 * 3;
150.
}
151.
else
if
(sleepCount == 5)
152.
{
153.
sleepTime = 1000 * 5;
154.
}
155.
else
if
(sleepCount == 8)
156.
{
157.
sleepTime = 1000 * 8;
158.
}
159.
else
if
(sleepCount == 10)
160.
{
161.
sleepTime = 1000 * 10;
162.
}
163.
else
164.
{
165.
sleepTime = 1000 * 50;
166.
}
167.
sleepCount++;
168.
Thread.Sleep(sleepTime);
169.
}
170.
else
171.
{
172.
//判斷是否隊列有線程正在處理
173.
if
(enabled && Interlocked.CompareExchange(
ref
isProcessing, 1, 0) == 0)
174.
{
175.
if
(!queue.IsEmpty)
176.
{
177.
ThreadPool.QueueUserWorkItem(ProcessItemLoop);
178.
}
179.
else
180.
{
181.
Interlocked.Exchange(
ref
isProcessing, 0);
182.
}
183.
sleepCount = 0;
184.
sleepTime = 1000;
185.
}
186.
}
187.
}
188.
}
189.
190.
/// <summary>
191.
/// 停止隊列
192.
/// </summary>
193.
public
void
Stop()
194.
{
195.
this
.enabled =
false
;
196.
197.
}
198.
199.
/// <summary>
200.
/// 觸發異常處理事件
201.
/// </summary>
202.
/// <param name="ex">異常</param>
203.
private
void
OnProcessException(Exception ex)
204.
{
205.
var tempException = OnException;
206.
Interlocked.CompareExchange(
ref
tempException,
null
,
null
);
207.
208.
if
(tempException !=
null
)
209.
{
210.
OnException(ex);
211.
}
212.
}
213.
214.
#endregion
215.
216.
}
三、自旋鎖(Spinlock)
基本介紹: 在原子操作基礎上實現的鎖,用戶態的鎖,缺點是線程一直不釋放CPU時間片。操作系統進行一次線程用戶態到內核態的切換大約需要500個時鐘周期,可以根據這個進行參考我們的線程是進行用戶等待還是轉到內核的等待.。
使用場景:線程等待資源時間較短的情況下使用。
案例: 和最常用的Monitor 使用方法一樣 這裡就不舉例了,在實際場景中應該優先選擇使用Monitor,除非是線程等待資源的時間特別的短。
四、監視器(Monitor)
基本介紹: 原子操作基礎上實現的鎖,開始處於用戶態,自旋一段時間進入內核態的等待釋放CPU時間片,缺點使用不當容易造成死鎖 c#實現的關鍵字是Lock。
使用場景: 所有需要加鎖的場景都可以使用。
案例: 案例太多了,這裡就不列出了。
五、讀寫鎖(ReadWriterLock)
原理分析: 原子操作基礎上實現的鎖,
使用場景:適用於寫的次數少,讀的頻率高的情況。
案例:一個線程安全的緩存實現(.net 4.0 可以使用基礎類庫中的 ConcurrentDictionary<K,V>) 註意:老版本ReaderWriterLock已經被淘汰,新版的是ReaderWriterLockSlim
01.
class
CacheManager<K, V>
02.
{
03.
#region [成員]
04.
05.
private
ReaderWriterLockSlim readerWriterLockSlim;
06.
07.
private
Dictionary<K, V> containter;
08.
09.
#endregion
10.
11.
#region [構造函數]
12.
13.
public
CacheManager()
14.
{
15.
this
.readerWriterLockSlim =
new
ReaderWriterLockSlim();
16.
this
.containter =
new
Dictionary<K, V>();
17.
}
18.
19.
#endregion
20.
21.
#region [方法]
22.
23.
public
void
Add(K key, V value)
24.
{
25.
readerWriterLockSlim.EnterWriteLock();
26.
27.
try
28.
{
29.
containter.Add(key, value);
30.
}
31.
32.
finally
33.
{
34.
readerWriterLockSlim.ExitWriteLock();
35.
}
36.
}
37.
38.
public
V Get(K key)
39.
{
40.
41.
bool
result =
false
;
42.
V value;
43.
44.
do
45.
{
46.
readerWriterLockSlim.EnterReadLock();
47.
48.
try
49.
{
50.
result = containter.TryGetValue(key,
out
value);
51.
}
52.
53.
finally
54.
{
55.
readerWriterLockSlim.ExitWriteLock();
56.
}
57.
58.
}
while
(!result);
59.
60.
return
value;
61.
}
62.
63.
#endregion
64.
}
.net中還有其他的線程同步機制:ManualResetEventSlim ,AutoResetEvent ,SemaphoreSlim 這裡就逐個進行不介紹 具體在《CLR Via C# 》中解釋的非常詳細,但在具體的實際開發中我還沒有使用到。
最好的線程同步機制是沒有同步,這取決於良好的設計,當然有些情況下無法避免使用鎖。 在性能要求不高的場合基本的lock就能滿足要求,但性能要求比較苛刻的情就需求更具實際場景進行選擇哪種線程同步機制。
免費培訓課:http://www.jinhusns.com/Products/Curriculum/?type=xcj
源碼分享:http://www.jinhusns.com/Products/Download/?type=xcj