使用MASA Stack+.Net 從零開始搭建IoT平臺 第五章 使用時序庫存儲上行數據

来源:https://www.cnblogs.com/MASA/archive/2023/06/26/17505493.html
-Advertisement-
Play Games

@[TOC](目錄) # 前言 我們可以將設備上行數據存儲到關係型資料庫中,我們需要兩張帶有時間戳的表(**最新數據表** 和 **歷史數據表**),**歷史數據表**存儲所有設備上報的數據,**最新數據表**需要存儲設備最新一條上報數據,這條最新數據相當於設備的當前狀態。然後展示的時候只展示最新一 ...


@

目錄


前言

我們可以將設備上行數據存儲到關係型資料庫中,我們需要兩張帶有時間戳的表(最新數據表歷史數據表),歷史數據表存儲所有設備上報的數據,最新數據表需要存儲設備最新一條上報數據,這條最新數據相當於設備的當前狀態。然後展示的時候只展示最新一條數據的狀態,報表查詢可以按照設備id和時間從歷史數據表查詢彙總。
這樣是可以的,但是我們的最新數據表需要被頻繁的更新,數據量少的時候沒問題。但數據量大,併發高的時候就會出現問題。
1、存儲成本:數據不會被壓縮,導致占用存儲資源。
2、維護成本:單表數據量太大時,需要人工分庫分表。
3、寫入性能:單機寫入吞吐量難以滿足大量上行數據的寫入需求,資料庫存在性能瓶頸。
4、查詢性能:數據量太大導致查詢性能受到影響。

分析

我們可以採用時序庫來解決上述問題,首先來瞭解一下什麼是時序數據。時序數據是按照時間維度進行索引的數據,它記錄了某個被測量實體在一定時間範圍內,每個時間點上的一組測試值。感測器上傳的室內PM2.5和甲醛數據、凈水器感測器當前的TDS值、電腦系統的監控數據等,都屬於時序數據,時序數據有如下特點:
1、數據量較大,寫入操作是持續且平穩的,而且寫多讀少。
2、只有寫入操作,幾乎沒有更新操作,比如去修改感測器的歷史數據,是沒有意義的。
3、沒有隨機刪除,即使刪除也是按照時間範圍進行刪除。刪除某一個時間點的數據沒有意義,但是刪除2年前的數據是有意義的。
4、數據實時性和時效性強,數據隨著時間的推移不斷追加,舊數據很快失去意義。
5、大部分以時間和實體為維度進行查詢,很少以測試值為維度查詢,比如用戶會查詢某個時間段的溫度數據,但是很少會去查詢溫度高於多少度的數據記錄。
顯然IoT的業務是符合使用時序庫的場景的。
序資料庫就是用來存儲時序數據的資料庫,時序資料庫相較於傳統的關係型數據和非關係型資料庫而言,專門優化了對時序數據的存儲,開源的時序資料庫有InfluxDB OpenTSDB、TimeScaleDB 等。本文以InfluxDB資料庫進行演示。
時序資料庫有如下幾個概念。
1.Metric:度量,相當於關係型資料庫中的表(table)。
2.Data Point:數據點,相當於關係型資料庫的中的行(row)。
3.Timestamp:時間戳,數據點生成時的時間戳。
4.Field:測量值,比如溫度和濕度、PM2.5等。
5.Tag:標簽,用於標識數據點,通常用來標識數據點的來源,比如溫度和濕度數據來自哪個房間,哪個設備,可以當作關係型資料庫表的主鍵。

如下圖,度量為 Wind,每一個數據點都具有一個 timestamp,兩個 field:direction 和 speed,兩個 tag:sensor、city。它的第一行和第三行,存放的都是 sensor 號碼為 95D8-7913 的設備,屬性城市是上海。隨著時間的變化,風向和風速都發生了改變,風向從 23.4 變成 23.2;而風速從 3.4 變成了 3.3。

圖片來自網路

實施步驟

時序庫的安裝

