概述 非同步這個概念在不同語境下有不同的解釋,比如在一個單核CPU里開啟兩個線程執行兩個函數,通常認為這種調用是非同步的,但對於CPU來說它是單核不可能同時運行兩個函數,不過是由系統調度在不同的時間分片中執行。一般來說,如果兩個工作能同時進行,就認為是非同步的。在編程中,它通常代表函數的調用可以在不執行完 ...
概述
非同步這個概念在不同語境下有不同的解釋,比如在一個單核CPU里開啟兩個線程執行兩個函數,通常認為這種調用是非同步的,但對於CPU來說它是單核不可能同時運行兩個函數,不過是由系統調度在不同的時間分片中執行。一般來說,如果兩個工作能同時進行,就認為是非同步的。在編程中,它通常代表函數的調用可以在不執行完的情況下返回,必要時在完成時回調。
有一個概念常常被混淆,多線程和非同步。很多人認為非同步就是多線程的,但是多線程只是實現非同步的其中一種方式,除此之外還有系統中斷,定時器,甚至可以自己寫一個狀態機實現非同步(C# 的非同步實現類似狀態機)。
不同的編程語言有不同非同步編程方法,在C#語言中,常常使用async/await等關鍵字,和Task等類來實現非同步編程。
C#非同步編程用法
class Program
{
static void Main(string[] args)
{
var task = IntTask();
Console.WriteLine("等待中...");
Console.WriteLine($"算完了? 讓我康康! result = {task.Result}");
}
static async Task<int> IntTask()
{
Console.WriteLine("等3秒吧");
await Task.Delay(3000);
return 1;
}
}
Main函數非同步調用IntTask,列印"等三秒吧",隨後返回到Main函數列印“等待中”,在task.Result取值時阻塞,三秒後IntTask返回(此時Task.Result被賦值)列印“result = 1”。看一下用法:
- async: 非同步函數使用async關鍵字修飾
- await: 等待非同步函數返回
- Task
:非同步函數有返回值,且返回值為int類型
上述只是一個極簡的用法,忽略了大量的細節,可以建立一個初步的印象。
async/await和Task簡介
async
用async修飾一個方法,表明這個方法可以非同步執行,其返回值必須是void/Task/Task<T>(T是返回值類型)其中一個,方法內的語句至少包含一個await關鍵字,否則會被同步的方式執行。
await
await只能修飾(返回值是)Task類型變數,此時會返回Task.Result或void而不是Task本身,在上述示例中,Main沒有被async修飾,不能使用await,其返回值就是Task<int>, 而IntTask調用Task.Delay就是直接返回void。await也只能在被async修飾的函數的語句中使用。
Task
源於基於任務的非同步模式(Task-based Asynchronous Pattern,TAP),被作為非同步函數的返回值。非同步函數的返回值有三種:
- void:"fire and forget"(觸發並忘記)不需要知道狀態(是否完成),比如拋出異常、列印日誌時可以使用
- Task:需要知道是否完成(或失敗)等狀態,但是不需要返回值
- Task<T>:在Task的基礎上還想要返回值
其他
- 非同步函數不能使用ref/out修飾參數
實現原理剖析
如果使用反彙編等手段,可以看到上述示例代碼的編譯:
在返回1之前,好像有什麼“奇怪的東西”被調用,編譯器又背著開發者偷偷幹了什麼呢?
實現原理示例
在微軟的開發博客里有一個叫謝爾蓋·傑普利亞科夫(Sergey Tepliakov)的毛子曾提到這部分,來看一下他的示例:
源碼
class StockPrices
{
private Dictionary<string, decimal> _stockPrices;
public async Task<decimal> GetStockPriceForAsync(string companyId)
{
await InitializeMapIfNeededAsync();
_stockPrices.TryGetValue(companyId, out var result);
return result;
}
private async Task InitializeMapIfNeededAsync()
{
if (_stockPrices != null)
return;
await Task.Delay(42);
// Getting the stock prices from the external source and cache in memory.
_stockPrices = new Dictionary<string, decimal> { { "MSFT", 42 } };
}
}
這是他的源代碼,這個類叫做StockPrices(股票價格),其核心業務是根據公司ID查詢股票價格GetStockPriceForAsync,這是一個非同步調用,首先它先非同步調用InitializeMapIfNeededAsync對資料庫進行初始化,初始化完成嘗試從資料庫中獲取該公司的股票價格返回。
上述提到編譯器偷偷自己生成了代碼,如果手動實現大概是怎樣的呢?來看謝爾蓋給出的解:
手動實現
class GetStockPriceForAsync_StateMachine
{
enum State { Start, Step1, }
private readonly StockPrices @this;
private readonly string _companyId;
private readonly TaskCompletionSource<decimal> _tcs;
private Task _initializeMapIfNeededTask;
private State _state = State.Start;
public GetStockPriceForAsync_StateMachine(StockPrices @this, string companyId)
{
this.@this = @this;
_companyId = companyId;
}
public void Start()
{
try
{
if (_state == State.Start)
{
// The code from the start of the method to the first 'await'.
if (string.IsNullOrEmpty(_companyId))
throw new ArgumentNullException();
_initializeMapIfNeededTask = @this.InitializeMapIfNeeded();
// Update state and schedule continuation
_state = State.Step1;
_initializeMapIfNeededTask.ContinueWith(_ => Start());
}
else if (_state == State.Step1)
{
// Need to check the error and the cancel case first
if (_initializeMapIfNeededTask.Status == TaskStatus.Canceled)
_tcs.SetCanceled();
else if (_initializeMapIfNeededTask.Status == TaskStatus.Faulted)
_tcs.SetException(_initializeMapIfNeededTask.Exception.InnerException);
else
{
// The code between first await and the rest of the method
@this._store.TryGetValue(_companyId, out var result);
_tcs.SetResult(result);
}
}
}
catch (Exception e)
{
_tcs.SetException(e);
}
}
public Task<decimal> Task => _tcs.Task;
}
public Task<decimal> GetStockPriceForAsync(string companyId)
{
var stateMachine = new GetStockPriceForAsync_StateMachine(this, companyId);
stateMachine.Start();
return stateMachine.Task;
}
從類名GetStockPriceForAsync_StateMachine可以看到,他為這個非同步調用生成了一個狀態機來實現非同步,先來看下成員變數:
- StockPrices: 原來那個“股票價格”類的引用
- _companyId: 調用方法時的參數公司ID
- _tcs:TaskCompletionSource
創建並完成該任務的來源。 - _initializeMapIfNeededTask:調用初始化數據的非同步任務
- _state:狀態枚舉
- Task:直接就是_tcs.Task,即該任務創建並完成的來源
現在看來這段代碼的邏輯就比較清楚了,在調用非同步查詢股票的介面時,創建了一個狀態機並調用狀態機的Start函數,第一次進入start函數時狀態機的狀態是Start狀態,它給_initializeMapIfNeededTask賦值,把狀態機狀態流轉到Step1,並讓_initializeMapIfNeededTask執行結束末尾再次調用Start函數(ContinueWith)。
_initializeMapIfNeededTask任務在等待了42毫秒後(Task.Delay(42)),末尾時再次調用了Start函數,此時狀態為Step1。首先檢查了Task狀態,符合要求調用_tcs.SetResult(其實是給Task的Result賦值),此時非同步任務完成。
TaskCompletionSource
看官方文檔給的定義:
表示未綁定到委托的 Task<TResult> 的製造者方,並通過 Task 屬性提供對使用者方的訪問
簡單的示例:
static void Main()
{
TaskCompletionSource<int> tcs1 = new TaskCompletionSource<int>();
Task<int> t1 = tcs1.Task;
// Start a background task that will complete tcs1.Task
Task.Factory.StartNew(() =>
{
Thread.Sleep(1000);
tcs1.SetResult(15);
});
}
看的出來這個類就是對Task的包裝,方便創建分發給使用者的任務。其核心就是包裝Task並方便外面設置其屬性和狀態
Task.ContinueWith
創建一個在目標 Task 完成時非同步執行的延續任務
可以傳入一個委托,在Task完成的末尾調用。這是一個典型的續體傳遞風格(continuation-pass style)。
續體傳遞風格
續體傳遞風格(continuation-pass style, CPS),來看維基百科的描述:
A function written in continuation-passing style takes an extra argument: an explicit "continuation"; i.e., a function of one argument. When the CPS function has computed its result value, it "returns" it by calling the continuation function with this value as the argument. That means that when invoking a CPS function, the calling function is required to supply a procedure to be invoked with the subroutine's "return" value. Expressing code in this form makes a number of things explicit which are implicit in direct style. These include: procedure returns, which become apparent as calls to a continuation; intermediate values, which are all given names; order of argument evaluation, which is made explicit; and tail calls, which simply call a procedure with the same continuation, unmodified, that was passed to the caller
大概的意思是,這種風格的函數比起普通的有一個額外的函數指針參數,調用結束(即將return)調用或者函數參數(替代直接return到調用者Caller)。還有一些其他細節,就不多說了,感興趣自行翻譯查看。
來看一個極簡的例子:
int a = b + c + d;
這是一個鏈式運算,是有順序的,在C++中,上述運算其實是:
int a = (b + c) + d; 先計算tmp = b + c(tmp是寄存器上一個臨時的值,也稱將亡值),然後計算 int a = tmp + c
使用續體傳遞來模擬這一過程:
class Program
{
public class Result<T>
{
public T V;
}
static void Main(string[] args)
{
int a = 1;
int b = 2;
int c = 3;
int d = 4;
Result<int> ar = new() { V = a};
Calc3(ar, b, c, d, Calc2);
Console.WriteLine($"a = {ar.V}");
}
static void Calc3(Result<int> ar, int b, int c, int d, Action<Result<int>, int, int> continues)
{
int tmp = b + c;
continues(ar, tmp, d);
}
static void Calc2(Result<int> ar, int tmp, int d)
{
ar.V = tmp + d;
}
}
上述代碼應該很清楚了,稍微看下應該能看明白。
C#編譯器的實現
struct _GetStockPriceForAsync_d__1 : IAsyncStateMachine
{
public StockPrices __this;
public string companyId;
public AsyncTaskMethodBuilder<decimal> __builder;
public int __state;
private TaskAwaiter __task1Awaiter;
public void MoveNext()
{
decimal result;
try
{
TaskAwaiter awaiter;
if (__state != 0)
{
// State 1 of the generated state machine:
if (string.IsNullOrEmpty(companyId))
throw new ArgumentNullException();
awaiter = __this.InitializeLocalStoreIfNeededAsync().GetAwaiter();
// Hot path optimization: if the task is completed,
// the state machine automatically moves to the next step
if (!awaiter.IsCompleted)
{
__state = 0;
__task1Awaiter = awaiter;
// The following call will eventually cause boxing of the state machine.
__builder.AwaitUnsafeOnCompleted(ref awaiter, ref this);
return;
}
}
else
{
awaiter = __task1Awaiter;
__task1Awaiter = default(TaskAwaiter);
__state = -1;
}
// GetResult returns void, but it'll throw if the awaited task failed.
// This exception is catched later and changes the resulting task.
awaiter.GetResult();
__this._stocks.TryGetValue(companyId, out result);
}
catch (Exception exception)
{
// Final state: failure
__state = -2;
__builder.SetException(exception);
return;
}
// Final state: success
__state = -2;
__builder.SetResult(result);
}
void IAsyncStateMachine.SetStateMachine(IAsyncStateMachine stateMachine)
{
__builder.SetStateMachine(stateMachine);
}
}
[AsyncStateMachine(typeof(_GetStockPriceForAsync_d__1))]
public Task<decimal> GetStockPriceFor(string companyId)
{
_GetStockPriceForAsync_d__1 _GetStockPriceFor_d__;
_GetStockPriceFor_d__.__this = this;
_GetStockPriceFor_d__.companyId = companyId;
_GetStockPriceFor_d__.__builder = AsyncTaskMethodBuilder<decimal>.Create();
_GetStockPriceFor_d__.__state = -1;
var __t__builder = _GetStockPriceFor_d__.__builder;
__t__builder.Start<_GetStockPriceForAsync_d__1>(ref _GetStockPriceFor_d__);
return _GetStockPriceFor_d__.__builder.Task;
}
比較一下C#編譯器生成的狀態機:
- __this:StockPrices“股票價格”類的引用
- companyId:公司ID參數
- __builder:AsyncTaskMethodBuilder
類型的表示返回任務的非同步方法生成器 - __state:狀態索引
- __task1Awaiter:TaskAwaiter類型,提供等待非同步任務完成的對象
上述成員有一些和之前手擼的狀態機不太一樣,等下麵會介紹,先來這一套的邏輯:
首先創建了一個_GetStockPriceForAsync_d__1狀態機_GetStockPriceFor_d__並初始化賦值,隨後調用了這個狀態機的__builder的Start函數並把該狀態機作為引用參數傳入。__builder.Start函數會調到該狀態機的MoveNext函數(下麵會介紹),這和手擼代碼狀態機Start函數調用類似。
MoveNext與Start函數的處理過程也類似:第一次進來__state == -1,__builder.AwaitUnsafeOnCompleted切換上下文執行InitializeLocalStoreIfNeededAsync非同步任務,並指定在完成後切換到當前上下文調用該狀態機的MoveNext函數,類似手擼代碼的Task.ContinueWith。第二次進入時,執行到__builder.SetResult(result),非同步任務基本完成。
上述描述也是忽略了一些細節,下麵是調用的時序圖,會更清楚些,有些不太清楚的點後面會詳細介紹。
TaskAwaiter
來看下官方定義:
public readonly struct TaskAwaiter : System.Runtime.CompilerServices.ICriticalNotifyCompletion 提供等待非同步任務完成的對象
結構:
可以看到,這個所謂“等待非同步任務完成的對象”,主要是保證實現ICriticalNotifyCompletion的介面OnCompleted等。
AsyncTaskMethodBuilder<TAwaiter,TStateMachine>(TAwaiter, TStateMachine)
官方定義:
個人認為可以視為非同步任務的“門面”,它負責啟動狀態機,傳遞一些中間狀態,併在最終SetResult時表示它和其子常式的非同步任務結束。其中有一個方法AwaitUnsafeOnCompleted,值得研究一下。
AsyncTaskMethodBuilder.AwaitUnsafeOnCompleted
這個方法在上述中一筆帶過,被描述為類似Task.ContinueWith,確實如此,但執行過程相當複雜,在這裡也只是簡單介紹下過程。
AwaitUnsafeOnCompleted首先會調用GetCompletionAction,GetCompletionAction創建了一個保存了上下文 context = ExecuteContext.FastCapture()的MoveNextRunner,並返回了指向的MoveNextRunner.Run函數的委托。
接著調用參數awaiter的UnsafeOnCompleted(completionAction)函數,這裡completionAction就是上述的那個委托,內部調用了成員Task.SetContinuationForAwait函數來初始化續體,SetContinuationForAwait又調用AddTaskContinuation把延續方法添加到Task中,當上述示例源碼中的InitializeMapIfNeededAsync函數執行完調用Runner.Run:
[SecuritySafeCritical]
internal void Run()
{
if (this.m_context != null)
{
try
{
// 我們並未給 s_invokeMoveNext 賦值,所以 callback == null
ContextCallback callback = s_invokeMoveNext;
if (callback == null)
{
// 將回調設置為下方的 InvokeMoveNext 方法
s_invokeMoveNext = callback = new
ContextCallback(AsyncMethodBuilderCore.MoveNextRunner.InvokeMoveNext);
}
ExecutionContext.Run(this.m_context, callback, this.m_stateMachine, true);
return;
}
finally
{
this.m_context.Dispose();
}
}
this.m_stateMachine.MoveNext();
}
[SecurityCritical]
private static void InvokeMoveNext(object stateMachine)
{
((IAsyncStateMachine) stateMachine).MoveNext();
}
((IAsyncStateMachine) stateMachine).MoveNext() 重新調用了狀態機的MoveNext()
參考鏈接
- 跳轉鏈接:微軟.NET的API文檔
- 跳轉鏈接:微軟開發者博客《Dissecting the async methods in C#》
- 跳轉鏈接:xiaoxiaotank《理解C#中的 async await》