Channel 是乾什麼的 The System.Threading.Channels namespace provides a set of synchronization data structures for passing data between producers and consume ...
Channel 是乾什麼的
The System.Threading.Channels namespace provides a set of synchronization data structures for passing data between producers and consumers asynchronously. The library targets .NET Standard and works on all .NET implementations.
Channels are an implementation of the producer/consumer conceptual programming model.
以上是微軟官方的解釋 channels。用中文說的話就是這個類提供了在生產者跟消費者之間非同步傳統數據的能力,簡單來說可以認為是一個記憶體消息隊列。
示例 1
下麵是一個簡單的示例,說明如何使用 Channel 類來創建一個生產者-消費者模型:
static async Task Main(string[] args)
{
var channel = Channel.CreateUnbounded<int>();
var producer = Task.Run(async () =>
{
for (int i = 0; i < 10; i++)
{
await channel.Writer.WriteAsync(i);
await Task.Delay(1000); // 模擬生產者需要一些時間來生成數據
}
channel.Writer.Complete();
});
var consumer = Task.Run(async () =>
{
await foreach (var item in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"消費者接收到: {item}");
}
});
await Task.WhenAll(producer, consumer);
}
在這個例子中,我們創建了一個無界的通道,然後創建了兩個任務,一個是生產者,一個是消費者。生產者每秒生成一個數字,然後寫入通道。消費者從通道中讀取數據並列印出來。當生產者完成寫入後,它會調用 channel.Writer.Complete() 來通知消費者沒有更多的數據可以讀取。
示例 2
你可以使用 Channel.CreateBounded
static async Task Main(string[] args)
{
var channel = Channel.CreateBounded<int>(5); // 創建一個容量為5的有界通道
var producer = Task.Run(async () =>
{
for (int i = 0; i < 10; i++)
{
await channel.Writer.WriteAsync(i);
Console.WriteLine($"生產者生成了: {i}");
await Task.Delay(1000); // 模擬生產者需要一些時間來生成數據
}
channel.Writer.Complete();
});
var consumer = Task.Run(async () =>
{
await foreach (var item in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"消費者接收到: {item}");
await Task.Delay(2000); // 模擬消費者需要一些時間來處理數據
}
});
await Task.WhenAll(producer, consumer);
}
在這個例子中,我們創建了一個容量為5的有界通道。生產者每秒生成一個數字,然後寫入通道。消費者從通道中讀取數據並列印出來,但消費者處理數據的速度比生產者慢,所以當通道滿時,生產者的 WriteAsync 操作將會阻塞,直到消費者讀取了一些數據,使得通道有空間可用。
示例 3
下麵是一個示例,展示瞭如何在多個生產者和消費者之間共用一個通道:
static async Task Main(string[] args)
{
var channel = Channel.CreateUnbounded<int>();
// 創建兩個生產者
var producer1 = Produce(channel.Writer, id: 1);
var producer2 = Produce(channel.Writer, id: 2);
// 創建兩個消費者
var consumer1 = Consume(channel.Reader, id: 1);
var consumer2 = Consume(channel.Reader, id: 2);
// 等待所有生產者和消費者完成
await Task.WhenAll(producer1, producer2, consumer1, consumer2);
}
static async Task Produce(ChannelWriter<int> writer, int id)
{
for (int i = 0; i < 10; i++)
{
await writer.WriteAsync(i);
Console.WriteLine($"生產者{id}生成了: {i}");
await Task.Delay(1000); // 模擬生產者需要一些時間來生成數據
}
writer.Complete();
}
static async Task Consume(ChannelReader<int> reader, int id)
{
await foreach (var item in reader.ReadAllAsync())
{
Console.WriteLine($"消費者{id}接收到: {item}");
await Task.Delay(2000); // 模擬消費者需要一些時間來處理數據
}
}
在這個例子中,我們創建了兩個生產者和兩個消費者,它們都共用同一個通道。這是一個非常重要使用模式。因為當我們使用消息隊列的時候往往會有多個生產者跟多個消費者。我們可以通過控制生產者生產的速度來控制推入隊列的數據量。我們還可以通過控制消費者的數量來控制消費數據的速度,從而來調節系統的流量,達到消峰填谷的作用。
總結
Channel 類是 .NET CORE 3.0 後新加入的類。為我們提供了便利的生產者/消費者模式實現方案。相當於是一個進程內的記憶體隊列,而且它沒有持久化,純記憶體操作,性能是非常非常高的。當我們面對真正的高併發的時候可以為我們的系統提供吞吐量。當然代價是記憶體跟放棄一些實時性。
關註我的公眾號一起玩轉技術
QQ群:1022985150 VX:kklldog 一起探討學習.NET技術
作者:Agile.Zhou(kklldog)
出處:http://www.cnblogs.com/kklldog/
本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,否則保留追究法律責任的權利。