DolphinDB 作為一款高性能時序資料庫,其在實際生產環境中常有數據的清洗、裝換以及載入等需求,而對於該如何結構化管理好 ETL 作業,Airflow 提供了一種很好的思路。本篇教程為生產環境中 ETL 實踐需求提供了一個解決方案,將 Python Airflow 引入到 DolphinDB 的 ...
DolphinDB 作為一款高性能時序資料庫,其在實際生產環境中常有數據的清洗、裝換以及載入等需求,而對於該如何結構化管理好 ETL 作業,Airflow 提供了一種很好的思路。本篇教程為生產環境中 ETL 實踐需求提供了一個解決方案,將 Python Airflow 引入到 DolphinDB 的高可用集群中,通過使用 Airflow 所提供的功能來實現更好管理 DolphinDB 數據 ETL 作業,整體架構如下:
1. Airflow
1.1 Airflow 簡介
Airflow 是一個可編程,調度和監控的工作流平臺,基於有向無環圖 (Directed acyclic graph, DAG),Airflow 可以定義一組有依賴的任務,按照依賴依次執行。Airflow 提供了豐富的命令行工具用於系統管控,而其 web 管理界面同樣也可以方便地管控調度任務,並且對任務運行狀態進行實時監控,方便了系統的運維和管理。
1.2 Airflow 部分核心功能
- 增量載入數據:當表或數據集較小時,可以整體載入數據。但是隨著數據的增長,以固定的時間間隔增量提取數據才是 ETL 的常態,僅載入一個小時,一天,一周數據的需求非常普遍。Airflow 可以容易地以特定的時間間隔載入增量數據。
- 處理歷史數據:在某些情況下,您剛剛完成一個新的工作流程並且需要回溯到將新代碼投入生產的日期的數據。在這種情況下,您只需使用 DAG 中的 start_date 參數以指定開始日期。然後,Airflow 將回填任務到開始日期。此外,還可以使用 DAG 的參數來處理數周、數月或數年的數據。
- 分區提取的數據:通過對數據的目的地進行分區,可以並行運行 DAG,避免對提取的數據進行鎖定,併在讀取相同數據時優化性能。不再相關的數據可以存檔並從資料庫中刪除。
- 強制冪等約束:DAG 運行的結果應始終具有冪等特性。這意味著當您使用相同的參數多次運行某個流程時(即使在不同的日期),結果也將完全相同。
- 有條件地執行:Airflow 具有一些選項,可根據之前的實例的成功來控制 DAG 中任務的運行方式。
1.3 DolphinDBOperator
DolphinDBOperator 是 Airflow 的 operator 一種,通過 DolphinDBOperator 可以在 Airflow 連接 DolphinDB 進行數據寫入、查詢、計算等操作。DolphinDBOperator 特有的參數有:
dolphindb_conn_id
: 用於指定 DolphinDB 連接,可在 connection 中設置sql
: 指定需要運行的 DolphinDB 腳本file_path
: 可以指定 DolphinDB dos 文件運行腳本
DolphinDBOperator 使用示例如下:
- 通過 sql 參數指定任務內容運行腳本:
//在 DolphinDB 中創建一個共用表
create_parameter_table = DolphinDBOperator(
task_id='create_parameter_table',
dolphindb_conn_id='dolphindb_test',
sql='''
undef(`paramTable,SHARED)
t = table(1:0, `param`value, [STRING, STRING])
share t as paramTable
'''
)
- 通過 file_path 指定 dos 文件運行腳本:
//CalAlpha001.dos 為 DolphinDB 腳本
case1 = DolphinDBOperator(
task_id='case1',
dolphindb_conn_id='dolphindb_test',
file_path=path + "/StreamCalculating/CalAlpha001.dos"
)
1.4 Airflow 安裝部署
-
硬體環境:
-
軟體環境:
註:
1.本教程使用 SQLite 資料庫作為後端存儲,如果因 SQLite 版本過低無法啟動,可參考設置資料庫,升級 SQLlite 或更改預設資料庫。
2.在流程開始前建議預先構建 DolphinDB 服務。具體安裝方法可以參考 DolphinDB 高可用集群部署教程。也可以參考基於 Docker-Compose 的 DolphinDB 多容器集群部署。
- 主機環境
- 首先,在安裝 Airflow 之前要確保主機上安裝了
python3
、dolphindb
、dolphindb-operator
三個依賴包。執行以下命令完成對這三個依賴包的安裝。 依賴包可從附件中獲取。
pip install --force-reinstall dolphindb
pip install --force-reinstall dolphindbapi-1.0.0-py3-none-any.whl
pip install --force-reinstall apache_Airflow_providers_dolphindb-1.0.0-py3-none-any.whl
本教程使用的 Airflow 的安裝包僅提供離線版安裝,線上版安裝會在正式發佈後提供安裝方式。
- 安裝好 airflow.provide.dolphindb 插件後,啟動 Airflow :
部署以及安裝 Airflow 詳情見官網:airflow 快速入門。以下為啟動 Airflow 的核心代碼:
#初始化資料庫
airflow db init
#創建用戶
airflow users create --username admin --firstname Peter --lastname Parker --role Admin --email [email protected] --password admin
# 守護進程運行 webserve
airflow webserver --port 8080 -D
# 守護進程運行 scheduler
airflow scheduler -D1#初始化資料庫 2airflow db init 3 4#創建用戶 5airflow users create --username admin --firstname Peter --lastname Parker --role Admin --email [email protected] --password admin 6 7# 守護進程運行webserve 8airflow webserver --port 8080 -D 9 10# 守護進程運行scheduler 11airflow scheduler -D
- 執行以下命令驗證 Airflow 是否成功啟動:
ps -aux|grep airflow
預期輸出如下圖,證明 Airflow 啟動成功:
- 啟動成功後,瀏覽器中登陸 Airflow 的 web 界面:
- 預設地址:
http://IP:8080
- 預設賬戶:初始化 db 中創建,本文例子中為
admin
- 預設密碼:初始化 db 中創建, 本文例子中為
admin
- 輸入上述創建用戶名密碼即可進入 Airflow 的 UI 界面,如下所示:
- 填寫 DolphinDB 連接信息後連接到 DolphinDB 資料庫。
連接成功後,在 DolphinDBOperator 中指定 dolphindb_conn_id='dolphindb_test',即可運行 DolphinDB 腳本。上述準備工作完成後,下文以一個股票快照數據的 ETL 過程為例展現 Airflow 如何和 DolphinDB 交互。
2. Airflow 調度對行情數據 ETL
2.1 整體 ETL 架構圖
功能模塊代碼目錄結構詳解
-
add:增量數據 ETL
- addLoadSnapshot:每日新增 Snapshot 原始數據導入
- addProcessSnapshot:增量 Snapshot 處理成 ArrayVector 以及清洗數據
- addFactor:增加合成日 K 及一分鐘 K 數據並存儲
- addETL.py:構建增量數據 DAG
-
full:全量數據 ETL
- loadSnapshot:Snapshot 建表與導入
- processSnapshot:Snapshot 清洗結果建表,將數據處理成 ArrayVector 以及清洗數據並存儲
- Factor:創建因數存儲表,將清洗後數據加工成日 K 以及一分鐘 K 數據並存儲
- fullETL.py:構建全量數據 DAG
外部數據源 - > ODS 數據源:將原始數據從外部數據源導入 DolphinDB
ODS 數據源 - >DWD 數據明細:清洗原始數據,將原始數據中的多檔數據清洗成 ArrayVector 並去重
DWD 數據明細 - > DWB/DWS 數據彙總: 對清洗後的快照數據進行計算加工合成 K 線數據
註:
本教程使用 DolphinDB 中 module 功能以及 DolphinDB 客戶端工具進行工程化管理 DolphinDB 腳本,詳細介紹見 DolphinDB教程: 模塊 以及 DolphinDB客戶端軟體教程。
2.2 數據介紹
本教程選取了 2020.01.04 - 2021.01.08 全市場所有股票的 5 天的 level 2 快照數據。以下是快照表在DolphinDB的結構。BidOrderQty,BidPrice,BidNumOrders,BidOrders,OfferPrice,OfferOrderQty,OfferNumOrders 和 OfferOrders 8個欄位分別包含多檔數據,在 DolphinDB 中採用 ArrayVector 數據類型來保存:
2.3 DolphinDB 核心清洗腳本介紹
2.3.1 創建分散式庫表
- 創建 snapshot 原始數據存儲表:
創建存儲原始 snapshot 原始數據的庫表,核心代碼如下:
module loadSnapshot::createSnapshotTable
//創建 snapshot 原始數據存儲庫表
def createSnapshot(dbName, tbName){
login("admin", "123456")
if(existsDatabase(dbName)){
dropDatabase(dbName)
}
db1 = database(, VALUE, 2020.01.01..2021.01.01)
db2 = database(, HASH, [SYMBOL, 50])
//按天和股票代碼組合分區
db = database(dbName,COMPO,[db1,db2],engine='TSDB')
colName = ["SecurityID","DateTime","PreClosePx","OpenPx","HighPx","LowPx","LastPx","TotalVolumeTrade","TotalValueTrade","InstrumentStatus","BidPrice0","BidPrice1","BidPrice2","BidPrice3","BidPrice4","BidPrice5","BidPrice6","BidPrice7","BidPrice8","BidPrice9","BidOrderQty0","BidOrderQty1","BidOrderQty2","BidOrderQty3","BidOrderQty4","BidOrderQty5","BidOrderQty6","BidOrderQty7","BidOrderQty8","BidOrderQty9","BidNumOrders0","BidNumOrders1","BidNumOrders2","BidNumOrders3","BidNumOrders4","BidNumOrders5","BidNumOrders6","BidNumOrders7","BidNumOrders8","BidNumOrders9","BidOrders0","BidOrders1","BidOrders2","BidOrders3","BidOrders4","BidOrders5","BidOrders6","BidOrders7","BidOrders8","BidOrders9","BidOrders10","BidOrders11","BidOrders12","BidOrders13","BidOrders14","BidOrders15","BidOrders16","BidOrders17","BidOrders18","BidOrders19","BidOrders20","BidOrders21","BidOrders22","BidOrders23","BidOrders24","BidOrders25","BidOrders26","BidOrders27","BidOrders28","BidOrders29","BidOrders30","BidOrders31","BidOrders32","BidOrders33","BidOrders34","BidOrders35","BidOrders36","BidOrders37","BidOrders38","BidOrders39","BidOrders40","BidOrders41","BidOrders42","BidOrders43","BidOrders44","BidOrders45","BidOrders46","BidOrders47","BidOrders48","BidOrders49","OfferPrice0","OfferPrice1","OfferPrice2","OfferPrice3","OfferPrice4","OfferPrice5","OfferPrice6","OfferPrice7","OfferPrice8","OfferPrice9","OfferOrderQty0","OfferOrderQty1","OfferOrderQty2","OfferOrderQty3","OfferOrderQty4","OfferOrderQty5","OfferOrderQty6","OfferOrderQty7","OfferOrderQty8","OfferOrderQty9","OfferNumOrders0","OfferNumOrders1","OfferNumOrders2","OfferNumOrders3","OfferNumOrders4","OfferNumOrders5","OfferNumOrders6","OfferNumOrders7","OfferNumOrders8","OfferNumOrders9","OfferOrders0","OfferOrders1","OfferOrders2","OfferOrders3","OfferOrders4","OfferOrders5","OfferOrders6","OfferOrders7","OfferOrders8","OfferOrders9","OfferOrders10","OfferOrders11","OfferOrders12","OfferOrders13","OfferOrders14","OfferOrders15","OfferOrders16","OfferOrders17","OfferOrders18","OfferOrders19","OfferOrders20","OfferOrders21","OfferOrders22","OfferOrders23","OfferOrders24","OfferOrders25","OfferOrders26","OfferOrders27","OfferOrders28","OfferOrders29","OfferOrders30","OfferOrders31","OfferOrders32","OfferOrders33","OfferOrders34","OfferOrders35","OfferOrders36","OfferOrders37","OfferOrders38","OfferOrders39","OfferOrders40","OfferOrders41","OfferOrders42","OfferOrders43","OfferOrders44","OfferOrders45","OfferOrders46","OfferOrders47","OfferOrders48","OfferOrders49","NumTrades","IOPV","TotalBidQty","TotalOfferQty","WeightedAvgBidPx","WeightedAvgOfferPx","TotalBidNumber","TotalOfferNumber","BidTradeMaxDuration","OfferTradeMaxDuration","NumBidOrders","NumOfferOrders","WithdrawBuyNumber","WithdrawBuyAmount","WithdrawBuyMoney","WithdrawSellNumber","WithdrawSellAmount","WithdrawSellMoney","ETFBuyNumber","ETFBuyAmount","ETFBuyMoney","ETFSellNumber","ETFSellAmount","ETFSellMoney"]
colType = ["SYMBOL","TIMESTAMP","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","INT","DOUBLE","SYMBOL","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","INT","DOUBLE","DOUBLE","INT","INT","INT","INT","INT","INT","INT","INT","DOUBLE","INT","INT","DOUBLE","INT","INT","INT","INT","INT","INT"]
schemaTable = table(1:0,colName, colType)
db.createPartitionedTable(table=schemaTable, tableName=tbName, partitionColumns=`DateTime`SecurityID, compressMethods={DateTime:"delta"}, sortColumns=`SecurityID`DateTime, keepDuplicates=ALL)
}
對於 snapshot 數據,本文采用的資料庫分區方案是組合分區,第一層按天分區,第二層對股票代碼按 HASH 分50個分區。如何根據數據確定分區方案可參考 DolphinDB 分區資料庫教程。
- 創建清洗後 snapshot 數據存儲表:
創建清洗後以 Array 格式存儲 snapshot 數據的庫表,核心代碼如下:
module processSnapshot::createSnapshot_array
//創建清洗後的 snapshot 數據存儲表
def createProcessTable(dbName, tbName){
if(existsDatabase(dbName)){
dropDatabase(dbName)
}
db1 = database(, VALUE, 2020.01.01..2021.01.01)
db2 = database(, HASH, [SYMBOL, 50])
//按天和股票代碼組合分區
db = database(dbName,COMPO,[db1,db2],engine='TSDB')
colName = ["SecurityID","DateTime","PreClosePx","OpenPx","HighPx","LowPx","LastPx","TotalVolumeTrade","TotalValueTrade","InstrumentStatus","BidPrice","BidOrderQty","BidNumOrders","BidOrders","OfferPrice","OfferOrderQty","OfferNumOrders","OfferOrders","NumTrades","IOPV","TotalBidQty","TotalOfferQty","WeightedAvgBidPx","WeightedAvgOfferPx","TotalBidNumber","TotalOfferNumber","BidTradeMaxDuration","OfferTradeMaxDuration","NumBidOrders","NumOfferOrders","WithdrawBuyNumber","WithdrawBuyAmount","WithdrawBuyMoney","WithdrawSellNumber","WithdrawSellAmount","WithdrawSellMoney","ETFBuyNumber","ETFBuyAmount","ETFBuyMoney","ETFSellNumber","ETFSellAmount","ETFSellMoney"]
colType = ["SYMBOL","TIMESTAMP","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","INT","DOUBLE","SYMBOL","DOUBLE[]","INT[]","INT[]","INT[]","DOUBLE[]","INT[]","INT[]","INT[]","INT","INT","INT","INT","DOUBLE","DOUBLE","INT","INT","INT","INT","INT","INT","INT","INT","DOUBLE","INT","INT","DOUBLE","INT","INT","INT","INT","INT","INT"]
schemaTable = table(1:0, colName, colType)
db.createPartitionedTable(table=schemaTable, tableName=tbName, partitionColumns=`DateTime`SecurityID, compressMethods={DateTime:"delta"}, sortColumns=`SecurityID`DateTime, keepDuplicates=ALL)
}
- 創建 K 線結果存儲表:
創建分鐘級 K 線結果存儲表,核心代碼如下:
module Factor::createFactorOneMinute
//創建分鐘 k 線因數儲存表
def createFactorOneMinute(dbName, tbName){
if(existsDatabase(dbName)){
dropDatabase(dbName)
}
//按天分區
db = database(dbName, VALUE, 2021.01.01..2021.01.03,engine = `TSDB)
colName = `TradeDate`TradeTime`SecurityID`Open`High`Low`Close`Volume`Amount`Vwap
colType =[DATE, MINUTE, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG, DOUBLE, DOUBLE]
tbSchema = table(1:0, colName, colType)
db.createPartitionedTable(table=tbSchema,tableName=tbName,partitionColumns=`TradeDate,sortColumns=`SecurityID`TradeTime,keepDuplicates=ALL)
}
創建日級 K 線結果存儲表,核心代碼如下:
module Factor::createFactorDaily
//創建日 K 線儲存表
def createFactorDaily(dbName, tbName){
if(existsDatabase(dbName)){
dropDatabase(dbName)
}
//按年分區
db = database(dbName, RANGE, datetimeAdd(2000.01M,(0..50)*12, "M"),engine = `TSDB)
colName = `TradeDate`SecurityID`Open`High`Low`Close`Volume`Amount`Vwap
colType =[DATE, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG, DOUBLE, DOUBLE]
tbSchema = table(1:0, colName, colType)
db.createPartitionedTable(table=tbSchema,tableName=tbName,partitionColumns=`TradeDate,sortColumns=`SecurityID`TradeDate,keepDuplicates=ALL)
}
2.3.2 數據清洗
本文將 snapshot 數據的清洗主要分為兩個部分:
- 將多檔價格、委托量以 Array 形式儲存
- 數據去重
具體的處理流程及核心代碼如下:
module processSnapshot::processSnapshotData
//將數據組合為 array、去重,並統計重覆數據量
def mapProcess(mutable t, dbName, tbName){
n1 = t.size()
t = select SecurityID, DateTime, PreClosePx, OpenPx, HighPx, LowPx, LastPx, TotalVolumeTrade, TotalValueTrade, InstrumentStatus, fixedLengthArrayVector(BidPrice0, BidPrice1, BidPrice2, BidPrice3, BidPrice4, BidPrice5, BidPrice6, BidPrice7, BidPrice8, BidPrice9) as BidPrice, fixedLengthArrayVector(BidOrderQty0, BidOrderQty1, BidOrderQty2, BidOrderQty3, BidOrderQty4, BidOrderQty5, BidOrderQty6, BidOrderQty7, BidOrderQty8, BidOrderQty9) as BidOrderQty, fixedLengthArrayVector(BidNumOrders0, BidNumOrders1, BidNumOrders2, BidNumOrders3, BidNumOrders4, BidNumOrders5, BidNumOrders6, BidNumOrders7, BidNumOrders8, BidNumOrders9) as BidNumOrders, fixedLengthArrayVector(BidOrders0, BidOrders1, BidOrders2, BidOrders3, BidOrders4, BidOrders5, BidOrders6, BidOrders7, BidOrders8, BidOrders9, BidOrders10, BidOrders11, BidOrders12, BidOrders13, BidOrders14, BidOrders15, BidOrders16, BidOrders17, BidOrders18, BidOrders19, BidOrders20, BidOrders21, BidOrders22, BidOrders23, BidOrders24, BidOrders25, BidOrders26, BidOrders27, BidOrders28, BidOrders29, BidOrders30, BidOrders31, BidOrders32, BidOrders33, BidOrders34, BidOrders35, BidOrders36, BidOrders37, BidOrders38, BidOrders39, BidOrders40, BidOrders41, BidOrders42, BidOrders43, BidOrders44, BidOrders45, BidOrders46, BidOrders47, BidOrders48, BidOrders49) as BidOrders, fixedLengthArrayVector(OfferPrice0, OfferPrice1, OfferPrice2, OfferPrice3, OfferPrice4, OfferPrice5, OfferPrice6, OfferPrice7, OfferPrice8, OfferPrice9) as OfferPrice, fixedLengthArrayVector(OfferOrderQty0, OfferOrderQty1, OfferOrderQty2, OfferOrderQty3, OfferOrderQty4, OfferOrderQty5, OfferOrderQty6, OfferOrderQty7, OfferOrderQty8, OfferOrderQty9) as OfferQty, fixedLengthArrayVector(OfferNumOrders0, OfferNumOrders1, OfferNumOrders2, OfferNumOrders3, OfferNumOrders4, OfferNumOrders5, OfferNumOrders6, OfferNumOrders7, OfferNumOrders8, OfferNumOrders9) as OfferNumOrders, fixedLengthArrayVector(OfferOrders0, OfferOrders1, OfferOrders2, OfferOrders3, OfferOrders4, OfferOrders5, OfferOrders6, OfferOrders7, OfferOrders8, OfferOrders9, OfferOrders10, OfferOrders11, OfferOrders12, OfferOrders13, OfferOrders14, OfferOrders15, OfferOrders16, OfferOrders17, OfferOrders18, OfferOrders19, OfferOrders20, OfferOrders21, OfferOrders22, OfferOrders23, OfferOrders24, OfferOrders25, OfferOrders26, OfferOrders27, OfferOrders28, OfferOrders29, OfferOrders30, OfferOrders31, OfferOrders32, OfferOrders33, OfferOrders34, OfferOrders35, OfferOrders36, OfferOrders37, OfferOrders38, OfferOrders39, OfferOrders40, OfferOrders41, OfferOrders42, OfferOrders43, OfferOrders44, OfferOrders45, OfferOrders46, OfferOrders47, OfferOrders48, OfferOrders49) as OfferOrders, NumTrades, IOPV, TotalBidQty, TotalOfferQty, WeightedAvgBidPx, WeightedAvgOfferPx, TotalBidNumber, TotalOfferNumber, BidTradeMaxDuration, OfferTradeMaxDuration, NumBidOrders, NumOfferOrders, WithdrawBuyNumber, WithdrawBuyAmount, WithdrawBuyMoney, WithdrawSellNumber, WithdrawSellAmount, WithdrawSellMoney, ETFBuyNumber, ETFBuyAmount, ETFBuyMoney, ETFSellNumber, ETFSellAmount, ETFSellMoney from t where isDuplicated([SecurityID, DateTime], FIRST) = false
n2 = t.size()
loadTable(dbName, tbName).append!(t)
return n1,n2
}
def process(processDate, dbName_orig, tbName_orig, dbName_process, tbName_process){
dataString = temporalFormat(processDate, "yyyyMMdd")
//查詢處理日期的數據在資料庫中是否存在
todayCount = exec count(*) from loadTable(dbName_process, tbName_process) where date(DateTime)=processDate
//如果庫裡面已經存在當天要處理的數據,刪除庫裡面已有數據
if(todayCount != 0){
writeLog("Start to delete the process snapshot data, the delete date is: " + dataString)
dropPartition(database(dbName_process), processDate, tbName_process)
writeLog("Successfully deleted the process snapshot data, the delete date is: " + dataString)
}
//開始處理數據
writeLog("Start process Snapshot Data, the datetime is "+ dataString)
ds = sqlDS(sql(select=sqlCol("*"), from=loadTable(dbName_orig,tbName_orig),where=<date(DateTime)=processDate>))
n1,n2=mr(ds, mapProcess{, dbName_process, tbName_process}, +, , false)
if(n1 != n2){
writeLog("ERROR: Duplicated datas exists in " + dataString + ", Successfully drop " + string(n1-n2) + " pieces of data" )
}
writeLog("Successfully process the snapshot data, the processDate is: " + dataString)
}
2.3.3 清洗行情數據合成 K 線
分鐘級 K 線合成併入庫, 核心代碼如下:
module Factor::calFactorOneMinute
//合成分鐘 K 線併入庫
def calFactorOneMinute(dbName, tbName, mutable factorTable){
pt = loadTable(dbName, tbName)
//將數據分為10天一組計算
dayList = schema(pt).partitionSchema[0]
if(dayList.size()>10) dayList = dayList.cut(10)
for(days in dayList){
//計算分鐘 K 線
res = select first(LastPX) as Open, max(LastPx) as High, min(LastPx) as Low, last(LastPx) as Close, sum(TotalVolumeTrade) as Volume, sum(LastPx*totalVolumeTrade) as Amount, wavg(LastPx, TotalVolumeTrade) as Vwap from pt where date(DateTime) in days group by date(DateTime) as TradeDate,minute(DateTime) as TradeTime, SecurityID
writeLog("Start to append minute factor result , the days is: [" + concat(days, ",")+"]")
//分鐘 K 線入庫
factorTable.append!(res)
writeLog("Successfully append the minute factor result to databse, the days is: [" + concat(days, ",")+"]")
}
}
日級 K 線合成併入庫, 核心代碼如下:
module Factor::calFactorDaily1
//合成日 K 線併入庫
def calFactorDaily(dbName, tbName, mutable factorTable){
pt = loadTable(dbName, tbName)
//將數據分為10天一組計算
dayList = schema(pt).partitionSchema[0]
if(dayList.size()>10) dayList = dayList.cut(10)
for(days in dayList){
//計算日 K 線
res = select first(LastPX) as Open, max(LastPx) as High, min(LastPx) as Low, last(LastPx) as Close, sum(TotalVolumeTrade) as Volume, sum(LastPx*totalVolumeTrade) as Amount, wavg(LastPx, TotalVolumeTrade) as Vwap from pt where date(DateTime) in days group by date(DateTime) as TradeDate, SecurityID
writeLog("Start to append daily factor result , the days is: [" + concat(days, ",")+"]")
//日 K 線入庫
factorTable.append!(res)
writeLog("Successfully append the daily factor result to databse, the days is: [" + concat(days, ",")+"]")
}
}
2.4 增量數據清洗
增量數據的導入和全量數據的導入整體邏輯相同,主要區別如下:
- 全量數據批量導入,增量數據每天定時執行
- 全量數據導入通過 DolphinDB submitjob 非同步提交任務,增量數據導入通過調用 append! 介面同步導入
- 數據加工 K 線全量數據批量加工,增量數據只加工當天數據
具體代碼差別見附件。
2.5 Airflow 生成 DAG 執行任務
2.5.1 生成一個 DAG 實例
生成全量 DAG 實例的示例如下:
with DAG(dag_id="ETLTest", start_date=datetime(2023, 3, 10), schedule_interval=None) as dag:
dag_id
指定了 DAG 名稱,需要具有唯一性;start_date
設定任務開始日期;schedule_interval
指定兩次任務的間隔;None
表示該任務不自動執行需手動觸發。 增量 DAG 示例如下:
args = {
"owner" : "Airflow",
"start_date" : days_ago(1),
"catchup" : False,
'retries' : 0
}
with DAG(dag_id="addETLTest", default_args = args, schedule_interval="0 12 * * *") as dag:
增量 DAG 中 catchup
表示是否進行回填任務操作,retries
表示任務失敗重試次數,schedule_interval = “0 12 * * * ”
表示在每天12點 (UTC) 運行任務。schedule
設置詳細可參考:Scheduling & Triggers
2.5.2 獲取 Airflow 中的變數
Airflow 中設定的變數值,無法直接在 DolphinDB 腳本中獲取,為了在後續的任務中使用,本文通過將 Airflow 中變數寫入共用表的方式,來實現後續在 DolphinDB 任務讀取變數,具體代碼示例如下:
//獲取變數值
variable = ['ETL_dbName_origin', "ETL_tbName_origin", "ETL_dbName_process",
"ETL_tbName_process", 'ETL_dbName_factor','ETL_tbName_factor','ETL_dbName_factor_daily',
'ETL_tbName_factor_daily',"ETL_filedir", "ETL_start_date","ETL_end_date"]
value = []
for var in variable:
value.append(str(Variable.get(var)))
variables = ",".join(variable)
values = ",".join(value)
//通過DolphinDBOperator創建共用表,並將變數值寫入共用表中
create_parameter_table = DolphinDBOperator(
task_id='create_parameter_table',
dolphindb_conn_id='dolphindb_test',
sql='''
undef(`paramTable,SHARED)
t = table(1:0, `param`value, [STRING, STRING])
share t as paramTable
'''
)
given_param = DolphinDBOperator(
task_id='given_param',
dolphindb_conn_id='dolphindb_test',
sql="params = split('" + variables + "',',');" + \
"values = split('" + values + "',',');" + \
"for(i in 0..(params.size()-1)){" + \
"insert into paramTable values(params[i], values[i]);}"
)
運行該任務後可在 DolphinDB GUI 共用變數欄中看到參數表及其值,如下圖所示:
每個 DAG 變數的值可以通過 AirFlow 進行修改,點擊下圖所示位置:
DAG 生成後,在如下 Web 頁面顯示 DAG 使用的變數可以動態修改,如下所示:
下表為本項目中涉及的變數名稱及其含義:
2.5.3 DolphinDBOperator 執行任務
- DolphinDBOperator 全量處理數據
通過 DolphinDBOperator 將上述的數據入庫、清洗、計算等設置為 DAG 中的任務
全量處理核心代碼如下:
loadSnapshot = DolphinDBOperator(
task_id='loadSnapshot',
dolphindb_conn_id='dolphindb_test',
sql='''
pnodeRun(clearAllCache)
undef(all)
go;
//使用 module,載入已封裝好的建表及入庫函數
use loadSnapshot::createSnapshotTable
use loadSnapshot::loadSnapshotData
//通過參數共用表獲取參數
params = dict(paramTable[`param], paramTable[`value])
dbName = params[`ETL_dbName_origin]
tbName = params[`ETL_tbName_origin]
startDate = date(params[`ETL_start_date])
endDate = date(params[`ETL_end_date])
fileDir = params[`ETL_filedir]
//結果庫表不存在則創建
if(not existsDatabase(dbName)){
loadSnapshot::createSnapshotTable::createSnapshot(dbName, tbName)
}
//調用清洗函數,後臺多進程寫入,提高寫入效率
start = now()
for (loadDate in startDate..endDate){
submitJob("loadSnapshot"+year(loadDate)+monthOfYear(loadDate)+dayOfMonth(loadDate), "loadSnapshot", loadSnapshot::loadSnapshotData::loadSnapshot{, dbName, tbName, fileDir}, loadDate)
}
//查看寫入任務是否完成,以保證後續處理部分數據源完整
do{
cnt = exec count(*) from getRecentJobs() where jobDesc="loadSnapshot" and endTime is null
}
while(cnt != 0)
//查看導入過程中是否有異常,有異常則拋出異常
cnt = exec count(*) from pnodeRun(getRecentJobs) where jobDesc="loadSnapshot" and errorMsg is not null and receivedTime > start
if (cnt != 0){
error = exec errorMsg from pnodeRun(getRecentJobs) where jobDesc="loadSnapshot" and errorMsg is not null and receivedTime > start
throw error[0]
}
'''
)
processSnapshot = DolphinDBOperator(
task_id='processSnapshot',
dolphindb_conn_id='dolphindb_test',
sql='''
pnodeRun(clearAllCache)
undef(all)
go;
//使用 module,載入已封裝好的建表及入庫函數
use processSnapshot::createSnapshot_array
use processSnapshot::processSnapshotData
//通過參數共用表獲取參數
params = dict(paramTable[`param], paramTable[`value])
dbName_orig = params[`ETL_dbName_origin]
tbName_orig = params[`ETL_tbName_origin]
dbName_process = params[`ETL_dbName_process]
tbName_process = params[`ETL_tbName_process]
startDate = date(params[`ETL_start_date])
endDate = date(params[`ETL_end_date])
//結果庫表不存在則創建
if(not existsDatabase(dbName_process)){
processSnapshot::createSnapshot_array::createProcessTable(dbName_process, tbName_process)
}
//後臺多進程處理,提高處理效率
start = now()
for (processDate in startDate..endDate){
submitJob("processSnapshot"+year(processDate)+monthOfYear(processDate)+dayOfMonth(processDate), "processSnapshot", processSnapshot::processSnapshotData::process{, dbName_orig, tbName_orig, dbName_process, tbName_process}, processDate)
}
//查看清洗任務是否完成,以保證後續處理部分數據源完整
do{
cnt = exec count(*) from getRecentJobs() where jobDesc="processSnapshot" and endTime is null
}
while(cnt != 0)
//查看清洗過程中是否有異常,有異常則拋出異常
cnt = exec count(*) from pnodeRun(getRecentJobs) where jobDesc="processSnapshot" and errorMsg is not null and receivedTime > start
if (cnt != 0){
error = exec errorMsg from pnodeRun(getRecentJobs) where jobDesc="processSnapshot" and errorMsg is not null and receivedTime > start
throw error[0]
}
'''
)
calMinuteFactor = DolphinDBOperator(
task_id='calMinuteFactor',
dolphindb_conn_id='dolphindb_test',
sql='''
pnodeRun(clearAllCache)
undef(all)
go;
//使用 module,載入已封裝好的建表及入庫函數
use Factor::createFactorOneMinute
use Factor::calFactorOneMinute
//通過參數共用表獲取參數
params = dict(paramTable[`param], paramTable[`value])
dbName = params[`ETL_dbName_process]
tbName = params[`ETL_tbName_process]
dbName_factor = params[`ETL_dbName_factor]
tbName_factor = params[`ETL_tbName_factor]
//結果庫表不存在則創建
if(not existsDatabase(dbName_factor)){
createFactorOneMinute(dbName_factor, tbName_factor)
}
factorTable = loadTable(dbName_factor, tbName_factor)
//調用計算函數
calFactorOneMinute(dbName, tbName,factorTable)
'''
)
calDailyFactor = DolphinDBOperator(
task_id='calDailyFactor',
dolphindb_conn_id='dolphindb_test',
sql='''
pnodeRun(clearAllCache)
undef(all)
go;
//使用 module,載入已封裝好的建表及入庫函數
use Factor::createFactorDaily
use Factor::calFactorDaily1
//通過參數共用表獲取參數
params = dict(paramTable[`param], paramTable[`value])
dbName = params[`ETL_dbName_process]
tbName = params[`ETL_tbName_process]
dbName_factor = params[`ETL_dbName_factor_daily]
tbName_factor = params[`ETL_tbName_factor_daily]
//結果庫表不存在則創建
if(not existsDatabase(dbName_factor)){
createFactorDaily(dbName_factor, tbName_factor)
}
//調用計算函數
factorTable = loadTable(dbName_factor, tbName_factor)
Factor::calFactorDaily1::calFactorDaily(dbName, tbName,factorTable)
'''
)
根據任務間的依賴關係,構建 DAG,示例如下:
start_task >> create_parameter_table >> given_param >> loadSnapshot >> processSnapshot >> calMinuteFactor >> calDailyFactor
- DolphinDBOperator 增量數據入庫
增量數據任務構建代碼如下:
addLoadSnapshot = DolphinDBOperator(
task_id='addLoadSnapshot',
dolphindb_conn_id='dolphindb_test',
sql='''
pnodeRun(clearAllCache)
undef(all)
go;
//使用module,載入已封裝好的入庫函數
use addLoadSnapshot::loadSnapshotData
//通過參數共用表獲取參數
params = dict(paramTable[`param], paramTable[`value])
dbName = params[`ETL_dbName_origin]
tbName = params[`ETL_tbName_origin]
fileDir = params[`ETL_filedir]
//獲取交易日曆
MarketDays = getMarketCalendar("CFFEX")
//是交易日則進行數據入庫
if(today() in MarketDays ){
fileDir = params[`ETL_filedir]
addLoadSnapshot::loadSnapshotData::loadSnapshot(today(), dbName, tbName, fileDir)
}
'''
)
addProcessSnapshot = DolphinDBOperator(
task_id='addProcessSnapshot',
dolphindb_conn_id='dolphindb_test',
sql='''
pnodeRun(clearAllCache)
undef(all)
go;
//使用module,載入已封裝好的清洗函數
use addProcessSnapshot::processSnapshotData
//通過參數共用表獲取參數
params = dict(paramTable[`param], paramTable[`value])
dbName_orig = params[`ETL_dbName_origin]
tbName_orig = params[`ETL_tbName_origin]
dbName_process = params[`ETL_dbName_process]
tbName_process = params[`ETL_tbName_process]
//獲取交易日曆
MarketDays = getMarketCalendar("CFFEX")
//是交易日則進行數據處理
if(today() in MarketDays ){
addProcessSnapshot::processSnapshotData::process(today(), dbName_orig, tbName_orig, dbName_process, tbName_process)
}
'''
)
addCalMinuteFactor= DolphinDBOperator(
task_id='addCalMinuteFactor',
dolphindb_conn_id='dolphindb_test',
sql='''
pnodeRun(clearAllCache)
undef(all)
go;
//使用module,載入已封裝好的計算函數
use addFactor::calFactorOneMinute
//通過參數共用表獲取參數
params = dict(paramTable[`param], paramTable[`value])
dbName = params[`ETL_dbName_process]
tbName = params[`ETL_tbName_process]
dbName_factor = params[`ETL_dbName_factor]
tbName_factor = params[`ETL_tbName_factor]
factorTable = loadTable(dbName_factor, tbName_factor)
//獲取交易日曆
MarketDays = getMarketCalendar("CFFEX")
//是交易日則調用計算函數合成分鐘K線
if(today() in MarketDays ){
addFactor::calFactorOneMinute::calFactorOneMinute(dbName, tbName,today(), factorTable)
}
'''
)
addCalDailyFactor= DolphinDBOperator(
task_id='addCalDailyFactor',
dolphindb_conn_id='dolphindb_test',
sql='''
pnodeRun(clearAllCache)
undef(all)
go;
//使用module,載入已封裝好的計算函數
use addFactor::calFactorDaily1
//通過參數共用表獲取參數
params = dict(paramTable[`param], paramTable[`value])
dbName = params[`ETL_dbName_process]
tbName = params[`ETL_tbName_process]
dbName_factor = params[`ETL_dbName_factor_daily]
tbName_factor = params[`ETL_tbName_factor_daily]
factorTable = loadTable(dbName_factor, tbName_factor)
//獲取交易日曆
MarketDays = getMarketCalendar("CFFEX")
//是交易日則調用計算函數合成日K線
if(today() in MarketDays ){
addFactor::calFactorDaily1::calFactorDaily(dbName, tbName,today(), factorTable)
}
'''
)
根據任務間的依賴關係,構建 DAG,示例如下:
start_task >> create_parameter_table >> given_param >> addLoadSnapshot >> addProcessSnapshot >> addCalMinuteFactor >> addCalDailyFactor
2.5.4 生成 DAG
根據如下步驟部署項目:
- 第一步 DolphinDB 項目部署
將 DolphinDB 項目中的 addETL 和 fullETL 項目分別導入 DolphinDB GUI (DolphinDB 客戶端工具)中:
將 addETL 及 fullETL 項目中的 module 模塊上傳至 Airflow 中已建立連接的 DolphinDB server 中:
-
第二步 python 項目部署
將 python 項目中的 python 腳本放置到 <Airflow_install_Dir/airflow/dags> 目錄下。註意,新建的 DAG 任務並不會馬上出現在界面上,預設需要等待5分鐘後刷新,也可修改 airflow.cfg 文件中的 dag_dir_list_interval 調整刷新間隔。 -
第三步 Airflow 變數導入
在 Airflow 網頁中進入 Admin-->Variables,將 Variables.json 文件上傳,將變數導入 Airflow 中,並根據實際情況調整變數值。
- 第四步 上傳原始數據文件
將數據文件上傳至伺服器,並根據數據文件的實際存放路徑,在 Airflow 中修改ETL_filedir
變數。如運行增量 ETL 任務,需要將數據文件名中的日期改為當前日期,如:20230330snapshot.csv,以避免無數據導致任務失敗。
最終實現 DAG 如下所示:
全量數據入庫:
增量數據入庫:
運行任務後,任務實例為綠色代表任務運行成功;紅色表示任務運行失敗;橙色則表示該任務所依賴的上游任務運行失敗,任務未啟動。
3. 常見問題解答(FAQ)
3.1 如何捕獲 DolphinDB 腳本中的 print 函數列印的信息
- DolphinDB 腳本的 print 的信息為標準輸出,可以在
airflow-scheduler.out
中找到,如下圖所示:
3.2 DolphinDB 腳本中的非同步作業 submitjob 如何檢測其完成狀態
通過 DolphinDB 的函數 getRecntJobs
獲取後臺作業信息, 之後在 DolphinDB DAG 中添加判斷邏輯, 代碼示例如下:
DolphinDBOperator(
task_id='processSnapshot',
dolphindb_conn_id='dolphindb_test',
sql='''
//檢查所有任務是否全部完成
do{
cnt = exec count(*) from getRecentJobs() where jobDesc="processSnapshot" and endTime is null
}
while(cnt != 0)
//檢查後臺任務是否成功,失敗則拋出異常
cnt = exec count(*) from pnodeRun(getRecentJobs) where jobDesc="processSnapshot" and errorMsg is not null and receivedTime > start
if (cnt != 0){
error = exec errorMsg from pnodeRun(getRecentJobs) where jobDesc="processSnapshot" and errorMsg is not null and receivedTime > start
throw error[0]
}
'''
)
3.3 執行 Airflow 中經常遇到連接超時斷開,該如何處理
當遇到如上問題,可能是網路延時導致的,可以在建立連接時設置參數,如上圖,在 DolphinDB 連接中設置 KeepAliveTime 及 reconnect 參數即可。
3.4 將 start_date 日期設為當前日期,每天運行一次,為什麼當天不會運行
- 在 Airflow 中一個定時調度任務的最早開始時間為 start_date + scheduler_interval,例如:
start_date = 2023.03.16
,每天調用一次,則最早一次任務調度為 2023.03.17,所以當天的任務無法執行。
3.5 DolphinDBOperator 任務運行失敗如何定位失敗原因
- 任務失敗後,DolphinDBOperator 會將具體的錯誤信息列印在日誌中,可通過查看日誌信息,定位異常代碼併進行修改。查看日誌信息步驟如下:
4. 總結
本教程從一個常用行情數據 ETL 案例出發,著重闡述瞭如何將 Airflow 調度框架與 DolphinDB 資料庫結合起來進行結構化 ETL 作業管理, 以此來節省時間,提高效率,降低運維成本。同時,由於篇幅有限,涉及到DolphinDB 和 Airflow 框架的一些其它操作未能更進一步展示,用戶在使用過程中需要按照實際情況進行調整。也歡迎大家對本教程中可能存在的紕漏和缺陷批評指正。
附件
- 依賴包:pydolphindb-1.0.0-py3-none-any.whl,apache_airflow_providers_dolphindb-1.0.0-py3-none-any.whl
- DolphinDB 工程項目:addETL,fullETL
- Python 項目:addETL.py,fullETL.py
- 數據文件:20210104snapshot.csv
- Airflow 變數:Variables.json