.NET Task揭秘(一)

来源:http://www.cnblogs.com/newbier/archive/2016/12/20/6203422.html
-Advertisement-
Play Games

Task為.NET提供了基於任務的非同步模式,它不是線程,它運行線上程池的線程上。本著開源的精神, 本文以解讀基於.NET4.5 Task源碼的方式來揭秘Task的實現原理。 Task的創建 Task的創建方式主要有2種:Task.Run 和Task.Factory.StartNew,各自有不同的ov ...


Task為.NET提供了基於任務的非同步模式,它不是線程,它運行線上程池的線程上。本著開源的精神, 本文以解讀基於.NET4.5 Task源碼的方式來揭秘Task的實現原理。   Task的創建 Task的創建方式主要有2種:Task.RunTask.Factory.StartNew,各自有不同的overload,這裡只解讀其中的一種方式,其他有興趣的請自行解讀。 先來看看Task.Run源碼:
1 public static Task Run(Action action, CancellationToken cancellationToken)
2 {
3 StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
4 return Task.InternalStartNew((Task) null, (Delegate) action, (object) null, cancellationToken, TaskScheduler.Default, TaskCreationOptions.DenyChildAttach, InternalTaskOptions.None, ref stackMark);
5 }
調用了Task.InternalStartNew,第一個參數為null,並傳入TaskScheduler.DefaultTaskCreationOptions.DenyChildAttach. 再來看看Task.Factory.StartNew源碼:
1 public Task StartNew(Action<object> action, object state, CancellationToken cancellationToken)
2 {
3 StackCrawlMark stackMark = StackCrawlMark.LookForMyCaller;
4 Task internalCurrent = Task.InternalCurrent;
5 return Task.InternalStartNew(internalCurrent, (Delegate) action, state, cancellationToken, this.GetDefaultScheduler(internalCurrent), this.m_defaultCreationOptions, InternalTaskOptions.None, ref stackMark);
6 }
也是調用Task.InternalStartNew,第一個參數為internalCurrent,當前為null,並傳入GetDefaultScheduler(internalCurrent)m_defaultCreationOptions
1 private TaskScheduler GetDefaultScheduler(Task currTask)
2 {
3 if (this.m_defaultScheduler != null)
4 return this.m_defaultScheduler;
5 if (currTask != null && (currTask.CreationOptions & TaskCreationOptions.HideScheduler) == TaskCreationOptions.None)
6 return currTask.ExecutingTaskScheduler;
7 return TaskScheduler.Default;
8 }
如果internalCurrent不為空而且options是TaskCreationOptions.HideScheduler,那麼啟用internalCurrent的TaskScheduler。可惜internalCurrent為null,所以啟用預設的TaskScheduler,跟入代碼發現預設的TaskScheduler是ThreadPoolTaskScheduler,看名字就知道用的是線程池的任務調度,跟“黑盒”傳說的一樣的。m_defaultCreationOptions在Task.Factory的預設無參構造函數里被賦值TaskCreationOptions.None
1 public abstract class TaskScheduler
2 {
3 private static readonly ConditionalWeakTable<TaskScheduler, object> s_activeTaskSchedulers = new ConditionalWeakTable<TaskScheduler, object>();
4 private static readonly TaskScheduler s_defaultTaskScheduler = (TaskScheduler) new ThreadPoolTaskScheduler();
5 ...
6 }
目前來看兩個方法最大的區別在於TaskCreationOption的不同,一個是DenyChildAttach,另一個是None。 接著往下看InternalStartNew
1 internal static Task InternalStartNew(Task creatingTask, Delegate action, object state, CancellationToken cancellationToken, TaskScheduler scheduler, TaskCreationOptions options, InternalTaskOptions internalOptions, ref StackCrawlMark stackMark)
2 {
3 if (scheduler == null)
4 throw new ArgumentNullException("scheduler");
5 Task task = new Task(action, state, creatingTask, cancellationToken, options, internalOptions | InternalTaskOptions.QueuedByRuntime, scheduler);
6 task.PossiblyCaptureContext(ref stackMark);
7 task.ScheduleAndStart(false);
8 return task;
9 }
首先實例化一個Task:
1 internal Task(Delegate action, object state, Task parent, CancellationToken cancellationToken, TaskCreationOptions creationOptions, InternalTaskOptions internalOptions, TaskScheduler scheduler)
2 {
3 if (action == null)
4 throw new ArgumentNullException("action");
5 if ((creationOptions & TaskCreationOptions.AttachedToParent) != TaskCreationOptions.None || (internalOptions & InternalTaskOptions.SelfReplicating) != InternalTaskOptions.None)
6 this.m_parent = parent;
7 this.TaskConstructorCore((object) action, state, cancellationToken, creationOptions, internalOptions, scheduler);
8 }
如果option是AttachToParent,那麼internalCurrent就賦值給m_parent,目前為null,SelfReplicating是用來做並行計算的,會在TPL里詳解。隨後調用TaskConstructorCore
 1 internal void TaskConstructorCore(object action, object state, CancellationToken cancellationToken, TaskCreationOptions creationOptions, InternalTaskOptions internalOptions, TaskScheduler scheduler)
 2 {
 3 this.m_action = action;
 4 this.m_stateObject = state;
 5 this.m_taskScheduler = scheduler;
 6 if ((creationOptions & ~(TaskCreationOptions.PreferFairness | TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent | TaskCreationOptions.DenyChildAttach | TaskCreationOptions.HideScheduler | TaskCreationOptions.RunContinuationsAsynchronously)) != TaskCreationOptions.None)
 7 throw new ArgumentOutOfRangeException("creationOptions");
 8 if ((creationOptions & TaskCreationOptions.LongRunning) != TaskCreationOptions.None && (internalOptions & InternalTaskOptions.SelfReplicating) != InternalTaskOptions.None)
 9 throw new InvalidOperationException(Environment.GetResourceString("Task_ctor_LRandSR"));
10 int num = (int) (creationOptions | (TaskCreationOptions) internalOptions);
11 if (this.m_action == null || (internalOptions & InternalTaskOptions.ContinuationTask) != InternalTaskOptions.None)
12 num |= 33554432;
13 this.m_stateFlags = num;
14 if (this.m_parent != null && (creationOptions & TaskCreationOptions.AttachedToParent) != TaskCreationOptions.None && (this.m_parent.CreationOptions & TaskCreationOptions.DenyChildAttach) == TaskCreationOptions.None)
15 this.m_parent.AddNewChild();
16 if (!cancellationToken.CanBeCanceled)
17 return;
18 this.AssignCancellationToken(cancellationToken, (Task) null, (TaskContinuation) null);
19 }
如果options不為DenyChildAttach而且m_parent不為空,則把當前task作為child添加到m_parent。也就是說Task.Run不允許把要執行的task作為當前task的child。 Task已創建,接著調用PossiblyCaptureContext來獲取execution context。
 1 internal static ExecutionContext Capture(ref StackCrawlMark stackMark, ExecutionContext.CaptureOptions options)
 2 {
 3 ExecutionContext.Reader executionContextReader = Thread.CurrentThread.GetExecutionContextReader();
 4 if (executionContextReader.IsFlowSuppressed)
 5 return (ExecutionContext) null;
 6 SecurityContext securityContext = SecurityContext.Capture(executionContextReader, ref stackMark);
 7 HostExecutionContext executionContext1 = HostExecutionContextManager.CaptureHostExecutionContext();
 8 SynchronizationContext synchronizationContext = (SynchronizationContext) null;
 9 LogicalCallContext logicalCallContext = (LogicalCallContext) null;
10 if (!executionContextReader.IsNull)
11 {
12 if ((options & ExecutionContext.CaptureOptions.IgnoreSyncCtx) == ExecutionContext.CaptureOptions.None)
13 synchronizationContext = executionContextReader.SynchronizationContext == null ? (SynchronizationContext) null : executionContextReader.SynchronizationContext.CreateCopy();
14 if (executionContextReader.LogicalCallContext.HasInfo)
15 logicalCallContext = executionContextReader.LogicalCallContext.Clone();
16 }
17 Dictionary<IAsyncLocal, object> dictionary = (Dictionary<IAsyncLocal, object>) null;
18 List<IAsyncLocal> asyncLocalList = (List<IAsyncLocal>) null;
19 if (!executionContextReader.IsNull)
20 {
21 dictionary = executionContextReader.DangerousGetRawExecutionContext()._localValues;
22 asyncLocalList = executionContextReader.DangerousGetRawExecutionContext()._localChangeNotifications;
23 }
24 if ((options & ExecutionContext.CaptureOptions.OptimizeDefaultCase) != ExecutionContext.CaptureOptions.None && securityContext == null && (executionContext1 == null && synchronizationContext == null) && ((logicalCallContext == null || !logicalCallContext.HasInfo) && (dictionary == null && asyncLocalList == null)))
25 return ExecutionContext.s_dummyDefaultEC;
26 ExecutionContext executionContext2 = new ExecutionContext();
27 executionContext2.SecurityContext = securityContext;
28 if (executionContext2.SecurityContext != null)
29 executionContext2.SecurityContext.ExecutionContext = executionContext2;
30 executionContext2._hostExecutionContext = executionContext1;
31 executionContext2._syncContext = synchronizationContext;
32 executionContext2.LogicalCallContext = logicalCallContext;
33 executionContext2._localValues = dictionary;
34 executionContext2._localChangeNotifications = asyncLocalList;
35 executionContext2.isNewCapture = true;
36 return executionContext2;
37 }
ExecutionContext包含了SecurityContext,SynchronizationContext以及LogicalCallContext,其中SynchronizationContext需要做CreateCopy,LogicalCallContext需要做clone,所有這一切都是用戶態的,不涉及內核,性能棒棒噠! 接著調用ScheduleAndStart:
 1 internal void ScheduleAndStart(bool needsProtection)
 2 {
 3 if (needsProtection)
 4 {
 5 if (!this.MarkStarted())
 6 return;
 7 }
 8 else
 9 this.m_stateFlags = this.m_stateFlags | 65536;
10 if (Task.s_asyncDebuggingEnabled)
11 Task.AddToActiveTasks(this);
12 if (AsyncCausalityTracer.LoggingOn && (this.Options & (TaskCreationOptions) 512) == TaskCreationOptions.None)
13 AsyncCausalityTracer.TraceOperationCreation(CausalityTraceLevel.Required, this.Id, "Task: " + ((Delegate) this.m_action).Method.Name, 0UL);
14 try
15 {
16 this.m_taskScheduler.InternalQueueTask(this);
17 }
18 catch (ThreadAbortException ex)
19 {
20 this.AddException((object) ex);
21 this.FinishThreadAbortedTask(true, false);
22 }
23 catch (System.Exception ex)
24 {
25 TaskSchedulerException schedulerException = new TaskSchedulerException(ex);
26 this.AddException((object) schedulerException);
27 this.Finish(false);
28 if ((this.Options & (TaskCreationOptions) 512) == TaskCreationOptions.None)
29 this.m_contingentProperties.m_exceptionsHolder.MarkAsHandled(false);
30 throw schedulerException;
31 }
32 }
33  
34 internal void InternalQueueTask(Task task)
35 {
36 task.FireTaskScheduledIfNeeded(this);
37 this.QueueTask(task);
38 }
FireTaskScheduledIfNeeded判斷是否開啟EWT Trace,接著調用ThreadPoolTaskScheduler.QueueTask
 1 private static readonly ParameterizedThreadStart s_longRunningThreadWork = new ParameterizedThreadStart(ThreadPoolTaskScheduler.LongRunningThreadWork);
 2 private static void LongRunningThreadWork(object obj)
 3 {
 4 (obj as Task).ExecuteEntry(false);
 5 }
 6 protected internal override void QueueTask(Task task)
 7 {
 8 if ((task.Options & TaskCreationOptions.LongRunning) != TaskCreationOptions.None)
 9 {
10 new Thread(ThreadPoolTaskScheduler.s_longRunningThreadWork)
11 {
12 IsBackground = true
13 }.Start((object) task);
14 }
15 else
16 {
17 bool forceGlobal = (uint) (task.Options & TaskCreationOptions.PreferFairness) > 0U;
18 ThreadPool.UnsafeQueueCustomWorkItem((IThreadPoolWorkItem) task, forceGlobal);
19 }
20 }
如果options是LongRunning,那麼單獨創建一個線程執行該任務(ExecuteEntry),否則就調用ThreadPool.UnsafeQueueCustomWorkItem,這個方法我們熟,還記得在.net線程池內幕里有講到的global work queue和local work queue嗎?給ThreadPool添加一個任務實際上是在global work queue添加一個任務,而task就是往local work queue里添加任務。 ThreadPoolWorkQueue源碼:
 1 public void Enqueue(IThreadPoolWorkItem callback, bool forceGlobal)
 2 {
 3 ThreadPoolWorkQueueThreadLocals queueThreadLocals = (ThreadPoolWorkQueueThreadLocals) null;
 4 if (!forceGlobal)
 5 queueThreadLocals = ThreadPoolWorkQueueThreadLocals.threadLocals;
 6 if (this.loggingEnabled)
 7 FrameworkEventSource.Log.ThreadPoolEnqueueWorkObject((object) callback);
 8 if (queueThreadLocals != null)
 9 {
10 queueThreadLocals.workStealingQueue.LocalPush(callback);
11 }
12 else
13 {
14 ThreadPoolWorkQueue.QueueSegment comparand = this.queueHead;
15 while (!comparand.TryEnqueue(callback))
16 {
17 Interlocked.CompareExchange<ThreadPoolWorkQueue.QueueSegment>(ref comparand.Next, new ThreadPoolWorkQueue.QueueSegment(), (ThreadPoolWorkQueue.QueueSegment) null);
18 for (; comparand.Next != null; comparand = this.queueHead)
19 Interlocked.CompareExchange<ThreadPoolWorkQueue.QueueSegment>(ref this.queueHead, comparand.Next, comparand);
20 }
21 }
22 this.EnsureThreadRequested();
23 }
由於線程已經執行過任務(global的也有可能是local的),所以代碼會走到queueThreadLocals.workStealingQueue.LocalPush(callback)
 1 internal volatile IThreadPoolWorkItem[] m_array = new IThreadPoolWorkItem[32];
 2 private SpinLock m_foreignLock = new SpinLock(false);
 3 public void LocalPush(IThreadPoolWorkItem obj)
 4 {
 5 int num1 = this.m_tailIndex;
 6 if (num1 == int.MaxValue)
 7 {
 8 bool lockTaken = false;
 9 try
10 {
11 this.m_foreignLock.Enter(ref lockTaken);
12 if (this.m_tailIndex == int.MaxValue)
13 {
14 this.m_headIndex = this.m_headIndex & this.m_mask;
15 this.m_tailIndex = num1 = this.m_tailIndex & this.m_mask;
16 }
17 }
18 finally
19 {
20 if (lockTaken)
21 this.m_foreignLock.Exit(true);
22 }
23 }
24 if (num1 < this.m_headIndex + this.m_mask)
25 {
26 Volatile.Write<IThreadPoolWorkItem>(ref this.m_array[num1 & this.m_mask], obj);
27 this.m_tailIndex = num1 + 1;
28 }
29 else
30 {
31 bool lockTaken = false;
32 try
33 {
34 this.m_foreignLock.Enter(ref lockTaken);
35 int num2 = this.m_headIndex;
36 int num3 = this.m_tailIndex - this.m_headIndex;
37 if (num3 >= this.m_mask)
38 {
39 IThreadPoolWorkItem[] threadPoolWorkItemArray = new IThreadPoolWorkItem[this.m_array.Length << 1];
40 for (int index = 0; index < this.m_array.Length; ++index)
41 threadPoolWorkItemArray[index] = this.m_array[index + num2 & this.m_mask];
42 this.m_array = threadPoolWorkItemArray;
43 this.m_headIndex = 0;
44 this.m_tailIndex = num1 = num3;
45 this.m_mask = this.m_mask << 1 | 1;
46 }
47 Volatile.Write<IThreadPoolWorkItem>(ref this.m_array[num1 & this.m_mask], obj);
48 this.m_tailIndex = num1 + 1;
49 }
50 finally
51 {
52 if (lockTaken)
53 this.m_foreignLock.Exit(false);
54 }
55 }
56 }
Local work queue(m_array)首先被限死為32,如果queue超過最大數了,則擴大為原來的2倍,以此類推。這裡也使用了自旋鎖和記憶體寫屏障來代替同步鎖提高性能。   至此,task已被創建好,並加入到了ThreadPool的local work queue。那麼task是如何被調度的呢?為什麼LongRunning就要單獨起一個線程去做?請聽下回分解!
您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Linux內核大量使用面向對象的設計思想,通過追蹤源碼,我們甚至可以使用面向對象語言常用的UML類圖來分析Linux設備管理的"類"之間的關係。這裡以4.8.5內核為例從kobject,kset,kobj_type的分析入手,進而一探內核對於設備的管理方式 container_of巨集 這個巨集幾乎是l ...
  • 首先設定loop數量(ubuntu系統) 更改以下內容: 然後更新grub 重啟電腦 之後就可以看到loop設備數量變為64個 設定loop數量有此系統不一樣,archlinux可參考我的另一隨筆http://www.cnblogs.com/LTSperl/p/6195493.html。 然後就是今 ...
  • OAuth(開放授權)是一個開放標準,允許用戶讓第三方應用訪問該用戶在某一網站上存儲的私密的資源(如照片,視頻,聯繫人列表),而無需將用戶名和密碼提供給第三方應用。 OAuth 允許用戶提供一個令牌,而不是用戶名和密碼來訪問他們存放在特定服務提供者的數據.每一個令牌授權一個特定的網站(例如,視頻編輯 ...
  • ASP.NET Core應用中的路由機制實現在RouterMiddleware中間件中,它的目的在於通過路由解析為請求找到一個匹配的處理器,同時將請求攜帶的數據以路由參數的形式解析出來供後續請求處理流程使用。但是具體的路由解析功能其實並沒有直接實現在RouterMiddleware中間件中,而是由一... ...
  • 一,項目背景 現在基本上大大小小的項目都需要和資料庫打交道,自然而然資料庫操作會有很多地方,而使用傳統ADO.NET整個流程有點麻煩,出參都需要手動轉換為對象。基於以上,我們需要一個SQL執行工具,能簡化上訴步驟,而不失對SQL控制權。 二,核心技術 最底層是ADO.NET,基於ADO.NET開發的 ...
  • 格式化數據這東西,主要看需要的運用場景,今天和大家分享的是webapi格式化數據,這裡面的例子主要是輸出json和xml的格式數據,測試用例很接近實際常用情況;希望大家喜歡,也希望各位多多掃碼支持和點贊謝謝: . 自定義一個Action,響應輸出集合數據 . api返回json數據的兩種方式 . j ...
  • NPOI的C# Helper代碼2 1 public static MemoryStream ExportXls(DataTable dt) 2 { 3 HSSFWorkbook wk = new HSSFWorkbook(); 4 ISheet sheet = null; 5 6 string s ...
  • NPOI的C# Helper代碼 1 public static void WriteExcel(DataTable dt, string filePath) 2 { 3 if (!string.IsNullOrEmpty(filePath) && dt.Rows.Count > 0) 4 { 5 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...