DolphinDB +Python Airflow 高效實現數據清洗

来源:https://www.cnblogs.com/DolphinDB/archive/2023/04/14/17306907.html
-Advertisement-
Play Games

DolphinDB 作為一款高性能時序資料庫,其在實際生產環境中常有數據的清洗、裝換以及載入等需求,而對於該如何結構化管理好 ETL 作業,Airflow 提供了一種很好的思路。本篇教程為生產環境中 ETL 實踐需求提供了一個解決方案,將 Python Airflow 引入到 DolphinDB 的 ...


DolphinDB 作為一款高性能時序資料庫,其在實際生產環境中常有數據的清洗、裝換以及載入等需求,而對於該如何結構化管理好 ETL 作業,Airflow 提供了一種很好的思路。本篇教程為生產環境中 ETL 實踐需求提供了一個解決方案,將 Python Airflow 引入到 DolphinDB 的高可用集群中,通過使用 Airflow 所提供的功能來實現更好管理 DolphinDB 數據 ETL 作業,整體架構如下:

1. Airflow

1.1 Airflow 簡介

Airflow 是一個可編程,調度和監控的工作流平臺,基於有向無環圖 (Directed acyclic graph, DAG),Airflow 可以定義一組有依賴的任務,按照依賴依次執行。Airflow 提供了豐富的命令行工具用於系統管控,而其 web 管理界面同樣也可以方便地管控調度任務,並且對任務運行狀態進行實時監控,方便了系統的運維和管理。

1.2 Airflow 部分核心功能

  • 增量載入數據:當表或數據集較小時,可以整體載入數據。但是隨著數據的增長,以固定的時間間隔增量提取數據才是 ETL 的常態,僅載入一個小時,一天,一周數據的需求非常普遍。Airflow 可以容易地以特定的時間間隔載入增量數據。
  • 處理歷史數據:在某些情況下,您剛剛完成一個新的工作流程並且需要回溯到將新代碼投入生產的日期的數據。在這種情況下,您只需使用 DAG 中的 start_date 參數以指定開始日期。然後,Airflow 將回填任務到開始日期。此外,還可以使用 DAG 的參數來處理數周、數月或數年的數據。
  • 分區提取的數據:通過對數據的目的地進行分區,可以並行運行 DAG,避免對提取的數據進行鎖定,併在讀取相同數據時優化性能。不再相關的數據可以存檔並從資料庫中刪除。
  • 強制冪等約束:DAG 運行的結果應始終具有冪等特性。這意味著當您使用相同的參數多次運行某個流程時(即使在不同的日期),結果也將完全相同。
  • 有條件地執行:Airflow 具有一些選項,可根據之前的實例的成功來控制 DAG 中任務的運行方式。

1.3 DolphinDBOperator

DolphinDBOperator 是 Airflow 的 operator 一種,通過 DolphinDBOperator 可以在 Airflow 連接 DolphinDB 進行數據寫入、查詢、計算等操作。DolphinDBOperator 特有的參數有:

  • dolphindb_conn_id: 用於指定 DolphinDB 連接,可在 connection 中設置
  • sql: 指定需要運行的 DolphinDB 腳本
  • file_path: 可以指定 DolphinDB dos 文件運行腳本

​ DolphinDBOperator 使用示例如下:

  • 通過 sql 參數指定任務內容運行腳本:
//在 DolphinDB 中創建一個共用表
create_parameter_table = DolphinDBOperator(
        task_id='create_parameter_table',
        dolphindb_conn_id='dolphindb_test',
        sql='''
            undef(`paramTable,SHARED)
            t = table(1:0, `param`value, [STRING, STRING])
            share t as paramTable
            '''
    )
  • 通過 file_path 指定 dos 文件運行腳本:
    //CalAlpha001.dos 為 DolphinDB 腳本
    case1 = DolphinDBOperator(
        task_id='case1',
        dolphindb_conn_id='dolphindb_test',
        file_path=path + "/StreamCalculating/CalAlpha001.dos"
    )

1.4 Airflow 安裝部署

  • 硬體環境

  • 軟體環境:

註:
1.本教程使用 SQLite 資料庫作為後端存儲,如果因 SQLite 版本過低無法啟動,可參考設置資料庫,升級 SQLlite 或更改預設資料庫。
2.在流程開始前建議預先構建 DolphinDB 服務。具體安裝方法可以參考 DolphinDB 高可用集群部署教程。也可以參考基於 Docker-Compose 的 DolphinDB 多容器集群部署

  • 主機環境
  1. 首先,在安裝 Airflow 之前要確保主機上安裝了 python3 dolphindb dolphindb-operator 三個依賴包。執行以下命令完成對這三個依賴包的安裝。 依賴包可從附件中獲取。
pip install --force-reinstall dolphindb
pip install --force-reinstall dolphindbapi-1.0.0-py3-none-any.whl
pip install --force-reinstall apache_Airflow_providers_dolphindb-1.0.0-py3-none-any.whl

本教程使用的 Airflow 的安裝包僅提供離線版安裝,線上版安裝會在正式發佈後提供安裝方式。

  1. 安裝好 airflow.provide.dolphindb 插件後,啟動 Airflow :

部署以及安裝 Airflow 詳情見官網:airflow 快速入門。以下為啟動 Airflow 的核心代碼:

#初始化資料庫
airflow db init

#創建用戶
airflow users create  --username admin  --firstname Peter  --lastname Parker  --role Admin  --email [email protected]  --password admin

# 守護進程運行 webserve
airflow webserver --port 8080 -D

# 守護進程運行 scheduler
airflow scheduler -D1#初始化資料庫 2airflow db init 3 4#創建用戶 5airflow users create  --username admin  --firstname Peter  --lastname Parker  --role Admin  --email [email protected]  --password admin 6 7# 守護進程運行webserve 8airflow webserver --port 8080 -D 9 10# 守護進程運行scheduler 11airflow scheduler -D 
  1. 執行以下命令驗證 Airflow 是否成功啟動:

