C#8.0——非同步流(AsyncStream)

来源:https://www.cnblogs.com/ms27946/archive/2019/08/10/Async-Stream-In-CSharp-8.html
-Advertisement-
Play Games

非同步流(AsyncStream) 原文地址: "https://github.com/dotnet/roslyn/blob/master/docs/features/async streams.md" 註意:以下內容最好能根據反編譯工具查看非同步流相關類生成的代碼效果最佳 非同步流是可枚舉類(Enume ...


非同步流(AsyncStream)

原文地址:https://github.com/dotnet/roslyn/blob/master/docs/features/async-streams.md

註意:以下內容最好能根據反編譯工具查看非同步流相關類生成的代碼效果最佳

非同步流是可枚舉類(Enumerable)的非同步變體,它會在遍歷下一個元素的時候(Next)會涉及非同步操作。只要繼承自 IAsyncEnumerable 就能實現。

首先我們來看下這些在 .netcore3.0 新增的非同步流 API

namespace System.Collections.Generic
{
    public interface IAsyncEnumerable<out T>
    {
        IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default);
    }
    public interface IAsyncEnumerator<out T> : IAsyncDisposable
    {
        T Current { get; }

        ValueTask<bool> MoveNextAsync();
    }
}
namespace System
{
    public interface IAsyncDisposable
    {
        ValueTask DisposeAsync();
    }
}

當有非同步流時,你可以使用非同步形式的 foreach 代碼段迭代可枚舉的類:await foreach(var item in asyncStream){...}await foreach 語句與 foreach 語句是一樣的,只是它使用了 IAsyncEnumerable 代替 Enumerable,在每次迭代都會去執行調用 await MoveNextAsync(),並且這些枚舉器的釋放都是非同步的。

同樣,當你有可非同步釋放的對象時,你就可以通過 using 語句使用並非同步的處理它:await using(var response = asyncDisposable){...}await using 跟 平常用的 using 是一樣的,就如非同步流對象一樣,只是用了 IAsyncDisposable 代替 IDisposableDisposeAsync() 代替方法 Dispose()

使用者也可以自己手動實現這些介面,或者使用編譯器的高級特性從使用著(程式員)定義的方法(調用一個非同步迭代器方法)自動生成狀態機。非同步迭代器方法特征如下:

  1. 是非同步的(標記 async)
  2. 返回 IAsyncEnumerable 或 IAsyncEnumerator 類型。
  3. 要使用 await 語法(await 表達式,如 await foreach 或是 await using 語句)以及 yield 語句(yield return,yield break)。

舉個例子:

async IAsyncEnumerable<int> GetValuesFromServer()
{
    while (true)
    {
        IEnumerable<int> batch = await GetNextBatch();
        if (batch == null) yield break;

        foreach (int item in batch)
        {
            yield return item;
        }
    }
}

就像在迭代方法里一樣,這裡有幾個限制,在 yield 聲明語句出現的附近:

  • 對於一個 yield 語句(或者是其他任意形式)出現在 try finally 語句中會導致編譯期錯誤。
  • 對於一個 yield 語句出現在 try 語句包裹的任何地方包括任何 catch 塊都會導致編譯器錯誤。

await using 語句的詳細設計

非同步 using 與 常規 using 是一樣低(lowered)的,只是用 DisposeAsync() 代替 Dispose() 方法。

要註意,基於模式查找 DisposeAsync 綁定到一個實例方法,它能夠在被沒有參數下被調用。

拓展方法是不可用在非同步流上的,DisposeAsync 的結果必須是可等待的。

await foreach 語句的詳細設計

await foreach 就如常規 foreach 一樣,只是:

  • GetEnumerator() 被替換程了 GetAsyncEnumerator()
  • MoveNext() 被替換成了 await MoveNextAsync()
  • Dispose() 被替換成了 DisposeAsync()

註意,基於模式查找的 GetAsyncEnumeratorMoveNextAsync 以及 DisposeAsync 綁定到一個沒有參數就能被調用的實例方法。拓展方法無效。MoveNextAsync 以及 DisposeAsync 同樣必須是可等待的。釋放清理 await foreach 不包含回掉函數來檢查這個介面的實現。

非同步 foreach 迭代是不允許集合類型為 dynamic 類型,因為沒有等價的非同步非泛型 IEnumerable 介面。

包裝器類型能傳遞非預設值(查看 api .WithCancellation(CancellationToken) 拓展方法),因此允許消費者控制非同步流的取消。非同步流的生產者也能夠使用取消令牌通過在自定義類寫 IAsyncEnumerator GetAsyncEnumerator(CancellatonToken) 非同步迭代器方法。

E e = ((C)(x)).GetAsyncEnumerator(default);
try
{
    while(await e.MoveNextAsync())
    {
        V v = (V)(T)e.Current; -OR-  (D1 d1, ...) = (V)(T)e.Current;
        //body
    }
}
finally
{
    await e.DisposeAsync();
}

