Task為.NET提供了基於任務的非同步模式,它不是線程,它運行線上程池的線程上。本著開源的精神, 本文以解讀基於.NET4.5 Task源碼的方式來揭秘Task的實現原理。 Task的創建 Task的創建方式主要有2種:Task.Run 和Task.Factory.StartNew,各自有不同的ov ...
Task為.NET提供了基於任務的非同步模式,它不是線程,它運行線上程池的線程上。本著開源的精神, 本文以解讀基於.NET4.5 Task源碼的方式來揭秘Task的實現原理。 Task的創建 Task的創建方式主要有2種:Task.Run 和Task.Factory.StartNew,各自有不同的overload,這裡只解讀其中的一種方式,其他有興趣的請自行解讀。 先來看看Task.Run源碼:
1 public static Task Run(Action action, CancellationToken cancellationToken) 2 { 3 StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller; 4 return Task.InternalStartNew((Task) null, (Delegate) action, (object) null, cancellationToken, TaskScheduler.Default, TaskCreationOptions.DenyChildAttach, InternalTaskOptions.None, ref stackMark); 5 }調用了Task.InternalStartNew,第一個參數為null,並傳入TaskScheduler.Default和TaskCreationOptions.DenyChildAttach. 再來看看Task.Factory.StartNew源碼:
1 public Task StartNew(Action<object> action, object state, CancellationToken cancellationToken) 2 { 3 StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller; 4 Task internalCurrent = Task.InternalCurrent; 5 return Task.InternalStartNew(internalCurrent, (Delegate) action, state, cancellationToken, this.GetDefaultScheduler(internalCurrent), this.m_defaultCreationOptions, InternalTaskOptions.None, ref stackMark); 6 }也是調用Task.InternalStartNew,第一個參數為internalCurrent,當前為null,並傳入GetDefaultScheduler(internalCurrent)和m_defaultCreationOptions。
1 private TaskScheduler GetDefaultScheduler(Task currTask) 2 { 3 if (this.m_defaultScheduler != null) 4 return this.m_defaultScheduler; 5 if (currTask != null && (currTask.CreationOptions & TaskCreationOptions.HideScheduler) == TaskCreationOptions.None) 6 return currTask.ExecutingTaskScheduler; 7 return TaskScheduler.Default; 8 }如果internalCurrent不為空而且options是TaskCreationOptions.HideScheduler,那麼啟用internalCurrent的TaskScheduler。可惜internalCurrent為null,所以啟用預設的TaskScheduler,跟入代碼發現預設的TaskScheduler是ThreadPoolTaskScheduler,看名字就知道用的是線程池的任務調度,跟“黑盒”傳說的一樣的。m_defaultCreationOptions在Task.Factory的預設無參構造函數里被賦值TaskCreationOptions.None。
1 public abstract class TaskScheduler 2 { 3 private static readonly ConditionalWeakTable<TaskScheduler, object> s_activeTaskSchedulers = new ConditionalWeakTable<TaskScheduler, object>(); 4 private static readonly TaskScheduler s_defaultTaskScheduler = (TaskScheduler) new ThreadPoolTaskScheduler(); 5 ... 6 }目前來看兩個方法最大的區別在於TaskCreationOption的不同,一個是DenyChildAttach,另一個是None。 接著往下看InternalStartNew:
1 internal static Task InternalStartNew(Task creatingTask, Delegate action, object state, CancellationToken cancellationToken, TaskScheduler scheduler, TaskCreationOptions options, InternalTaskOptions internalOptions, ref StackCrawlMark stackMark) 2 { 3 if (scheduler == null) 4 throw new ArgumentNullException("scheduler"); 5 Task task = new Task(action, state, creatingTask, cancellationToken, options, internalOptions | InternalTaskOptions.QueuedByRuntime, scheduler); 6 task.PossiblyCaptureContext(ref stackMark); 7 task.ScheduleAndStart(false); 8 return task; 9 }首先實例化一個Task:
1 internal Task(Delegate action, object state, Task parent, CancellationToken cancellationToken, TaskCreationOptions creationOptions, InternalTaskOptions internalOptions, TaskScheduler scheduler) 2 { 3 if (action == null) 4 throw new ArgumentNullException("action"); 5 if ((creationOptions & TaskCreationOptions.AttachedToParent) != TaskCreationOptions.None || (internalOptions & InternalTaskOptions.SelfReplicating) != InternalTaskOptions.None) 6 this.m_parent = parent; 7 this.TaskConstructorCore((object) action, state, cancellationToken, creationOptions, internalOptions, scheduler); 8 }如果option是AttachToParent,那麼internalCurrent就賦值給m_parent,目前為null,SelfReplicating是用來做並行計算的,會在TPL里詳解。隨後調用TaskConstructorCore。
1 internal void TaskConstructorCore(object action, object state, CancellationToken cancellationToken, TaskCreationOptions creationOptions, InternalTaskOptions internalOptions, TaskScheduler scheduler) 2 { 3 this.m_action = action; 4 this.m_stateObject = state; 5 this.m_taskScheduler = scheduler; 6 if ((creationOptions & ~(TaskCreationOptions.PreferFairness | TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent | TaskCreationOptions.DenyChildAttach | TaskCreationOptions.HideScheduler | TaskCreationOptions.RunContinuationsAsynchronously)) != TaskCreationOptions.None) 7 throw new ArgumentOutOfRangeException("creationOptions"); 8 if ((creationOptions & TaskCreationOptions.LongRunning) != TaskCreationOptions.None && (internalOptions & InternalTaskOptions.SelfReplicating) != InternalTaskOptions.None) 9 throw new InvalidOperationException(Environment.GetResourceString("Task_ctor_LRandSR")); 10 int num = (int) (creationOptions | (TaskCreationOptions) internalOptions); 11 if (this.m_action == null || (internalOptions & InternalTaskOptions.ContinuationTask) != InternalTaskOptions.None) 12 num |= 33554432; 13 this.m_stateFlags = num; 14 if (this.m_parent != null && (creationOptions & TaskCreationOptions.AttachedToParent) != TaskCreationOptions.None && (this.m_parent.CreationOptions & TaskCreationOptions.DenyChildAttach) == TaskCreationOptions.None) 15 this.m_parent.AddNewChild(); 16 if (!cancellationToken.CanBeCanceled) 17 return; 18 this.AssignCancellationToken(cancellationToken, (Task) null, (TaskContinuation) null); 19 }如果options不為DenyChildAttach而且m_parent不為空,則把當前task作為child添加到m_parent。也就是說Task.Run不允許把要執行的task作為當前task的child。 Task已創建,接著調用PossiblyCaptureContext來獲取execution context。
1 internal static ExecutionContext Capture(ref StackCrawlMark stackMark, ExecutionContext.CaptureOptions options) 2 { 3 ExecutionContext.Reader executionContextReader = Thread.CurrentThread.GetExecutionContextReader(); 4 if (executionContextReader.IsFlowSuppressed) 5 return (ExecutionContext) null; 6 SecurityContext securityContext = SecurityContext.Capture(executionContextReader, ref stackMark); 7 HostExecutionContext executionContext1 = HostExecutionContextManager.CaptureHostExecutionContext(); 8 SynchronizationContext synchronizationContext = (SynchronizationContext) null; 9 LogicalCallContext logicalCallContext = (LogicalCallContext) null; 10 if (!executionContextReader.IsNull) 11 { 12 if ((options & ExecutionContext.CaptureOptions.IgnoreSyncCtx) == ExecutionContext.CaptureOptions.None) 13 synchronizationContext = executionContextReader.SynchronizationContext == null ? (SynchronizationContext) null : executionContextReader.SynchronizationContext.CreateCopy(); 14 if (executionContextReader.LogicalCallContext.HasInfo) 15 logicalCallContext = executionContextReader.LogicalCallContext.Clone(); 16 } 17 Dictionary<IAsyncLocal, object> dictionary = (Dictionary<IAsyncLocal, object>) null; 18 List<IAsyncLocal> asyncLocalList = (List<IAsyncLocal>) null; 19 if (!executionContextReader.IsNull) 20 { 21 dictionary = executionContextReader.DangerousGetRawExecutionContext()._localValues; 22 asyncLocalList = executionContextReader.DangerousGetRawExecutionContext()._localChangeNotifications; 23 } 24 if ((options & ExecutionContext.CaptureOptions.OptimizeDefaultCase) != ExecutionContext.CaptureOptions.None && securityContext == null && (executionContext1 == null && synchronizationContext == null) && ((logicalCallContext == null || !logicalCallContext.HasInfo) && (dictionary == null && asyncLocalList == null))) 25 return ExecutionContext.s_dummyDefaultEC; 26 ExecutionContext executionContext2 = new ExecutionContext(); 27 executionContext2.SecurityContext = securityContext; 28 if (executionContext2.SecurityContext != null) 29 executionContext2.SecurityContext.ExecutionContext = executionContext2; 30 executionContext2._hostExecutionContext = executionContext1; 31 executionContext2._syncContext = synchronizationContext; 32 executionContext2.LogicalCallContext = logicalCallContext; 33 executionContext2._localValues = dictionary; 34 executionContext2._localChangeNotifications = asyncLocalList; 35 executionContext2.isNewCapture = true; 36 return executionContext2; 37 }ExecutionContext包含了SecurityContext,SynchronizationContext以及LogicalCallContext,其中SynchronizationContext需要做CreateCopy,LogicalCallContext需要做clone,所有這一切都是用戶態的,不涉及內核,性能棒棒噠! 接著調用ScheduleAndStart:
1 internal void ScheduleAndStart(bool needsProtection) 2 { 3 if (needsProtection) 4 { 5 if (!this.MarkStarted()) 6 return; 7 } 8 else 9 this.m_stateFlags = this.m_stateFlags | 65536; 10 if (Task.s_asyncDebuggingEnabled) 11 Task.AddToActiveTasks(this); 12 if (AsyncCausalityTracer.LoggingOn && (this.Options & (TaskCreationOptions) 512) == TaskCreationOptions.None) 13 AsyncCausalityTracer.TraceOperationCreation(CausalityTraceLevel.Required, this.Id, "Task: " + ((Delegate) this.m_action).Method.Name, 0UL); 14 try 15 { 16 this.m_taskScheduler.InternalQueueTask(this); 17 } 18 catch (ThreadAbortException ex) 19 { 20 this.AddException((object) ex); 21 this.FinishThreadAbortedTask(true, false); 22 } 23 catch (System.Exception ex) 24 { 25 TaskSchedulerException schedulerException = new TaskSchedulerException(ex); 26 this.AddException((object) schedulerException); 27 this.Finish(false); 28 if ((this.Options & (TaskCreationOptions) 512) == TaskCreationOptions.None) 29 this.m_contingentProperties.m_exceptionsHolder.MarkAsHandled(false); 30 throw schedulerException; 31 } 32 } 33 34 internal void InternalQueueTask(Task task) 35 { 36 task.FireTaskScheduledIfNeeded(this); 37 this.QueueTask(task); 38 }FireTaskScheduledIfNeeded判斷是否開啟EWT Trace,接著調用ThreadPoolTaskScheduler.QueueTask。
1 private static readonly ParameterizedThreadStart s_longRunningThreadWork = new ParameterizedThreadStart(ThreadPoolTaskScheduler.LongRunningThreadWork); 2 private static void LongRunningThreadWork(object obj) 3 { 4 (obj as Task).ExecuteEntry(false); 5 } 6 protected internal override void QueueTask(Task task) 7 { 8 if ((task.Options & TaskCreationOptions.LongRunning) != TaskCreationOptions.None) 9 { 10 new Thread(ThreadPoolTaskScheduler.s_longRunningThreadWork) 11 { 12 IsBackground = true 13 }.Start((object) task); 14 } 15 else 16 { 17 bool forceGlobal = (uint) (task.Options & TaskCreationOptions.PreferFairness) > 0U; 18 ThreadPool.UnsafeQueueCustomWorkItem((IThreadPoolWorkItem) task, forceGlobal); 19 } 20 }如果options是LongRunning,那麼單獨創建一個線程執行該任務(ExecuteEntry),否則就調用ThreadPool.UnsafeQueueCustomWorkItem,這個方法我們熟,還記得在.net線程池內幕里有講到的global work queue和local work queue嗎?給ThreadPool添加一個任務實際上是在global work queue添加一個任務,而task就是往local work queue里添加任務。 ThreadPoolWorkQueue源碼:
1 public void Enqueue(IThreadPoolWorkItem callback, bool forceGlobal) 2 { 3 ThreadPoolWorkQueueThreadLocals queueThreadLocals = (ThreadPoolWorkQueueThreadLocals) null; 4 if (!forceGlobal) 5 queueThreadLocals = ThreadPoolWorkQueueThreadLocals.threadLocals; 6 if (this.loggingEnabled) 7 FrameworkEventSource.Log.ThreadPoolEnqueueWorkObject((object) callback); 8 if (queueThreadLocals != null) 9 { 10 queueThreadLocals.workStealingQueue.LocalPush(callback); 11 } 12 else 13 { 14 ThreadPoolWorkQueue.QueueSegment comparand = this.queueHead; 15 while (!comparand.TryEnqueue(callback)) 16 { 17 Interlocked.CompareExchange<ThreadPoolWorkQueue.QueueSegment>(ref comparand.Next, new ThreadPoolWorkQueue.QueueSegment(), (ThreadPoolWorkQueue.QueueSegment) null); 18 for (; comparand.Next != null; comparand = this.queueHead) 19 Interlocked.CompareExchange<ThreadPoolWorkQueue.QueueSegment>(ref this.queueHead, comparand.Next, comparand); 20 } 21 } 22 this.EnsureThreadRequested(); 23 }由於線程已經執行過任務(global的也有可能是local的),所以代碼會走到queueThreadLocals.workStealingQueue.LocalPush(callback)。
1 internal volatile IThreadPoolWorkItem[] m_array = new IThreadPoolWorkItem[32]; 2 private SpinLock m_foreignLock = new SpinLock(false); 3 public void LocalPush(IThreadPoolWorkItem obj) 4 { 5 int num1 = this.m_tailIndex; 6 if (num1 == int.MaxValue) 7 { 8 bool lockTaken = false; 9 try 10 { 11 this.m_foreignLock.Enter(ref lockTaken); 12 if (this.m_tailIndex == int.MaxValue) 13 { 14 this.m_headIndex = this.m_headIndex & this.m_mask; 15 this.m_tailIndex = num1 = this.m_tailIndex & this.m_mask; 16 } 17 } 18 finally 19 { 20 if (lockTaken) 21 this.m_foreignLock.Exit(true); 22 } 23 } 24 if (num1 < this.m_headIndex + this.m_mask) 25 { 26 Volatile.Write<IThreadPoolWorkItem>(ref this.m_array[num1 & this.m_mask], obj); 27 this.m_tailIndex = num1 + 1; 28 } 29 else 30 { 31 bool lockTaken = false; 32 try 33 { 34 this.m_foreignLock.Enter(ref lockTaken); 35 int num2 = this.m_headIndex; 36 int num3 = this.m_tailIndex - this.m_headIndex; 37 if (num3 >= this.m_mask) 38 { 39 IThreadPoolWorkItem[] threadPoolWorkItemArray = new IThreadPoolWorkItem[this.m_array.Length << 1]; 40 for (int index = 0; index < this.m_array.Length; ++index) 41 threadPoolWorkItemArray[index] = this.m_array[index + num2 & this.m_mask]; 42 this.m_array = threadPoolWorkItemArray; 43 this.m_headIndex = 0; 44 this.m_tailIndex = num1 = num3; 45 this.m_mask = this.m_mask << 1 | 1; 46 } 47 Volatile.Write<IThreadPoolWorkItem>(ref this.m_array[num1 & this.m_mask], obj); 48 this.m_tailIndex = num1 + 1; 49 } 50 finally 51 { 52 if (lockTaken) 53 this.m_foreignLock.Exit(false); 54 } 55 } 56 }Local work queue(m_array)首先被限死為32,如果queue超過最大數了,則擴大為原來的2倍,以此類推。這裡也使用了自旋鎖和記憶體寫屏障來代替同步鎖提高性能。 至此,task已被創建好,並加入到了ThreadPool的local work queue。那麼task是如何被調度的呢?為什麼LongRunning就要單獨起一個線程去做?請聽下回分解!