ps -aux|grep airflow

預期輸出如下圖,證明 Airflow 啟動成功:

  1. 啟動成功後,瀏覽器中登陸 Airflow 的 web 界面:
  • 預設地址:http://IP:8080
  • 預設賬戶:初始化 db 中創建,本文例子中為 admin
  • 預設密碼:初始化 db 中創建, 本文例子中為 admin

  1. 輸入上述創建用戶名密碼即可進入 Airflow 的 UI 界面,如下所示:

  1. 填寫 DolphinDB 連接信息後連接到 DolphinDB 資料庫。

連接成功後,在 DolphinDBOperator 中指定 dolphindb_conn_id='dolphindb_test',即可運行 DolphinDB 腳本。上述準備工作完成後,下文以一個股票快照數據的 ETL 過程為例展現 Airflow 如何和 DolphinDB 交互。

2. Airflow 調度對行情數據 ETL

2.1 整體 ETL 架構圖

功能模塊代碼目錄結構詳解

  • add:增量數據 ETL

    • addLoadSnapshot:每日新增 Snapshot 原始數據導入
    • addProcessSnapshot:增量 Snapshot 處理成 ArrayVector 以及清洗數據
    • addFactor:增加合成日 K 及一分鐘 K 數據並存儲
    • addETL.py:構建增量數據 DAG
  • full:全量數據 ETL

    • loadSnapshot:Snapshot 建表與導入
    • processSnapshot:Snapshot 清洗結果建表,將數據處理成 ArrayVector 以及清洗數據並存儲
    • Factor:創建因數存儲表,將清洗後數據加工成日 K 以及一分鐘 K 數據並存儲
    • fullETL.py:構建全量數據 DAG

外部數據源 - > ODS 數據源:將原始數據從外部數據源導入 DolphinDB

ODS 數據源 - >DWD 數據明細:清洗原始數據,將原始數據中的多檔數據清洗成 ArrayVector 並去重

DWD 數據明細 - > DWB/DWS 數據彙總: 對清洗後的快照數據進行計算加工合成 K 線數據

註:
本教程使用 DolphinDB 中 module 功能以及 DolphinDB 客戶端工具進行工程化管理 DolphinDB 腳本,詳細介紹見 DolphinDB教程: 模塊 以及 DolphinDB客戶端軟體教程

2.2 數據介紹

本教程選取了 2020.01.04 - 2021.01.08 全市場所有股票的 5 天的 level 2 快照數據。以下是快照表在DolphinDB的結構。BidOrderQty,BidPrice,BidNumOrders,BidOrders,OfferPrice,OfferOrderQty,OfferNumOrders 和 OfferOrders 8個欄位分別包含多檔數據,在 DolphinDB 中採用 ArrayVector 數據類型來保存:

2.3 DolphinDB 核心清洗腳本介紹

2.3.1 創建分散式庫表

  • 創建 snapshot 原始數據存儲表:

創建存儲原始 snapshot 原始數據的庫表,核心代碼如下:

module loadSnapshot::createSnapshotTable

//創建 snapshot 原始數據存儲庫表
def createSnapshot(dbName, tbName){
	login("admin", "123456")
	if(existsDatabase(dbName)){
		dropDatabase(dbName)
	}
	db1 = database(, VALUE, 2020.01.01..2021.01.01)
	db2 = database(, HASH, [SYMBOL, 50])
	//按天和股票代碼組合分區
	db = database(dbName,COMPO,[db1,db2],engine='TSDB')
	colName = ["SecurityID","DateTime","PreClosePx","OpenPx","HighPx","LowPx","LastPx","TotalVolumeTrade","TotalValueTrade","InstrumentStatus","BidPrice0","BidPrice1","BidPrice2","BidPrice3","BidPrice4","BidPrice5","BidPrice6","BidPrice7","BidPrice8","BidPrice9","BidOrderQty0","BidOrderQty1","BidOrderQty2","BidOrderQty3","BidOrderQty4","BidOrderQty5","BidOrderQty6","BidOrderQty7","BidOrderQty8","BidOrderQty9","BidNumOrders0","BidNumOrders1","BidNumOrders2","BidNumOrders3","BidNumOrders4","BidNumOrders5","BidNumOrders6","BidNumOrders7","BidNumOrders8","BidNumOrders9","BidOrders0","BidOrders1","BidOrders2","BidOrders3","BidOrders4","BidOrders5","BidOrders6","BidOrders7","BidOrders8","BidOrders9","BidOrders10","BidOrders11","BidOrders12","BidOrders13","BidOrders14","BidOrders15","BidOrders16","BidOrders17","BidOrders18","BidOrders19","BidOrders20","BidOrders21","BidOrders22","BidOrders23","BidOrders24","BidOrders25","BidOrders26","BidOrders27","BidOrders28","BidOrders29","BidOrders30","BidOrders31","BidOrders32","BidOrders33","BidOrders34","BidOrders35","BidOrders36","BidOrders37","BidOrders38","BidOrders39","BidOrders40","BidOrders41","BidOrders42","BidOrders43","BidOrders44","BidOrders45","BidOrders46","BidOrders47","BidOrders48","BidOrders49","OfferPrice0","OfferPrice1","OfferPrice2","OfferPrice3","OfferPrice4","OfferPrice5","OfferPrice6","OfferPrice7","OfferPrice8","OfferPrice9","OfferOrderQty0","OfferOrderQty1","OfferOrderQty2","OfferOrderQty3","OfferOrderQty4","OfferOrderQty5","OfferOrderQty6","OfferOrderQty7","OfferOrderQty8","OfferOrderQty9","OfferNumOrders0","OfferNumOrders1","OfferNumOrders2","OfferNumOrders3","OfferNumOrders4","OfferNumOrders5","OfferNumOrders6","OfferNumOrders7","OfferNumOrders8","OfferNumOrders9","OfferOrders0","OfferOrders1","OfferOrders2","OfferOrders3","OfferOrders4","OfferOrders5","OfferOrders6","OfferOrders7","OfferOrders8","OfferOrders9","OfferOrders10","OfferOrders11","OfferOrders12","OfferOrders13","OfferOrders14","OfferOrders15","OfferOrders16","OfferOrders17","OfferOrders18","OfferOrders19","OfferOrders20","OfferOrders21","OfferOrders22","OfferOrders23","OfferOrders24","OfferOrders25","OfferOrders26","OfferOrders27","OfferOrders28","OfferOrders29","OfferOrders30","OfferOrders31","OfferOrders32","OfferOrders33","OfferOrders34","OfferOrders35","OfferOrders36","OfferOrders37","OfferOrders38","OfferOrders39","OfferOrders40","OfferOrders41","OfferOrders42","OfferOrders43","OfferOrders44","OfferOrders45","OfferOrders46","OfferOrders47","OfferOrders48","OfferOrders49","NumTrades","IOPV","TotalBidQty","TotalOfferQty","WeightedAvgBidPx","WeightedAvgOfferPx","TotalBidNumber","TotalOfferNumber","BidTradeMaxDuration","OfferTradeMaxDuration","NumBidOrders","NumOfferOrders","WithdrawBuyNumber","WithdrawBuyAmount","WithdrawBuyMoney","WithdrawSellNumber","WithdrawSellAmount","WithdrawSellMoney","ETFBuyNumber","ETFBuyAmount","ETFBuyMoney","ETFSellNumber","ETFSellAmount","ETFSellMoney"]
	colType = ["SYMBOL","TIMESTAMP","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","INT","DOUBLE","SYMBOL","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","DOUBLE","DOUBLE","INT","INT","INT","INT","INT","INT","INT","INT","DOUBLE","INT","INT","DOUBLE","INT","INT","INT","INT","INT","INT"]
	schemaTable = table(1:0,colName, colType)
	
	db.createPartitionedTable(table=schemaTable, tableName=tbName, partitionColumns=`DateTime`SecurityID, compressMethods={DateTime:"delta"}, sortColumns=`SecurityID`DateTime, keepDuplicates=ALL)
}