非同步迭代器方法的詳細設計

非同步迭代器方法被初始化的狀態機的啟動(kick-off)方法替換。它不會在開始時就運行狀態機(不像常規的 async 方法的啟動方法)。這個啟動方法的方法被標記為 AsyncIteratorStateMachineAttribute

對於可枚舉的非同步迭代器方法的狀態機首先就要實現 IAsyncEnumerable<T> 以及 IAsyncEnumerator<T>。對於可枚舉非同步迭代器,它只需要實現 IAsyncEnumerator<T>。它與狀態機生成非同步方法是相似的。它包含生成器(builder)以及等待者(awaiter)欄位,用於在後臺運行狀態機(當 await 在非同步迭代器到達的時候)。它也能捕捉參數值(如果有)或者當前對象 this (如果需要)。

狀態機運行過程可詳見 《async in c#》。

這裡也提供我嘗試理解翻譯的地址

https://github.com/MarsonShine/Books/blob/master/AsyncInCSharp/docs/Async-Compiler-Transform-In-Deepth.md

但是對於新的非同步內容,這裡添加了新的狀態:

  • 值到結束的狀態(promise of a value-or-end),
  • 被 yielded 出的當前值類型 T,
  • 一個 int 類型值,它能捕捉創建它的線程 id,
  • 一個 bool 表示,標識 "dispose mode",
  • 一個 CancellationTokenSource 的組合令牌(in enumerables)

狀態機主要的方法是 MoveNext()。它能被通過 MoveNextAsync() 獲得運行,或者從 await 方法中作為一個後臺繼續等待啟動(continuation)。

value-or-end 狀態從 MoveNextAsync 返回。下麵提到的都能滿足:

與常規的非同步方法的狀態機相比,在非同步迭代器方法的 MoveNext() 增加如下邏輯:

  • 支持處理 yield return 語句,它保存了當前值並實現 結果 true 的狀態(promise with result true)
  • 支持處理 yield break 語句,它設置釋放模式為 on,並跳轉到封閉的(enclosing)finally 或 exit 塊。
  • 執行分配給 finally 塊(當正在釋放的時候)
  • 退出方法,會釋放 CancellationTokenSource (如果存在)並且實現結果為 false 的狀態
  • 捕捉異時時,它會釋放 CancellationTokenSource (如果存在)並且在狀態里設置異常信息

這反映在實現中,它將非同步方法的降低機制(lowering machinery)拓展如下:

  1. 處理 yield return 以及 yield break 語句(查看 VisitYieldReturnStatement 方法以及 VisitYieldBreakStatement 方法到 AsyncIteratorMethodToStateMachineRewriter
  2. 處理 try 語句(查看方法 AsyncIteratorMethodToStateMachineRewriter 里的 VisitTryStatementVisitExtractedFinallyBlock 方法)
  3. 對自己的 promise 產生額外的狀態和邏輯(查看 AsyncIteratorRewriter 方法,它產生其他的成員變數:MoveNextAsyncCurrenctDisposeAsync 以及一些支持可重置的 ValueTask 行為的成員,也就是 GetResultSetStatusOnCompleted
ValueTask<bool> MoveNextAsync()
{
    if(state == StateMachineStates.FinishedStateMachine)
    {
        return default(ValueTask<bool>);
    }
    ValueOrEndPromise.Reset();
    var inst = this;
    builder.Start(ref inst);
    var version = valueOrEndPromise.Version;
    if(valueOrEndPromise.GetStatus(version) == ValueTaskSourceStatus.Succeeded)
    {
        return new ValueTask<bool>(valueOrEndPromise.GetResult(version));
    }
    return new ValueTask<bool>(this,version);//註意,這裡用到了狀態機的 IValueTaskSource 實現
}
T Current => current;

非同步迭代器方法的啟動方法和狀態機的初始化都遵循常規迭代器方法。尤其是 GetAsyncEnumerator() 方法就像 GetEnumerator() 只是它設置初始化狀態為 StateMachineStates.NotStartedStateMachine (-1):

IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken token)
{
    {StateMachineType} result;
    if(initialThreadId == /*托管線程id*/ && state == StateMachineStates.FinishedStateMachine)
    {
        state = InitialState;   //-3
        disposeMode = false;
        result = this;
    }
    else
    {
        result = new {StateMachineType}(InitialState);
    }
    /*複製每個參數代理,或者在每個參數在被標記為[EnumeratorCancellation]的情況下和 GetAsyncEnumerator 的 token 參數結合使用*/
}

對於被標記為 [EnumeratorCancellation] 的參數,GetEnumerator 通過結合兩個可用的 token 初始化:

if(this.parameterProxy.Equals(default))
{
    result.parameter = token;
}
else if(token.Equals(this.parameterProxy) || token.Equals(default))
{
    result.paramter = this.parameterProxy;
}
else
{
    result.combinedTokens = CancellationTokenSource.CreatedLinkedTokenSource(this.parameterProxy,token);
    result.parameter = combinedTokens.Token;
}

對於線程 id 的檢查,可查看 https://github.com/dotnet/corefx/issues/3481

類似地,啟動方法與常規迭代器方法非常相似:

{
    {StateMachineType} result = new {StateMachineType}(StateMachineStatus.FinishedStateMachine);//-2
    /* 保存參數到參數代理中 */
    return result;
}

Disposal

迭代器和非同步迭代器方法都需要處理,因為他們執行的步驟都被調用者控制,它可以選擇在得到所有的元素之前釋放迭代器。例如,foreach(...){if(...) break;}。相比之下,非同步方法會持續自動運行直到結束。在調用者角度來說,它們不會在執行過程中掛起(暫停),所以它們不需要被釋放。

總之,非同步迭代器的釋放基於四個設計元素:

  • yield return(當恢復至 dispose 模式時,跳轉到 finally )
  • yield break(進入 dispose 模式並且跳轉到封閉的 finally 塊)
  • finallyfinally 後我們會跳轉到下一個)
  • DisposeAsync(進入 dispose 模式並恢復執行)

