讀寫鎖 /// /// 提供非同步獨占和併發執行支持 /// public sealed class AsyncReaderWriter { /// /// 在當前實例中保護所有共用狀態的鎖 /// private readonly object _lock = new object(); /// / ...
讀寫鎖
/// <summary>
/// 提供非同步獨占和併發執行支持
/// </summary>
public sealed class AsyncReaderWriter
{
/// <summary>
/// 在當前實例中保護所有共用狀態的鎖
/// </summary>
private readonly object _lock = new object();
/// <summary>
/// 併發讀任務等待執行的隊列
/// </summary>
private readonly Queue<Task> _waitingCocurrent = new Queue<Task>();
/// <summary>
/// 獨占寫任務等待執行的隊列
/// </summary>
private readonly Queue<Task> _waitingExclusive = new Queue<Task>();
/// <summary>
/// 併發讀任務正在執行的數量
/// </summary>
private int _currentConcurrent = 0;
/// <summary>
/// 獨占寫任務是否正在執行
/// </summary>
private bool _currentlyExclusive = false;
/// <summary>
/// 非泛型的任務創建工廠
/// </summary>
private TaskFactory _factory;
/// <summary>
/// 初始化
/// </summary>
public AsyncReaderWriter()
{
this._factory = Task.Factory;
}
/// <summary>
/// 使用指定的<see cref="TaskFactory"/>初始化<see cref="AsyncReaderWriter"/>, 為我們創建所有任務
/// </summary>
/// <param name="factory">用來創建所有任務的<see cref="TaskFactory"/></param>
public AsyncReaderWriter(TaskFactory factory)
{
if (factory == null)
throw new ArgumentNullException("factory");
this._factory = factory;
}
/// <summary>
/// 獲取當前隊列中正在等待的獨占寫任務數量
/// </summary>
public int WaitingExclusive { get { lock (_lock) return this._waitingExclusive.Count; } }
/// <summary>
/// 獲取當前隊列中正在等待的併發讀任務數量
/// </summary>
public int WaitingConcurrent { get { lock (_lock) return this._waitingCocurrent.Count; } }
/// <summary>
/// 獲取併發讀任務正在執行的數量
/// </summary>
public int CurrentConcurrent { get { lock (_lock) return this._currentConcurrent; } }
/// <summary>
/// 獲取當前獨占寫任務是否正在執行
/// </summary>
public bool CurrentlyExclusive { get { lock (_lock) return this._currentlyExclusive; } }
/// <summary>
/// 將獨占寫<see cref="Action"/>入隊到<see cref="AsyncReaderWriter"/>
/// </summary>
/// <param name="action">將要被以獨占寫方式執行的<see cref="Action"/></param>
/// <returns>表示執行提供的<see cref="Action"/>的任務</returns>
public Task QueueExclusiveWriter(Action action)
{
var task = new Task(state =>
{
try
{
//運行提供的action
((Action)state)();
}
finally
{
//確保我們完成後清理
FinishExclusiveWriter();
}
}, action, this._factory.CancellationToken, this._factory.CreationOptions);
lock (_lock)
{
//如果當前有任務正在運行,或者其他的獨占寫任務需要運行, 入隊
//否則,沒有其他正在運行或將要運行的任務,現在就執行當前任務
if (this._currentlyExclusive || this._currentConcurrent > 0 || this._waitingExclusive.Count > 0)
{
this._waitingExclusive.Enqueue(task);
}
else
{
RunExclusive_RequiresLock(task);
}
}
return task;
}
/// <summary>
/// 將獨占寫<see cref="Func{TResult}"/>入隊到<see cref="AsyncReaderWriter"/>
/// </summary>
/// <typeparam name="TResult"><see cref="Func{TResult}"/>委托封裝的方法的返回值類型。</typeparam>
/// <param name="fun">將要被以獨占寫方式指定的<see cref="Func{TResult}"/></param>
/// <returns>表示執行提供的<see cref="Func{TResult}"/>的任務</returns>
public Task<TResult> QueueExclusiveWriter<TResult>(Func<TResult> fun)
{
var task = new Task<TResult>(state =>
{
try
{
return ((Func<TResult>)state)();
}
finally
{
FinishExclusiveWriter();
}
}, fun, this._factory.CancellationToken, this._factory.CreationOptions);
lock (_lock)
{
//如果當前有任務正在運行,或者其他的獨占寫任務需要運行, 入隊
//否則,沒有其他正在運行或將要運行的任務,現在就執行當前任務
if (this._currentlyExclusive || this._currentConcurrent > 0 || this._waitingExclusive.Count > 0)
{
this._waitingExclusive.Enqueue(task);
}
else
{
RunExclusive_RequiresLock(task);
}
}
return task;
}
/// <summary>
/// 將併發讀<see cref="Action"/>入隊到<see cref="AsyncReaderWriter"/>
/// </summary>
/// <param name="action">將要被以併發讀方式執行的<see cref="Action"/></param>
/// <returns>表示執行提供的<see cref="Action"/>的任務</returns>
public Task QueueConcurrentReader(Action action)
{
var task = new Task(state =>
{
try
{
((Action)state)();
}
finally
{
FinishConcurrentReader();
}
}, action, this._factory.CancellationToken, this._factory.CreationOptions);
lock (_lock)
{
//如果現在有獨占寫任務正在運行或者等待
//將當前任務入隊
if (this._currentlyExclusive || this._waitingExclusive.Count > 0)
{
this._waitingCocurrent.Enqueue(task);
}
else
{
//否則立即調度
RunConcurrent_RequiresLock(task);
}
}
return task;
}
/// <summary>
/// 將併發讀<see cref="Func{TResult}"/>入隊到<see cref="AsyncReaderWriter"/>
/// </summary>
/// <typeparam name="TResult"><see cref="Func{TResult}"/>委托封裝的方法返回值類型</typeparam>
/// <param name="fun">將要被以併發讀方式執行的<see cref="Func{TResult}"/></param>
/// <returns>表示執行提供的<see cref="Func{TResult}"/>的任務</returns>
public Task<TResult> QueueConcurrentReader<TResult>(Func<TResult> fun)
{
var task = new Task<TResult>(state =>
{
try
{
return ((Func<TResult>)state)();
}
finally
{
FinishConcurrentReader();
}
}, fun, this._factory.CancellationToken, this._factory.CreationOptions);
lock (_lock)
{
if (_currentlyExclusive || this._waitingExclusive.Count > 0)
{
this._waitingCocurrent.Enqueue(task);
}
else
{
RunConcurrent_RequiresLock(task);
}
}
return task;
}
#region 私有方法
/// <summary>
/// 開始指定的獨占任務
/// </summary>
/// <param name="exclusive">即將開始的獨占任務</param>
private void RunExclusive_RequiresLock(Task exclusive)
{
this._currentlyExclusive = true;
exclusive.Start(this._factory.Scheduler ?? TaskScheduler.Current);
}
/// <summary>
/// 開始指定的併發任務
/// </summary>
/// <param name="concurrent">即將開始的併發任務</param>
private void RunConcurrent_RequiresLock(Task concurrent)
{
this._currentConcurrent++;
concurrent.Start(this._factory.Scheduler ?? TaskScheduler.Current);
}
/// <summary>
/// 開始併發隊列中的所有任務
/// </summary>
private void RunConcurrent_RequiresLock()
{
while (this._waitingCocurrent.Count > 0)
{
RunConcurrent_RequiresLock(this._waitingCocurrent.Dequeue());
}
}
/// <summary>
/// 完成併發讀任務
/// </summary>
private void FinishConcurrentReader()
{
lock (_lock)
{
//運行到此處,表示一個併發任務已結束
this._currentConcurrent--;
//如果現在正在運行的併發任務數為0, 並且還有正在等待的獨占任務, 執行一個
if (this._currentConcurrent == 0 && this._waitingExclusive.Count > 0)
{
RunExclusive_RequiresLock(this._waitingExclusive.Dequeue());
}
//否則, 如果現在沒有等待的獨占任務,而有一些因為某些原因等待的併發任務(它們本應該在添加到隊列的時候就開始了), 運行所有正在等待的併發任務
else if (this._waitingExclusive.Count == 0 && this._waitingCocurrent.Count > 0)
{
RunConcurrent_RequiresLock();
}
}
}
/// <summary>
/// 完成獨占寫任務
/// </summary>
private void FinishExclusiveWriter()
{
lock (_lock)
{
//運行到此處,表示一個獨占任務已結束
this._currentlyExclusive = false;
//如果當前仍有正在等待的獨占任務, 以內聯方式運行下一個
if (this._waitingExclusive.Count > 0)
{
RunExclusive_RequiresLock(this._waitingExclusive.Dequeue());
}
//否則, 如果當前仍有正在等待的併發任務, 運行所有
else if (this._waitingCocurrent.Count > 0)
{
RunConcurrent_RequiresLock();
}
}
}
#endregion
}
使用方式:
var read = new Action(() =>
{
Debug.WriteLine($"讀取:{File.ReadLines(fileName).EmptyToNull()?.Last()}");
});
var write = new Action(() =>
{
var text = $"{DateTime.Now.Ticks.ToString()}\r\n";
File.AppendAllText(fileName, text);
Debug.Write($"\t寫入:{text}");
});
var rw = new AsyncReaderWriter();
for(var i = 0; i < 10; i++)
{
rw.QueueExclusiveWriter(write);
rw.QueueConcurrentReader(read);
}