@[TOC](目錄) # 前言 我們可以將設備上行數據存儲到關係型資料庫中,我們需要兩張帶有時間戳的表(**最新數據表** 和 **歷史數據表**),**歷史數據表**存儲所有設備上報的數據,**最新數據表**需要存儲設備最新一條上報數據,這條最新數據相當於設備的當前狀態。然後展示的時候只展示最新一 ...
@
目錄前言
我們可以將設備上行數據存儲到關係型資料庫中,我們需要兩張帶有時間戳的表(最新數據表 和 歷史數據表),歷史數據表存儲所有設備上報的數據,最新數據表需要存儲設備最新一條上報數據,這條最新數據相當於設備的當前狀態。然後展示的時候只展示最新一條數據的狀態,報表查詢可以按照設備id和時間從歷史數據表查詢彙總。
這樣是可以的,但是我們的最新數據表需要被頻繁的更新,數據量少的時候沒問題。但數據量大,併發高的時候就會出現問題。
1、存儲成本:數據不會被壓縮,導致占用存儲資源。
2、維護成本:單表數據量太大時,需要人工分庫分表。
3、寫入性能:單機寫入吞吐量難以滿足大量上行數據的寫入需求,資料庫存在性能瓶頸。
4、查詢性能:數據量太大導致查詢性能受到影響。
分析
我們可以採用時序庫來解決上述問題,首先來瞭解一下什麼是時序數據。時序數據是按照時間維度進行索引的數據,它記錄了某個被測量實體在一定時間範圍內,每個時間點上的一組測試值。感測器上傳的室內PM2.5和甲醛數據、凈水器感測器當前的TDS值、電腦系統的監控數據等,都屬於時序數據,時序數據有如下特點:
1、數據量較大,寫入操作是持續且平穩的,而且寫多讀少。
2、只有寫入操作,幾乎沒有更新操作,比如去修改感測器的歷史數據,是沒有意義的。
3、沒有隨機刪除,即使刪除也是按照時間範圍進行刪除。刪除某一個時間點的數據沒有意義,但是刪除2年前的數據是有意義的。
4、數據實時性和時效性強,數據隨著時間的推移不斷追加,舊數據很快失去意義。
5、大部分以時間和實體為維度進行查詢,很少以測試值為維度查詢,比如用戶會查詢某個時間段的溫度數據,但是很少會去查詢溫度高於多少度的數據記錄。
顯然IoT的業務是符合使用時序庫的場景的。
序資料庫就是用來存儲時序數據的資料庫,時序資料庫相較於傳統的關係型數據和非關係型資料庫而言,專門優化了對時序數據的存儲,開源的時序資料庫有InfluxDB OpenTSDB、TimeScaleDB 等。本文以InfluxDB資料庫進行演示。
時序資料庫有如下幾個概念。
1.Metric:度量,相當於關係型資料庫中的表(table)。
2.Data Point:數據點,相當於關係型資料庫的中的行(row)。
3.Timestamp:時間戳,數據點生成時的時間戳。
4.Field:測量值,比如溫度和濕度、PM2.5等。
5.Tag:標簽,用於標識數據點,通常用來標識數據點的來源,比如溫度和濕度數據來自哪個房間,哪個設備,可以當作關係型資料庫表的主鍵。
如下圖,度量為 Wind,每一個數據點都具有一個 timestamp,兩個 field:direction 和 speed,兩個 tag:sensor、city。它的第一行和第三行,存放的都是 sensor 號碼為 95D8-7913 的設備,屬性城市是上海。隨著時間的變化,風向和風速都發生了改變,風向從 23.4 變成 23.2;而風速從 3.4 變成了 3.3。
圖片來自網路
實施步驟
時序庫的安裝
安裝參考官方文檔,為了方便,我這裡採用docker安裝
docker run --name influxdb -p 8086:8086 influxdb:2.7.0
我們打開 伺服器ip:8086 可以看到它自帶的管理界面,我們首先創建用戶名密碼,組織、以及Bucket的名稱。
這裡的bucket "IoTDemos" 相當於資料庫的名稱
我們記錄一下這個Token,一會連接influxdb需要,相當於賬號密碼
解決playload沒有時間戳問題
對於時序庫來講,時間戳是非常重要的,但是我們拿到的playload並沒有時間戳(MQTTNet包我沒有找到拿時間戳的方法)。
所以我們需要在mqtt上想辦法,讓設備上報數據的時候,mqtt自動添加時間戳到playload中。
1、我們在數據集成->規則中新建一條規則名稱為"Add_Ts"。SQL編寫如下
SELECT
*,
now_timestamp('millisecond') as payload.Ts
FROM
"topic/#"
topic/# 代表消息發佈到"topic/#"主題的事件
now_timestamp函數返回當前時間的 Unix 時間戳,我們將時間戳寫入到payload的Ts屬性中,關於更多內置SQL函數,請參考官方文檔
https://www.emqx.io/docs/zh/v5.0/data-integration/rule-sql-builtin-functions.html
2、我們打開下麵的調試,模擬設備上報一條數據,可以看到這條規則幫我們加入了時間戳。
3、然後我們還需要處理添加了時間戳的處理結果,我們在右側添加一個動作,選擇消息重發佈,將剛剛添加了時間戳的消息重發到一個新的Topic上,我們使用topic/dp,併在playload中添加${payload},這樣我們就修改了playload中的信息,添加了我們需要的時間戳,當然,我們Hub訂閱的消息也需要對應修改,添加/dp尾碼。
4、首先我們先修改MASA.IoT.Hub的配置文件,Topic添加"/dp"尾碼
"MqttSetting": {
...
"Topic": "$share/IotHub/topic/+/dp"
},
5、CallbackAsync中,因為我們設備名稱是從Topic截取的,也要對應修改一下。
private async Task CallbackAsync(MqttApplicationMessageReceivedEventArgs e)
{
var deviceDataPointStr = System.Text.Encoding.Default.GetString(e.ApplicationMessage.PayloadSegment);
Console.WriteLine(deviceDataPointStr);
var pubSubOptions = new PubSubOptions
{
//修改一下獲取設備名稱的方式
DeviceName = e.ApplicationMessage.Topic[6..^3],
Msg = deviceDataPointStr,
PubTime = new DateTimeOffset(DateTime.Now).ToUnixTimeMilliseconds(),
TrackId = Guid.NewGuid()
};
...
}
代碼編寫
解決完時間戳的問題,我們就可以編寫代碼向InfluxDB中寫入數據了,我們首先在Infrastructure文件夾下創建ITimeSeriesDbClient介面和TimeSeriesDbClient類,使用介面也方便我們日後更換其他的時序庫。
這裡使用了InfluxDB.Client包。
ITimeSeriesDbClient.cs
namespace MASA.IoT.Core.Infrastructure
{
public interface ITimeSeriesDbClient
{
bool WriteMeasurement<T>(T measurement);
}
}
TimeSeriesDbClient.cs
using InfluxDB.Client;
using InfluxDB.Client.Api.Domain;
using MASA.IoT.WebApi;
using Microsoft.Extensions.Options;
namespace MASA.IoT.Core.Infrastructure
{
public class TimeSeriesDbClient : ITimeSeriesDbClient
{
private readonly InfluxDBClient _client;
private readonly string _bucket;
private readonly string _org;
private readonly AppSettings _appSettings;
public TimeSeriesDbClient(IOptions<AppSettings> settings)
{
_appSettings = settings.Value;
_org = _appSettings.InfluxDBSetting.Org;
_bucket = _appSettings.InfluxDBSetting.Bucket;
_client = new InfluxDBClient(_appSettings.InfluxDBSetting.Url, _appSettings.InfluxDBSetting.Token);
}
public bool WriteMeasurement<T>(T measurement)
{
try
{
using var writeApi = _client.GetWriteApi();
writeApi.WriteMeasurement<T>(measurement, WritePrecision.Ms, _bucket, _org);
return true;
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
return false;
}
}
}
}
這裡使用new InfluxDBClient(_appSettings.InfluxDBSetting.Url, _appSettings.InfluxDBSetting.Token)來構造InfluxDBClient。
Token就是我們創建Bucket過程中保存的Token。
Url是我們InfluxDB的訪問地址:http://127.0.0.1:8086
寫入的方法WriteMeasurement中我們通過_client.GetWriteApi創建一個寫入的api然後直接將我們要寫入的泛型實體寫入,第二個可選參數代表寫入精度,這裡我們使用WritePrecision.Ms
我們在DeviceHandler.cs中註入ITimeSeriesDbClient 並添加一個WriteMeasurementAsync方法,在方法中我們先根據設備名稱獲取產品,如果識別產品ID為10001(空凈產品),
那麼我們就寫入數據到Measurement:AirPurifierDataPoint
Measurement相當於資料庫的表。
Measurement和Column特性都是InfluxDB.Client.Core提供的,可以用來標識Tag、Timestamp等
using InfluxDB.Client.Core;
using Newtonsoft.Json;
namespace MASA.IoT.Core.Contract
{
[Measurement("AirPurifierDataPoint")]
public class AirPurifierDataPoint
{
/// <summary>
/// 設備名稱
/// </summary>
[Column("DeviceName", IsTag = true)] public string DeviceName { get; set; }
/// <summary>
/// 產品ID
/// </summary>
[Column("ProductId", IsTag = true)] public Guid ProductId { get; set; }
/// <summary>
/// Pm2.5
/// </summary>
[Column("PM_25")] public double? Pm_25 { get; set; }
/// <summary>
/// 溫度
/// </summary>
[Column("Temperature")] public double? Temperature { get; set; }
/// <summary>
/// 濕度
/// </summary>
[Column("Humidity")] public double? Humidity { get; set; }
/// <summary>
/// 時間戳
/// </summary>
[JsonProperty(propertyName: "Ts")]
[Column(IsTimestamp = true)] public long Timestamp { get; set; }
}
}
public class DeviceHandler : IDeviceHandler
{
private readonly MASAIoTContext _ioTDbContext;
private readonly IMqttHandler _mqttHandler;
private readonly ITimeSeriesDbClient _timeSeriesDbClient;
public DeviceHandler(MASAIoTContext ioTDbContext, IMqttHandler mqttHandler, ITimeSeriesDbClient timeSeriesDbClient)
{
_ioTDbContext = ioTDbContext;
_mqttHandler = mqttHandler;
_timeSeriesDbClient = timeSeriesDbClient;
}
/// <summary>
/// 寫入數據
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="pubSubOptions"></param>
/// <returns></returns>
public async Task<bool> WriteMeasurementAsync<T>(PubSubOptions pubSubOptions)
{
var device = await _ioTDbContext.IoTDeviceInfo.Include(o => o.ProductInfo).AsNoTracking()
.FirstOrDefaultAsync(o => o.DeviceName == pubSubOptions.DeviceName);
if (device != null && device.ProductInfo.ProductCode == "10001") //空氣凈化器產品
{
var airPurifierDataPoint = JsonConvert.DeserializeObject<AirPurifierDataPoint>(pubSubOptions.Msg);
airPurifierDataPoint.ProductId = device.ProductInfoId;
return _timeSeriesDbClient.WriteMeasurement<AirPurifierDataPoint>(airPurifierDataPoint);
}
return false;
}
除了WriteMeasurement方法之外,還提供了很多其他方法,如WritePoint,和批量寫入的方法,可自行測試。
測試
我們啟動項目,通過MQTTX向"topic/284202304230001"上報一條數據
{
"DeviceName":"284202304230001",
"Pm_25":100,
"Temperature":25,
"Humidity":50
}
我們在influxDB的管理工具中使用Data Explorer,使用如下的flux query查詢語句,即可查出5分鐘之內的數據,註意,這裡的時間是UTC時間
如果想顯示北京時區方便調試,可以在後面添加|> timeShift(duration: 8h)
from(bucket: "IoTDemos")
|> range(start:-5m)
關於flux查詢語法
總結
本節我們簡單介紹了開源時序資料庫influxDB的安裝。
我們藉助InfluxDB.Client庫完成設備從上報到時序庫數據存儲的全過程,下一節我們介紹從時序庫查詢數據。
完整代碼在這裡:https://github.com/sunday866/MASA.IoT-Training-Demos