之前寫過一篇如使用阿裡雲上部署.NET 3.1自定義運行時的文章,吐槽一下,雖然現在已經2022年了,但是阿裡雲函數計算的支持依然停留在.NET Core 2.1,更新緩慢,由於程式解包大小的限制,也不能放太複雜的東西的上去,雖然現在.NET 6裁剪包能挺好地解決這個問題,但是心裡還是不爽。 需求 ...
之前寫過一篇如使用阿裡雲上部署.NET 3.1自定義運行時的文章,吐槽一下,雖然現在已經2022年了,但是阿裡雲函數計算的支持依然停留在.NET Core 2.1,更新緩慢,由於程式解包大小的限制,也不能放太複雜的東西的上去,雖然現在.NET 6裁剪包能挺好地解決這個問題,但是心裡還是不爽。
需求
言歸正傳,有這麼一個情景:發送數據想接入阿裡雲的IoT平臺,然後直接插入postgresQL資料庫中。正常來說,只要數據發送到了IoT平臺,然後定義轉發到RDS就可以了,不過阿裡雲有幾個限制:
- 數據流轉到RDS資料庫,只能支持
mysql
與sql server
- 數據流轉只支持
json
形式的數據流轉,如果發送的是透傳的數據,那麼發送不了(更新:現在新版的數據流轉已經支持了。)
思前想後,可能只能掏出阿裡雲的函數計算服務了,運用函數計算作為中轉,將透傳的數據流轉給函數計算,然後在函數計算中執行sql語句。
IoT平臺接收設置
阿裡雲的物聯網平臺,設置了基本的產品和設備之後,如果是物模型的話,那麼自行設置好對應的物模型。對於透傳就比較簡單了,支持MQTT的設備方只需要定義:
- 透傳的消息發送到/{productKey}/{deviceName}/user/update
- 訂閱阿裡雲的/{productKey}/{deviceName}/user/get
- 設置阿裡雲的Mqtt IoT實例終端節點:({YourProductKey}.iot-as-mqtt.{YourRegionId}.aliyuncs.com:1883
- 設置設備的ProductKey和ProductSecret
設置好之後,即可傳輸數據到阿裡雲IoT端,數據傳輸過來,看下日誌,如果能看到:
那說明就已經發送OK了,接收到的是普通的字元串(不是json),需要進行進一步解析。
IoT流轉設置
在雲產品流轉中,新建解析器,設置好數據源,數據目的選擇函數計算:
解析器腳本比較簡單:
var data = payload();
writeFc(1000, data);
註意,payload函數payload
(textEncoding)是內建的函數:
- 不傳入參數:預設按照UTF-8編碼轉換為字元串,即payload()等價於payload('utf-8')。
- 'json':將payload數據轉換成Map格式變數。如果payload不是JSON格式,則返回異常。
- 'binary':將payload數據轉換成二進位變數進行透傳。
這裡我使用文本透傳的形式,將數據轉成UTF8文本傳輸。writeFc指將轉換的內容傳遞給1000編號的目標函數。詳情見文檔。
當然還可以使用更為複雜的腳本,實現對腳本數據的初步處理,由於我這裡後面還有函數計算,我就直接將數據轉到下一個節點。
函數計算配置
按照官方文檔新建函數,請註意不需要新建觸發器!我們這裡的函數使用python語言,通過psycopg2
實現數據插入到postgres資料庫中。
由於函數計算中,預設並沒有該包,需要手動添加引用,官方建議使用Serverless Devs
工具安裝部署,這個玩意非常不好用,嗯,我不接受他的建議。推薦大家使用vscode,安裝阿裡雲serverless的插件,這樣其實更加方便。
按照插件的文檔,自己建立好服務與函數,預設會給一個函數入口:
# To enable the initializer feature (https://help.aliyun.com/document_detail/158208.html)
# please implement the initializer function as below:
# def initializer(context):
# logger = logging.getLogger()
# logger.info('initializing')
def handler(event, context):
logger = logging.getLogger()
logger.info(event)
return 'hello world'
我們首先在函數上右鍵,然後選擇Install Package
,選擇pip安裝psycopg2
,依賴就自動被安裝上了,這個非常方便。
請註意,通過IOT流轉過來的字元串,是b'data'
這樣的形式的形式,需要先decode一下,然後在處理,修改函數為:
# -*- coding: utf-8 -*-
import logging
import psycopg2
import uuid
import time
def insert_database(device_id,data):
timest = int(time.time()*1000)
guid = str(uuid.uuid1())
conn = psycopg2.connect(database="", user="", password="", host="", port="")
cur = conn.cursor()
sql = 'INSERT INTO "data"("Id","DeviceId","Timestamp", "DataArray") VALUES (\'{id}\', \'{deviceid}\', \'{timestamp}\', array{data})'
sql = sql.format(id= guid, deviceid= device_id, timestamp= timest, data= data)
cur.execute(sql)
conn.commit()
print(" Records inserted successfully")
conn.close()
def extract_string_array(data: bytes):
arr = data.decode().strip().split(' ')
# 寫自己的邏輯
return deviceid, resu
def handler(event, context):
logger = logging.getLogger()
logger.info(event)
device_id, result = extract_string_array(event)
insert_database(device_id, result)
return 'OK'
保存,然後在vscode中deploy即可。
提示:vscode中也可以進行本地的debug,還是比較方便的,不過這些功能依賴docker,所以還是提前裝好比較好。
弄完了之後,應該是能看見這樣的畫面:
至此,數據就正常流轉成功。
要點
- 不要設置觸發器,當時為了配置這個觸發器弄了非常長的時間
- 函數計算與資料庫的VPC應該相同,並且賦予許可權,否則無法訪問。
- 函數計算預設無法保持狀態,如果有這個需求,最好試試別的方案,或者看下函數計算的預留實例(常駐實例)
- 提前在本地安裝好Docker,要不會有各種各樣的問題出現。
- Postgresql插入數組格式的數據,需要註意格式,可以參考這篇文檔
- 如果長時間不用docker,導致docker無法啟動,可以參考這篇文章