對於 snapshot 數據,本文采用的資料庫分區方案是組合分區,第一層按天分區,第二層對股票代碼按 HASH 分50個分區。如何根據數據確定分區方案可參考 DolphinDB 分區資料庫教程

  • 創建清洗後 snapshot 數據存儲表:

創建清洗後以 Array 格式存儲 snapshot 數據的庫表,核心代碼如下:

module processSnapshot::createSnapshot_array

//創建清洗後的 snapshot 數據存儲表
def createProcessTable(dbName, tbName){
	if(existsDatabase(dbName)){
		dropDatabase(dbName)
	}
	db1 = database(, VALUE, 2020.01.01..2021.01.01)
	db2 = database(, HASH, [SYMBOL, 50])
	//按天和股票代碼組合分區
	db = database(dbName,COMPO,[db1,db2],engine='TSDB')
	colName = ["SecurityID","DateTime","PreClosePx","OpenPx","HighPx","LowPx","LastPx","TotalVolumeTrade","TotalValueTrade","InstrumentStatus","BidPrice","BidOrderQty","BidNumOrders","BidOrders","OfferPrice","OfferOrderQty","OfferNumOrders","OfferOrders","NumTrades","IOPV","TotalBidQty","TotalOfferQty","WeightedAvgBidPx","WeightedAvgOfferPx","TotalBidNumber","TotalOfferNumber","BidTradeMaxDuration","OfferTradeMaxDuration","NumBidOrders","NumOfferOrders","WithdrawBuyNumber","WithdrawBuyAmount","WithdrawBuyMoney","WithdrawSellNumber","WithdrawSellAmount","WithdrawSellMoney","ETFBuyNumber","ETFBuyAmount","ETFBuyMoney","ETFSellNumber","ETFSellAmount","ETFSellMoney"]
	colType = ["SYMBOL","TIMESTAMP","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","INT","DOUBLE","SYMBOL","DOUBLE[]","INT[]","INT[]","INT[]","DOUBLE[]","INT[]","INT[]","INT[]","INT","INT","INT","INT","DOUBLE","DOUBLE","INT","INT","INT","INT","INT","INT","INT","INT","DOUBLE","INT","INT","DOUBLE","INT","INT","INT","INT","INT","INT"]
	schemaTable = table(1:0, colName, colType)
	db.createPartitionedTable(table=schemaTable, tableName=tbName, partitionColumns=`DateTime`SecurityID, compressMethods={DateTime:"delta"}, sortColumns=`SecurityID`DateTime, keepDuplicates=ALL)
}
  • 創建 K 線結果存儲表:

創建分鐘級 K 線結果存儲表,核心代碼如下:

module Factor::createFactorOneMinute

//創建分鐘 k 線因數儲存表
def createFactorOneMinute(dbName, tbName){
	if(existsDatabase(dbName)){
		dropDatabase(dbName)
	}
	//按天分區
	db = database(dbName, VALUE, 2021.01.01..2021.01.03,engine = `TSDB)
	colName = `TradeDate`TradeTime`SecurityID`Open`High`Low`Close`Volume`Amount`Vwap
	colType =[DATE, MINUTE, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG, DOUBLE, DOUBLE]
	tbSchema = table(1:0, colName, colType)
  	db.createPartitionedTable(table=tbSchema,tableName=tbName,partitionColumns=`TradeDate,sortColumns=`SecurityID`TradeTime,keepDuplicates=ALL)
}

創建日級 K 線結果存儲表,核心代碼如下:

module Factor::createFactorDaily

