.net core使用channel消息隊列

来源:https://www.cnblogs.com/guoxiaotian/archive/2023/06/26/17506536.html
-Advertisement-
Play Games

# 如何部署ASP.NET Core 到Linux伺服器 我們開發的最終目的,是將開發後的東西發佈網路上,以便自己及其他人使用。 本篇博客介紹如果在 linux 上部署 ASP.NET Core應用,使用nginx+systemd 來管理我們的應用。 ## 準備 - Ubuntu 20.04 - N ...


.net core使用channel消息隊列

背景

最近做一個項目,連接了很多設備,需要保存設備的心跳數據,剛開始的做法是直接接收到設備的數據之後進行心跳數據的保存,但是隨著設備多了起來,然後設備的使用時長不斷的加大,對資料庫的壓力也比較大,所以想著優化一下。

方案調研

1.使用第三方中間件

常見的使用redis,或者mq,只需要不斷的向中間件發送數據即可,redis使用隊列,如果是mq直接發送消息即可,使用起來簡單方便,但是要引入這些中間件,目前的架構裡面沒有,需要自己去起服務,維護。

2.使用channel

System.Threading.Channels 是.NET Core 3.0 後推出的新的集合類型, 具有非同步API,高性能,線程安全等特點,它可以用來做消息隊列,進行數據的生產和消費, 公開的 WriterReader api對應消息的生產者和消費者,也讓Channel更加的簡潔和易用,與Rabbit MQ 等其他隊列不同的是,Channel 是進程內的隊列

目前就介紹來看非常完美,不需要添加第三方中間件,直接添加現有的模塊即可。

代碼實現

選擇了使用channel來做優化。拿到設備數據之後直接把消息丟入到channel,然後後臺使用定時任務或者自己實現hostservice去不斷的消費數據。

生產者代碼

 public async Task ProduceHeartBeat(string message)
        {
            await channel.Writer.WriteAsync(message);
        }

不斷的向裡面寫入數據即可.

消費者代碼

        /// <summary>
        /// timespan時間內消費多少數據
        /// </summary>
        /// <param name="count"></param>
        /// <param name="timeSpan"></param>
        /// <returns></returns>
        public async Task<List<string>> ConsumeHeartBeatAsync(int count,TimeSpan timeSpan)
        {
            var result = new List<string>(count);
            CancellationTokenSource cts = new CancellationTokenSource();
            var cancellationToken = cts.Token;
            cts.CancelAfter(timeSpan);
            int rcount = 0;
            while ( !cancellationToken.IsCancellationRequested && rcount<count)
            {
                //await Task.Delay(2000);
                if (channel.Reader.TryRead(out var number))
                {
                    Console.WriteLine(number);
                    result.Add(number);
                    rcount++;
                }
                else
                {
                    break;
                }
                
            }  
            return result;
        }

裡面加入了一個cancellationToken,進行消費的時長限制。在此時長內消費多少條數據,超時直接結束。

這就是基本的代碼

後臺定時消費數據

