# 如何部署ASP.NET Core 到Linux伺服器 我們開發的最終目的,是將開發後的東西發佈網路上,以便自己及其他人使用。 本篇博客介紹如果在 linux 上部署 ASP.NET Core應用,使用nginx+systemd 來管理我們的應用。 ## 準備 - Ubuntu 20.04 - N ...
.net core使用channel消息隊列
背景
最近做一個項目,連接了很多設備,需要保存設備的心跳數據,剛開始的做法是直接接收到設備的數據之後進行心跳數據的保存,但是隨著設備多了起來,然後設備的使用時長不斷的加大,對資料庫的壓力也比較大,所以想著優化一下。
方案調研
1.使用第三方中間件
常見的使用redis,或者mq,只需要不斷的向中間件發送數據即可,redis使用隊列,如果是mq直接發送消息即可,使用起來簡單方便,但是要引入這些中間件,目前的架構裡面沒有,需要自己去起服務,維護。
2.使用channel
System.Threading.Channels 是.NET Core 3.0 後推出的新的集合類型, 具有非同步API,高性能,線程安全等特點,它可以用來做消息隊列,進行數據的生產和消費, 公開的 Writer
和 Reader
api對應消息的生產者和消費者,也讓Channel更加的簡潔和易用,與Rabbit MQ 等其他隊列不同的是,Channel 是進程內的隊列
目前就介紹來看非常完美,不需要添加第三方中間件,直接添加現有的模塊即可。
代碼實現
選擇了使用channel來做優化。拿到設備數據之後直接把消息丟入到channel,然後後臺使用定時任務或者自己實現hostservice去不斷的消費數據。
生產者代碼
public async Task ProduceHeartBeat(string message)
{
await channel.Writer.WriteAsync(message);
}
不斷的向裡面寫入數據即可.
消費者代碼
/// <summary>
/// timespan時間內消費多少數據
/// </summary>
/// <param name="count"></param>
/// <param name="timeSpan"></param>
/// <returns></returns>
public async Task<List<string>> ConsumeHeartBeatAsync(int count,TimeSpan timeSpan)
{
var result = new List<string>(count);
CancellationTokenSource cts = new CancellationTokenSource();
var cancellationToken = cts.Token;
cts.CancelAfter(timeSpan);
int rcount = 0;
while ( !cancellationToken.IsCancellationRequested && rcount<count)
{
//await Task.Delay(2000);
if (channel.Reader.TryRead(out var number))
{
Console.WriteLine(number);
result.Add(number);
rcount++;
}
else
{
break;
}
}
return result;
}
裡面加入了一個cancellationToken,進行消費的時長限制。在此時長內消費多少條數據,超時直接結束。
這就是基本的代碼
後臺定時消費數據
public class HeartBeatService : BackgroundService
{
private readonly HeartBeatsChannel heartBeatsChannel;
public HeartBeatService(HeartBeatsChannel heartBeatsChannel)
{
this.heartBeatsChannel = heartBeatsChannel;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
Task.Factory.StartNew(() =>
{
while (!stoppingToken.IsCancellationRequested)
{
//阻塞的隊列使得一直在同一個線程運行
Process(15,heartBeatsChannel).Wait();
}
}, TaskCreationOptions.LongRunning);
Console.WriteLine("主線程 現在運行的線程id為:" + Thread.CurrentThread.ManagedThreadId);
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
}
/// <summary>
/// 消費數據
/// </summary>
/// <param name="count">一次消費數量</param>
/// <param name="heartBeatsChannel"></param>
/// <returns></returns>
private async Task Process(int count ,HeartBeatsChannel heartBeatsChannel)
{
Console.WriteLine("子線程_現在運行的線程id為:" + Thread.CurrentThread.ManagedThreadId);
//每次消費三十個
if (heartBeatsChannel.IsHasContent)
{
//int count = 15;
//進行消費
await heartBeatsChannel.ConsumeHeartBeatAsync(count, TimeSpan.FromSeconds(3));
}
await Task.Delay(3000);
}
使用的是BackgroundServic,直接實現要處理的業務邏輯就好了。在這裡使用的是TaskCreationOptions.LongRunning,新開一個線程去處理心跳數據。
總結
以上就是主要的實現全過程,完整的代碼在github
https://github.com/lackguozi/LearnChannelWebApi
實際上完全可以不用後臺去定時消費數據,channel有很多api可以去處理,比如WaitToReadAsync(),但是這裡沒有使用,主要是不想持續的占資料庫資源???總結的話學習了channel的用法,底層似乎使用了deque??只稍微看了下源碼,但是看到了許多的lock,這個是必不可少的。還是巨硬輪子造的好 =_=