非同步迭代器方法的調用者應該在當一個方法完成或被 yield return 掛起時,只調用 DisposeAsync()DisposeAsync 會在狀態機("dispose mode")設置一個標識以及(如果這個方法沒有完成)從當前狀態恢復執行。狀態機能夠在被給定的狀態恢復執行(甚至那些分配到 try 中的)。在 dispose 模式下恢復執行時,它會直接跳轉到 finally

finally 塊只在 await 表達式下可能包含暫停以及恢復。由於在 yield return 上的限制(上面描述的),dispose 模式不會運行進入 yield return。一旦 finally 塊完成,在 dispose 模式中執行跳轉到下一個 finally 塊,或是在到達了方法的最頂層結尾。

到達 yield break (或是方法結束)會設置 dispose 模式標識以及跳轉到 finally 塊。當我們返回控制權給調用者時(通過到達方法尾部來完成 promise false)所有的處理都會被完成以及狀態機也處於完成狀態。所以 DisposeAsync() 不需要做其他事了。

從給定的 finally 塊來看處理,可以執行塊中的代碼:

  • 正常執行(比如在 try 塊中的代碼之後)
  • 在 try 塊中觸發異常(它將執行必要的 finally 塊以及在 Finished 狀態中終止方法)
  • 調用 DisposeAsync()(它在 dispose 模式恢復執行並且跳轉到 finally 塊)
  • yield break(進入 dispose 模式並跳轉到 finally 塊)
  • dispose 模式,後面跟著嵌套 finally

a yield return is lowered as:

_current = expression;
_state = <next_state>;
goto <exprReturnTrueable>;// 它執行了 _valueOrEndPromise.SetResult(true);return;
//從 state = <next_state> 恢復,將執行到這個標簽
<next_state_label>:;
this.state = cachedState = NotStartedStateMachine;
if(disposeModel)/* 跳轉到 finally 或退出*/

a yield break is lowered as:

disposeModel = true;
/* 跳轉到 finally 或退出 */
ValueTask IAsyncDisposable.DisposeAsync()
{
    if(state >= StateMachineStates.NotStartedStateMachine /* -1 */)
    {
        throw new NotSupportedException();
    }
    if(state == StateMachineStates.FinishedStateMachine /* -2 */)
    {
        return default;
    }
    disposeModel = true;
    _valueOrEndPromise.Reset();
    var inst = this;
    _builder.Start(ref inst);
    return new ValueTask(this,_valueOrEndPromise.Version);//註意,這裡用到了狀態機的 IValueTaskSource 實現
}

與通常提取 finally 的對比

當 finally 塊內不含 await 表達式,try/catch is lowered as:

try
{
    ...
    finallyEntryLabel:
}
finally
{
    ...
}
if(disposeMode) /* 跳轉 finally 或退出 */

當 finally 塊內包含 await 表達式時,在非同步重寫之前被提取(通過 AsyncExeptionHandlerRewriter)。

在這種情況下,我們將知道:

try
{
    ...
    goto finallyEntryLabel:
}
catch(Exception e)
{
    ... save exception ...
}
finallyEntryLabel:
{
    ... 從 finally 的原始代碼和異常的附加處理 
}

這兩種情況下,我們將在 finally 邏輯塊後添加 if(disposeMode) /* 跳轉到 finally 或退出 */

狀態值和轉換

可枚舉從狀態 -2 開始。調用 GetAsyncEnumerator 來設置狀態為 -3,或返回一個新的枚舉器(狀態也為 -3)。