public class HeartBeatService : BackgroundService
    {
        private readonly HeartBeatsChannel heartBeatsChannel;

        public HeartBeatService(HeartBeatsChannel heartBeatsChannel)
        {
            this.heartBeatsChannel = heartBeatsChannel;
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            try
            {

                Task.Factory.StartNew(() =>
                {
                    while (!stoppingToken.IsCancellationRequested)
                    {
                        //阻塞的隊列使得一直在同一個線程運行
                        Process(15,heartBeatsChannel).Wait();
                    }

                }, TaskCreationOptions.LongRunning);

                Console.WriteLine("主線程 現在運行的線程id為:" + Thread.CurrentThread.ManagedThreadId);

                }
            catch (Exception ex)
            {
                Console.WriteLine(ex.ToString());
            }
        }


        /// <summary>
        /// 消費數據
        /// </summary>
        /// <param name="count">一次消費數量</param>
        /// <param name="heartBeatsChannel"></param>
        /// <returns></returns>
        private async Task Process(int count ,HeartBeatsChannel heartBeatsChannel)
        {
            Console.WriteLine("子線程_現在運行的線程id為:" + Thread.CurrentThread.ManagedThreadId);
            //每次消費三十個
            if (heartBeatsChannel.IsHasContent)
            {
                //int count = 15;
                //進行消費
                await heartBeatsChannel.ConsumeHeartBeatAsync(count, TimeSpan.FromSeconds(3));
            }           
            await Task.Delay(3000);
        }

使用的是BackgroundServic,直接實現要處理的業務邏輯就好了。在這裡使用的是TaskCreationOptions.LongRunning,新開一個線程去處理心跳數據。

總結

以上就是主要的實現全過程,完整的代碼在github

https://github.com/lackguozi/LearnChannelWebApi

實際上完全可以不用後臺去定時消費數據,channel有很多api可以去處理,比如WaitToReadAsync(),但是這裡沒有使用,主要是不想持續的占資料庫資源???總結的話學習了channel的用法,底層似乎使用了deque??只稍微看了下源碼,但是看到了許多的lock,這個是必不可少的。還是巨硬輪子造的好 =_=


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 當我們在開發項目時,有時需要用到外部依賴組件,例如當我們需要Json序列化的時候需要用到FastJson組件,我們可以通過下載對應jar包載入到項目中。但當一個大的項目同時需要依賴各種各樣的外部服務,就存在著配置繁瑣、依賴衝突等問題,因此可以通過maven來完成對應的依賴管理功能。 ...
  • ### 序 上一篇我們介紹了 k8s 的基本架構,我們在這篇文章將介紹 `Kubernetes` 關鍵組件和概念。 還是先來一張圖: ![1_2pdatNn7KzcQZpc8cOALOQ.webp][1] 根據上圖我們分別對`Deployment`、`ReplicaSet`、`Pod`詳細的介紹,其 ...
  • ### 一、項目結構 新建報表微服務模塊,這是我的項目結構圖。 ![](https://pic.smartasc.cn/blogPics/20230626141851.png) ### 二、執行初始化數據腳本 運行積木報表的初始化腳本,創建相關表結構,github速度太慢,推薦使用 [gitee地址 ...
  • 1、準備需要的jar包並複製到伺服器某個目錄下 ![](https://img2023.cnblogs.com/blog/1928230/202306/1928230-20230626151543572-42173473.png) 2、在此目錄下,創建Dockerfile的文本文件,並將以下內容添加 ...
  • # hovertool `HoverTool` 是 `Bokeh` 庫中的一個工具,它可以在滑鼠懸停在圖上時顯示數據。當滑鼠指針放在圖表的特定部分(比如散點圖的點或者線圖中的線的時候),該工具會顯示與該部分相關的附加信息。 一般配套使用的是`from bokeh.plotting import fi ...
  • 最近在讀《數據密集型應用系統設計》,其中談到了zookeeper對容錯共識演算法的應用。這讓我想到之前參考的zookeeper學習資料中,誤將容錯共識演算法寫成了2PC(兩階段提交協議),所以準備以此文對共識演算法和2PC做梳理和區分,也希望它能幫助像我一樣對這兩者有誤解的同學。 ...
  • 上一篇介紹的**通用計算**是關於多個`numpy`數組的計算, 本篇介紹的**聚合計算**一般是針對單個數據集的各種統計結果,同樣,使用**聚合函數**,也可以避免繁瑣的迴圈語句的編寫。 # 元素的和 數組中的元素求和也就是合計值。 ## 調用方式 **聚合計算**有兩種調用方式,一種是面向對象的 ...
  • 主要介紹了WPF中的兩大類資源:應用資源和XAML 資源以及在使用資源時通過靜態資源引用和使用動態資源引用的區別,需要根據具體場景調整。 本文是學習WPF所作筆記,內容難免由紕漏,歡迎留言討論! ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...