//創建日 K 線儲存表
def createFactorDaily(dbName, tbName){
	if(existsDatabase(dbName)){
		dropDatabase(dbName)
	}
	//按年分區
	db = database(dbName, RANGE, datetimeAdd(2000.01M,(0..50)*12, "M"),engine = `TSDB)
	colName = `TradeDate`SecurityID`Open`High`Low`Close`Volume`Amount`Vwap
	colType =[DATE, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG, DOUBLE, DOUBLE]
	tbSchema = table(1:0, colName, colType)
  	db.createPartitionedTable(table=tbSchema,tableName=tbName,partitionColumns=`TradeDate,sortColumns=`SecurityID`TradeDate,keepDuplicates=ALL)
}

2.3.2 數據清洗

本文將 snapshot 數據的清洗主要分為兩個部分:

  • 將多檔價格、委托量以 Array 形式儲存
  • 數據去重

具體的處理流程及核心代碼如下:

module processSnapshot::processSnapshotData

//將數據組合為 array、去重,並統計重覆數據量
def mapProcess(mutable t, dbName, tbName){
	n1 = t.size()
	t = select SecurityID, DateTime, PreClosePx, OpenPx, HighPx, LowPx, LastPx, TotalVolumeTrade, TotalValueTrade, InstrumentStatus, fixedLengthArrayVector(BidPrice0, BidPrice1, BidPrice2, BidPrice3, BidPrice4, BidPrice5, BidPrice6, BidPrice7, BidPrice8, BidPrice9) as BidPrice, fixedLengthArrayVector(BidOrderQty0, BidOrderQty1, BidOrderQty2, BidOrderQty3, BidOrderQty4, BidOrderQty5, BidOrderQty6, BidOrderQty7, BidOrderQty8, BidOrderQty9) as BidOrderQty, fixedLengthArrayVector(BidNumOrders0, BidNumOrders1, BidNumOrders2, BidNumOrders3, BidNumOrders4, BidNumOrders5, BidNumOrders6, BidNumOrders7, BidNumOrders8, BidNumOrders9) as BidNumOrders, fixedLengthArrayVector(BidOrders0, BidOrders1, BidOrders2, BidOrders3, BidOrders4, BidOrders5, BidOrders6, BidOrders7, BidOrders8, BidOrders9, BidOrders10, BidOrders11, BidOrders12, BidOrders13, BidOrders14, BidOrders15, BidOrders16, BidOrders17, BidOrders18, BidOrders19, BidOrders20, BidOrders21, BidOrders22, BidOrders23, BidOrders24, BidOrders25, BidOrders26, BidOrders27, BidOrders28, BidOrders29, BidOrders30, BidOrders31, BidOrders32, BidOrders33, BidOrders34, BidOrders35, BidOrders36, BidOrders37, BidOrders38, BidOrders39, BidOrders40, BidOrders41, BidOrders42, BidOrders43, BidOrders44, BidOrders45, BidOrders46, BidOrders47, BidOrders48, BidOrders49) as BidOrders, fixedLengthArrayVector(OfferPrice0, OfferPrice1, OfferPrice2, OfferPrice3, OfferPrice4, OfferPrice5, OfferPrice6, OfferPrice7, OfferPrice8, OfferPrice9) as OfferPrice, fixedLengthArrayVector(OfferOrderQty0, OfferOrderQty1, OfferOrderQty2, OfferOrderQty3, OfferOrderQty4, OfferOrderQty5, OfferOrderQty6, OfferOrderQty7, OfferOrderQty8, OfferOrderQty9) as OfferQty, fixedLengthArrayVector(OfferNumOrders0, OfferNumOrders1, OfferNumOrders2, OfferNumOrders3, OfferNumOrders4, OfferNumOrders5, OfferNumOrders6, OfferNumOrders7, OfferNumOrders8, OfferNumOrders9) as OfferNumOrders, fixedLengthArrayVector(OfferOrders0, OfferOrders1, OfferOrders2, OfferOrders3, OfferOrders4, OfferOrders5, OfferOrders6, OfferOrders7, OfferOrders8, OfferOrders9, OfferOrders10, OfferOrders11, OfferOrders12, OfferOrders13, OfferOrders14, OfferOrders15, OfferOrders16, OfferOrders17, OfferOrders18, OfferOrders19, OfferOrders20, OfferOrders21, OfferOrders22, OfferOrders23, OfferOrders24, OfferOrders25, OfferOrders26, OfferOrders27, OfferOrders28, OfferOrders29, OfferOrders30, OfferOrders31, OfferOrders32, OfferOrders33, OfferOrders34, OfferOrders35, OfferOrders36, OfferOrders37, OfferOrders38, OfferOrders39, OfferOrders40, OfferOrders41, OfferOrders42, OfferOrders43, OfferOrders44, OfferOrders45, OfferOrders46, OfferOrders47, OfferOrders48, OfferOrders49) as OfferOrders, NumTrades, IOPV, TotalBidQty, TotalOfferQty, WeightedAvgBidPx, WeightedAvgOfferPx, TotalBidNumber, TotalOfferNumber, BidTradeMaxDuration, OfferTradeMaxDuration, NumBidOrders, NumOfferOrders, WithdrawBuyNumber, WithdrawBuyAmount, WithdrawBuyMoney, WithdrawSellNumber, WithdrawSellAmount, WithdrawSellMoney, ETFBuyNumber, ETFBuyAmount, ETFBuyMoney, ETFSellNumber, ETFSellAmount, ETFSellMoney from t where isDuplicated([SecurityID, DateTime], FIRST) = false
	n2 = t.size()
	loadTable(dbName, tbName).append!(t)
	return n1,n2
}

def process(processDate, dbName_orig, tbName_orig, dbName_process, tbName_process){
	dataString = temporalFormat(processDate, "yyyyMMdd")
	//查詢處理日期的數據在資料庫中是否存在
	todayCount = exec count(*) from loadTable(dbName_process, tbName_process) where date(DateTime)=processDate
	//如果庫裡面已經存在當天要處理的數據,刪除庫裡面已有數據
	if(todayCount != 0){
		writeLog("Start to delete the process snapshot data, the delete date is: " + dataString)
		dropPartition(database(dbName_process), processDate, tbName_process)
		writeLog("Successfully deleted the process snapshot data, the delete date is: " + dataString)
	}
	//開始處理數據
	writeLog("Start process Snapshot Data, the datetime is "+ dataString)
	ds = sqlDS(sql(select=sqlCol("*"), from=loadTable(dbName_orig,tbName_orig),where=<date(DateTime)=processDate>))
	n1,n2=mr(ds, mapProcess{, dbName_process, tbName_process}, +, , false)
	if(n1 != n2){
		writeLog("ERROR: Duplicated datas exists in " + dataString + ", Successfully drop " + string(n1-n2) + " pieces of data" )
	}
	writeLog("Successfully process the snapshot data, the processDate is: " + dataString)
}

2.3.3 清洗行情數據合成 K 線

分鐘級 K 線合成併入庫, 核心代碼如下:

module Factor::calFactorOneMinute

//合成分鐘 K 線併入庫
def calFactorOneMinute(dbName, tbName, mutable factorTable){
	pt = loadTable(dbName, tbName)
	//將數據分為10天一組計算
	dayList = schema(pt).partitionSchema[0]
	if(dayList.size()>10) dayList = dayList.cut(10)
	for(days in dayList){
		//計算分鐘 K 線
		res =   select first(LastPX) as Open, max(LastPx) as High, min(LastPx) as Low, last(LastPx) as Close, sum(TotalVolumeTrade) as Volume, sum(LastPx*totalVolumeTrade) as Amount, wavg(LastPx, TotalVolumeTrade) as Vwap from pt where date(DateTime) in days group by date(DateTime) as TradeDate,minute(DateTime) as TradeTime, SecurityID
		writeLog("Start to append minute factor result , the days is: [" + concat(days, ",")+"]")
		//分鐘 K 線入庫
		factorTable.append!(res)
		writeLog("Successfully append the minute factor result to databse, the days is: [" + concat(days, ",")+"]")
	}
}

日級 K 線合成併入庫, 核心代碼如下:

module Factor::calFactorDaily1

//合成日 K 線併入庫
def calFactorDaily(dbName, tbName, mutable factorTable){
	pt = loadTable(dbName, tbName)
	//將數據分為10天一組計算
	dayList = schema(pt).partitionSchema[0]
	if(dayList.size()>10) dayList = dayList.cut(10)
	for(days in dayList){
		//計算日 K 線
		res =   select first(LastPX) as Open, max(LastPx) as High, min(LastPx) as Low, last(LastPx) as Close, sum(TotalVolumeTrade) as Volume, sum(LastPx*totalVolumeTrade) as Amount, wavg(LastPx, TotalVolumeTrade) as Vwap from pt where date(DateTime) in days group by date(DateTime) as TradeDate, SecurityID 
		writeLog("Start to append daily factor result , the days is: [" + concat(days, ",")+"]")
		//日 K 線入庫
		factorTable.append!(res)
		writeLog("Successfully append the daily factor result to databse, the days is: [" + concat(days, ",")+"]")
	}
}

2.4 增量數據清洗

增量數據的導入和全量數據的導入整體邏輯相同,主要區別如下:

  • 全量數據批量導入,增量數據每天定時執行
  • 全量數據導入通過 DolphinDB submitjob 非同步提交任務,增量數據導入通過調用 append! 介面同步導入
  • 數據加工 K 線全量數據批量加工,增量數據只加工當天數據

具體代碼差別見附件。

2.5 Airflow 生成 DAG 執行任務

2.5.1 生成一個 DAG 實例

生成全量 DAG 實例的示例如下:

with DAG(dag_id="ETLTest", start_date=datetime(2023, 3, 10), schedule_interval=None) as dag:

dag_id 指定了 DAG 名稱,需要具有唯一性;start_date 設定任務開始日期;schedule_interval 指定兩次任務的間隔;None 表示該任務不自動執行需手動觸發。 增量 DAG 示例如下:

args = {
    "owner" : "Airflow",
    "start_date" : days_ago(1),
    "catchup" : False,
    'retries' : 0
}
with DAG(dag_id="addETLTest", default_args = args, schedule_interval="0 12 * * *") as dag:

增量 DAG 中 catchup 表示是否進行回填任務操作,retries 表示任務失敗重試次數,schedule_interval = “0 12 * * * ” 表示在每天12點 (UTC) 運行任務。schedule 設置詳細可參考:Scheduling & Triggers

2.5.2 獲取 Airflow 中的變數

Airflow 中設定的變數值,無法直接在 DolphinDB 腳本中獲取,為了在後續的任務中使用,本文通過將 Airflow 中變數寫入共用表的方式,來實現後續在 DolphinDB 任務讀取變數,具體代碼示例如下:

//獲取變數值
variable = ['ETL_dbName_origin', "ETL_tbName_origin", "ETL_dbName_process",
            "ETL_tbName_process", 'ETL_dbName_factor','ETL_tbName_factor','ETL_dbName_factor_daily',
            'ETL_tbName_factor_daily',"ETL_filedir", "ETL_start_date","ETL_end_date"]
value = []
for var in variable:
    value.append(str(Variable.get(var)))
variables = ",".join(variable)
values = ",".join(value)

//通過DolphinDBOperator創建共用表,並將變數值寫入共用表中
  create_parameter_table = DolphinDBOperator(
        task_id='create_parameter_table',
        dolphindb_conn_id='dolphindb_test',
        sql='''
            undef(`paramTable,SHARED)
            t = table(1:0, `param`value, [STRING, STRING])
            share t as paramTable
            '''
    )
    given_param = DolphinDBOperator(
        task_id='given_param',
        dolphindb_conn_id='dolphindb_test',
        sql="params = split('" + variables + "',',');" + \
            "values = split('" + values + "',',');" + \
            "for(i in 0..(params.size()-1)){" + \
            "insert into paramTable values(params[i], values[i]);}"
    )

運行該任務後可在 DolphinDB GUI 共用變數欄中看到參數表及其值,如下圖所示:

每個 DAG 變數的值可以通過 AirFlow 進行修改,點擊下圖所示位置:

DAG 生成後,在如下 Web 頁面顯示 DAG 使用的變數可以動態修改,如下所示:

下表為本項目中涉及的變數名稱及其含義:

2.5.3 DolphinDBOperator 執行任務

  • DolphinDBOperator 全量處理數據

通過 DolphinDBOperator 將上述的數據入庫、清洗、計算等設置為 DAG 中的任務

全量處理核心代碼如下:

   loadSnapshot = DolphinDBOperator(
        task_id='loadSnapshot',
        dolphindb_conn_id='dolphindb_test',
        sql='''
            pnodeRun(clearAllCache)
            undef(all)
            go;
            //使用 module,載入已封裝好的建表及入庫函數
            use loadSnapshot::createSnapshotTable
            use  loadSnapshot::loadSnapshotData
            //通過參數共用表獲取參數
            params = dict(paramTable[`param], paramTable[`value])
            dbName = params[`ETL_dbName_origin]
            tbName = params[`ETL_tbName_origin]
            startDate = date(params[`ETL_start_date])
            endDate = date(params[`ETL_end_date])
            fileDir = params[`ETL_filedir]
            //結果庫表不存在則創建
            if(not existsDatabase(dbName)){
                loadSnapshot::createSnapshotTable::createSnapshot(dbName, tbName)
            }
            //調用清洗函數,後臺多進程寫入,提高寫入效率
            start = now()
            for (loadDate in startDate..endDate){
                submitJob("loadSnapshot"+year(loadDate)+monthOfYear(loadDate)+dayOfMonth(loadDate), "loadSnapshot", loadSnapshot::loadSnapshotData::loadSnapshot{, dbName, tbName, fileDir}, loadDate)
            }
            //查看寫入任務是否完成,以保證後續處理部分數據源完整
            do{
                cnt = exec count(*) from getRecentJobs() where jobDesc="loadSnapshot" and endTime is null
            }
            while(cnt != 0)
            //查看導入過程中是否有異常,有異常則拋出異常
            cnt = exec count(*) from pnodeRun(getRecentJobs) where jobDesc="loadSnapshot" and errorMsg is not null and receivedTime > start
            if (cnt != 0){
                error = exec errorMsg from pnodeRun(getRecentJobs) where jobDesc="loadSnapshot" and errorMsg is not null and receivedTime > start
                throw error[0]
            }
            '''
    )
    processSnapshot = DolphinDBOperator(
        task_id='processSnapshot',
        dolphindb_conn_id='dolphindb_test',
        sql='''
            pnodeRun(clearAllCache)
            undef(all)
            go;
            //使用 module,載入已封裝好的建表及入庫函數
            use processSnapshot::createSnapshot_array
            use processSnapshot::processSnapshotData
            //通過參數共用表獲取參數
            params = dict(paramTable[`param], paramTable[`value])
            dbName_orig = params[`ETL_dbName_origin]
            tbName_orig = params[`ETL_tbName_origin]
            dbName_process = params[`ETL_dbName_process]
            tbName_process = params[`ETL_tbName_process]
            startDate = date(params[`ETL_start_date])
            endDate = date(params[`ETL_end_date])
            //結果庫表不存在則創建
            if(not existsDatabase(dbName_process)){
                processSnapshot::createSnapshot_array::createProcessTable(dbName_process, tbName_process)
            }
            //後臺多進程處理,提高處理效率
            start = now()
            for (processDate in startDate..endDate){
                submitJob("processSnapshot"+year(processDate)+monthOfYear(processDate)+dayOfMonth(processDate), "processSnapshot", processSnapshot::processSnapshotData::process{, dbName_orig, tbName_orig, dbName_process, tbName_process}, processDate)
            }
            //查看清洗任務是否完成,以保證後續處理部分數據源完整
            do{
                cnt = exec count(*) from getRecentJobs() where jobDesc="processSnapshot" and endTime is null
            }
            while(cnt != 0)
            //查看清洗過程中是否有異常,有異常則拋出異常
            cnt = exec count(*) from pnodeRun(getRecentJobs) where jobDesc="processSnapshot" and errorMsg is not null and receivedTime > start
            if (cnt != 0){
                error = exec errorMsg from pnodeRun(getRecentJobs) where jobDesc="processSnapshot" and errorMsg is not null and receivedTime > start
                throw error[0]
            }
            '''
    )
    calMinuteFactor = DolphinDBOperator(
        task_id='calMinuteFactor',
        dolphindb_conn_id='dolphindb_test',
        sql='''
            pnodeRun(clearAllCache)
            undef(all)
            go;
            //使用 module,載入已封裝好的建表及入庫函數
            use Factor::createFactorOneMinute
            use Factor::calFactorOneMinute
            //通過參數共用表獲取參數
            params = dict(paramTable[`param], paramTable[`value])
            dbName = params[`ETL_dbName_process]
            tbName = params[`ETL_tbName_process]	
            dbName_factor = params[`ETL_dbName_factor]
            tbName_factor = params[`ETL_tbName_factor]
            //結果庫表不存在則創建
            if(not existsDatabase(dbName_factor)){
                createFactorOneMinute(dbName_factor, tbName_factor)
            }
            factorTable = loadTable(dbName_factor, tbName_factor)
            //調用計算函數
            calFactorOneMinute(dbName, tbName,factorTable)
            '''
    )
    calDailyFactor = DolphinDBOperator(
        task_id='calDailyFactor',
        dolphindb_conn_id='dolphindb_test',
        sql='''
            pnodeRun(clearAllCache)
            undef(all)
            go;
            //使用 module,載入已封裝好的建表及入庫函數
            use Factor::createFactorDaily
            use Factor::calFactorDaily1	
            //通過參數共用表獲取參數
            params = dict(paramTable[`param], paramTable[`value])
            dbName = params[`ETL_dbName_process]
            tbName = params[`ETL_tbName_process]	
            dbName_factor = params[`ETL_dbName_factor_daily]
            tbName_factor = params[`ETL_tbName_factor_daily]
            //結果庫表不存在則創建
            if(not existsDatabase(dbName_factor)){
                createFactorDaily(dbName_factor, tbName_factor)
            }
            //調用計算函數
            factorTable = loadTable(dbName_factor, tbName_factor)
            Factor::calFactorDaily1::calFactorDaily(dbName, tbName,factorTable)
            '''
    )

根據任務間的依賴關係,構建 DAG,示例如下:

start_task >> create_parameter_table >> given_param >> loadSnapshot >> processSnapshot >> calMinuteFactor >> calDailyFactor

  • DolphinDBOperator 增量數據入庫

增量數據任務構建代碼如下:

addLoadSnapshot = DolphinDBOperator(
        task_id='addLoadSnapshot',
        dolphindb_conn_id='dolphindb_test',
        sql='''
            pnodeRun(clearAllCache)
            undef(all)
            go;
            //使用module,載入已封裝好的入庫函數
            use  addLoadSnapshot::loadSnapshotData
            //通過參數共用表獲取參數
            params = dict(paramTable[`param], paramTable[`value])
            dbName = params[`ETL_dbName_origin]
            tbName = params[`ETL_tbName_origin]
            fileDir = params[`ETL_filedir]
            //獲取交易日曆
            MarketDays = getMarketCalendar("CFFEX")
            //是交易日則進行數據入庫
            if(today() in MarketDays ){
                fileDir = params[`ETL_filedir]
                addLoadSnapshot::loadSnapshotData::loadSnapshot(today(), dbName, tbName, fileDir)
            }
            '''
    )
    addProcessSnapshot = DolphinDBOperator(
        task_id='addProcessSnapshot',
        dolphindb_conn_id='dolphindb_test',
        sql='''
            pnodeRun(clearAllCache)
            undef(all)
            go;
            //使用module,載入已封裝好的清洗函數
            use addProcessSnapshot::processSnapshotData
            //通過參數共用表獲取參數
            params = dict(paramTable[`param], paramTable[`value])
            dbName_orig = params[`ETL_dbName_origin]
            tbName_orig = params[`ETL_tbName_origin]
            dbName_process = params[`ETL_dbName_process]
            tbName_process = params[`ETL_tbName_process]
            //獲取交易日曆
            MarketDays = getMarketCalendar("CFFEX")
            //是交易日則進行數據處理
            if(today() in MarketDays ){
                addProcessSnapshot::processSnapshotData::process(today(), dbName_orig, tbName_orig, dbName_process, tbName_process)
            }
            '''
    )
    addCalMinuteFactor= DolphinDBOperator(
        task_id='addCalMinuteFactor',
        dolphindb_conn_id='dolphindb_test',
        sql='''
            pnodeRun(clearAllCache)
            undef(all)
            go;
            //使用module,載入已封裝好的計算函數
            use addFactor::calFactorOneMinute
            //通過參數共用表獲取參數
            params = dict(paramTable[`param], paramTable[`value])
            dbName = params[`ETL_dbName_process]
            tbName = params[`ETL_tbName_process]	
            dbName_factor = params[`ETL_dbName_factor]
            tbName_factor = params[`ETL_tbName_factor]
            factorTable = loadTable(dbName_factor, tbName_factor)
            //獲取交易日曆
            MarketDays = getMarketCalendar("CFFEX")
            //是交易日則調用計算函數合成分鐘K線
            if(today() in MarketDays ){
                	addFactor::calFactorOneMinute::calFactorOneMinute(dbName, tbName,today(), factorTable)
            }
            '''
    )
    addCalDailyFactor= DolphinDBOperator(
        task_id='addCalDailyFactor',
        dolphindb_conn_id='dolphindb_test',
        sql='''
            pnodeRun(clearAllCache)
            undef(all)
            go;
            //使用module,載入已封裝好的計算函數
            use addFactor::calFactorDaily1	
            //通過參數共用表獲取參數
            params = dict(paramTable[`param], paramTable[`value])
            dbName = params[`ETL_dbName_process]
            tbName = params[`ETL_tbName_process]	
            dbName_factor = params[`ETL_dbName_factor_daily]
            tbName_factor = params[`ETL_tbName_factor_daily]
            factorTable = loadTable(dbName_factor, tbName_factor)
            //獲取交易日曆
            MarketDays = getMarketCalendar("CFFEX")
            //是交易日則調用計算函數合成日K線
            if(today() in MarketDays ){
                addFactor::calFactorDaily1::calFactorDaily(dbName, tbName,today(), factorTable)
            }
            '''
    )

根據任務間的依賴關係,構建 DAG,示例如下:

start_task >> create_parameter_table >> given_param >> addLoadSnapshot >> addProcessSnapshot >> addCalMinuteFactor >> addCalDailyFactor

2.5.4 生成 DAG

根據如下步驟部署項目:

  • 第一步 DolphinDB 項目部署

將 DolphinDB 項目中的 addETLfullETL 項目分別導入 DolphinDB GUI (DolphinDB 客戶端工具)中:

addETLfullETL 項目中的 module 模塊上傳至 Airflow 中已建立連接的 DolphinDB server 中:

  • 第二步 python 項目部署
    python 項目中的 python 腳本放置到 <Airflow_install_Dir/airflow/dags> 目錄下。註意,新建的 DAG 任務並不會馬上出現在界面上,預設需要等待5分鐘後刷新,也可修改 airflow.cfg 文件中的 dag_dir_list_interval 調整刷新間隔。

  • 第三步 Airflow 變數導入
    在 Airflow 網頁中進入 Admin-->Variables,將 Variables.json 文件上傳,將變數導入 Airflow 中,並根據實際情況調整變數值。

  • 第四步 上傳原始數據文件
    將數據文件上傳至伺服器,並根據數據文件的實際存放路徑,在 Airflow 中修改 ETL_filedir 變數。如運行增量 ETL 任務,需要將數據文件名中的日期改為當前日期,如:20230330snapshot.csv,以避免無數據導致任務失敗。

最終實現 DAG 如下所示:

全量數據入庫:

增量數據入庫:

運行任務後,任務實例為綠色代表任務運行成功;紅色表示任務運行失敗;橙色則表示該任務所依賴的上游任務運行失敗,任務未啟動。

3. 常見問題解答(FAQ)

3.1 如何捕獲 DolphinDB 腳本中的 print 函數列印的信息

  • DolphinDB 腳本的 print 的信息為標準輸出,可以在 airflow-scheduler.out 中找到,如下圖所示:

3.2 DolphinDB 腳本中的非同步作業 submitjob 如何檢測其完成狀態

通過 DolphinDB 的函數 getRecntJobs 獲取後臺作業信息, 之後在 DolphinDB DAG 中添加判斷邏輯, 代碼示例如下:

DolphinDBOperator(
        task_id='processSnapshot',
        dolphindb_conn_id='dolphindb_test',
        sql='''
          //檢查所有任務是否全部完成
            do{
                cnt = exec count(*) from getRecentJobs() where jobDesc="processSnapshot" and endTime is null
            }
            while(cnt != 0)
            //檢查後臺任務是否成功,失敗則拋出異常
            cnt = exec count(*) from pnodeRun(getRecentJobs) where jobDesc="processSnapshot" and errorMsg is not null and receivedTime > start
            if (cnt != 0){
                error = exec errorMsg from pnodeRun(getRecentJobs) where jobDesc="processSnapshot" and errorMsg is not null and receivedTime > start
                throw error[0]
            }
            '''
    )

3.3 執行 Airflow 中經常遇到連接超時斷開,該如何處理

當遇到如上問題,可能是網路延時導致的,可以在建立連接時設置參數,如上圖,在 DolphinDB 連接中設置 KeepAliveTimereconnect 參數即可。

3.4 將 start_date 日期設為當前日期,每天運行一次,為什麼當天不會運行

  • 在 Airflow 中一個定時調度任務的最早開始時間為 start_date + scheduler_interval,例如:start_date = 2023.03.16,每天調用一次,則最早一次任務調度為 2023.03.17,所以當天的任務無法執行。

3.5 DolphinDBOperator 任務運行失敗如何定位失敗原因

  • 任務失敗後,DolphinDBOperator 會將具體的錯誤信息列印在日誌中,可通過查看日誌信息,定位異常代碼併進行修改。查看日誌信息步驟如下:

4. 總結

​ 本教程從一個常用行情數據 ETL 案例出發,著重闡述瞭如何將 Airflow 調度框架與 DolphinDB 資料庫結合起來進行結構化 ETL 作業管理, 以此來節省時間,提高效率,降低運維成本。同時,由於篇幅有限,涉及到DolphinDB 和 Airflow 框架的一些其它操作未能更進一步展示,用戶在使用過程中需要按照實際情況進行調整。也歡迎大家對本教程中可能存在的紕漏和缺陷批評指正。

附件


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • SqlSugar的開發框架本身主要是基於常規關係型資料庫設計的框架,支持多種資料庫類型的接入,如SqlServer、MySQL、Oracle、PostgreSQL、SQLite等資料庫,非關係型資料庫的MongoDB資料庫也可以作為擴展整合到開發框架裡面,通過基類的繼承關係很好的封裝了相關的基礎操作... ...
  • 日常開發過程中,驗證字元的合法性一直是一個必不可少的步驟,以前都是在用戶輸入完再做判斷,不僅麻煩在不符合標準的時候還要提示用戶修改,體驗很差,為什麼不在輸入的時候加以限制呢? ...
  • 因為Grpc採用HTTP/2作為通信協議,預設採用LTS/SSL加密方式傳輸,比如使用.net core啟動一個服務端(被調用方)時: public static IHostBuilder CreateHostBuilder(string[] args) => Host.CreateDefaultB ...
  • NuGet 引入依賴庫 PM> Install-Package Tron.Wallet.Net 隨機生成私鑰和對應的地址 using Tron.Wallet.Net; namespace ConsoleApp1 { internal class Program { static async Task ...
  • 簡介:本文主要介紹ubuntu20.04容器中搭建xfce遠程桌面、C++、Go環境、容器內docker操作配置、zsh配置 一、創建容器 1、創建容器 docker pull ubuntu:20.04docker run -itd --privileged --name=my-desktop--u ...
  • 1、下載redis源碼包,併進行解壓縮操作 https://download.redis.io/releases/ [root@Redis-Ubuntu-1804-p21:~]# wget https://download.redis.io/releases/redis-5.0.14.tar.gz ...
  • 掃碼獲取搭建步驟: 實驗名稱: Samba共用伺服器基本配置 簡答題(直接打字回答在題目下方,僅完成4道題目的同學,最高30分。另外70分是加給自己搭建實驗環境完成實驗報告的同學): 1、Samba是什麼,有什麼用處? 2、CentOS系統中,線上安裝Samba服務,用什麼命令? 3、CentOS系 ...
  • 索引 索引(index)是幫助MySQL高效獲取數據的==數據結構(有序)==。在數據之外,資料庫系統還維護著滿足特定查找演算法的數據結構,這些數據結構以某種方式引用(指向)數據,這樣就可以在這些數據結構上實現高級查找演算法,這種數據結構就是索引。 無索引的查找:全表掃描(將整張表遍歷一遍),性能極低。 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...