因子挖掘是量化交易的基礎。近年來,Python 是很多研究員進行資料分析和因子挖掘的主流工具。但是通過 Python 挖掘的有效因子在投産時,通常需要由 QUANT 團隊的研究員将代碼送出給 IT 團隊,IT 團隊用 C++ 代碼轉寫後部署到生産環境,以滿足實盤低延時的要求。這種做法雖然通過維護兩套系統解決了産研一體化的問題,但開發周期較長,成本也相對較高。
量化金融是一個高度市場化、多方機構高度博弈的領域,因子的有效時間會随着博弈程度的加劇而縮短。如何使用更高效的工具和流程,更快地找到新的有效因子并投産部署到實盤交易,是每一個交易團隊必須面對的問題。
本教程旨在指導使用者基于 DolphinDB 快速搭建一個友善、快捷、擴充性好和相容性強的流批一體因子計算平台原型,提供基于快照資料計算分鐘因子和進一步加工分鐘因子為複雜因子的功能。
因子業務開發人員無需了解 DolphinDB 流計算架構的底層架構,僅需根據業務因子計算邏輯編寫函數表達式,然後排程因子計算平台的計算接口,便可完成因子計算。
基于這一平台,開發人員無需再轉寫代碼,因子投研和生産隻需一套系統、一種腳本即可無縫切換,極大降低了開發運維成本,提高了因子投産的全流程效率。
1. 概述
在基于 Level-2 快照資料做實時分鐘因子加工的時候,比如實時做 K 線,常常會面臨以下幾個問題:
- 以機器時間還是事件時間作為視窗關閉的信号?
- 如果使用事件時間作為視窗關閉的信号,如何保證及時關閉不活躍股票的計算視窗?
- 如果使用事件時間作為視窗關閉的信号,如何保證及時關閉 11:30(午間休市)、14:57(連續競價結束)的計算視窗 ?
- 計算時視窗邊界的開閉是左閉右開還是左開右閉?
- 計算輸出結果的時間是計算視窗起始時間還是結束時間?
- 如果在某個計算視窗内,分組内沒有資料,如何用前值或者指定值填充?
在基于分鐘因子進一步加工有狀态的複雜因子的時候,比如實時計算 MACD、RSI 等,常常會面臨以下幾個問題:
- 計算因子是有狀态的:不僅與目前的多個名額有關,而且與多個名額的曆史狀态相關,如何開發有狀态的算子?
- 有狀态的算子如何實作增量計算,提高計算效率?
- 在一次響應計算過程中,如果計算 1000 個因子,這 1000 個因子依賴一個共同的中間變量,如何避免重複計算?
DolphinDB 内置的時間序列流計算引擎可以滿足 Level-2 快照資料實時分鐘因子計算,響應式狀态流計算引擎可以滿足分鐘因子進一步加工有狀态的複雜因子的計算。上述問題會在本教程中逐一解答。本教程的示例内容隻涉及分鐘頻的因子計算,但是 DolphinDB 的計算能力不局限于分鐘頻的因子,後續我們會陸續釋出快照頻率、1s 頻率甚至更高頻率的因子計算平台建構最佳實踐教程。
2. Level-2 快照資料流批一體因子計算平台
2.1 因子計算平台業務使用流程
按照本教程部署完基于 DolphinDB 搭建的因子計算平台後,基于曆史資料的因子開發階段的調試流程如下:
因子業務開發人員隻需要在 DolphinDB 提供的內建開發環境中編寫因子計算的函數表達式,然後調用因子計算平台的計算接口就可以完成調試。如果編寫因子符合 DolphinDB 的文法,就可以成功執行并傳回計算結果。如果編寫因子不符合 DolphinDB 的文法,就會報錯中斷。
在已經開發了一定數量的因子後,需要在生産環境部署實時計算業務,部署流程如下:
因子業務開發人員隻需通過用戶端調用封裝好的實時因子計算服務執行函數,便可以完成部署。執行完以後,DolphinDB server 會出現該流計算服務的入口,是一個表對象,可以通過 DolphinDB 提供的實時資料接入工具來接入資料。同時也會自動建立流計算服務的出口,也是一個表對象,存儲計算結果。
2.2 因子計算平台架構
本教程示例 Level-2 快照資料流批一體因子計算平台的架構如下圖所示:
主要包括以下功能子產品
- 實時資料低延時接入功能子產品
- DolphinDB API 實時資料寫入接口:C++ API, Java API 等
- DolphinDB 實時行情接入插件:amdQuote, Insight, NSQ 等
- DolphinDB 消息中間件訂閱插件:Kafka, zmq, MQTT 等
- 曆史資料回放功能子產品:因子開發階段的調試和因子回測都需要基于曆史資料,DolphinDB 提供了單表和多表的嚴格按照時序的控速回放功能,能夠便捷高效地把已經存儲在 DolphinDB 資料庫中的曆史資料回放成流。
- 内置流計算引擎功能子產品:DolphinDB 根據各種時序資料流式計算場景,開發了多個流計算引擎。本教程中,對快照資料做滾動視窗的聚合計算(計算生成不同分鐘頻的因子)使用了時間序列聚合流計算引擎,進一步加工成複雜因子用了響應式狀态流計算引擎。
- 內建開發環境功能子產品:因子業務開發人員可以把 DolphinDB GUI 和 DolphinDB Vscode 作為內建開發環境,進行因子表達式代碼的開發和調試。同時可以通過各種語言的 DolphinDB API 與 DolphinDB server 進行互動,進行任務的排程和作業的執行。
- 低延時消息總線釋出子產品:DolphinDB 提供了對接各種消息隊列中間件的插件,可以把實時計算結果推送到 Kafka, zmq, RabbitMQ, MQTT 等。
2.3 因子計算平台的計算能力
本教程示例 Level-2 快照資料流批一體因子計算平台擁有計算下述兩類因子的能力:
(1)第一類:基于快照資料計算分鐘因子
第一類因子是指直接對快照資料,做指定視窗大小的滾動視窗聚合計算,比如任意分鐘的 K 線等聚合名額計算。第一類因子使用了 DolphinDB 内置的時間序列引擎(createTimeSeriesEngine),具體教程可參考 https://www.dolphindb.cn/cn/help/FunctionsandCommands/FunctionReferences/c/createTimeSeriesEngine.html
(2)第二類:進一步加工分鐘因子為複雜因子
第二類因子是指對第一類因子做進一步加工,做步長為1行、視窗為 n 行或者指定時間的滑動視窗計算,比如 EMA、RSI 等有狀态因子的計算。第二類因子使用了 DolphinDB 内置的流計算引擎(createReactiveStateEngine),具體教程可參考 https://www.dolphindb.cn/cn/help/FunctionsandCommands/FunctionReferences/c/createReactiveStateEngine.html
2.4 使用者二次開發自定義因子表達式
2.4.1 自定義分鐘因子表達式
第一類因子是指直接對快照資料,做指定視窗大小的滾動視窗聚合計算,用了 DolphinDB 内置的時間序列引擎。時間序列引擎對以下聚合計算算子進行了優化,實作了增量計算,顯著提升了性能:corr, covar, first, last, max, med, min, percentile, quantile, std, var, sum, sum2, sum3, sum4, wavg, wsum, count, firstNot, ifirstNot, lastNot, ilastNot, imax, imin, nunique, prod, sem, mode, searchK。是以,如果分鐘因子可以直接用 DolphinDB 内置聚合算子表達,就可以實作增量計算。如果分鐘因子複雜程度較高,無法直接用 DolphinDB 内置聚合算子直接表達,那麼就需要用 defg 函數聲明自定義聚合計算函數來表達。
下面我們以分鐘 K 線計算和指定視窗内的買賣壓力名額計算為例,說明增量計算的因子表達式編寫方式和非增量計算的因子表達式編寫方式。
增量計算因子表達式:
def High(){
return "max(LastPx)"
}
函數名High對應因子名稱,表示分鐘 K 線的最高價,業務上的計算邏輯是對計算視窗内發生的所有價格求最大值,可以用 DolphinDB 内置的聚合算子max直接表達,是以用字元串max(LastPx)直接表示,LastPx表示最新成交價格。因子計算平台會自動解析字元串max(LastPx)為元代碼的格式 <max(LastPx)>,并傳入時間序列引擎。 同理,分鐘 K 線的開盤價、收盤價和最低價可以這樣表示:
def Open(){
return "first(LastPx)"
}
def Close(){
return "last(LastPx)"
}
def Low(){
return "min(LastPx)"
}
非增量計算因子表達式:
defg Press(BidPrice0,BidPrice1,BidPrice2,BidPrice3,BidPrice4,BidPrice5,BidPrice6,BidPrice7,BidPrice8,BidPrice9,BidOrderQty0,BidOrderQty1,BidOrderQty2,BidOrderQty3,BidOrderQty4,BidOrderQty5,BidOrderQty6,BidOrderQty7,BidOrderQty8,BidOrderQty9,OfferPrice0,OfferPrice1,OfferPrice2,OfferPrice3,OfferPrice4,OfferPrice5,OfferPrice6,OfferPrice7,OfferPrice8,OfferPrice9,OfferOrderQty0,OfferOrderQty1,OfferOrderQty2,OfferOrderQty3,OfferOrderQty4,OfferOrderQty5,OfferOrderQty6,OfferOrderQty7,OfferOrderQty8,OfferOrderQty9){
bidPrice = matrix(BidPrice0,BidPrice1,BidPrice2,BidPrice3,BidPrice4,BidPrice5,BidPrice6,BidPrice7,BidPrice8,BidPrice9)
bidQty = matrix(BidOrderQty0,BidOrderQty1,BidOrderQty2,BidOrderQty3,BidOrderQty4,BidOrderQty5,BidOrderQty6,BidOrderQty7,BidOrderQty8,BidOrderQty9)
offerPrice = matrix(OfferPrice0,OfferPrice1,OfferPrice2,OfferPrice3,OfferPrice4,OfferPrice5,OfferPrice6,OfferPrice7,OfferPrice8,OfferPrice9)
offerQty = matrix(OfferOrderQty0,OfferOrderQty1,OfferOrderQty2,OfferOrderQty3,OfferOrderQty4,OfferOrderQty5,OfferOrderQty6,OfferOrderQty7,OfferOrderQty8,OfferOrderQty9)
wap = (bidPrice[0]*offerQty[0] + offerPrice[0]*bidQty[0])\(bidQty[0]+offerQty[0])
bidw=(1.0\(bidPrice-wap))
bidw=bidw\(bidw.rowSum())
offerw=(1.0\(offerPrice-wap))
offerw=offerw\(offerw.rowSum())
press = log((bidQty*bidw).rowSum())-log((offerQty*offerw).rowSum())
return avg(press)
}
函數名 Press 對應因子名,表示買賣壓力名額,BidPrice, BidOrderQty, OfferPrice, OfferOrderQty 表示買賣方向的十檔量價,其函數表達式如下:
該因子的表達式複雜度較高,無法直接用 DolphinDB 内置的聚合算子表示,需要用 defg 函數聲明自定義聚合計算函數來表達。因子計算平台會自動解析聚合函數 Press 為元代碼的格式 <Press()>,并傳入時間序列引擎。
2.4.2 自定義複雜因子表達式
第二類因子是指對第一類因子做進一步加工,做步長為1行、視窗為 n 行或者指定時間的滑動視窗計算,用了 DolphinDB 内置的響應式狀态引擎。狀态算子計算時需要用到曆史狀态。如果每一次計算都使用全量資料,性能不佳。狀态函數的優化,也就是增量方式的流式實作非常關鍵。下列狀态函數在 DolphinDB 的響應式狀态引擎中的實作均得到了優化:
- 累計視窗函數:cumavg, cumsum, cumprod, cumcount, cummin, cummax, cumvar, cumvarp, cumstd, cumstdp, cumcorr, cumcovar, cumbeta, cumwsum, cumwavg
- 滑動視窗函數:ema, mavg, msum, mcount, mprod, mvar, mvarp, mstd, mstdp, mskew, mkurtosis, mmin, mmax, mimin, mimax, mmed, mpercentile, mrank, mcorr, mcovar, mbeta, mwsum, mwavg, mslr
- 序列相關函數:deltas, ratios, ffill, move, prev, iterate, ewmMean, ewmVar, ewmStd, ewmCovar, ewmCorr
上述函數除了 mslr 傳回兩個值以外,其餘函數均隻有一個傳回值。在後續的版本中,DolphinDB 将允許使用者用插件來開發自己的狀态函數,注冊後即可在狀态引擎中使用。
下面我們以進一步加工分鐘收盤價為 MACD 為例:
@state
def MACD(Close, SHORT_ = 12, LONG_ = 26, M = 9) {
DIF = ewmMean(Close, span = SHORT_, adjust = false) - ewmMean(Close, span = LONG_, adjust = false)
DEA = ewmMean(DIF, span = M, adjust = false)
MACD = (DIF - DEA) * 2
return round(DIF, 3), round(DEA, 3), round(MACD, 3)
}
函數名 MACD 對應因子名,Close 是指分鐘收盤價,必須包括在時間序列聚合引擎的輸出結果中。
在定義複雜因子表達式的時候,如果定義的函數是有狀态的,即目前行傳回值基于之前行的資料,則需要在定義函數前用 @state 聲明。
3. 部署和因子開發
3.1 因子計算平台部署
第一步
把本教程功能子產品代碼導入本地內建開發環境(DolphinDB GUI),功能子產品源碼見附錄 SnapshotFactorCalculationPlatform。導入的目錄結構必須嚴格按照下圖所示結構:
第二步
右擊 SnapshotFactorCalculationPlatform 目錄,把本地子產品代碼同步到 DolphinDB server 端。
第三步
在 DolphinDB GUI 的 scripts 目錄建立腳本檔案,執行下述代碼進行功能驗證
use DolphinDBModules::SnapshotFactorCalculationPlatform::JsonConfig::JsonConfigLoad
/**
計算服務部署傳參
testConfig.dos 是示例 Json 配置檔案
parallel 指定計算的并行度
*/
jsonPath = "./modules/DolphinDBModules/SnapshotFactorCalculationPlatform/testConfig.dos"
parallel = 2
// 執行計算服務部署函數
loadJsonConfig(jsonPath, parallel)
執行成功後,可以在 DolphinDB GUI 的右下角變量欄看到流計算相應的入口和出口的表變量:
此時,隻需要把實時資料或者庫内曆史資料注入到入口 snapshotStream 中,就會在出口(結果表)中看到相應的輸出。
3.2 第一類分鐘因子開發和調試
第一步
在 DolphinDB GUI 的 SnapshotFactorCalculationPlatform 子產品的 Factor1 目錄定義第一類分鐘因子的表達式,按照 3.1 部署完因子計算平台後,内置了如下分鐘因子,以調試 Close 和 Press 為例:
定義完新的因子表達式後,手動把修改後的子產品檔案同步到 DolphinDB server,如下圖所示:
第二步
在 DolphinDB GUI 的 scripts 目錄建立腳本檔案,執行下述代碼,生成 Json 格式的配置檔案:
use DolphinDBModules::SnapshotFactorCalculationPlatform::JsonConfig::JsonGetFileString
// 第一類分鐘因子配置參數
FactorLevel1 = `Close`Press`Close`Press
isInc = `true`false`true`false
barMinutesLevel1 = 1 1 5 5
useSystemTime = `false`false`false`false
// 指定存儲 Json 配置檔案的路徑
jsonPath = "./test.json"
JsonFileString = JsonGetFileString(FactorLevel1, isInc, barMinutesLevel1, useSystemTime)
saveTextFile(JsonFileString, jsonPath)
代碼說明:
- FactorLevel1:指定需要計算的分鐘因子名稱,必須在 step1 中定義,并同步到 DolphinDB server。
- isInc:與 FactorLevel1 的長度相同,表示計算的分鐘因子是否需要按照增量計算解析,“true” 表示計算因子按照增量計算函數解析,“false” 表示計算因子按照非增量計算函數解析。
- barMinutesLevel1:與 FactorLevel1 的長度相同,表示計算的分鐘因子的頻率,機關是“分”。
- useSystemTime:與 FactorLevel1 的長度相同,表示計算的分鐘因子的視窗關閉方式,“true” 表示用機器時間觸發視窗,“false” 表示用事件時間觸發視窗。同一個頻率的計算因子視窗關閉方式必須一緻。
執行完畢後,會在 DolphinDB server 部署目錄生成一個 Json 格式的配置檔案 test.json,内容如下:
[{"factor": "Close", "isInc": true, "barMinute": 1, "level": 1, "useSystemTime": false}, {"factor": "Press", "isInc": false, "barMinute": 1, "level": 1, "useSystemTime": false}, {"factor": "Close", "isInc": true, "barMinute": 5, "level": 1, "useSystemTime": false}, {"factor": "Press", "isInc": false, "barMinute": 5, "level": 1, "useSystemTime": false}]
第三步
在 DolphinDB GUI 的 scripts 目錄建立腳本檔案,執行下述代碼,部署計算服務:
// 初始化流計算環境
use DolphinDBModules::ops
clearAllStreamEnv()
go
// 執行計算服務部署函數
use DolphinDBModules::SnapshotFactorCalculationPlatform::JsonConfig::JsonConfigLoad
jsonPath = "./test.json"
parallel = 1
loadJsonConfig(jsonPath, parallel)
注意,ops 功能子產品中的 clearAllStreamEnv() 函數會把目前節點的所有訂閱、引擎和共享表都會清除,是以在多人協作開發的環境中使用時需要注意。
第四步
把測試的 csv 資料檔案放到 DolphinDB server 端伺服器的指定位置,例如本教程放在 /hdd/hdd9/tutorials/SnapshotFactorCalculationPlatform/test.csv,測試的 csv 資料可在教程附錄下載下傳。然後在 DolphinDB GUI 的 scripts 目錄建立腳本檔案,執行下述代碼,把 csv 資料按照流的方式回放進來:
use DolphinDBModules::SnapshotFactorCalculationPlatform::snapshotReplay
csvPath = "/hdd/hdd9/tutorials/SnapshotFactorCalculationPlatform/test.csv"
snapshotCsvReplayJob(csvPath, snapshotStream)
此時可以在 DolphinDB GUI 中執行函數 now(),起到重新整理用戶端的效果,可以看到右下角變量欄的結果表不斷地在更新,檢視結果表中的資料,以 1 分鐘計算結果表為例,具體内容如下:
3.3 第二類複雜因子開發和調試
第一步
在 DolphinDB GUI 的 SnapshotFactorCalculationPlatform 子產品的 Factor2 目錄定義第二類複雜因子的表達式,按照 3.1節 部署完因子計算平台後,内置了如下複雜因子,以調試 RSI 和 MACD 為例:
定義完新的因子表達式後,手動把修改後的子產品檔案同步到 DolphinDB server。
第二步
在 DolphinDB GUI 的 scripts 目錄建立腳本檔案,執行下述代碼,生成 Json 格式的配置檔案:
use DolphinDBModules::SnapshotFactorCalculationPlatform::JsonConfig::JsonGetFileString
// 第一類分鐘因子配置參數
FactorLevel1 = `Close`Press`Close`Press
isInc = `true`false`true`false
barMinutesLevel1 = 1 1 5 5
useSystemTime = `false`false`false`false
// 第二類複雜因子配置參數
FactorLevel2 = `RSI`MACD`RSI`MACD
barMinutesLevel2 = [[1, 1], [1], [5], [5]]
colNameLevel2 = [`RSI, `DIF`DEA`MACD, `RSI, `DIF`DEA`MACD]
paramsName = [`N, `SHORT_`LONG_`M, `N,`SHORT_`LONG_`M]
paramsValue = [[[24], [30]], [[18, 30, 10]], [[24]], [[9, 25, 6]], [[12, 26, 9]]]
// 指定存儲 Json 配置檔案的路徑
jsonPath = "./test.json"
JsonFileString = JsonGetFileString(FactorLevel1, isInc, barMinutesLevel1, useSystemTime, FactorLevel2, barMinutesLevel2, colNameLevel2, paramsName, paramsValue)
saveTextFile(JsonFileString, jsonPath)
代碼說明:
- FactorLevel2:指定需要計算的複雜因子名稱,必須在 step1 中定義,并同步到 DolphinDB server。
- barMinutesLevel2:與 FactorLevel1 的長度相同,例子中第一個元素 [1, 1] 表示對 FactorLevel2[0](RSI )做兩個 1 分鐘頻率的計算,其視窗大小通過 paramsValue 配置。
- colNameLevel2:與 FactorLevel1 的長度相同,表示因子計算函數輸出的列名。
- paramsName:與 FactorLevel1 的長度相同,表示因子計算函數的參數名字。
- paramsValue:與 FactorLevel1 的長度相同,與 barMinutesLevel2 對應,如 [[24], [30]] 對應 barMinutesLevel2[0],即 [1, 1],表示對 RSI 做兩個 1 分鐘頻率的計算,其視窗大小分别是 24 和 30。
執行完畢後,會在 DolphinDB server 部署目錄生成一個 Json 格式的配置檔案 test.json,内容如下:
[{"factor": "Close", "isInc": true, "barMinute": 1, "level": 1, "useSystemTime": false}, {"factor": "High", "isInc": true, "barMinute": 1, "level": 1, "useSystemTime": false}, {"factor": "Low", "isInc": true, "barMinute": 1, "level": 1, "useSystemTime": false}, {"factor": "Close", "isInc": true, "barMinute": 5, "level": 1, "useSystemTime": false}, {"factor": "High", "isInc": true, "barMinute": 5, "level": 1, "useSystemTime": false}, {"factor": "Low", "isInc": true, "barMinute": 5, "level": 1, "useSystemTime": false}, {"factor": "RSI", "level": 2, "colName": `R_1, "barMinute": 1, "N": 24}, {"factor": "RSI", "level": 2, "colName": `R_2, "barMinute": 1, "N": 30}, {"factor": "MACD", "level": 2, "colName": `DIF_1`DEA_1`MACD_1, "barMinute": 1, "SHORT_": 18, "LONG_": 30, "M": 10}, {"factor": "RSI", "level": 2, "colName": `R_1, "barMinute": 5, "N": 24}, {"factor": "MACD", "level": 2, "colName": `DIF_1`DEA_1`MACD_1, "barMinute": 5, "SHORT_": 9, "LONG_": 25, "M": 6}]
第三步
在 DolphinDB GUI 的 scripts 目錄建立腳本檔案,執行下述代碼,部署計算服務:
// 初始化流計算環境
use DolphinDBModules::ops
clearAllStreamEnv()
go
// 執行計算服務部署函數
use DolphinDBModules::SnapshotFactorCalculationPlatform::JsonConfig::JsonConfigLoad
jsonPath = "./test.json"
parallel = 1
loadJsonConfig(jsonPath, parallel)
注意,ops 功能子產品中的 clearAllStreamEnv() 函數會把目前節點的所有訂閱、引擎和共享表都會清除,是以在多人協作開發的環境中使用時需要注意。
第四步
把測試的 csv 資料檔案放到 DolphinDB server 端伺服器的指定位置,例如本教程放在 /hdd/hdd9/tutorials/SnapshotFactorCalculationPlatform/test.csv,測試的 csv 資料可在教程附錄下載下傳。然後在 DolphinDB GUI 的 scripts 目錄建立腳本檔案,執行下述代碼,把 csv 資料按照流的方式回放進來:
use DolphinDBModules::SnapshotFactorCalculationPlatform::snapshotReplay
csvPath = "/hdd/hdd9/tutorials/SnapshotFactorCalculationPlatform/test.csv"
snapshotCsvReplayJob(csvPath, snapshotStream)
此時可以在 DolphinDB GUI 中執行函數 now(),起到重新整理用戶端的效果,可以看到右下角變量欄的結果表不斷地在更新,檢視結果表中的資料,以 1 分鐘複雜因子計算結果表為例,具體内容如下:
CostTime 表示單次響應的計算耗時,機關是微秒。
3.4 Python 用戶端排程任務和訂閱結果
本教程用 jupyter 環境調試,具體測試軟體版本如下:
- DolphinDB server 版本:2.00.9.2
- DolphinDB Python API 版本:1.30.21.1
- Python 的版本:3.7.6
第一步
導入依賴的 Python 包,并與 DolphinDB server 建立連接配接:
import dolphindb as ddb
import numpy as np
s = ddb.session(host="localhost", port=8892, userid='admin', password='123456',enablePickle=False)
第二步
部署因子計算服務:
jsonPath = "./modules/DolphinDBModules/SnapshotFactorCalculationPlatform/testConfig.dos"
parallel = 1
scripts = """
use DolphinDBModules::ops
clearAllStreamEnv()
go
use DolphinDBModules::SnapshotFactorCalculationPlatform::JsonConfig::JsonConfigLoad
loadJsonConfig("{0}", {1})
""".format(jsonPath, parallel)
s.run(scripts)
jsonPath 路徑是指 DolphinDB server 端的相對路徑,是調試用的預設測試配置檔案。
第三步
執行資料回放服務:
csvPath = "/hdd/hdd9/tutorials/SnapshotFactorCalculationPlatform/test.csv"
scripts = """
use DolphinDBModules::SnapshotFactorCalculationPlatform::snapshotReplay
snapshotCsvReplayJob("{0}", snapshotStream)
""".format(csvPath)
s.run(scripts)
csvPath 路徑是指 DolphinDB server 所在伺服器的絕對路徑,需要使用者自己下載下傳測試資料(見附錄),并放到相應目錄。例如,本教程測試環境資料檔案所在路徑是 /hdd/hdd9/tutorials/SnapshotFactorCalculationPlatform/test.csv。
第四步
查詢資料至 python 用戶端:
queryDate = "2021.12.01"
SecurityID = "600000"
scripts = """
select * from resultTable1Min where date(DateTime)={0}, SecurityID="{1}"
""".format(queryDate, SecurityID)
resultdf = s.run(scripts)
resultdf
resultdf 的内容如下:
第五步
Python 用戶端訂閱 DolphinDB server 端的結果表:
s.enableStreaming(0)
def handler(lst):
print(lst)
s.subscribe(host="localhost", port=8892, handler=handler, tableName="aggr1Min", actionName="sub1min", offset=0, msgAsTable=False, filter=np.array(['600010']))
此處 offset 設定為 0,表示從結果表的第一行資料開始訂閱,訂閱傳回結果如下:
如果想取消訂閱,可以執行下述代碼:
s.unsubscribe(host="localhost", port=8892,tableName="aggr1Min",actionName="sub1min")
調試完畢後,建議手動關閉 Python 用戶端會話:
s.close()
其它語言的用戶端,如 C++, Java 等,與 DolphinDB server 互動的方式與 Python 用戶端相似,具體參考官方的教程文檔即可。
3.5 計算結果實時推送至 Kafka
DolphinDB server 計算的結果,也可以實時推送到客戶本地的低延時消息總線。本教程以推送至 Kafka 為例。開始調試下述功能的前提條件是在 Kafka 中建立好 aggr1Min 的 topic,同時 DolphinDB server 已經加載 Kafka 插件。
在 DolphinDB GUI 的 scripts 目錄建立腳本檔案,執行下述代碼,把 1 分鐘因子計算結果表中的資料推送至 Kafka:
use DolphinDBModules::SnapshotFactorCalculationPlatform::resultToKafka
producer = initKafkaProducer("localhost:9092")
subscribeTable(tableName="aggr1Min", actionName="aggr1MinToKafka", offset=0, handler=aggr1MinToKafka{producer, "aggr1Min"}, msgAsTable=true)
其中,initKafkaProducer("localhost:9092")是指 Kafka 服務的部署 IP 和 Port。
Kafka 的消費者可以及時消費這些資料,如下圖所示:
從 DolphinDB server 的監控日志可以看到,計算結果推送至 Kafka 的平均耗時約 180 微秒:
4. 流計算相關問題解答
4.1 時間序列引擎計算分鐘因子
在基于 Level-2 快照資料做實時分鐘因子加工的時候,比如實時做 K 線,常常會面臨以下幾個問題:
- 以機器時間還是事件時間作為視窗關閉的信号?
本教程示例因子計算平台預設以事件時間作為視窗關閉的觸發信号。DolphinDB 内置的時間序列引擎的 useSystemTime 參數可以控制以機器時間還是事件時間作為視窗關閉的信号。
- 如果使用事件時間作為視窗關閉的信号,如何保證及時關閉不活躍股票的計算視窗?
本教程示例因子計算平台以股票為分組,不同組(即不同股票)之間沒有觸發視窗關閉的機制。DolphinDB 内置的時間序列引擎的 forceTriggerTime 參數設定後,可以通過活躍股票的資料強制觸發不活躍股票的計算視窗關閉。
- 如果使用事件時間作為視窗關閉的信号,如何保證及時關閉 11:30(午間休市)、14:57(連續競價結束)的計算視窗 ?
本教程示例因子計算平台沒有配置該功能。DolphinDB 内置的日級時間序列引擎的 forceTriggerSessionEndTime 參數可以滿足上述場景。
- 計算時視窗邊界的開閉是左閉右開還是左開右閉?
本教程示例因子計算平台的規則是左閉右開。DolphinDB 内置的時間序列引擎的 closed 參數可以控制視窗邊界規則。
- 計算輸出結果的時間是計算視窗起始時間還是結束時間?
本教程示例因子計算平台的規則是計算視窗起始時間。DolphinDB 内置的時間序列引擎的 useWindowStartTime 參數可以輸出規則。
- 如果在某個計算視窗内,分組内沒有資料,如何用前值或者指定值填充?
本教程示例因子計算平台沒有設定填充規則。DolphinDB 内置的時間序列引擎的 fill 參數可以指定填充規則,若(某個分組的)某個視窗無資料時,支援以下幾種填充的規則:none 表示不輸出結果;null 表示輸出結果為 NULL;ffill 表示輸出上一個有資料的視窗的結果;具體數值,需要和對應的 metrics 計算結果的類型保持一緻。
4.2 響應式狀态引擎計算複雜因子
在基于分鐘因子進一步加工有狀态的複雜因子的時候,比如實時計算 MACD、RSI 等,常常會面臨以下幾個問題:
- 計算因子是有狀态的:不僅與目前的多個名額有關,而且與多個名額的曆史狀态相關,如何開發有狀态的算子?
DolphinDB 内置了大量有狀态的算子,并進行了增量計算的優化,具體已經内置算子如下所示。
- 累計視窗函數:cumavg, cumsum, cumprod, cumcount, cummin, cummax, cumvar, cumvarp, cumstd, cumstdp, cumcorr, cumcovar, cumbeta, cumwsum, cumwavg。
- 滑動視窗函數:ema, mavg, msum, mcount, mprod, mvar, mvarp, mstd, mstdp, mskew, mkurtosis, mmin, mmax, mimin, mimax, mmed, mpercentile, mrank, mcorr, mcovar, mbeta, mwsum, mwavg, mslr。
- 序列相關函數:deltas, ratios, ffill, move, prev, iterate, ewmMean, ewmVar, ewmStd, ewmCovar, ewmCorr。
DolphinDB 也允許使用者用插件來開發自己的狀态函數,注冊後即可在狀态引擎中使用。
- 在一次響應計算過程中,如果計算 1000 個因子,這 1000 個因子依賴一個共同的中間變量,如何避免重複計算?
比如在上述因子計算平台的複雜因子計算處,有兩個因子,分别叫 factor1 和 factor2,表達式如下:
@state
def factor1(price) {
a = ema(price, 20)
b = ema(price, 40)
tmp = 1000 * (a-b)\(a+b)
return ema(tmp , 10) - ema(tmp , 20)
}
@state
def factor2(price) {
a = ema(price, 20)
b = ema(price, 40)
tmp = 1000 * (a-b)\(a+b)
return mavg(tmp, 10)
}
可以看到,兩個因子的計算都依賴了相同的中間變量tmp。如果要避免中間變量tmp的重複計算,可以先定義一個tmpFactor的函數,表達式如下:
@state
def tmpFactor(price) {
a = ema(price, 20)
b = ema(price, 40)
tmp = 1000 * (a-b)\(a+b)
return tmp
}
然後把 factor1 和 factor2 的表達式用如下方式表示:
@state
def factor1(price) {
tmp = tmpFactor(price)
return ema(tmp , 10) - ema(tmp , 20)
}
@state
def factor2(price) {
tmp = tmpFactor(price)
return mavg(tmp, 10)
}
DolphinDB 内置的響應式狀态引擎在解析複雜因子的計算表達式的時候,就會自動避免不同因子相同中間變量的重複計算。
5. 路線圖(Roadmap)
- 進一步完善 Level-2 快照資料流批一體因子計算平台的功能子產品:
- 開放更多配置參數,解決基于 Level-2 快照資料做實時分鐘因子加工的時候遇到的問題
- 補充 Factor1 和 Factor2 下的内置因子,豐富内置因子庫
- 開發 Level-2 快照頻率流批一體因子計算平台功能子產品
- 開發 Level2 多資料源融合流批一體因子計算平台功能子產品
附錄
功能子產品源碼: https://gitee.com/dolphindb/Tutorials_CN/tree/master/script/Level2_Snapshot_Factor_Calculation/DolphinDBModules/SnapshotFactorCalculationPlatform
按照教程,把module内容同步到server後,測試所需腳本: https://gitee.com/dolphindb/Tutorials_CN/blob/master/script/Level2_Snapshot_Factor_Calculation/test_scripts.zip
測試的 csv 資料:https://gitee.com/dolphindb/Tutorials_CN/tree/master/data/Level2_Snapshot_Factor_Calculation