安裝參考官方文檔,為了方便,我這裡採用docker安裝

docker run --name influxdb -p 8086:8086 influxdb:2.7.0

https://docs.influxdata.com/influxdb/v2.7/install/

我們打開 伺服器ip:8086 可以看到它自帶的管理界面,我們首先創建用戶名密碼,組織、以及Bucket的名稱。
這裡的bucket "IoTDemos" 相當於資料庫的名稱

我們記錄一下這個Token,一會連接influxdb需要,相當於賬號密碼

解決playload沒有時間戳問題

對於時序庫來講,時間戳是非常重要的,但是我們拿到的playload並沒有時間戳(MQTTNet包我沒有找到拿時間戳的方法)。
所以我們需要在mqtt上想辦法,讓設備上報數據的時候,mqtt自動添加時間戳到playload中。
1、我們在數據集成->規則中新建一條規則名稱為"Add_Ts"。SQL編寫如下

SELECT
  *,
  now_timestamp('millisecond') as payload.Ts
FROM
  "topic/#"

topic/# 代表消息發佈到"topic/#"主題的事件
now_timestamp函數返回當前時間的 Unix 時間戳,我們將時間戳寫入到payload的Ts屬性中,關於更多內置SQL函數,請參考官方文檔

https://www.emqx.io/docs/zh/v5.0/data-integration/rule-sql-builtin-functions.html

2、我們打開下麵的調試,模擬設備上報一條數據,可以看到這條規則幫我們加入了時間戳。

3、然後我們還需要處理添加了時間戳的處理結果,我們在右側添加一個動作,選擇消息重發佈,將剛剛添加了時間戳的消息重發到一個新的Topic上,我們使用topic/dp,併在playload中添加${payload},這樣我們就修改了playload中的信息,添加了我們需要的時間戳,當然,我們Hub訂閱的消息也需要對應修改,添加/dp尾碼。

4、首先我們先修改MASA.IoT.Hub的配置文件,Topic添加"/dp"尾碼

  "MqttSetting": {
...
    "Topic": "$share/IotHub/topic/+/dp"
  },

5、CallbackAsync中,因為我們設備名稱是從Topic截取的,也要對應修改一下。

    private async Task CallbackAsync(MqttApplicationMessageReceivedEventArgs e)
    {
        var deviceDataPointStr = System.Text.Encoding.Default.GetString(e.ApplicationMessage.PayloadSegment);

        Console.WriteLine(deviceDataPointStr);
        var pubSubOptions = new PubSubOptions
        {
            //修改一下獲取設備名稱的方式
            DeviceName = e.ApplicationMessage.Topic[6..^3],
            Msg = deviceDataPointStr,
            PubTime = new DateTimeOffset(DateTime.Now).ToUnixTimeMilliseconds(),
            TrackId = Guid.NewGuid()
        };                            
...
    }

代碼編寫

解決完時間戳的問題,我們就可以編寫代碼向InfluxDB中寫入數據了,我們首先在Infrastructure文件夾下創建ITimeSeriesDbClient介面和TimeSeriesDbClient類,使用介面也方便我們日後更換其他的時序庫。
這裡使用了InfluxDB.Client包。
ITimeSeriesDbClient.cs

namespace MASA.IoT.Core.Infrastructure
{
    public interface ITimeSeriesDbClient
    {
        bool WriteMeasurement<T>(T measurement);
    }
}

TimeSeriesDbClient.cs

using InfluxDB.Client;
using InfluxDB.Client.Api.Domain;
using MASA.IoT.WebApi;
using Microsoft.Extensions.Options;

namespace MASA.IoT.Core.Infrastructure
{
    public class TimeSeriesDbClient : ITimeSeriesDbClient
    {
        private readonly InfluxDBClient _client;
        private readonly string _bucket;
        private readonly string _org;
        private readonly AppSettings _appSettings;
        
