引言 在C#的併發編程中,Channel是一種非常強大的數據結構,用於在生產者和消費者之間進行通信。本文將首先通過一個實際的使用案例,介紹如何在C#中使用Channel,然後深入到Channel的源碼中,解析其內部的實現機制。 使用案例一:文件遍歷和過濾 在我們的使用案例中,我們需要遍歷一個文件夾及 ...
引言
在C#的併發編程中,Channel是一種非常強大的數據結構,用於在生產者和消費者之間進行通信。本文將首先通過一個實際的使用案例,介紹如何在C#中使用Channel,然後深入到Channel的源碼中,解析其內部的實現機制。
使用案例一:文件遍歷和過濾
在我們的使用案例中,我們需要遍歷一個文件夾及其所有子文件夾,並過濾出具有特定擴展名的文件。在此,我們使用了C#的Channel來實現這個任務。
首先,我們創建了一個名為EnumerateFilesRecursively的方法,這個方法接受一個文件夾路徑作為參數,並返回一個ChannelReader
ChannelReader<string> EnumerateFilesRecursively(string root, int capacity = 100, CancellationToken token = default)
{
var output = Channel.CreateBounded<string>(capacity);
async Task WalkDir(string path)
{
IEnumerable<string> files = null, directories = null;
try
{
files = Directory.EnumerateFiles(path);
directories = Directory.EnumerateDirectories(path);
}
catch (Exception ex)
{
Console.WriteLine($"An error occurred: {ex.Message}");
}
if (files != null)
{
foreach (var file in files)
{
await output.Writer.WriteAsync(file, token);
}
}
if (directories != null)
await Task.WhenAll(directories.Select(WalkDir));
}
Task.Run(async () =>
{
await WalkDir(root);
output.Writer.Complete();
}, token);
return output.Reader;
}
然後,我們創建了一個名為FilterByExtension的方法,這個方法接受一個ChannelReader
ChannelReader<FileInfo> FilterByExtension(
ChannelReader<string> input, IReadOnlySet<string> exts, CancellationToken token = default)
{
var output = Channel.CreateUnbounded<FileInfo>();
Task.Run(async () =>
{
try
{
await foreach (var file in input.ReadAllAsync(token).ConfigureAwait(false))
{
var fileInfo = new FileInfo(file);
if (exts.Contains(fileInfo.Extension))
await output.Writer.WriteAsync(fileInfo, token).ConfigureAwait(false);
}
}
catch (Exception ex)
{
Console.WriteLine($"An error occurred: {ex.Message}");
}
finally
{
output.Writer.Complete();
}
}, token);
return output;
}
最後,在Main方法中,我們首先調用EnumerateFilesRecursively方法,遍歷指定的文件夾,並得到一個文件路徑的Channel。然後,調用FilterByExtension方法,過濾出具有特定擴展名的文件,並得到一個文件信息的Channel。最後,遍歷這個Channel,列印出每個文件的全路徑。
var fileSource = EnumerateFilesRecursively("D:\\Program Files\\.nuget\\packages");
var sourceCodeFiles =
FilterByExtension(fileSource, new HashSet<string> { ".json", ".map", ".dll" });
await foreach (var file in sourceCodeFiles.ReadAllAsync().ConfigureAwait(false))
{
Console.WriteLine($"{file.FullName}");
}
Console.ReadKey();
在這個例子中,可以看到無論是文件的遍歷還是過濾,都是並行進行的,並且這兩個任務之間通過Channel進行瞭解耦,使得代碼更加簡潔和清晰。此外,由於Channel的非同步特性,我們的程式在等待數據的時候不會阻塞,從而大大提高了程式的性能和響應性。
使用案例二:Excel讀取與翻譯內容
在我們的使用案例中,我們需要讀取Excel文件,同時將讀取的內容處理,調用對應的翻譯服務進行翻譯,並將翻譯結果列印到控制台並存儲到新的Excel文件中。為此,我們定義了一個名為ExcelTranslationProvider的類。
ExcelTranslationProvider類
ExcelTranslationProvider類是一個專門處理Excel文件翻譯的工具。它主要使用了.NET的Channel來處理非同步數據流,從而提高了翻譯的效率。以下是該類的代碼:
public class ExcelTranslationProvider : TranslationProvider
{
public static Translater Translater { get; set; } = Translater.Azure;
public static II18NTermTranslateService TranslateService => TranslateServiceProvider.GetTranslateService(Translater);
private static ExcelTranslationParameters translationParameters;
public static async Task Translate(TranslationParameters parameters)
{
if (parameters is not ExcelTranslationParameters excelParameters)
throw new ArgumentException("Invalid parameters for Excel translation.");
translationParameters = excelParameters;
var translateText = TranslateText(excelParameters.Path);
var i = 1;
List<TranslationDto> list = new List<TranslationDto>();
await foreach (var text in translateText.ReadAllAsync().ConfigureAwait(false))
{
System.Console.WriteLine($"{i++}、" + text.TranslatText);
list.Add(text);
}
await ExcelUtil.SaveAsAsync(excelParameters.SavePath, list);
}
private static ChannelReader<TranslationDto> TranslateText(string path)
{
var output = Channel.CreateUnbounded<TranslationDto>();
_ = TranslateAndWriteToChannelAsync(path, output.Writer);
return output.Reader;
}
private static async Task TranslateAndWriteToChannelAsync(string path, ChannelWriter<TranslationDto> writer)
{
var query = await ExcelUtil.QueryAsync<TranslationDto>(path, translationParameters.Sheet);
var tasks = query.Select(async item =>
{
try
{
var res = await TranslateService.TranslateSync(item.Name, "en-US");
item.TranslatText = res;
await writer.WriteAsync(item);
}
catch (Exception ex)
{
System.Console.WriteLine($"An error occurred: {ex.Message}");
}
});
await Task.WhenAll(tasks);
writer.Complete();
}
}
-
Translater和TranslateService:這兩個靜態屬性用於配置和獲取翻譯服務。Translater是一個枚舉類型,表示可用的翻譯服務提供者。預設的翻譯服務是Azure。TranslateService是一個只讀屬性,返回一個實現了II18NTermTranslateService介面的翻譯服務對象。這個對象是通過TranslateServiceProvider.GetTranslateService(Translater)方法獲取的。
-
translationParameters:用於保存翻譯參數,這些參數包括源文件的路徑、目標文件的路徑等。
-
Translate:這個方法首先檢查傳入的參數是否為ExcelTranslationParameters類型。然後,它調用TranslateText方法開始翻譯過程。翻譯的結果被保存在一個List
列表中,然後寫入到Excel文件。 -
TranslateText:它創建了一個無界Channel,並啟動了一個非同步任務來進行翻譯操作並將結果寫入到Channel中。無界Channel是一種可以存儲任意數量元素的Channel,它是通過Channel.CreateUnbounded
()方法創建的。創建Channel後,這個方法返回Channel的讀取端,同時啟動了一個非同步任務TranslateAndWriteToChannelAsync來進行翻譯並將結果寫入到Channel的中。 -
TranslateAndWriteToChannelAsync:它負責從Excel文件中讀取數據,進行翻譯,並將翻譯結果寫入到Channel中。這個方法首先從Excel文件中讀取數據,然後為每一條數據創建一個非同步翻譯任務。所有的翻譯任務是併發執行的,使用了Task.WhenAll(tasks)來等待所有的翻譯任務完成。完成所有的翻譯任務後,這個方法調用writer.Complete()方法來表示沒有更多的數據要寫入到Channel中。