非同步流(AsyncStream) 原文地址: "https://github.com/dotnet/roslyn/blob/master/docs/features/async streams.md" 註意:以下內容最好能根據反編譯工具查看非同步流相關類生成的代碼效果最佳 非同步流是可枚舉類(Enume ...
非同步流(AsyncStream)
原文地址:https://github.com/dotnet/roslyn/blob/master/docs/features/async-streams.md
註意:以下內容最好能根據反編譯工具查看非同步流相關類生成的代碼效果最佳
非同步流是可枚舉類(Enumerable)的非同步變體,它會在遍歷下一個元素的時候(Next)會涉及非同步操作。只要繼承自 IAsyncEnumerable
首先我們來看下這些在 .netcore3.0 新增的非同步流 API
namespace System.Collections.Generic
{
public interface IAsyncEnumerable<out T>
{
IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default);
}
public interface IAsyncEnumerator<out T> : IAsyncDisposable
{
T Current { get; }
ValueTask<bool> MoveNextAsync();
}
}
namespace System
{
public interface IAsyncDisposable
{
ValueTask DisposeAsync();
}
}
當有非同步流時,你可以使用非同步形式的 foreach
代碼段迭代可枚舉的類:await foreach(var item in asyncStream){...}
。await foreach
語句與 foreach
語句是一樣的,只是它使用了 IAsyncEnumerable
代替 Enumerable
,在每次迭代都會去執行調用 await MoveNextAsync()
,並且這些枚舉器的釋放都是非同步的。
同樣,當你有可非同步釋放的對象時,你就可以通過 using
語句使用並非同步的處理它:await using(var response = asyncDisposable){...}
。await using
跟 平常用的 using
是一樣的,就如非同步流對象一樣,只是用了 IAsyncDisposable
代替 IDisposable
,DisposeAsync()
代替方法 Dispose()
。
使用者也可以自己手動實現這些介面,或者使用編譯器的高級特性從使用著(程式員)定義的方法(調用一個非同步迭代器方法)自動生成狀態機。非同步迭代器方法特征如下:
- 是非同步的(標記 async)
- 返回 IAsyncEnumerable
或 IAsyncEnumerator 類型。 - 要使用 await 語法(await 表達式,如 await foreach 或是 await using 語句)以及 yield 語句(yield return,yield break)。
舉個例子:
async IAsyncEnumerable<int> GetValuesFromServer()
{
while (true)
{
IEnumerable<int> batch = await GetNextBatch();
if (batch == null) yield break;
foreach (int item in batch)
{
yield return item;
}
}
}
就像在迭代方法里一樣,這裡有幾個限制,在 yield 聲明語句出現的附近:
- 對於一個 yield 語句(或者是其他任意形式)出現在 try finally 語句中會導致編譯期錯誤。
- 對於一個 yield 語句出現在 try 語句包裹的任何地方包括任何 catch 塊都會導致編譯器錯誤。
await using 語句的詳細設計
非同步 using 與 常規 using 是一樣低(lowered)的,只是用 DisposeAsync() 代替 Dispose() 方法。
要註意,基於模式查找 DisposeAsync 綁定到一個實例方法,它能夠在被沒有參數下被調用。
拓展方法是不可用在非同步流上的,DisposeAsync 的結果必須是可等待的。
await foreach 語句的詳細設計
await foreach 就如常規 foreach 一樣,只是:
- GetEnumerator() 被替換程了 GetAsyncEnumerator()
- MoveNext() 被替換成了 await MoveNextAsync()
- Dispose() 被替換成了 DisposeAsync()
註意,基於模式查找的 GetAsyncEnumerator
,MoveNextAsync
以及 DisposeAsync
綁定到一個沒有參數就能被調用的實例方法。拓展方法無效。MoveNextAsync
以及 DisposeAsync
同樣必須是可等待的。釋放清理 await foreach
不包含回掉函數來檢查這個介面的實現。
非同步 foreach 迭代是不允許集合類型為 dynamic 類型,因為沒有等價的非同步非泛型 IEnumerable 介面。
包裝器類型能傳遞非預設值(查看 api .WithCancellation(CancellationToken)
拓展方法),因此允許消費者控制非同步流的取消。非同步流的生產者也能夠使用取消令牌通過在自定義類寫 IAsyncEnumerator
E e = ((C)(x)).GetAsyncEnumerator(default);
try
{
while(await e.MoveNextAsync())
{
V v = (V)(T)e.Current; -OR- (D1 d1, ...) = (V)(T)e.Current;
//body
}
}
finally
{
await e.DisposeAsync();
}
非同步迭代器方法的詳細設計
非同步迭代器方法被初始化的狀態機的啟動(kick-off)方法替換。它不會在開始時就運行狀態機(不像常規的 async 方法的啟動方法)。這個啟動方法的方法被標記為 AsyncIteratorStateMachineAttribute
。
對於可枚舉的非同步迭代器方法的狀態機首先就要實現 IAsyncEnumerable<T>
以及 IAsyncEnumerator<T>
。對於可枚舉非同步迭代器,它只需要實現 IAsyncEnumerator<T>
。它與狀態機生成非同步方法是相似的。它包含生成器(builder)以及等待者(awaiter)欄位,用於在後臺運行狀態機(當 await 在非同步迭代器到達的時候)。它也能捕捉參數值(如果有)或者當前對象 this
(如果需要)。
狀態機運行過程可詳見 《async in c#》。
這裡也提供我嘗試理解翻譯的地址
但是對於新的非同步內容,這裡添加了新的狀態:
- 值到結束的狀態(promise of a value-or-end),
- 被 yielded 出的當前值類型 T,
- 一個 int 類型值,它能捕捉創建它的線程 id,
- 一個 bool 表示,標識 "dispose mode",
- 一個 CancellationTokenSource 的組合令牌(in enumerables)
狀態機主要的方法是 MoveNext()
。它能被通過 MoveNextAsync()
獲得運行,或者從 await
方法中作為一個後臺繼續等待啟動(continuation)。
value-or-end 狀態從 MoveNextAsync
返回。下麵提到的都能滿足:
- true(當伴隨著後臺運行狀態機執行後,值變的可用)
- false(如果 end 到達)
- 異常。這個狀態(promise)被
ManualResetValueTaskSourceCore<bool>
實現(它是可重用的,並且無需再分配的方式來生成和實現ValueTask<bool>
或者ValueTask
實例)。關於這些類的更多細節請訪問 https://blogs.msdn.microsoft.com/dotnet/2018/11/07/understanding-the-whys-whats-and-whens-of-valuetask/
與常規的非同步方法的狀態機相比,在非同步迭代器方法的 MoveNext()
增加如下邏輯:
- 支持處理
yield return
語句,它保存了當前值並實現 結果 true 的狀態(promise with result true) - 支持處理
yield break
語句,它設置釋放模式為 on,並跳轉到封閉的(enclosing)finally 或 exit 塊。 - 執行分配給 finally 塊(當正在釋放的時候)
- 退出方法,會釋放
CancellationTokenSource
(如果存在)並且實現結果為 false 的狀態 - 捕捉異時時,它會釋放
CancellationTokenSource
(如果存在)並且在狀態里設置異常信息
這反映在實現中,它將非同步方法的降低機制(lowering machinery)拓展如下:
- 處理 yield return 以及 yield break 語句(查看
VisitYieldReturnStatement
方法以及VisitYieldBreakStatement
方法到AsyncIteratorMethodToStateMachineRewriter
) - 處理 try 語句(查看方法
AsyncIteratorMethodToStateMachineRewriter
里的VisitTryStatement
和VisitExtractedFinallyBlock
方法) - 對自己的 promise 產生額外的狀態和邏輯(查看
AsyncIteratorRewriter
方法,它產生其他的成員變數:MoveNextAsync
,Currenct
,DisposeAsync
以及一些支持可重置的ValueTask
行為的成員,也就是GetResult
,SetStatus
,OnCompleted
)
ValueTask<bool> MoveNextAsync()
{
if(state == StateMachineStates.FinishedStateMachine)
{
return default(ValueTask<bool>);
}
ValueOrEndPromise.Reset();
var inst = this;
builder.Start(ref inst);
var version = valueOrEndPromise.Version;
if(valueOrEndPromise.GetStatus(version) == ValueTaskSourceStatus.Succeeded)
{
return new ValueTask<bool>(valueOrEndPromise.GetResult(version));
}
return new ValueTask<bool>(this,version);//註意,這裡用到了狀態機的 IValueTaskSource 實現
}
T Current => current;
非同步迭代器方法的啟動方法和狀態機的初始化都遵循常規迭代器方法。尤其是 GetAsyncEnumerator()
方法就像 GetEnumerator()
只是它設置初始化狀態為 StateMachineStates.NotStartedStateMachine (-1):
IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken token)
{
{StateMachineType} result;
if(initialThreadId == /*托管線程id*/ && state == StateMachineStates.FinishedStateMachine)
{
state = InitialState; //-3
disposeMode = false;
result = this;
}
else
{
result = new {StateMachineType}(InitialState);
}
/*複製每個參數代理,或者在每個參數在被標記為[EnumeratorCancellation]的情況下和 GetAsyncEnumerator 的 token 參數結合使用*/
}
對於被標記為 [EnumeratorCancellation] 的參數,GetEnumerator 通過結合兩個可用的 token 初始化:
if(this.parameterProxy.Equals(default))
{
result.parameter = token;
}
else if(token.Equals(this.parameterProxy) || token.Equals(default))
{
result.paramter = this.parameterProxy;
}
else
{
result.combinedTokens = CancellationTokenSource.CreatedLinkedTokenSource(this.parameterProxy,token);
result.parameter = combinedTokens.Token;
}
對於線程 id 的檢查,可查看 https://github.com/dotnet/corefx/issues/3481
類似地,啟動方法與常規迭代器方法非常相似:
{
{StateMachineType} result = new {StateMachineType}(StateMachineStatus.FinishedStateMachine);//-2
/* 保存參數到參數代理中 */
return result;
}
Disposal
迭代器和非同步迭代器方法都需要處理,因為他們執行的步驟都被調用者控制,它可以選擇在得到所有的元素之前釋放迭代器。例如,foreach(...){if(...) break;}
。相比之下,非同步方法會持續自動運行直到結束。在調用者角度來說,它們不會在執行過程中掛起(暫停),所以它們不需要被釋放。
總之,非同步迭代器的釋放基於四個設計元素:
yield return
(當恢復至 dispose 模式時,跳轉到 finally )yield break
(進入 dispose 模式並且跳轉到封閉的 finally 塊)finally
(finally
後我們會跳轉到下一個)DisposeAsync
(進入 dispose 模式並恢復執行)
非同步迭代器方法的調用者應該在當一個方法完成或被 yield return
掛起時,只調用 DisposeAsync()
。DisposeAsync
會在狀態機("dispose mode")設置一個標識以及(如果這個方法沒有完成)從當前狀態恢復執行。狀態機能夠在被給定的狀態恢復執行(甚至那些分配到 try 中的)。在 dispose 模式下恢復執行時,它會直接跳轉到 finally
。
finally
塊只在 await 表達式下可能包含暫停以及恢復。由於在 yield return
上的限制(上面描述的),dispose 模式不會運行進入 yield return
。一旦 finally 塊完成,在 dispose 模式中執行跳轉到下一個 finally 塊,或是在到達了方法的最頂層結尾。
到達 yield break
(或是方法結束)會設置 dispose 模式標識以及跳轉到 finally 塊。當我們返回控制權給調用者時(通過到達方法尾部來完成 promise false)所有的處理都會被完成以及狀態機也處於完成狀態。所以 DisposeAsync()
不需要做其他事了。
從給定的 finally 塊來看處理,可以執行塊中的代碼:
- 正常執行(比如在 try 塊中的代碼之後)
- 在 try 塊中觸發異常(它將執行必要的 finally 塊以及在 Finished 狀態中終止方法)
- 調用
DisposeAsync()
(它在 dispose 模式恢復執行並且跳轉到 finally 塊) - yield break(進入 dispose 模式並跳轉到 finally 塊)
- dispose 模式,後面跟著嵌套 finally
a yield return is lowered as:
_current = expression;
_state = <next_state>;
goto <exprReturnTrueable>;// 它執行了 _valueOrEndPromise.SetResult(true);return;
//從 state = <next_state> 恢復,將執行到這個標簽
<next_state_label>:;
this.state = cachedState = NotStartedStateMachine;
if(disposeModel)/* 跳轉到 finally 或退出*/
a yield break is lowered as:
disposeModel = true;
/* 跳轉到 finally 或退出 */
ValueTask IAsyncDisposable.DisposeAsync()
{
if(state >= StateMachineStates.NotStartedStateMachine /* -1 */)
{
throw new NotSupportedException();
}
if(state == StateMachineStates.FinishedStateMachine /* -2 */)
{
return default;
}
disposeModel = true;
_valueOrEndPromise.Reset();
var inst = this;
_builder.Start(ref inst);
return new ValueTask(this,_valueOrEndPromise.Version);//註意,這裡用到了狀態機的 IValueTaskSource 實現
}
與通常提取 finally 的對比
當 finally 塊內不含 await 表達式,try/catch is lowered as:
try
{
...
finallyEntryLabel:
}
finally
{
...
}
if(disposeMode) /* 跳轉 finally 或退出 */
當 finally 塊內包含 await 表達式時,在非同步重寫之前被提取(通過 AsyncExeptionHandlerRewriter)。
在這種情況下,我們將知道:
try
{
...
goto finallyEntryLabel:
}
catch(Exception e)
{
... save exception ...
}
finallyEntryLabel:
{
... 從 finally 的原始代碼和異常的附加處理
}
這兩種情況下,我們將在 finally 邏輯塊後添加 if(disposeMode) /* 跳轉到 finally 或退出 */
狀態值和轉換
可枚舉從狀態 -2 開始。調用 GetAsyncEnumerator 來設置狀態為 -3,或返回一個新的枚舉器(狀態也為 -3)。
從這裡開始,MoveNext 將會做:
- 到達方法的最後(-2,我們已經做完並釋放)
- 到達
yield break
(狀態不變,dispose mode = true) - 到達
yield return
(-N,從 -4 遞減) - 到達
await
(N,從 0 遞增)
從暫停狀態 N 或 -N,MoveNext 將恢復執行(-1)。但是如果暫停 yield return(-N),你也能調用 DisposeAsync,它在 dispose mode 中恢復執行(-1)。
當在 dispose mode 下,MoveNext 會繼續暫停(N)和恢復(-1)直到到達方法結束(-2)。
從狀態 -1 或 N 調用 DisposeAsync 的結果未指明。編譯器在這種情況下會生成 throw new NotSupportedException
。
DisposeAsync await
+------------------------+ +------------------------> N
| | | |
v GetAsyncEnumerator | | resuming |
-2 --------------------> -3 --------> -1 <-------------------------+ Dispose mode = false
^ | | |
| done and disposed | | yield return |
+-----------------------------------+ +-----------------------> -N
| or exception thrown | |
| | |
| yield | |
| break | DisposeAsync |
| | +--------------------------+
| | |
| | |
| done and disposed v v suspension (await)
+----------------------------------- -1 ------------------------> N
^ | Dispose mode = true
| resuming |
+--------------------------+
註意:A
yield return
is lowered as 我實在是不知道 is lowerd as 到底是什麼意思,單個單詞都會,拼在一起怎麼讀都不順