C# 8.0中,提供了一種新的IAsyncEnumerable<T>介面,在對集合進行迭代時,支持非同步操作。比如在讀取文本中的多行字元串時,如果讀取每行字元串的時候使用同步方法,那麼會導致線程堵塞。IAsyncEnumerable<T>可以解決這種情況,在迭代的時候支持使用非同步方法。也就是說,之前我 ...
C# 8.0中,提供了一種新的IAsyncEnumerable<T>
介面,在對集合進行迭代時,支持非同步操作。比如在讀取文本中的多行字元串時,如果讀取每行字元串的時候使用同步方法,那麼會導致線程堵塞。IAsyncEnumerable<T>
可以解決這種情況,在迭代的時候支持使用非同步方法。也就是說,之前我們使用foreach
來對IEnumerable
進行迭代,現在可以使用await foreach
來對IAsyncEnumerable<T>
進行迭代,每個項都是可等待的。這種新的介面稱為async-streams
,將會隨.NET Core 3
發佈。我們來看一下如何在LINQ
中實現非同步的迭代。
使用常規的IEnumerable<T>
首先我們創建一個新的Console
項目,基於.NET Core 3
:
namespace AsyncLinqDemo { class Program { static void Main(string[] args) { Console.WriteLine("Input the file path:"); var file = Console.ReadLine(); var lines = ReadAllLines(file); foreach (var line in lines) { Console.WriteLine(line); } } static IEnumerable<string> ReadAllLines(string file) { using (var fs = File.OpenRead(file)) { using (var sr = new StreamReader(fs)) { while (true) { string line = sr.ReadLine(); if(line == null) { break; } yield return line; } } } } } }
這是一個很簡單的Console程式,實現了一個簡單的返回類型為IEnumerable<string>
的ReadAllLines(string file)
方法,從文本文件中逐行讀取文本,並逐行輸出。如果文本內容較少的話,沒什麼問題。但如果我們使用過aync/await
,就會瞭解,在IO操作如讀取或寫入文件的時候,最好使用非同步方法以避免線程阻塞。讓我們來改進一下。
使用非同步的IAsyncEnumerable<T>
可以優化的是下麵這句:
string line = sr.ReadLine();
對於IO操作,最好使用非同步方式。這裡可使用相應的非同步方法:
string line = await sr.ReadLineAsync();
我們說“非同步是傳染的”,如果這裡使用非同步,那麼相應的該方法的返回值也要使用非同步,所以需要將返回值改為static async Task<IEnumerable<string>>
,但這樣會得到一個錯誤:
ErrorCS1624The body of 'Program.ReadAllLines(string)' cannot be an iterator block because 'Task<IEnumerable<string>>' is not an iterator interface typeAsyncLinqDemoC:\Source\Workspaces\Console\AsyncLinqDemo\AsyncLinqDemo\Program.cs23Active
因為Task<IEnumerable<string>>
並不是一個可以迭代的介面類型,所以我們無法在方法內部使用yield
關鍵字。解決問題的辦法是使用新的IAsyncEnumerable
介面:
static async IAsyncEnumerable<string> ReadAllLines(string file) { using (var fs = File.OpenRead(file)) { using (var sr = new StreamReader(fs)) { while (true) { string line = await sr.ReadLineAsync(); if(line == null) { break; } yield return line; } } } }
按F12
查看該介面的定義:
namespace System.Collections.Generic { public interface IAsyncEnumerable<out T> { IAsyncEnumerator<T> GetAsyncEnumerator(CancellationTokencancellationToken = default); } }
這是一個非同步的迭代器,並提供了CancellationToken
。再按F12
查看IAsyncEnumerator<T>
的定義,可發現裡面是這樣的:
namespace System.Collections.Generic { public interface IAsyncEnumerator<out T> : IAsyncDisposable { T Current { get; } ValueTask<bool> MoveNextAsync(); } }
這裡MoveNextAsync()
方法實際是返回了一個結果類型為bool
的Task
,每次迭代都是可等待的,從而實現了迭代器的非同步。
使用await foreach
消費IAsyncEnumerable<T>
當我們做了以上改動之後,ReadAllLines()
方法返回的是一個支持非同步的IAsyncEnumerable
,那麼在使用的時候,也不能簡單的使用foreach
了。修改Main
方法如下:
static async Task Main(string[] args) { Console.WriteLine("Input the file path:"); var file = Console.ReadLine(); var lines = ReadAllLines(file); await foreach (var line in lines) { Console.WriteLine(line); } }
首先在foreach
之前添加await
關鍵字,還要需要將Main
方法由void
改為async Task
。這樣整個程式都是非同步執行了,不會再導致堵塞了。這個例子只是一個簡單的demo,是否使用非同步並不會感覺到明顯的區別。如果在迭代內部需要比較重的操作,如從網路獲取大量數據或讀取大量磁碟文件,非同步的優勢還是會比較明顯的。
使用LINQ
消費IAsyncEnumerable<T>
使用LINQ
來操作集合是常用的功能。如果使用IEnumberable
,在Main
方法中可以做如下改動:
var lines = ReadAllLines(file); var res = from line in lines where line.StartsWith("ERROR: ") selectline.Substring("ERROR: ".Length); foreach (var line in res) { Console.WriteLine(line); }
或:
var res = lines.Where(x => x.StartsWith("ERROR: ")).Select(x => x.Substring("ERROR: ".Length));
如果使用了新的IAsyncEnumerable
,你會發現無法使用Where
等操作符了:
ErrorCS1936Could not find an implementation of the query pattern for source type 'IAsyncEnumerable<string>'. 'Where' not found.AsyncLinqDemoC:\Source\Workspaces\Console\AsyncLinqDemo\AsyncLinqDemo\Program.cs16Active
目前LINQ
還沒有提供對IAsyncEnumerable
的原生支持,不過微軟提供了一個Nuget包來實現此功能。在項目中打開Nuget Package Manger搜索安裝System.Linq.Async
,註意該包目前還是預覽版,所以要勾選include prerelease
才能看到。安裝該Nuget包後,Linq查詢語句中的錯誤就消失了。
在System.Linq.Async
這個包中,對每個同步的LINQ
方法都做了相應的擴展。所以基本上代碼無需什麼改動即可正常編譯。
對於LINQ
中的條件語句,也可以使用WhereAwait()
方法來支持await
:
public static IAsyncEnumerable<TSource> WhereAwait<TSource>(thisIAsyncEnumerable<TSource> source, Func<TSource, int, ValueTask<bool>>predicate);
如需要在條件語句中進行IO或網路請求等非同步操作,可以這樣用:
var res = lines.WhereAwait(async x => await DoSomeHeavyOperationsAsync(x));
DoSomeHeavyOperationsAsync
方法的簽名如下:
private static ValueTask<bool> DoSomeHeavyOperationsAsync(string x) { //Do some works... }
小結
通過以上的示例,我們簡要瞭解瞭如何使用IAsyncEnumerable
介面以及如何在LINQ
中實現非同步查詢。在使用該介面時,我們需要創建一個自定義方法返回IAsyncEnumerable<T>
來代替IEnumberable<T>
,這個方法可稱為async-iterator
方法,需要註意以下幾點:
-
該方法應該被聲明為
async
。 -
返回
IAsyncEnumerable<T>
。 -
同時使用
await
及yield
。如await foreach
,yield return
或yield break
等。
例如:
async IAsyncEnumerable<int> GetValuesFromServer() { while (true) { IEnumerable<int> batch = await GetNextBatch(); if (batch == null) yield break; foreach (int item in batch) { yield return item; } } }
此外還有一些限制:
-
無法在
try
的finally
塊中使用任何形式的yield
語句。 -
無法在包含任何
catch
語句的try
語句中使用yield return
語句。
期待.NET Core 3的正式發佈!
瞭解紐西蘭IT行業真實碼農生活
請長按上方二維碼關註“程式員在紐西蘭”