在C#中,List集合是一種泛型集合,可以存儲任何類型的對象。克隆一個List集合可以通過以下幾種方式實現: 使用List的構造函數 使用List的構造函數可以創建一個新的List對象,並將原始List中的元素複製到新List中。例如: List<int> list1 = new List<int> ...
.net中使用Task可以方便地編寫非同步程式,為了更好地理解Task及其調度機制,接下來模擬Task的實現,目的是搞清楚:
- Task是什麼
- Task是如何被調度的
基本的Task模擬實現
從最基本的Task用法開始
Task.Run(Action action)
這個命令的作用是將action作為一項任務提交給調度器,調度器會安排空閑線程來處理。
我們使用Job來模擬Task
public class Job
{
private readonly Action _work;
public Job(Action work) => _work = work;
public JobStatus Status { get; internal set; }
internal protected virtual void Invoke()
{
Status = JobStatus.Running;
_work();
Status = JobStatus.Completed;
}
public void Start(JobScheduler? scheduler = null)
=> (scheduler ?? JobScheduler.Current).QueueJob(this);
public static Job Run(Action work)
{
var job = new Job(work);
job.Start();
return job;
}
}
public enum JobStatus
{
Created,
Scheduled,
Running,
Completed
}
這裡也定義了同Task一樣的靜態Run方法,使用方式也與Task類似
Job.Run(() => Console.WriteLine($"Job1, thread:{Thread.CurrentThread.ManagedThreadId}"));
作為對比,使用Task時的寫法如下,多了await關鍵字,後文會討論。
await Task.Run(()=>() => Console.WriteLine($"Task1, thread:{Thread.CurrentThread.ManagedThreadId}"));
調用Job.Run方法時,會基於給定的Action創建一個Job,然後執行job.Start(), 但Job沒有立即開始執行,而是通過QueueJob方法提交給了調度器,由調度器來決定Job何時執行,在Job真正被執行時會調用其Invoke方法,此時給定的Action就會被執行了,同時會對應修改Job的狀態,從Running到Completed。簡單來說,.net的Task的基本工作過程與這個粗糙的Job一樣,由此可見,Task/Job代表一項具有某種狀態的操作。
基於線程池的調度
但Task/Job的執行依賴與調度器,這裡用JobScheduler來模擬,.net預設使用基於線程池的調度策略,我們也模擬實現一個ThreadPoolJobScheduler
首先看下JobScheduler,作為抽象基類,其QueueJob方法將有具體的某個調度器(ThreadPoolJobScheduler)來實現:
public abstract class JobScheduler
{
public abstract void QueueJob(Job job);
public static JobScheduler Current { get; set; } = new ThreadPoolJobScheduler();
}
ThreadPoolJobScheduler實現的QueueJob如下:
public class ThreadPoolJobScheduler : JobScheduler
{
public override void QueueJob(Job job)
{
job.Status = JobStatus.Scheduled;
var executionContext = ExecutionContext.Capture();
ThreadPool.QueueUserWorkItem(_ => ExecutionContext.Run(executionContext!,
_ => job.Invoke(), null));
}
}
ThreadPoolJobScheduler會將Job提交給線程池,並將Job狀態設置為Scheduled。
使用指定線程進行調度
JobScheduler的Current屬性預設設置為基於線程的調度,如果有其它調度器也可以更換,但為什麼要更換呢?這要從基於線程的調度的局限說起,對於一些具有較高優先順序的任務,採用這個策略可能會無法滿足需求,比如當線程都忙的時候,新的任務可能遲遲無法被執行。對於這種情況,.net可以通過設置TaskCreationOptions.LongRunning來解決,解析來先用自定義的調度器來解決這個問題:
public class DedicatedThreadJobScheduler : JobScheduler
{
private readonly BlockingCollection<Job> _queues=new();
private readonly Thread[] _threads;
public DedicatedThreadJobScheduler(int threadCount)
{
_threads=new Thread[threadCount];
for(int index=0; index< threadCount; index++)
{
_threads[index] =new Thread(Invoke);
}
Array.ForEach(_threads, thread=>thread.Start());
void Invoke(object? state){
while(true){
_queues.Take().Invoke();
}
}
}
public override void QueueJob(Job job)
{
_queues.Add(job);
}
}
在啟動DedicatedThreadJobScheduler時,會啟動指定數量的線程,這些線程會不停地從隊列中取出任務並執行。
接下來看看.net的TaskCreationOptions.LongRunning怎麼用:
await Task.Factory.StartNew(LongRunningMethod, TaskCreationOptions.LongRunning);
static void LongRunningMethod()
{
// Simulate a long-running operation
Console.WriteLine("Long-running task started on thread {0}.", Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(10000);
Console.WriteLine("Long-running task finished on thread {0}.", Thread.CurrentThread.ManagedThreadId);
}
任務順序的編排
在使用Task時,經常會使用await關鍵字,來控制多個非同步任務之間的順序,await實際上是語法糖,在瞭解await之前,先來看看最基本的ContinueWith方法。
var taskA = Task.Run(() => DateTime.Now);
var taskB = taskA.ContinueWith(time => Console.WriteLine(time.Result));
await taskB;
模仿Task,我們給Job也添加ContinueWith方法。
public class Job
{
private readonly Action _work;
private Job? _continue;
public Job(Action work) => _work = work;
public JobStatus Status { get; internal set; }
internal protected virtual void Invoke()
{
Status = JobStatus.Running;
_work();
Status = JobStatus.Completed;
_continue?.Start();
}
public void Start(JobScheduler? scheduler = null)
=> (scheduler ?? JobScheduler.Current).QueueJob(this);
public static Job Run(Action work)
{
var job = new Job(work);
job.Start();
return job;
}
public Job ContinueWith(Action<Job> tobeContinued)
{
if (_continue == null)
{
var job = new Job(() => tobeContinued(this));
_continue = job;
}
else
{
_continue.ContinueWith(tobeContinued);
}
return this;
}
}
這個ContinueWith方法會將下一個待執行的Job放在_continue,這樣多個順序執行的Job就會構成一個鏈表。
在當前Job的Invoke方法執行結束時,會觸發下一個Job被調度。
使用示例:
Job.Run(() =>
{
Thread.Sleep(1000);
Console.WriteLine("11");
}).ContinueWith(_ =>
{
Thread.Sleep(1000);
Console.WriteLine("12");
});
進一步使用await關鍵字來控制
要像Task一樣使用await,需要Job支持有GetAwaiter方法。任何一個類型,只要有了這個GetAwaiter方法,就可以對其使用await關鍵字了。
c#的Task類中可以找到GetAwaiter
public TaskAwaiter GetAwaiter();
然後TaskAwaiter繼承了ICriticalNotifyCompletion介面
public readonly struct TaskAwaiter<TResult> : System.Runtime.CompilerServices.ICriticalNotifyCompletion
照貓畫虎,也為Job添加一個最簡單的JobAwaiter
public class Job
{
...
public JobAwaiter GetAwaiter() => new(this);
}
JobAwaiter的定義如下:
public struct JobAwaiter : ICriticalNotifyCompletion
{
private readonly Job _job;
public readonly bool IsCompleted => _job.Status == JobStatus.Completed;
public JobAwaiter(Job job)
{
_job = job;
if (job.Status == JobStatus.Created)
{
job.Start();
}
}
public void GetResult() { }
public void OnCompleted(Action continuation)
{
_job.ContinueWith(_ => continuation());
}
public void UnsafeOnCompleted(Action continuation)
=> OnCompleted(continuation);
}
添加了await後,前面的代碼也可以這樣寫:
await F1();
await F2();
static Job F1() => new Job(() =>
{
Thread.Sleep(1000);
Console.WriteLine("11");
});
static Job F2() => new Job(() =>
{
Thread.Sleep(1000);
Console.WriteLine("12");
});
總結
回顧開頭的兩個問題,現在可以嘗試給出答案了。
- Task是什麼,Task是一種有狀態的操作(Created,Scheduled,Running,Completed),是對耗時操作的抽象,就像現實中的一項任務一樣,它的執行需要相對較長的時間,它也有創建(Created),安排(Scheduled),執行(Running),完成(Completed)的基本過程。任務完成當然需要拿到結果的,這裡的Job比較簡單,沒有模擬具體的結果;
- Task是如何被調度的,預設採用基於線程池的調度,即創建好Task後,由線程池中的空閑線程執行,具體什麼時候執行、由哪個線程執行,開發者是不用關心的,在具體執行過程中,
但由於.net全局線程池的局限,對於一些特殊場景無法滿足時(比如需要立即執行Task),此時可以通過TaskCreationOptions更改調度行為;
另外,await是語法糖,它背後的實現是基於GetAwaiter,由其返回ICriticalNotifyCompletion介面的實現,並對ContinueWith做了封裝。