        public TimeSeriesDbClient(IOptions<AppSettings> settings)
        {
            _appSettings = settings.Value;
            _org = _appSettings.InfluxDBSetting.Org;
            _bucket = _appSettings.InfluxDBSetting.Bucket;
            _client = new InfluxDBClient(_appSettings.InfluxDBSetting.Url, _appSettings.InfluxDBSetting.Token);
        }

        public bool WriteMeasurement<T>(T measurement)
        {
            try
            {
                using var writeApi = _client.GetWriteApi();
                writeApi.WriteMeasurement<T>(measurement, WritePrecision.Ms, _bucket, _org);
                return true;
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
                return false;
            }
        }
    }
}

這裡使用new InfluxDBClient(_appSettings.InfluxDBSetting.Url, _appSettings.InfluxDBSetting.Token)來構造InfluxDBClient。
Token就是我們創建Bucket過程中保存的Token
Url是我們InfluxDB的訪問地址:http://127.0.0.1:8086
寫入的方法WriteMeasurement中我們通過_client.GetWriteApi創建一個寫入的api然後直接將我們要寫入的泛型實體寫入,第二個可選參數代表寫入精度,這裡我們使用WritePrecision.Ms
我們在DeviceHandler.cs中註入ITimeSeriesDbClient 並添加一個WriteMeasurementAsync方法,在方法中我們先根據設備名稱獲取產品,如果識別產品ID為10001(空凈產品),
那麼我們就寫入數據到Measurement:AirPurifierDataPoint
Measurement相當於資料庫的表。
MeasurementColumn特性都是InfluxDB.Client.Core提供的,可以用來標識TagTimestamp

using InfluxDB.Client.Core;
using Newtonsoft.Json;

namespace MASA.IoT.Core.Contract
{
    [Measurement("AirPurifierDataPoint")]
    public class AirPurifierDataPoint
    {
        /// <summary>
        /// 設備名稱
        /// </summary>
        [Column("DeviceName", IsTag = true)] public string DeviceName { get; set; }

        /// <summary>
        /// 產品ID
        /// </summary>
        [Column("ProductId", IsTag = true)] public Guid ProductId { get; set; }