從這裡開始,MoveNext 將會做:

  • 到達方法的最後(-2,我們已經做完並釋放)
  • 到達 yield break(狀態不變,dispose mode = true)
  • 到達 yield return(-N,從 -4 遞減)
  • 到達 await(N,從 0 遞增)

從暫停狀態 N 或 -N,MoveNext 將恢復執行(-1)。但是如果暫停 yield return(-N),你也能調用 DisposeAsync,它在 dispose mode 中恢復執行(-1)。

當在 dispose mode 下,MoveNext 會繼續暫停(N)和恢復(-1)直到到達方法結束(-2)。

從狀態 -1 或 N 調用 DisposeAsync 的結果未指明。編譯器在這種情況下會生成 throw new NotSupportedException

   DisposeAsync                              await
 +------------------------+             +------------------------> N
 |                        |             |                          |
 v   GetAsyncEnumerator   |             |        resuming          |
-2 --------------------> -3 --------> -1 <-------------------------+    Dispose mode = false
 ^                                   |  |                          |
 |         done and disposed         |  |      yield return        |
 +-----------------------------------+  +-----------------------> -N
 |        or exception thrown        |                             |
 |                                   |                             |
 |                             yield |                             |
 |                             break |           DisposeAsync      |
 |                                   |  +--------------------------+
 |                                   |  |
 |                                   |  |
 |         done and disposed         v  v    suspension (await)
 +----------------------------------- -1 ------------------------> N
                                        ^                          |    Dispose mode = true
                                        |         resuming         |
                                        +--------------------------+

註意:A yield return is lowered as 我實在是不知道 is lowerd as 到底是什麼意思,單個單詞都會,拼在一起怎麼讀都不順


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 題目鏈接 FZU - 2295 Human life 題目分析 題意:你在玩一個游戲,在其中你可以通過學習一些技能,但是學習某些技能之前,可能還要學習一些其他的技能,並且學習任何技能都有一定的花費; 而我們可以通過掌握某些工作以獲取報酬,為了掌握這一工作,我們必須學會特定的技能。 不過有些工作彼此之 ...
  • 時隔一年多,在掌握了Spring、SpringBoot、SpringCloud之後 我再次回頭,重新學習Spring框架 Bean的生命周期學習: 在傳統的XML配置中,可以這樣自定義初始化和銷毀方法: 註解方式的簡單使用: 註意:要有close方法,否則不會列印Car銷毀方法 列印如下: 這裡預設 ...
  • 快速排序:在一組數據中,可以將左邊的數字當作樞軸(右邊也可以),接下來要做的就是,先從右邊找到比樞軸小的數, 再從左邊找到比樞軸大的數,接著將這兩個數進行交換,重覆上述步驟找出所有符合條件的數進行交換, 最後將樞軸放到比樞軸大的數與比樞軸小的數之間。之所以要從右邊開始找,並且找到比樞軸小的數是因為交 ...
  • 原題 | Generating a PEG Parser 作者 | Guido van Rossum(Python之父) 譯者 | 豌豆花下貓(“Python貓”公眾號作者) 聲明 | 本翻譯是出於交流學習的目的,基於 "CC BY NC SA 4.0" 授權協議。為便於閱讀,內容略有改動。 首發地 ...
  • 更新一篇知識星球裡面的源碼分析文章,去年寫的,周末自己錄了個視頻,大家看下效果好嗎?如果好的話,後面補錄發在知識星球裡面的其他源碼解析文章。 前言 之前自己本地 clone 了 Flink 的源碼,編譯過,然後 share 到了 GitHub 上去了,自己也寫了一些源碼的中文註釋,並且 push 到 ...
  • debug #排除程式故障 print()函數常和#號註釋結合在一起用來debug 多行註釋有兩種快捷操作:1、在需要註釋的多行代碼塊前後加一組三引號''' 2、選中代碼後使用快捷鍵操作:Windows快捷鍵是ctrl+/,Mac為cmd+/,適用於本地編輯器) 一種異常處理的機制,可以在異常出現時 ...
  • 一、補碼簡介 1.電腦中的符號數有三種表示方式,即為:原碼、反碼、補碼。三種表示方法均有符號位和數值位,符號位都是0表示正數,符號位都是1表示負數。 2.電腦中的數字的存儲方式:在電腦系統中,數值一律用補碼來表示和存儲,原因在於:使用補碼可以將符號位和數值域統一處理,同時,加法和減法可以統一處 ...
  • 1.JMS即Java消息服務,是Java平臺中面向消息中間件的API,用於在兩個應用程式之間,或分散式系統中發送消息,進行非同步通信。 2.JMS分為兩種,點對點消息模型 和發佈/訂閱消息模型 3.由六個模塊組成 連接工廠ConnectFactory 消息隊列的連接Connect 用於連接的對話Ses ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...