        /// <summary>
        /// Pm2.5
        /// </summary>
        [Column("PM_25")] public double? Pm_25 { get; set; }
        /// <summary>
        /// 溫度
        /// </summary>
        [Column("Temperature")] public double? Temperature { get; set; }
        /// <summary>
        /// 濕度
        /// </summary>
        [Column("Humidity")] public double? Humidity { get; set; }
        /// <summary>
        /// 時間戳
        /// </summary>
        [JsonProperty(propertyName: "Ts")]
        [Column(IsTimestamp = true)] public long Timestamp { get; set; }
    }
}

    public class DeviceHandler : IDeviceHandler
    {
        private readonly MASAIoTContext _ioTDbContext;
        private readonly IMqttHandler _mqttHandler;
        private readonly ITimeSeriesDbClient _timeSeriesDbClient;

        public DeviceHandler(MASAIoTContext ioTDbContext, IMqttHandler mqttHandler, ITimeSeriesDbClient timeSeriesDbClient)
        {
            _ioTDbContext = ioTDbContext;
            _mqttHandler = mqttHandler;
            _timeSeriesDbClient = timeSeriesDbClient;
        }

        /// <summary>
        /// 寫入數據
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="pubSubOptions"></param>
        /// <returns></returns>
        public async Task<bool> WriteMeasurementAsync<T>(PubSubOptions pubSubOptions)
        {
            var device = await _ioTDbContext.IoTDeviceInfo.Include(o => o.ProductInfo).AsNoTracking()
                .FirstOrDefaultAsync(o => o.DeviceName == pubSubOptions.DeviceName);

            if (device != null && device.ProductInfo.ProductCode == "10001")  //空氣凈化器產品
            {
                var airPurifierDataPoint = JsonConvert.DeserializeObject<AirPurifierDataPoint>(pubSubOptions.Msg);

                airPurifierDataPoint.ProductId = device.ProductInfoId;
  
                return _timeSeriesDbClient.WriteMeasurement<AirPurifierDataPoint>(airPurifierDataPoint);

            }
            return false;
        }

除了WriteMeasurement方法之外,還提供了很多其他方法,如WritePoint,和批量寫入的方法,可自行測試。

測試

我們啟動項目,通過MQTTX向"topic/284202304230001"上報一條數據

{
  "DeviceName":"284202304230001",
  "Pm_25":100,
  "Temperature":25,
  "Humidity":50
}

我們在influxDB的管理工具中使用Data Explorer,使用如下的flux query查詢語句,即可查出5分鐘之內的數據,註意,這裡的時間是UTC時間

如果想顯示北京時區方便調試,可以在後面添加|> timeShift(duration: 8h)

from(bucket: "IoTDemos") 
|> range(start:-5m)


關於flux查詢語法

https://docs.influxdata.com/flux/v0.x/

總結

本節我們簡單介紹了開源時序資料庫influxDB的安裝。
我們藉助InfluxDB.Client庫完成設備從上報到時序庫數據存儲的全過程,下一節我們介紹從時序庫查詢數據。

完整代碼在這裡:https://github.com/sunday866/MASA.IoT-Training-Demos


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 1、準備需要的jar包並複製到伺服器某個目錄下 ![](https://img2023.cnblogs.com/blog/1928230/202306/1928230-20230626151543572-42173473.png) 2、在此目錄下,創建Dockerfile的文本文件,並將以下內容添加 ...
  • # hovertool `HoverTool` 是 `Bokeh` 庫中的一個工具,它可以在滑鼠懸停在圖上時顯示數據。當滑鼠指針放在圖表的特定部分(比如散點圖的點或者線圖中的線的時候),該工具會顯示與該部分相關的附加信息。 一般配套使用的是`from bokeh.plotting import fi ...
  • 最近在讀《數據密集型應用系統設計》,其中談到了zookeeper對容錯共識演算法的應用。這讓我想到之前參考的zookeeper學習資料中,誤將容錯共識演算法寫成了2PC(兩階段提交協議),所以準備以此文對共識演算法和2PC做梳理和區分,也希望它能幫助像我一樣對這兩者有誤解的同學。 ...
  • 上一篇介紹的**通用計算**是關於多個`numpy`數組的計算, 本篇介紹的**聚合計算**一般是針對單個數據集的各種統計結果,同樣,使用**聚合函數**,也可以避免繁瑣的迴圈語句的編寫。 # 元素的和 數組中的元素求和也就是合計值。 ## 調用方式 **聚合計算**有兩種調用方式,一種是面向對象的 ...
  • 主要介紹了WPF中的兩大類資源:應用資源和XAML 資源以及在使用資源時通過靜態資源引用和使用動態資源引用的區別,需要根據具體場景調整。 本文是學習WPF所作筆記,內容難免由紕漏,歡迎留言討論! ...
  • # 如何部署ASP.NET Core 到Linux伺服器 我們開發的最終目的,是將開發後的東西發佈網路上,以便自己及其他人使用。 本篇博客介紹如果在 linux 上部署 ASP.NET Core應用,使用nginx+systemd 來管理我們的應用。 ## 準備 - Ubuntu 20.04 - N ...
  • # 關於EF Core更新速度隨時間越來越慢的解決辦法 ## 概要 本篇主要介紹使用 `context.ChangeTracker.Clear() `方法,在通過迴圈進行批量更新時,通過手動清除跟蹤實體以提高性能的示例。 ## 背景 最近在做一些數據分析時,遇到了一個問題,當我把計算結果更新到資料庫 ...
  • 最早開發Admin 管理插件,是為了微服務節點,有可視化的界面,後續隨著優化開發,豐富了其它功能。目前任意 .Net 或.Net Core的項目,只要在Nuget 中引用 Taurus,即可享有以下管理後臺功能:1、微服務節點管理。2、系統環境信息管理。3、指標統計管理。4、系統配置管理。5、系統日... ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...