天天看點

量化因子在 DolphinDB 中的流式實作攻略

作者:DolphinDB

DolphinDB 是一款高性能分布式時序資料庫。與傳統的關系資料庫和常見的時序資料庫不同,DolphinDB 不僅提供了高速存取時序資料的基本功能,而且内置了向量化的多範式程式設計語言與強大的計算引擎。DolphinDB 的計算引擎不僅可以用于量化金融的回測和研發,也可以用于生産環境的實時計算,譬如各種頻率的金融量化因子的流式實時計算。

1. 概述

1.1 DolphinDB 流計算架構

DolphinDB 内置的流資料架構支援流資料的釋出,訂閱,預處理,實時記憶體計算,複雜名額的滾動視窗計算、滑動視窗計算、累計視窗計算等,是一個運作高效、使用便捷的流資料處理架構。

量化因子在 DolphinDB 中的流式實作攻略

本教程主要介紹如何在 “流資料表 ——> 訂閱者(内置流計算引擎) ——> 計算結果” 這段過程中,利用内置流計算引擎實作金融量化因子并優化之。

DolphinDB 内置流資料引擎詳情可見: 流資料引擎 - DolphinDB 2.0 documentation

1.2 資料結構

本教程中的代碼基于上交所 level 2 行情的日 K 線,逐筆成交,快照資料進行調試。

測試資料在 DolphinDB 中存儲的表結構如下:

(1)日 K 線

字段名稱 資料說明 字段 含義
securityID 證券代碼 lastPx 收盤價
dateTime 日期時間 volume 成交量
preClosePx 昨收價 amount 成交金額
openPx 開始價 iopv 淨值估值
highPx 最高價 fp_Volume 盤後固定價格交易成交量
lowPx 最低價 fp_Amount 盤後固定價格交易成交金額

(2)逐筆成交

字段名稱 資料說明 字段 含義
securityID 證券代碼 buyNo 買方訂單号
tradeTime 日期時間 sellNo 賣方訂單号
tradePrice 成交價格 tradeBSFlag 内外盤标志
tradeQty 成交量 tradeIndex 成交序号
tradeAmount 成交金額 channelNo 成交通道

(3)行情快照

DolphinDB 提供了存儲可變長二維數組的資料類型 array vector。在資料存儲時,可以選擇将資料類型相同且含義相近的多列存為一列。

是以,level 2 快照行情的多檔資料可以選擇:①多列存儲;② 用 array vector 存為一列。

量化因子在 DolphinDB 中的流式實作攻略

多檔多列存儲,共194列

量化因子在 DolphinDB 中的流式實作攻略

array vector 存儲,共42列

字段名稱(多檔多列) 字段名稱(array vector) 資料說明
securityID securityID 證券代碼
dateTime dateTime 日期時間
preClosePx preClosePx 昨收價
openPx openPx 開始價
highPx highPx 最高價
lowPx lowPx 最低價
lastPx lastPx 最新價
totalVolumeTrade totalVolumeTrade 成交總量
totalValueTrade totalValueTrade 成交總金額
instrumentStatus instrumentStatus 交易狀态
bidPrice0 .. bidPrice9 bidPrice 申買十價
bidOrderQty0 .. bidOrderQty0 bidOrderQty 申買十量
bidNumOrders0 .. bidNumOrders9 bidNumOrders 申買十實際總委托筆數
bidOrders0 .. bidOrders49 bidOrders 申買一前 50 筆訂單
offerPrice0 .. offerPrice9 offerPrice 申賣十價
offerOrderQty0 .. offerOrderQty9 offerOrderQty 申賣十量
offerNumOrders0 .. offerNumOrders9 offerNumOrders 申賣十實際總委托筆數
offerOrders0 .. offerOrders49 offerOrders 申賣一前 50 筆訂單
numTrades numTrades 成交筆數
iopv iopv ETF 淨值估值
totalBidQty totalBidQty 委托買入總量
totalOfferQty totalOfferQty 委托賣出總量
weightedAvgBidPx weightedAvgBidPx 權重平均委買價格
weightedAvgOfferPx weightedAvgOfferPx 權重平均委賣價格
totalBidNumber totalBidNumber 買入總筆數
totalOfferNumber totalOfferNumber 賣出總筆數
bidTradeMaxDuration bidTradeMaxDuration 買入成交最大等待時間
offerTradeMaxDuration offerTradeMaxDuration 買入成交最大等待時間
numBidOrders numBidOrders 買方委托價位數
numOfferOrders numOfferOrders 賣方委托價位數
withdrawBuyNumber withdrawBuyNumber 買入撤單筆數
withdrawBuyAmount withdrawBuyAmount 買入撤單數量
withdrawBuyMoney withdrawBuyMoney 買入撤單金額
withdrawSellNumber withdrawSellNumber 賣出撤單筆數
withdrawSellAmount withdrawSellAmount 賣出撤單數量
withdrawSellMoney withdrawSellMoney 賣出撤單金額
etfBuyNumber etfBuyNumber ETF 申購筆數
etfBuyAmount etfBuyAmount ETF 申購數量
etfBuyMoney etfBuyMoney ETF 申購金額
etfSellNumber etfSellNumber ETF 贖回筆數
etfSellAmount etfSellAmount ETF 贖回數量
etfSellMoney etfSellMoney ETF 贖回金額

2. 日頻因子流式實作

DolphinDB 内置的流計算引擎有時間序列引擎,響應式狀态引擎,橫截面引擎等。在實際場景下,複雜的因子可能涉及橫截面、曆史狀态、時序視窗三種邏輯,需要多個引擎級聯才能實作因子邏輯。

為了提高轉寫效率,DolphinDB 提供了一個 引擎流水線解析器(Stream Engine Parser)可以自動解析并建立引擎流水線,使得使用者無需編寫複雜的級聯代碼。是以複雜日頻因子的實時流計算一般考慮使用 引擎流水線解析器 來實作。

DolphinDB 目前已經實作了 WorldQuant 101 Alpha 因子庫 和 國泰君安 191 Alphas 因子庫 兩個因子庫内的函數,分别封裝在 wq101alpha.dos 和 gtja191Alpha.dos 子產品中。這兩個子產品實作了批流一體,使用者可以友善地通過函數 streamEngineParser 實作子產品中日頻因子的流式計算。

2.1 實作示例

本節以 WorldQuant Alpha 101 和國泰君安 191 中的因子為例,說明日頻因子流式實作方法。

2.1.1 WorldQuant Alpha 1

  • 因子計算邏輯(Alpha001):

(rank(Ts_ArgMax(SignedPower(((returns < 0) ? stddev(returns, 20) : close), 2.), 5)) - 0.5)

  • DolphinDB 實作代碼:
def wqAlpha1(close){
    ts = mimax(pow(iif(ratios(close) - 1 < 0, mstd(ratios(close) - 1, 20), close), 2.0), 5)
    return rowRank(X=ts, percent=true) - 0.5
}           
  • 實時流計算:
// 定義輸入輸出的表結構
colName = `securityID`dateTime`preClosePx`openPx`highPx`lowPx`lastPx`volume`amount`iopv`fp_Volume`fp_Amount
colType = ["SYMBOL","TIMESTAMP","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","INT","DOUBLE","DOUBLE","INT","DOUBLE"]
inputTable = table(1:0, colName, colType)
resultTable = table(10000:0, ["dateTime", "securityID", "factor"], [TIMESTAMP, SYMBOL, DOUBLE])

// 使用 streamEngineParser 建立引擎流水線
try{ dropStreamEngine("alpha1Parser0")} catch(ex){ print(ex) }
try{ dropStreamEngine("alpha1Parser1")} catch(ex){ print(ex) }
metrics = <[securityid, wqAlpha1(preClosePx)]>
streamEngine = streamEngineParser(name="alpha1Parser", metrics=metrics, dummyTable=inputTable, outputTable=resultTable, keyColumn="securityID", timeColumn=`dateTime, triggeringPattern='keyCount', triggeringInterval=3000)

// 檢視引擎
getStreamEngineStat()
/*
ReactiveStreamEngine->
name          user  status lastErrMsg numGroups numRows numMetrics memoryInUsed snapshotDir ...
------------- ----- ------ ---------- --------- ------- ---------- ------------ ----------- 
alpha1Parser0 admin OK                0         0       2          13392                    ...

CrossSectionalEngine->
name         user  status lastErrMsg numRows numMetrics metrics      triggering...triggering......
------------ ----- ------ ---------- ------- ---------- ------------ --------------- --------------- ---
alpha1Parser1admin OK                0       2          securityid...keyCount     3000         ...
*/           

上述代碼建立了一個名為 “alpha1Parser” 的引擎流水線。通過 getStreamEngineStat 函數可以觀察到,該引擎流水線由一個名為 “alpha1Parser0” 的響應式狀态引擎和一個名為 “alpha1Parser1” 的橫截面引擎組成。

其中 securityID 作為分組鍵,dateTime 是時間列,輸入的消息格式同記憶體表 inputTable,需要計算的名額定義在 metrics 裡,結果輸出到記憶體表 resultTable 中。橫截面資料計算的觸發方式是 keyCount,即目前時間的資料累積超過 3000 條或者新時間點的資料到來才會觸發一次計算。

建立引擎之後,即可向引擎中插入幾條資料,并觀察計算結果。

// 向引擎添加資料
insert into streamEngine values(`000001, 2023.01.01, 30.85, 30.90, 31.65, 30.55, 31.45, 100, 3085, 0, 0, 0)
insert into streamEngine values(`000002, 2023.01.01, 30.86, 30.55, 31.35, 29.85, 30.75, 120, 3703.2, 0, 0, 0)
insert into streamEngine values(`000001, 2023.01.02, 30.80, 30.95, 31.05, 30.05, 30.85, 200, 6160, 0, 0, 0)
insert into streamEngine values(`000002, 2023.01.02, 30.81, 30.99, 31.55, 30.15, 30.65, 180, 5545.8, 0, 0, 0)
insert into streamEngine values(`000001, 2023.01.03, 30.83, 31.00, 31.35, 30.35, 30.55, 230, 7090.9, 0, 0, 0)
insert into streamEngine values(`000002, 2023.01.03, 30.89, 30.85, 31.10, 30.00, 30.45, 250, 7722.5, 0, 0, 0)
insert into streamEngine values(`000001, 2023.01.04, 30.90, 30.86, 31.10, 30.40, 30.75, 300, 9270, 0, 0, 0)
insert into streamEngine values(`000002, 2023.01.04, 30.85, 30.95, 31.65, 30.55, 31.45, 270, 8329.5, 0, 0, 0)
insert into streamEngine values(`000001, 2023.01.05, 30.86, 30.55, 31.35, 29.85, 30.75, 360, 11109.6, 0, 0, 0)
insert into streamEngine values(`000002, 2023.01.05, 30.80, 30.95, 31.05, 30.05, 30.85, 200, 6160, 0, 0, 0)
insert into streamEngine values(`000001, 2023.01.06, 30.81, 30.99, 31.55, 30.15, 30.65, 180, 5545.8, 0, 0, 0)
insert into streamEngine values(`000002, 2023.01.06, 30.83, 31.00, 31.35, 30.35, 30.55, 230, 7090.9, 0, 0, 0)
insert into streamEngine values(`000001, 2023.01.07, 30.89, 30.85, 31.10, 30.00, 30.45, 250, 7722.5, 0, 0, 0)
insert into streamEngine values(`000002, 2023.01.07, 30.90, 30.86, 31.10, 30.40, 30.75, 300, 9270, 0, 0, 0)
insert into streamEngine values(`000001, 2023.01.08, 30.89, 30.85, 31.10, 30.00, 30.45, 250, 7722.5, 0, 0, 0)
insert into streamEngine values(`000002, 2023.01.08, 30.90, 30.86, 31.10, 30.40, 30.75, 300, 9270, 0, 0, 0)

// 檢視結果
select factor from resultTable pivot by dateTime, securityID
/*
dateTime                000001 000002
----------------------- ------ ------
2023.01.01T00:00:00.000              
2023.01.02T00:00:00.000              
2023.01.03T00:00:00.000              
2023.01.04T00:00:00.000              
2023.01.05T00:00:00.000 0.5    0     
2023.01.06T00:00:00.000 0.5    0     
2023.01.07T00:00:00.000 0      0.5   
*/           

2.1.2 國泰君安 001 因子

  • 因子計算邏輯(國泰君安001因子):

(-1 * CORR(RANK(DELTA(LOG(VOLUME),1)),RANK(((CLOSE-OPEN)/OPEN)),6)

  • DolphinDB 實作代碼:
def gtjaAlpha1(open, close, vol){
	delta = deltas(log(vol)) 
    return -1 * (mcorr(rowRank(delta, percent=true), rowRank((close - open) \ open, percent=true), 6))
}           
  • 實時流計算:
實時流計算:
// 定義輸入輸出的表結構
colName = `securityID`dateTime`preClosePx`openPx`highPx`lowPx`lastPx`volume`amount`iopv`fp_Volume`fp_Amount
colType = ["SYMBOL","TIMESTAMP","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","INT","DOUBLE","DOUBLE","INT","DOUBLE"]
inputTable = table(1:0, colName, colType)
resultTable = table(10000:0, ["securityID", "dateTime", "factor"], [SYMBOL, TIMESTAMP, DOUBLE])

// 使用 streamEngineParser 建立引擎流水線
try{ dropStreamEngine("gtja1Parser0")} catch(ex){ print(ex) }
try{ dropStreamEngine("gtja1Parser1")} catch(ex){ print(ex) }
try{ dropStreamEngine("gtja1Parser2")} catch(ex){ print(ex) }
metrics = <[gtjaAlpha1(openPx, preClosePx, volume)]>
streamEngine = streamEngineParser(name="gtja1Parser", metrics=metrics, dummyTable=inputTable, outputTable=resultTable, keyColumn="securityID", timeColumn=`dateTime, triggeringPattern='keyCount', triggeringInterval=3000)

// 檢視引擎
getStreamEngineStat()
/*
ReactiveStreamEngine->
name         user  status lastErrMsg numGroups numRows numMetrics memoryInUsed snapshotDir ...
------------ ----- ------ ---------- --------- ------- ---------- ------------ ----------- 
gtja1Parser0 admin OK                0         0       4          808                      ...
gtja1Parser2 admin OK                0         0       2          872                      ...

CrossSectionalEngine->
name         user  status lastErrMsg numRows numMetrics metrics      triggering...triggering......
------------ ----- ------ ---------- ------- ---------- ------------ --------------- --------------- ---
gtja1Parser1 admin OK                0       3          securityID...keyCount     3000         ...
*/
上述代碼建立了一個名為 “gtja1Parser” 的引擎流水線。通過 getStreamEngineStat 函數可以觀察到,該引擎流水線由一個名為 “gtja1Parser0” 的響應式狀态引擎、一個名為 “gtja1Parser1” 的橫截面引擎和一個名為 “gtja1Parser2” 的響應式狀态引擎組成。

其中 securityID 作為分組鍵,dateTime 是時間列,輸入的消息格式同記憶體表 inputTable,需要計算的名額定義在 metrics 裡,結果輸出到記憶體表 resultTable 中。橫截面資料計算的觸發方式是 keyCount,即目前時間的資料累積超過 3000 條或者新時間點的資料到來才會觸發一次計算。

建立完引擎之後,即可往引擎中插入幾條資料,并觀察計算結果。           

上述代碼建立了一個名為 “gtja1Parser” 的引擎流水線。通過 getStreamEngineStat 函數可以觀察到,該引擎流水線由一個名為 “gtja1Parser0” 的響應式狀态引擎、一個名為 “gtja1Parser1” 的橫截面引擎和一個名為 “gtja1Parser2” 的響應式狀态引擎組成。

其中 securityID 作為分組鍵,dateTime 是時間列,輸入的消息格式同記憶體表 inputTable,需要計算的名額定義在 metrics 裡,結果輸出到記憶體表 resultTable 中。橫截面資料計算的觸發方式是 keyCount,即目前時間的資料累積超過 3000 條或者新時間點的資料到來才會觸發一次計算。

建立完引擎之後,即可往引擎中插入幾條資料,并觀察計算結果。

// 向引擎添加資料
insert into streamEngine values(`000001, 2023.01.01, 30.85, 30.90, 31.65, 30.55, 31.45, 100, 3085, 0, 0, 0)
insert into streamEngine values(`000002, 2023.01.01, 30.86, 30.55, 31.35, 29.85, 30.75, 120, 3703.2, 0, 0, 0)
insert into streamEngine values(`000001, 2023.01.02, 30.80, 30.95, 31.05, 30.05, 30.85, 200, 6160, 0, 0, 0)
insert into streamEngine values(`000002, 2023.01.02, 30.81, 30.99, 31.55, 30.15, 30.65, 180, 5545.8, 0, 0, 0)
insert into streamEngine values(`000001, 2023.01.03, 30.83, 31.00, 31.35, 30.35, 30.55, 230, 7090.9, 0, 0, 0)
insert into streamEngine values(`000002, 2023.01.03, 30.89, 30.85, 31.10, 30.00, 30.45, 250, 7722.5, 0, 0, 0)
insert into streamEngine values(`000001, 2023.01.04, 30.90, 30.86, 31.10, 30.40, 30.75, 300, 9270, 0, 0, 0)
insert into streamEngine values(`000002, 2023.01.04, 30.85, 30.95, 31.65, 30.55, 31.45, 270, 8329.5, 0, 0, 0)
insert into streamEngine values(`000001, 2023.01.05, 30.86, 30.55, 31.35, 29.85, 30.75, 360, 11109.6, 0, 0, 0)
insert into streamEngine values(`000002, 2023.01.05, 30.80, 30.95, 31.05, 30.05, 30.85, 200, 6160, 0, 0, 0)
insert into streamEngine values(`000001, 2023.01.06, 30.81, 30.99, 31.55, 30.15, 30.65, 180, 5545.8, 0, 0, 0)
insert into streamEngine values(`000002, 2023.01.06, 30.83, 31.00, 31.35, 30.35, 30.55, 230, 7090.9, 0, 0, 0)
insert into streamEngine values(`000001, 2023.01.07, 30.89, 30.85, 31.10, 30.00, 30.45, 250, 7722.5, 0, 0, 0)
insert into streamEngine values(`000002, 2023.01.07, 30.90, 30.86, 31.10, 30.40, 30.75, 300, 9270, 0, 0, 0)
insert into streamEngine values(`000001, 2023.01.08, 30.89, 30.85, 31.10, 30.00, 30.45, 250, 7722.5, 0, 0, 0)
insert into streamEngine values(`000002, 2023.01.08, 30.90, 30.86, 31.10, 30.40, 30.75, 300, 9270, 0, 0, 0)

// 檢視結果
select factor from resultTable pivot by dateTime, securityID
/*
dateTime                000001 000002            
----------------------- ------ ------------------
2023.01.01T00:00:00.000                          
2023.01.02T00:00:00.000                          
2023.01.03T00:00:00.000                          
2023.01.04T00:00:00.000                          
2023.01.05T00:00:00.000                          
2023.01.06T00:00:00.000 -1     -1
2023.01.07T00:00:00.000 -1     -1          
*/           

2.2 改寫規則

在 DolphinDB 中,内置了各種流計算引擎來實作因子的流式計算。其中很重要的一步就是将因子計算邏輯轉寫成引擎能夠正确解析并執行的函數,即改寫成引擎能夠識别分解的 metrics。

  • streamEngineParser 的 metrics 的解析規則如下:

① 行計算系列(row 系列)的函數會被分發給橫截面引擎進行計算,是以涉及橫截面計算的邏輯需要使用 row 系列函數。如果沒有對應的 row 系列的函數,使用者可以通過高階函數 byRow 自行實作逐行計算的邏輯。

② rolling 函數會被分發給時序聚合引擎進行計算,是以涉及時序視窗的計算需要使用 rolling 函數。

③ 其餘所有計算會被分發給響應式狀态引擎進行計算。響應式狀态引擎因子的具體轉寫注意事項可以參見第3章節。

以 2.1.2 章節的國泰君安 191 的 1 号因子為例說明上述解析規則。

def gtjaAlpha1(open, close, vol){
	delta = deltas(log(vol)) 
    return -1 * (mcorr(rowRank(delta, percent=true), rowRank((close - open) \ open, percent=true), 6))
}           

其中 delta = deltas(log(vol)) 不屬于 row 系列函數,也不是調用 rolling 函數,會被分發到響應式狀态引擎進行計算;rowRank(delta, percent=true) 和 rowRank((close - open) \ open, percent=true) 調用了 rowRank 函數,會被分發到橫截面引擎進行計算;-1 * (mcorr(..., ..., 6)) 不屬于 row 系列函數,也不是調用 rolling 函數,會被分發到響應式狀态引擎進行計算。

結合 getStreamEngineStat() 函數傳回的結果可以得出整個引擎流水線的計算過程如下:輸入資料首先傳入名為 “gtja1Parser0” 的響應式狀态引擎;“gtja1Parser0” 計算 delta = deltas(log(vol)) 後将計算結果輸出到名為 “gtja1Parser1” 的橫截面引擎;“gtja1Parser1” 計算 rowRank(delta, percent=true) 和 rowRank((close - open) \ open, percent=true) 後将計算結果輸出到名為 “gtja1Parser2” 的響應式狀态引擎;“gtja1Parser2” 計算 -1 * (mcorr(..., ..., 6)) 後将計算結果輸出到結果表 resultTable。

2.3 注意事項

(1)橫截面引擎的 timeColumn 參數隻支援 TIMESTAMP 類型。

(2)因為不同引擎的輸出表的各列順序不同,是以輸出表結構的定義需要根據因子的最後一步邏輯來決定。

  • 時序聚合引擎:輸出表的各列的順序為:時間列,分組列,計算結果列。
  • 響應式狀态引擎:輸出表的各列的順序為:分組列,時間列,計算結果列。(響應式狀态引擎的輸出表中時間列不是必須的,但是因為時間序列聚合引擎以及橫截面引擎的輸入輸出表需包含時間列,是以流水線中的響應式狀态引擎輸出時會自動增加時間列。)
  • 橫截面引擎:輸出表的各列的順序為:時間列,分組列 (contextByColumn),計算結果列。

以 2.1.1 章節的 WorldQuant Alpha 101 的 1 号因子為例,最後一步計算是 rowRank(...) - 0.5 會被分發到橫截面引擎内進行計算,是以結果輸出表的各列的順序為:時間列,分組列,計算結果列,即需要定義為 ["dateTime", "securityID", "factor"]。

以 2.1.2 章節的國泰君安191的 1 号因子為例,最後一步計算是 -1 * mcorr(...) 會被分發到響應式狀态引擎内計算,是以結果輸出表的各列的順序應該為:分組列,時間列,計算結果列,即定義為 ["securityID", "dateTime", "factor"]。

3. 高頻因子流式實作

響應式狀态引擎裡注入的每一條資料都會觸發一次計算,産生一條結果。高頻因子的實時流計算一般可以考慮使用 響應式狀态引擎(createReactiveStateEngine)來實作。

3.1 實作示例

3.1.1 價格漲跌幅

下面以基于逐筆成交資料計算價格漲跌幅為例,說明高頻因子流式實作方法。

  • 因子計算邏輯(價格漲跌幅):

tradePrice[i] / tradePrice0[i-lag] - 1

  • DolphinDB 實作代碼:
@state
def pricePercentChange(price, lag){
    return price \ move(price, lag) - 1
}           
  • 實時流計算:
// 定義輸入輸出的表結構
inputTable = table(1:0, `securityID`tradeTime`tradePrice`tradeQty`tradeAmount`buyNo`sellNo`tradeBSFlag`tradeIndex`channelNo, [SYMBOL,DATETIME,DOUBLE,INT,DOUBLE,LONG,LONG,SYMBOL,INT,INT])
resultTable = table(10000:0, ["securityID", "tradeTime", "factor"], [SYMBOL, TIMESTAMP, DOUBLE])

// 使用 createReactiveStateEngine 建立響應式狀态引擎
try{ dropStreamEngine("reactiveDemo")} catch(ex){ print(ex) }
metrics = <[tradeTime, pricePercentChange(tradePrice, 1)]>
rse = createReactiveStateEngine(name="reactiveDemo", metrics =metrics, dummyTable=inputTable, outputTable=resultTable, keyColumn="securityID")           

上述代碼建立了一個名為 “reactiveDemo” 響應式狀态引擎,其中 securityID 作為分組鍵,輸入的消息格式同記憶體表 inputTable,需要計算的名額定義在 metrics 裡,結果輸出到記憶體表 resultTable 中。

建立完引擎之後,即可往引擎中插入幾條資料,并觀察計算結果。

// 輸入資料
insert into rse values(`000155, 2020.01.01T09:30:00, 30.85, 100, 3085, 4951, 0, `B, 1, 1)
insert into rse values(`000155, 2020.01.01T09:30:01, 30.86, 100, 3086, 4951, 1, `B, 2, 1)
insert into rse values(`000155, 2020.01.01T09:30:02, 30.80, 200, 6160, 5501, 5600, `S, 3, 1)

// 檢視結果
select * from resultTable
/*
securityID tradeTime               factor            
---------- ----------------------- ------------------
000155     2020.01.01T09:30:00.000                   
000155     2020.01.01T09:30:01.000 0.0003
000155     2020.01.01T09:30:02.000 -0.001944
*/           

3.1.2 權重平均價格

下面以基于快照行情資料計算權重平均價格為例,說明高頻因子流式實作方法。

  • 因子計算邏輯(權重平均價格):

wap = (bidPrice0*offerOrderQty0 + offerPrice0*bidOrderQty0) / (offerOrderQty0+bidOrderQty0)

  • DolphinDB 實作代碼:
def weightedAveragedPrice(bidPrice0, bidOrderQty0, offerPrice0, offerOrderQty0){
    return (bidPrice0*offerOrderQty0 + offerPrice0*bidOrderQty0) \ (offerOrderQty0+bidOrderQty0)
}           
  • 實時流計算:
// 定義輸入輸出的表結構
colName = ["securityID","dateTime","preClosePx","openPx","highPx","lowPx","lastPx","totalVolumeTrade","totalValueTrade","instrumentStatus"] <- flatten(eachLeft(+, ["bidPrice","bidOrderQty","bidNumOrders"], string(0..9))) <- ("bidOrders"+string(0..49)) <- flatten(eachLeft(+, ["offerPrice","offerOrderQty","offerNumOrders"], string(0..9))) <- ("offerOrders"+string(0..49)) <- ["numTrades","iopv","totalBidQty","totalOfferQty","weightedAvgBidPx","weightedAvgOfferPx","totalBidNumber","totalOfferNumber","bidTradeMaxDuration","offerTradeMaxDuration","numBidOrders","numOfferOrders","withdrawBuyNumber","withdrawBuyAmount","withdrawBuyMoney","withdrawSellNumber","withdrawSellAmount","withdrawSellMoney","etfBuyNumber","etfBuyAmount","etfBuyMoney","etfSellNumber","etfSellAmount","etfSellMoney"]
colType = ["SYMBOL","TIMESTAMP","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","INT","DOUBLE","SYMBOL"] <- take("DOUBLE", 10) <- take("INT", 70)<- take("DOUBLE", 10) <- take("INT", 70) <- ["INT","DOUBLE","INT","INT","DOUBLE","DOUBLE","INT","INT","INT","INT","INT","INT","INT","INT","DOUBLE","INT","INT","DOUBLE","INT","INT","DOUBLE","INT","INT","DOUBLE"]
resultTable = table(10000:0, ["securityID", "dateTime", "factor"], [SYMBOL, TIMESTAMP, DOUBLE])
inputTable = table(1:0, colName, colType)

// 使用 createReactiveStateEngine 建立響應式狀态引擎
try{ dropStreamEngine("reactiveDemo")} catch(ex){ print(ex) }
metrics = <[dateTime, weightedAveragedPrice(bidPrice0, bidOrderQty0, offerPrice0, offerOrderQty0)]>
rse = createReactiveStateEngine(name="reactiveDemo", metrics =metrics, dummyTable=inputTable, outputTable=resultTable, keyColumn="securityID")           

上述代碼建立了一個名為 “reactiveDemo” 響應式狀态引擎,其中 securityID 作為分組鍵,輸入的消息格式同記憶體表 inputTable,需要計算的名額定義在 metrics 裡,結果輸出到記憶體表 resultTable 中。

建立完引擎之後,即可往引擎中插入幾條資料,并觀察計算結果。

// 輸入資料
tableInsert(rse, {"securityID":"000001", "dateTime":2023.01.01T09:30:00.000, "bidPrice0":19.98, "bidOrderQty0":100, "offerPrice0":19.99, "offerOrderQty0":120})
tableInsert(rse, {"securityID":"000001", "dateTime":2023.01.01T09:30:03.000, "bidPrice0":19.95, "bidOrderQty0":130, "offerPrice0":19.93, "offerOrderQty0":120})
tableInsert(rse, {"securityID":"000001", "dateTime":2023.01.01T09:30:06.000, "bidPrice0":19.97, "bidOrderQty0":120, "offerPrice0":19.98, "offerOrderQty0":130})

// 檢視結果
select * from resultTable
/*
securityID dateTime                factor            
---------- ----------------------- ------------------
000001     2023.01.01T09:30:00.000 19.9845 
000001     2023.01.01T09:30:03.000 19.9396
000001     2023.01.01T09:30:06.000 19.9748
*/           

從上面的實作示例可以發現,通過響應式狀态引擎可以友善快速地實作高頻因子的流式計算。其中很重要的一步就是将因子計算邏輯轉寫成引擎能夠正确解析并執行的函數,即改寫成引擎能夠識别分解的 metrics。

下面将重點介紹如何将 “因子計算邏輯” 轉寫為适配響應式狀态引擎 “DDB 實作代碼”,以及轉寫過程中的一些注意事項。

3.2 無狀态函數和狀态函數

3.2.1 無狀态函數

無狀态函數是指不需要回溯曆史資料,僅根據目前時刻傳入的參數即可獲得計算結果的函數。适合封裝不依賴曆史資料的計算邏輯。比如 3.1.2 章節中的權重平均價格。

注意:

如果在 createReactiveStateEngine 裡面指定了 keyColumn 參數,則響應式狀态引擎内會進行分組計算。以 keyColumn="securityID" 為例,引擎内會根據股票代碼分組計算。那麼對于同一個股票代碼的資料,引擎内會逐條計算;但對于不同股票代碼的資料,無狀态函數在引擎内會采取向量化計算。是以傳入無狀态函數的參數都是向量。

下面在 3.1.2 章節的權重平均價格的基礎上說明這個情況:

修改因子函數,增加 print 語句,觀察傳入的變量 bidPrice0。

def weightedAveragedPrice(bidPrice0, bidOrderQty0, offerPrice0, offerOrderQty0){
    print(typestr(bidPrice0))   // 檢視 bidPrice0 的資料類型
    print(bidPrice0)            // 檢視 bidPrice0 值
    return (bidPrice0*offerOrderQty0 + offerPrice0*bidOrderQty0) \ (offerOrderQty0+bidOrderQty0)
}

metrics = <[dateTime, weightedAveragedPrice(bidPrice0, bidOrderQty0, offerPrice0, offerOrderQty0)]>           

構造一批輸入引擎的資料。(這批資料總共 10 條,包含 7 個不同的标的,3個重複的标的)

setRandomSeed(9)
n = 10
securityID = take(lpad(string(1..7), 6, "0"), n)
dateTime = 2023.01.01T09:30:00.000 + 1..n
bidPrice0 = round(rand(20.0, n), 2)
bidOrderQty0 = rand(200, n)
offerPrice0 = round(rand(20.0, n), 2)
offerOrderQty0 = rand(200, n)
testData = table(securityID, dateTime, bidPrice0, bidOrderQty0, offerPrice0, offerOrderQty0)           
量化因子在 DolphinDB 中的流式實作攻略

建構響應式狀态引擎,并輸入這批資料:

// 定義輸入輸出的表結構
colName = ["securityID","dateTime","preClosePx","openPx","highPx","lowPx","lastPx","totalVolumeTrade","totalValueTrade","instrumentStatus"] <- flatten(eachLeft(+, ["bidPrice","bidOrderQty","bidNumOrders"], string(0..9))) <- ("bidOrders"+string(0..49)) <- flatten(eachLeft(+, ["offerPrice","offerOrderQty","offerNumOrders"], string(0..9))) <- ("offerOrders"+string(0..49)) <- ["numTrades","iopv","totalBidQty","totalOfferQty","weightedAvgBidPx","weightedAvgOfferPx","totalBidNumber","totalOfferNumber","bidTradeMaxDuration","offerTradeMaxDuration","numBidOrders","numOfferOrders","withdrawBuyNumber","withdrawBuyAmount","withdrawBuyMoney","withdrawSellNumber","withdrawSellAmount","withdrawSellMoney","etfBuyNumber","etfBuyAmount","etfBuyMoney","etfSellNumber","etfSellAmount","etfSellMoney"]
colType = ["SYMBOL","TIMESTAMP","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","INT","DOUBLE","SYMBOL"] <- take("DOUBLE", 10) <- take("INT", 70)<- take("DOUBLE", 10) <- take("INT", 70) <- ["INT","INT","INT","INT","DOUBLE","DOUBLE","INT","INT","INT","INT","INT","INT","INT","INT","DOUBLE","INT","INT","DOUBLE","INT","INT","INT","INT","INT","INT"]
inputTable = table(1:0, colName, colType)
resultTable = table(10000:0, ["securityID", "dateTime", "factor"], [SYMBOL, TIMESTAMP, DOUBLE])

// 使用 createReactiveStateEngine 建立響應式狀态引擎
try{ dropStreamEngine("reactiveDemo")} catch(ex){ print(ex) }
rse = createReactiveStateEngine(name="reactiveDemo", metrics =metrics, dummyTable=inputTable, outputTable=resultTable, keyColumn="securityID")

// 輸入資料
tableInsert(rse, testData.flip())           
量化因子在 DolphinDB 中的流式實作攻略

如上所示,傳入的 bidPrice0 是向量,并且這一批的 10 條資料會分兩次計算(前 7 條不同股票代碼的資料計算一次,後 3 條資料計算一次)。

是以,轉寫因子的時候需要注意參數的資料類型和函數支援的資料類型是否比對。

比如計算權重平均價格時,增加一層判斷邏輯:若 bidPrice0 > 0,則計算 wap;否則傳回預設值 default。

def weightedAveragedPrice(bidPrice0, bidOrderQty0, offerPrice0, offerOrderQty0, default){
    if(bidPrice0 > 0){  return (bidPrice0*offerOrderQty0 + offerPrice0*bidOrderQty0) \ (offerOrderQty0+bidOrderQty0)}
    return default
}

metrics = <[dateTime, weightedAveragedPrice(bidPrice0, bidOrderQty0, offerPrice0, offerOrderQty0, 0.0)]>           

如果在響應式狀态引擎的 metrics 裡直接調用上面的自定義函數 weightedAveragedPrice,會報錯。因為 if(condition){}else{} 裡面,condition 要求必須是一個标量,而傳參 bidPrice0 是向量,是以會報錯 “A scalar object is expected. But the actual object is a vector.”

針對這個問題,通用的解決方案是:使用 each/loop 把函數應用到向量的每個元素上。

對于上面的例子就是:weightedAveragedPrice 的寫法不變,在 factorWeightedAveragedPrice 裡調用 each 函數把函數 weightedAveragedPrice 應用到指定參數的每個元素上,使得傳入函數的參數是标量。在 metrics 裡則調用最外層的 factorWeightedAveragedPrice 函數。

def factorWeightedAveragedPrice(bidPrice0, bidOrderQty0, offerPrice0, offerOrderQty0){
    return each(weightedAveragedPrice{default=0.0}, bidPrice0, bidOrderQty0, offerPrice0, offerOrderQty0)
}

metrics = <[dateTime, factorWeightedAveragedPrice(bidPrice0, bidOrderQty0, offerPrice0, offerOrderQty0)]>           

上述處理方法是通用方法。但對于 if-else, DolphinDB 裡有函數 iif 可以替代,并且更推薦使用 iif。

def factorWeightedAveragedPrice(bidPrice0, bidOrderQty0, offerPrice0, offerOrderQty0){
    default = 0.0
    return iif(bidPrice0 > 0, (bidPrice0*offerOrderQty0 + offerPrice0*bidOrderQty0) \ (offerOrderQty0+bidOrderQty0), default)
}

metrics = <[dateTime, factorWeightedAveragedPrice(bidPrice0, bidOrderQty0, offerPrice0, offerOrderQty0)]>           

3.2.2 狀态函數

狀态函數是指計算中不僅用到目前資料,還會用到曆史資料的函數。比如 3.1.1 章節中的價格漲跌幅,不僅需要目前的價格資料,還需要前 lag 條的曆史價格資料。

注意:

(1)狀态函數需要用 @state 聲明。

(2)狀态函數内隻支援指派語句,return 語句和 if-else 語句。

其中 if-else 語句隻支援 condition 是一個無關上遊表格資料的标量。以 3.1.1 章節中的價格漲跌幅為例:pricePercentChange(price, lag) 的參數中 price 是上遊資料表中的一列, lag 是另外指定的視窗大小。是以狀态函數裡 if(lag>30){} 的判斷是可以的;但是 if(price>0){} 的判斷是不行的,需要轉化成 iif 語句 iif(price>0, , )。

// 以下寫法中 price 來自上遊表格,是以 price > 0 實時更新,不會是标量,這樣的 if-else 暫時不支援
@state
def pricePercentChange(price, lag){
    if(price>0){
        return price \ move(price, lag) - 1
    }else{
        return 0
    }
}

// 以下寫法中 lag 來自引擎定義時的額外指定,是以 lag>30 是個固定結果,是個标量,這樣的 if-else 可以支援
@state
def pricePercentChange(price, lag){
    if(lag>30){
        return price \ move(price, lag) - 1
    }else{
        return price \ move(price, 30) - 1
    }
}           

(3)狀态函數支援 rowSum、rowMax 等 row 系列函數和 cumSum、cumMax 等 cum 系列函數,但不支援 sum、max 等聚合函數。如果要在狀态函數裡面調用 sum 等聚合函數,可以用 rowSum 等 row 系列函數替代。

比如:求目前價格和倒數第 lag 條資料的價格的最大值。(max(tradePrice[i], tradePrice[i-lag]))

// 因子實作
@state
def maxPrice(price, lag){
    return rowMax([price, move(price, lag)])
}

// 定義輸入輸出的表結構
inputTable = table(1:0, `securityID`tradeTime`tradePrice`TradeQty`TradeAmount`BuyNo`SellNo`TradeBSFlag`TradeIndex`ChannelNo, [SYMBOL,DATETIME,DOUBLE,INT,DOUBLE,LONG,LONG,SYMBOL,INT,INT])
resultTable = table(10000:0, ["securityID", "tradeTime", "factor"], [SYMBOL, TIMESTAMP, DOUBLE])

// 使用 createReactiveStateEngine 建立響應式狀态引擎
try{ dropStreamEngine("reactiveDemo")} catch(ex){ print(ex) }
metrics = <[tradeTime, maxPrice(tradePrice, 2)]>
rse = createReactiveStateEngine(name="reactiveDemo", metrics =metrics, dummyTable=inputTable, outputTable=resultTable, keyColumn="securityID")

// 輸入資料
insert into rse values(`000155, 2020.01.01T09:30:00, 30.85, 100, 3085, 4951, 0, `B, 1, 1)
insert into rse values(`000155, 2020.01.01T09:30:01, 30.86, 100, 3086, 4951, 1, `B, 2, 1)
insert into rse values(`000155, 2020.01.01T09:30:02, 30.80, 200, 6160, 5501, 5600, `S, 3, 1)


// 檢視結果
select * from resultTable
/*
securityID tradeTime               factor            
---------- ----------------------- ------------------
000155     2020.01.01T09:30:00.000 30.85
000155     2020.01.01T09:30:01.000 30.86
000155     2020.01.01T09:30:02.000 30.85
*/           

④ 調用狀态函數的自定義函數也必須是狀态函數。

3.2.3 狀态和無狀态的拆分

響應式狀态引擎因子都可以用無狀态函數和狀态函數組合實作,其中狀态和無狀态的拆分是最重要的一步。

雖然有些無狀态的計算在狀态函數裡也可以支援,比如 rowSum 等。但是因為無狀态函數實作向量化計算,且不用儲存函數和變量的狀态,性能表現更好。是以不建議将所有計算都寫在一個狀态函數裡,而是要将複雜的截面計算拆分到無狀态函數中,在狀态函數裡隻保留一些自定義函數的調用和有關曆史資料的操作(比如 m 系列,tm系列, fill 相關,疊代等)。

3.2.3.1 移動平均買賣壓力

因子計算邏輯:

Step1:計算買賣壓力名額(買賣壓力名額)

量化因子在 DolphinDB 中的流式實作攻略

Step2:使用 mavg 計算過去 lag 行的移動平均買賣壓力名額

(press[i-lag+1]+…+press[i]) / lag

DDB 實作代碼:

  • 用一個狀态函數實作【不建議】
@state
def averagePress1(bidPrice0, bidPrice1, bidPrice2, bidPrice3, bidPrice4, bidPrice5, bidPrice6, bidPrice7, bidPrice8, bidPrice9, bidOrderQty0, bidOrderQty1, bidOrderQty2, bidOrderQty3, bidOrderQty4, bidOrderQty5, bidOrderQty6, bidOrderQty7, bidOrderQty8, bidOrderQty9, offerPrice0, offerPrice1, offerPrice2, offerPrice3, offerPrice4, offerPrice5, offerPrice6, offerPrice7, offerPrice8, offerPrice9, offerOrderQty0, offerOrderQty1, offerOrderQty2, offerOrderQty3, offerOrderQty4, offerOrderQty5, offerOrderQty6, offerOrderQty7, offerOrderQty8, offerOrderQty9, lag){
	bidPrice = fixedLengthArrayVector(bidPrice0, bidPrice1, bidPrice2, bidPrice3, bidPrice4, bidPrice5, bidPrice6, bidPrice7, bidPrice8, bidPrice9)
	bidOrderQty = fixedLengthArrayVector(bidOrderQty0, bidOrderQty1, bidOrderQty2, bidOrderQty3, bidOrderQty4, bidOrderQty5, bidOrderQty6, bidOrderQty7, bidOrderQty8, bidOrderQty9)
	offerPrice = fixedLengthArrayVector(offerPrice0, offerPrice1, offerPrice2, offerPrice3, offerPrice4, offerPrice5, offerPrice6, offerPrice7, offerPrice8, offerPrice9)
	offerOrderQty = fixedLengthArrayVector(offerOrderQty0, offerOrderQty1, offerOrderQty2, offerOrderQty3, offerOrderQty4, offerOrderQty5, offerOrderQty6, offerOrderQty7, offerOrderQty8, offerOrderQty9)
	wap = (bidPrice0*offerOrderQty0 + offerPrice0*bidOrderQty0) \ (offerOrderQty0+bidOrderQty0)
	bidPress = rowWavg(bidOrderQty, wap \ (bidPrice - wap))
	askPress = rowWavg(offerOrderQty, wap \ (offerPrice - wap))
	press = log(bidPress \ askPress)
	return mavg(press, lag, 1)
}           
  • 無狀态函數+狀态函數【推薦】
def calPress(bidPrice0, bidPrice1, bidPrice2, bidPrice3, bidPrice4, bidPrice5, bidPrice6, bidPrice7, bidPrice8, bidPrice9, bidOrderQty0, bidOrderQty1, bidOrderQty2, bidOrderQty3, bidOrderQty4, bidOrderQty5, bidOrderQty6, bidOrderQty7, bidOrderQty8, bidOrderQty9, offerPrice0, offerPrice1, offerPrice2, offerPrice3, offerPrice4, offerPrice5, offerPrice6, offerPrice7, offerPrice8, offerPrice9, offerOrderQty0, offerOrderQty1, offerOrderQty2, offerOrderQty3, offerOrderQty4, offerOrderQty5, offerOrderQty6, offerOrderQty7, offerOrderQty8, offerOrderQty9){
	bidPrice = fixedLengthArrayVector(bidPrice0, bidPrice1, bidPrice2, bidPrice3, bidPrice4, bidPrice5, bidPrice6, bidPrice7, bidPrice8, bidPrice9)
	bidOrderQty = fixedLengthArrayVector(bidOrderQty0, bidOrderQty1, bidOrderQty2, bidOrderQty3, bidOrderQty4, bidOrderQty5, bidOrderQty6, bidOrderQty7, bidOrderQty8, bidOrderQty9)
	offerPrice = fixedLengthArrayVector(offerPrice0, offerPrice1, offerPrice2, offerPrice3, offerPrice4, offerPrice5, offerPrice6, offerPrice7, offerPrice8, offerPrice9)
	offerOrderQty = fixedLengthArrayVector(offerOrderQty0, offerOrderQty1, offerOrderQty2, offerOrderQty3, offerOrderQty4, offerOrderQty5, offerOrderQty6, offerOrderQty7, offerOrderQty8, offerOrderQty9)
	wap = (bidPrice0*offerOrderQty0 + offerPrice0*bidOrderQty0) \ (offerOrderQty0+bidOrderQty0)
	bidPress = rowWavg(bidOrderQty, wap \ (bidPrice - wap))
	askPress = rowWavg(offerOrderQty, wap \ (offerPrice - wap))
	press = log(bidPress \ askPress)
	return press
}

@state
def averagePress2(bidPrice0, bidPrice1, bidPrice2, bidPrice3, bidPrice4, bidPrice5, bidPrice6, bidPrice7, bidPrice8, bidPrice9, bidOrderQty0, bidOrderQty1, bidOrderQty2, bidOrderQty3, bidOrderQty4, bidOrderQty5, bidOrderQty6, bidOrderQty7, bidOrderQty8, bidOrderQty9, offerPrice0, offerPrice1, offerPrice2, offerPrice3, offerPrice4, offerPrice5, offerPrice6, offerPrice7, offerPrice8, offerPrice9, offerOrderQty0, offerOrderQty1, offerOrderQty2, offerOrderQty3, offerOrderQty4, offerOrderQty5, offerOrderQty6, offerOrderQty7, offerOrderQty8, offerOrderQty9, lag){
	press = calPress(bidPrice0, bidPrice1, bidPrice2, bidPrice3, bidPrice4, bidPrice5, bidPrice6, bidPrice7, bidPrice8, bidPrice9, bidOrderQty0, bidOrderQty1, bidOrderQty2, bidOrderQty3, bidOrderQty4, bidOrderQty5, bidOrderQty6, bidOrderQty7, bidOrderQty8, bidOrderQty9, offerPrice0, offerPrice1, offerPrice2, offerPrice3, offerPrice4, offerPrice5, offerPrice6, offerPrice7, offerPrice8, offerPrice9, offerOrderQty0, offerOrderQty1, offerOrderQty2, offerOrderQty3, offerOrderQty4, offerOrderQty5, offerOrderQty6, offerOrderQty7, offerOrderQty8, offerOrderQty9)
	return mavg(press, lag, 1)
}           

3.2.3.2 性能對比

server 版本:2.00.9.2 2023.03.10 JIT

測試資料量:上交所 100 隻股票的某日的 Level2 快照資料,372,208 * 194 [319 M]

測試方案:通過 timer 函數,統計從資料灌入引擎開始到所有名額計算結束的總共耗時。

測試結果:

因子轉寫方式 耗時(ms)
狀态與無狀态不拆分 1492.224
狀态與無狀态拆分 577.651

測試腳本:

// 導入測試資料
csvPath = "/hdd/hdd0/jit200ssd/server/testdata/"
colName = ["securityID","dateTime","preClosePx","openPx","highPx","lowPx","lastPx","totalVolumeTrade","totalValueTrade","instrumentStatus"] <- flatten(eachLeft(+, ["bidPrice","bidOrderQty","bidNumOrders"], string(0..9))) <- ("bidOrders"+string(0..49)) <- flatten(eachLeft(+, ["offerPrice","offerOrderQty","offerNumOrders"], string(0..9))) <- ("offerOrders"+string(0..49)) <- ["numTrades","iopv","totalBidQty","totalOfferQty","weightedAvgBidPx","weightedAvgOfferPx","totalBidNumber","totalOfferNumber","bidTradeMaxDuration","offerTradeMaxDuration","numBidOrders","numOfferOrders","withdrawBuyNumber","withdrawBuyAmount","withdrawBuyMoney","withdrawSellNumber","withdrawSellAmount","withdrawSellMoney","etfBuyNumber","etfBuyAmount","etfBuyMoney","etfSellNumber","etfSellAmount","etfSellMoney"]
colType = ["SYMBOL","TIMESTAMP","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","INT","DOUBLE","SYMBOL"] <- take("DOUBLE", 10) <- take("INT", 70)<- take("DOUBLE", 10) <- take("INT", 70) <- ["INT","INT","INT","INT","DOUBLE","DOUBLE","INT","INT","INT","INT","INT","INT","INT","INT","DOUBLE","INT","INT","DOUBLE","INT","INT","INT","INT","INT","INT"]
data = select * from loadText(csvPath + "snapshot_100stocks_multi.csv", schema=table(colName, colType)) order by dateTime

// 定義輸入輸出的表結構
inputTable = table(1:0, colName, colType)
resultTable1 = table(10000:0, ["securityID", "dateTime", "factor"], [SYMBOL, TIMESTAMP, DOUBLE])
resultTable2 = table(10000:0, ["securityID", "dateTime", "factor"], [SYMBOL, TIMESTAMP, DOUBLE])

// 使用 createReactiveStateEngine 建立響應式狀态引擎
// 狀态函數和無狀态函數不拆分
try{ dropStreamEngine("reactiveDemo1")} catch(ex){ print(ex) }
metrics1 = <[dateTime, averagePress1(bidPrice0, bidPrice1, bidPrice2, bidPrice3, bidPrice4, bidPrice5, bidPrice6, bidPrice7, bidPrice8, bidPrice9, bidOrderQty0, bidOrderQty1, bidOrderQty2, bidOrderQty3, bidOrderQty4, bidOrderQty5, bidOrderQty6, bidOrderQty7, bidOrderQty8, bidOrderQty9, offerPrice0, offerPrice1, offerPrice2, offerPrice3, offerPrice4, offerPrice5, offerPrice6, offerPrice7, offerPrice8, offerPrice9, offerOrderQty0, offerOrderQty1, offerOrderQty2, offerOrderQty3, offerOrderQty4, offerOrderQty5, offerOrderQty6, offerOrderQty7, offerOrderQty8, offerOrderQty9, 60)]>
rse1 = createReactiveStateEngine(name="reactiveDemo1", metrics =metrics1, dummyTable=inputTable, outputTable=resultTable1, keyColumn="securityID")
//狀态函數和無狀态函數拆分
try{ dropStreamEngine("reactiveDemo2")} catch(ex){ print(ex) }
metrics2 = <[dateTime, averagePress2(bidPrice0, bidPrice1, bidPrice2, bidPrice3, bidPrice4, bidPrice5, bidPrice6, bidPrice7, bidPrice8, bidPrice9, bidOrderQty0, bidOrderQty1, bidOrderQty2, bidOrderQty3, bidOrderQty4, bidOrderQty5, bidOrderQty6, bidOrderQty7, bidOrderQty8, bidOrderQty9, offerPrice0, offerPrice1, offerPrice2, offerPrice3, offerPrice4, offerPrice5, offerPrice6, offerPrice7, offerPrice8, offerPrice9, offerOrderQty0, offerOrderQty1, offerOrderQty2, offerOrderQty3, offerOrderQty4, offerOrderQty5, offerOrderQty6, offerOrderQty7, offerOrderQty8, offerOrderQty9, 60)]>
rse2 = createReactiveStateEngine(name="reactiveDemo2", metrics =metrics2, dummyTable=inputTable, outputTable=resultTable2, keyColumn="securityID")

// 輸入資料
timer rse1.append!(data)
timer rse2.append!(data)

// 計算結果正确性驗證
each(eqObj, resultTable1.values(), resultTable2.values()).all()           

3.2.3.3 注意事項

① 因為響應式狀态引擎計算時的特殊處理,是以不是所有資料類型的變量都可以作為參數和傳回值在狀态函數和無狀态函數之間傳遞的。狀态函數和無狀态函數間的參數可以是标量、向量或者數組向量 (Array Vector),不能是元組 (ANY Vector) 等資料類型。

下面通過 print 列印從狀态函數到無狀态函數的參數的資料類型和資料值來說明這個現象。

// 因子實作
def typeTestNonStateFunc(scalar, vector, arrayVector, anyVector){
	print("---------------------------------------")
	print(typestr(scalar))
	print(scalar)
	print(typestr(vector))
	print(vector)
	print(typestr(arrayVector))
	print(arrayVector)
	print(typestr(anyVector))
	print(anyVector)
	return fixedLengthArrayVector(rowSum(arrayVector), rowAvg(arrayVector))
}

@state
def typeTestStateFunc(price1, price2, price3, lag){
	scalar = lag
	vector = price1
	arrayVector = fixedLengthArrayVector(price1, price2, price3)
	anyVector = [price1, price2, price3]
	res = typeTestNonStateFunc(scalar, vector, arrayVector, anyVector)
	sumRes = res[0]
	avgRes = res[1]
	return sumRes, avgRes, res, anyVector
}           

其中 lag 是外部指定的一個定值,price1、price2、price3 是上遊表中的三列,通過 fixedLengthArrayVector 可以将多個向量組裝成 arrayVector,通過 [price1, price2, price3] 可以拼裝出元組(tuple)。

// 定義輸入輸出的表結構
inputTable = table(1:0, `securityID`tradeTime`price1`price2`price3, [SYMBOL,DATETIME,DOUBLE,DOUBLE,DOUBLE])
resultTable = table(10000:0, ["securityID", "tradeTime", "sum", "avg", "sum_avg", "anyVector"], [SYMBOL, TIMESTAMP, DOUBLE, DOUBLE,DOUBLE[], DOUBLE[]])

// 使用 createReactiveStateEngine 建立響應式狀态引擎
try{ dropStreamEngine("reactiveDemo")} catch(ex){ print(ex) }
metrics = <[tradeTime, typeTestStateFunc(price1, price2, price3, 10) as `sum`avg`sum_avg`anyVector]>
rse = createReactiveStateEngine(name="reactiveDemo", metrics =metrics, dummyTable=inputTable, outputTable=resultTable, keyColumn="securityID")

// 輸入資料
insert into rse values(`000155, 2020.01.01T09:30:00, 30.81, 30.82, 30.83)
insert into rse values(`000155, 2020.01.01T09:30:01, 30.86, 30.87, 30.88)
insert into rse values(`000155, 2020.01.01T09:30:02, 30.80, 30.81, 30.82)

// 檢視結果
select * from resultTable
/*
securityID tradeTime               sum    avg    sum_avg                 anyVector
---------- ----------------------- ------ ------ ----------------------- ---------
000155     2020.01.01T09:30:00.000 92.46  30.82  [92.46,30.82]           [00F]    
000155     2020.01.01T09:30:01.000 92.61  30.87  [92.61,30.87]           [00F]    
000155     2020.01.01T09:30:02.000 92.43  30.81  [92.43,30.81]           [00F]    
*/           
量化因子在 DolphinDB 中的流式實作攻略

② 狀态函數内調用自定義函數時,不支援用多個變量接收函數多個傳回值(即 a,b = foo(...) 的寫法)。如果無狀态函數需要傳回多個值,則需要用 fixedLengthArrayVector 将傳回結果組裝成 array Vector 傳回。在狀态函數内,用一個變量接收,之後可以用 res[index] 的方式将傳回的多個結果拆分。(可以參考上面注意事項① 中的例子)

③ 當資料組裝成 array vector 後,計算函數需要改為對應的 row 系列函數或者 byRow 高階函數。(可以參考上面注意事項① 中的例子,計算三個價格之和使用 rowSum 函數,計算三個價格的平均數使用 rowAvg 函數)

3.3 if-else

  • 無狀态函數

無狀态函數支援 if-else 語句。但是 if-else 的 condition 結果必須是标量;否則需要使用 iif 函數替代。(可以參考上文 “3.2.1 無狀态函數” 章節中注意事項中的例子)

  • 狀态函數

狀态函數的 if-else 隻支援 condition 是一個無關上遊表格資料的标量;否則需要使用 iif 函數替代或者把 if-else 的邏輯封裝成自定義的無狀态函數。(可以參考上文 “3.2.2 狀态函數” 章節中注意事項② 的部分)

注意事項:

iif 函數的計算是把 trueResult 和 falseResult 的結果都計算出來,再根據 condition 拼裝結果。是以需要保證,trueResult 和 falseResult 的兩個式子在所有輸入下都可以執行不報錯。比如:y=iif(size(x) > 0, sum(x), 0.0),如果輸入的 x=[] ,雖然 size(x) > 0 不滿足條件應該直接傳回 0.0,但是 iif 會把兩種情況的式子都計算一遍,是以 sum(x) 還是會被計算。而 sum(x) 的輸入不允許是空向量,是以這個式子在 x=[] 的情況下會報錯。這時就需要換成 if-else 語句:if(size(x)>0){y=sum(x)}else{y=0}

3.4 曆史資料通路(視窗計算和疊代)

DolphinDB 内置了豐富的計算函數來幫助使用者在狀态函數裡面實作各種涉及曆史資料的計算。比如:滑動視窗系列(m 系列)、時序滑動視窗系列(tm 系列)、累計視窗系列(cum 系列)、ffill 等函數。

除此之外,還有 movingWindowData 和 tmovingWindowData 可以直接傳回變量曆史值組成的向量,友善使用者實作更多的自定義計算。

雖然狀态函數内不支援函數自身調用的寫法,但是 DolphinDB 提供了 conditionalIterate、stateIterate、genericStateIterate、genericTStateIterate 等函數來支援疊代邏輯的實作以及其他對函數結果曆史值的複雜處理。

下面以一個自定義的複雜因子為例。

  • 因子計算邏輯:

step1:對最近 lag 條快照的第一檔量價資料按照指定的權重做權重平均得到新的量價資料【使用 moving】

bidPrice = bidPrice0[i-lag+1]*weight[0] + … + bidPrice0[i]*weight[lag-1]

askPrice = offerPrice0[i-lag+1]*weight[0] + … + offerPrice0[i]*weight[lag-1]

bidVolume = bidOrderQty0[i-lag+1]*weight[0] + … + bidOrderQty0[i]*weight[lag-1]

askVolume = offerOrderQty0[i-lag+1]*weight[0] + … + offerOrderQty0[i]*weight[lag-1]

step2:使用 Step1 的結果計算移動平均權重價格 maWAP【使用 mavg】

step3:因子結果是最近 lag-1 個因子值和目前 maWAP 的權重平均【使用 genericStateIterate】,

其中權重由最近 lag 個 bidVolume / askVolume 的值确定【使用 movingWindowData】

​ w[i] = bidVolume[i-lag]/askVolume[i-lag]

​ factor[i]=(factor[i-lag+1]*w[i-lag+1] +…+ factor[i-1]*w[i-1] + maWAP[i]*w[i]) / (w[i-lag+1] +…+ w[i])

  • DDB 實作代碼:
defg myWavg(x){
	weight = 1..size(x)
	return wavg(x, weight)
}

def iterateFunc(historyFactors, currentValue, weight){
	return wavg(historyFactors join currentValue, weight)
}

@state
def myFactor(bidPrice0, bidOrderQty0, offerPrice0, offerOrderQty0, lag){
	// step1: 使用 moving
	bidPrice, askPrice, bidVolume, askVolume = moving(myWavg, bidPrice0, lag, 1), moving(myWavg, offerPrice0, lag, 1), moving(myWavg, bidOrderQty0, lag, 1), moving(myWavg, offerOrderQty0, lag, 1)

	// step2: 使用 mavg
	wap = (bidPrice*askVolume + askPrice*bidVolume) \ (bidVolume + askVolume)
	maWap = mavg(wap, lag, 1)
	
	// step3: 使用 movingWindowData 
	w = movingWindowData(bidVolume \ askVolume, lag)
	//	 使用 genericStateIterate
	factorValue = genericStateIterate(X=[maWap, w], initial=maWap, window=lag-1, func=iterateFunc{ , , })
	return factorValue
}           
  • 實時流計算
// 定義輸入輸出的表結構
colName = ["securityID","dateTime","preClosePx","openPx","highPx","lowPx","lastPx","totalVolumeTrade","totalValueTrade","instrumentStatus"] <- flatten(eachLeft(+, ["bidPrice","bidOrderQty","bidNumOrders"], string(0..9))) <- ("bidOrders"+string(0..49)) <- flatten(eachLeft(+, ["offerPrice","offerOrderQty","offerNumOrders"], string(0..9))) <- ("offerOrders"+string(0..49)) <- ["numTrades","iopv","totalBidQty","totalOfferQty","weightedAvgBidPx","weightedAvgOfferPx","totalBidNumber","totalOfferNumber","bidTradeMaxDuration","offerTradeMaxDuration","numBidOrders","numOfferOrders","withdrawBuyNumber","withdrawBuyAmount","withdrawBuyMoney","withdrawSellNumber","withdrawSellAmount","withdrawSellMoney","etfBuyNumber","etfBuyAmount","etfBuyMoney","etfSellNumber","etfSellAmount","etfSellMoney"]
colType = ["SYMBOL","TIMESTAMP","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","INT","DOUBLE","SYMBOL"] <- take("DOUBLE", 10) <- take("INT", 70)<- take("DOUBLE", 10) <- take("INT", 70) <- ["INT","INT","INT","INT","DOUBLE","DOUBLE","INT","INT","INT","INT","INT","INT","INT","INT","DOUBLE","INT","INT","DOUBLE","INT","INT","INT","INT","INT","INT"]
inputTable = table(1:0, colName, colType)
resultTable = table(10000:0, ["securityID", "dateTime", "factor"], [SYMBOL, TIMESTAMP, DOUBLE])

// 使用 createReactiveStateEngine 建立響應式狀态引擎
try{ dropStreamEngine("reactiveDemo")} catch(ex){ print(ex) }
metrics = <[dateTime, myFactor(bidPrice0, bidOrderQty0, offerPrice0, offerOrderQty0, 3)]>
rse = createReactiveStateEngine(name="reactiveDemo", metrics =metrics, dummyTable=inputTable, outputTable=resultTable, keyColumn="securityID")

// 輸入資料
tableInsert(rse, {"securityID":"000001", "dateTime":2023.01.01T09:30:00.000, "bidPrice0":19.98, "bidOrderQty0":100, "offerPrice0":19.99, "offerOrderQty0":120})
tableInsert(rse, {"securityID":"000001", "dateTime":2023.01.01T09:30:03.000, "bidPrice0":19.95, "bidOrderQty0":130, "offerPrice0":19.93, "offerOrderQty0":120})
tableInsert(rse, {"securityID":"000001", "dateTime":2023.01.01T09:30:06.000, "bidPrice0":19.97, "bidOrderQty0":120, "offerPrice0":19.98, "offerOrderQty0":130})
tableInsert(rse, {"securityID":"000001", "dateTime":2023.01.01T09:30:09.000, "bidPrice0":20.00, "bidOrderQty0":130, "offerPrice0":19.97, "offerOrderQty0":140})


// 檢視結果
select * from resultTable
/*
 securityID dateTime                factor            
---------- ----------------------- ------------------
000001     2023.01.01T09:30:00.000 19.9845 
000001     2023.01.01T09:30:03.000 19.9698
000001     2023.01.01T09:30:06.000 19.9736
000001     2023.01.01T09:30:09.000 19.9694
*/           
  • 注意事項

① 高階函數 moving(func, …) 中的 func 是一個聚合函數,需要用 defg 定義 func 函數。

② 狀态函數不支援函數自身調用的寫法,是以遇到需要曆史因子值的邏輯時(比如目前計算值為空,就用上一個因子值填充)會很難表示。為此,DolphinDB 提供了 conditionalIterate、genericStateIterate 等函數。但是,這些函數記錄的不是因子函數最終 return 的結果,而是截至該函數所在目前行代碼運作後計算的結果。是以,為了正确的取到因子函數的曆史結果,邏輯上需要把 conditionalIterate、genericStateIterate 等函數放在整個狀态函數的最後一步。

用下面的例子說明具體情況。

@state
def iterateTestFunc(tradePrice){
	// 計算交易價格漲跌幅
	change = tradePrice \ prev(tradePrice) - 1
	// 如果計算結果是空值,則用上一個非空因子值填充
	factor = conditionalIterate(change != NULL, change, cumlastNot)
	// 傳回 factor+1 作為最終因子值
	return factor + 1
}           

建立響應式狀态引擎并輸入幾條資料觀察結果。

// 定義輸入輸出的表結構
inputTable = table(1:0, `securityID`tradeTime`tradePrice`tradeQty`tradeAmount`buyNo`sellNo`tradeBSFlag`tradeIndex`channelNo, [SYMBOL,DATETIME,DOUBLE,INT,DOUBLE,LONG,LONG,SYMBOL,INT,INT])
resultTable = table(10000:0, ["securityID", "tradeTime", "factor"], [SYMBOL, TIMESTAMP, DOUBLE])

// 使用 createReactiveStateEngine 建立響應式狀态引擎
try{ dropStreamEngine("reactiveDemo")} catch(ex){ print(ex) }
metrics = <[tradeTime, iterateTestFunc(tradePrice)]>
rse = createReactiveStateEngine(name="reactiveDemo", metrics =metrics, dummyTable=inputTable, outputTable=resultTable, keyColumn="securityID")

// 輸入資料
insert into rse values(`000155, 2020.01.01T09:30:00, 30.85, 100, 3085, 4951, 0, `B, 1, 1)
insert into rse values(`000155, 2020.01.01T09:30:01, 30.86, 100, 3086, 4951, 1, `B, 2, 1)
insert into rse values(`000155, 2020.01.01T09:30:02, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL)
insert into rse values(`000155, 2020.01.01T09:30:03, 30.80, 200, 6160, 5501, 5600, `S, 3, 1)

// 檢視結果
select * from resultTable
/*
securityID tradeTime               factor          
---------- ----------------------- ----------------
000155     2020.01.01T09:30:00.000                 
000155     2020.01.01T09:30:01.000 1.0003
000155     2020.01.01T09:30:02.000 1.0003
000155     2020.01.01T09:30:03.000 1.0003
*/           
tradeTime tradePrice change cumlastNot factor=conditionalIterate(…) iterateTestFunc(tradePrice)
2020.01.01T09:30:00 30.85 NULL NULL NULL NULL
2020.01.01T09:30:01 30.86 0.0003 NULL 0.0003 1.0003
2020.01.01T09:30:02 NULL NULL 0.0003 0.0003 1.0003
2020.01.01T09:30:03 30.80 NULL 0.0003 0.0003 1.0003

可以發現,factor = conditionalIterate(change != NULL, change, cumlastNot) 這一行中的 cumlastNot 找的不是因子函數 iterateTestFunc(tradePrice) 的上一個非空值,而是 factor 的上一個非空值,最後 return 中的 factor+1 這一步操作并不會被 conditionalIterate 函數記錄。

如果對曆史因子值有後續操作,可以考慮使用 stateIterate、genericStateIterate 等函數。比如上面的例子可以改寫為:(genericStateIterate 函數的 window 有限制,window >= 2;目前已經開發新功能支援 window = 1,該功能會加在後續版本中)

// 目前要求 window >= 2,是以回看上一個資料也需要 window=2
def processFunc(historyFactor, change){
	lastFactor = last(historyFactor)
	factor = iif(change != NULL, change, lastFactor)
	return factor+1
}
@state
def iterateTestFunc(tradePrice){
	// 計算交易價格漲跌幅
	change = tradePrice \ prev(tradePrice) - 1
	// 如果計算結果是空值,則用上一個因子值填充,傳回 factor+1 作為最終因子值
	factor = genericStateIterate(X=[change], initial=change, window=2, func=processFunc)
	return factor
}

// 後續支援 window=1,則可以用以下代碼替換
/*
def processFunc(lastFactor, change){
	factor = iif(change != NULL, change, lastFactor)
	return factor+1
}
@state
def iterateTestFunc(tradePrice){
	// 計算交易價格漲跌幅
	change = tradePrice \ prev(tradePrice) - 1
	// 如果計算結果是空值,則用上一個因子值填充,傳回 factor+1 作為最終因子值
	factor = genericStateIterate(X=[change], initial=change, window=1, func=processFunc)
	return factor
}
*/           
tradeTime tradePrice change lastFactor factor=iif(..) iterateTestFunc(tradePrice)
2020.01.01T09:30:00 30.85 NULL NULL NULL NULL
2020.01.01T09:30:01 30.86 0.0003 NULL 0.0003 1.0003
2020.01.01T09:30:02 NULL NULL 1.0003 1.0003 2.0003
2020.01.01T09:30:03 30.80 NULL 2.0003 2.0003 3.0003

3.5 循環

建議按照狀态函數和無狀态函數拆分的原則把循環邏輯封裝在自定義的無狀态函數中。

  • 無狀态函數

無狀态函數支援 for/while 等循環語句,也支援使用 each/loop 等函數實作循環邏輯。

為了更低的計算延時和更優的計算性能,在沒有使用 JIT 優化因子代碼的情況下,因子代碼裡面不建議使用 for/while 循環。盡量通過向量化計算實作因子計算邏輯或者可以使用 each/loop 等函數實作循環。

  • 狀态函數

狀态函數不支援 for/while 等循環語句,支援使用 each/loop 等函數實作循環邏輯。

下面以一個 python 代碼實作的因子邏輯為例,說明如何在響應式狀态引擎内實作循環邏輯。

因子計算邏輯:(python 代碼來源:高頻因子是怎麼回事 —— 正确的、錯誤的、與瞎編的)

def _bid_withdraws_volume(l, n, levels=10):
    withdraws = 0
    for price_index in range(0,4*levels, 4):
        now_p = n[price_index]
        for price_last_index in range(0,4*levels,4):
            if l[price_last_index] == now_p:
                withdraws -= min(n[price_index+1] - l[price_last_index + 1], 0)     
    return withdraws

def bid_withdraws(depth, trade):
    ob_values = depth.values
    flows = np.zeros(len(ob_values))
    for i in range(1, len(ob_values)):
        flows[i] = _bid_withdraws_volume(ob_values[i-1], ob_values[i])
    return pd.Series(flows)           

因子代碼中有兩層循環,其中内層循環可以轉化為向量化計算;外層循環可以使用 each 函數。

DDB 實作代碼:

// 對應内層循環
def withdrawsVolumeTmp(lastPrices, lastVolumes, nowPrice, nowVolume){ 
	withdraws = lastVolumes[lastPrices == nowPrice] - nowVolume
	return sum(withdraws * (withdraws > 0))
}

// 對應外層循環
defg withdrawsVolume(prices, Volumes){ 
	lastPrices, nowPrices = prices[0], prices[1]
	lastVolumes, nowVolumes = Volumes[0], Volumes[1]

	withdraws = each(withdrawsVolumeTmp{lastPrices, lastVolumes}, nowPrices, nowVolumes)
	return sum(withdraws)
}


@state
def bidWithdrawsVolume(bidPrice0, bidPrice1, bidPrice2, bidPrice3, bidPrice4, bidPrice5, bidPrice6, bidPrice7, bidPrice8, bidPrice9,bidOrderQty0, bidOrderQty1, bidOrderQty2, bidOrderQty3, bidOrderQty4, bidOrderQty5, bidOrderQty6, bidOrderQty7, bidOrderQty8, bidOrderQty9, levels=10){
	bidPrice = fixedLengthArrayVector(bidPrice0, bidPrice1, bidPrice2, bidPrice3, bidPrice4, bidPrice5, bidPrice6, bidPrice7, bidPrice8, bidPrice9)
	bidOrderQty = fixedLengthArrayVector(bidOrderQty0, bidOrderQty1, bidOrderQty2, bidOrderQty3, bidOrderQty4, bidOrderQty5, bidOrderQty6, bidOrderQty7, bidOrderQty8, bidOrderQty9)
	return moving(withdrawsVolume, [bidPrice[0:levels], bidOrderQty[0:levels]], 2)
}           

實時流計算:

// 定義輸入輸出的表結構
colName = ["securityID","dateTime","preClosePx","openPx","highPx","lowPx","lastPx","totalVolumeTrade","totalValueTrade","instrumentStatus"] <- flatten(eachLeft(+, ["bidPrice","bidOrderQty","bidNumOrders"], string(0..9))) <- ("bidOrders"+string(0..49)) <- flatten(eachLeft(+, ["offerPrice","offerOrderQty","offerNumOrders"], string(0..9))) <- ("offerOrders"+string(0..49)) <- ["numTrades","iopv","totalBidQty","totalOfferQty","weightedAvgBidPx","weightedAvgOfferPx","totalBidNumber","totalOfferNumber","bidTradeMaxDuration","offerTradeMaxDuration","numBidOrders","numOfferOrders","withdrawBuyNumber","withdrawBuyAmount","withdrawBuyMoney","withdrawSellNumber","withdrawSellAmount","withdrawSellMoney","etfBuyNumber","etfBuyAmount","etfBuyMoney","etfSellNumber","etfSellAmount","etfSellMoney"]
colType = ["SYMBOL","TIMESTAMP","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","INT","DOUBLE","SYMBOL"] <- take("DOUBLE", 10) <- take("INT", 70)<- take("DOUBLE", 10) <- take("INT", 70) <- ["INT","INT","INT","INT","DOUBLE","DOUBLE","INT","INT","INT","INT","INT","INT","INT","INT","DOUBLE","INT","INT","DOUBLE","INT","INT","INT","INT","INT","INT"]
inputTable = table(1:0, colName, colType)
resultTable = table(10000:0, ["securityID", "dateTime", "factor"], [SYMBOL, TIMESTAMP, DOUBLE])

// 使用 createReactiveStateEngine 建立響應式狀态引擎
try{ dropStreamEngine("reactiveDemo")} catch(ex){ print(ex) }
metrics = <[dateTime, bidWithdrawsVolume(bidPrice0, bidPrice1, bidPrice2, bidPrice3, bidPrice4, bidPrice5, bidPrice6, bidPrice7, bidPrice8, bidPrice9,bidOrderQty0, bidOrderQty1, bidOrderQty2, bidOrderQty3, bidOrderQty4, bidOrderQty5, bidOrderQty6, bidOrderQty7, bidOrderQty8, bidOrderQty9, levels=3)]>
rse = createReactiveStateEngine(name="reactiveDemo", metrics =metrics, dummyTable=inputTable, outputTable=resultTable, keyColumn="securityID")

// 構造資料
setRandomSeed(9)
n = 5
securityID = take(`000001, n)
dateTime = 2023.01.01T09:30:00.000 + 1..n*3*1000
bidPrice0 = rand(10, n) \ 100 + 19.5
bidPrice1, bidPrice2 = bidPrice0+0.01, bidPrice0+0.02
bidOrderQty0, bidOrderQty1, bidOrderQty2 = rand(200, n), rand(200, n), rand(200, n)
offerPrice0 = rand(10, n) \ 100 + 19.5
offerPrice1, offerPrice2 = offerPrice0+0.01, offerPrice0+0.02
offerOrderQty0, offerOrderQty1, offerOrderQty2 = rand(200, n), rand(200, n), rand(200, n)
testdata = table(securityID, dateTime, bidPrice0, bidPrice1, bidPrice2, bidOrderQty0, bidOrderQty1, bidOrderQty2, offerPrice0, offerPrice1, offerPrice2, offerOrderQty0, offerOrderQty1, offerOrderQty2)
// 輸入資料
tableInsert(rse, testdata.flip())

// 檢視結果
select * from resultTable
/*
securityID dateTime                factor
---------- ----------------------- ------
000001     2023.01.01T09:30:03.000       
000001     2023.01.01T09:30:06.000       
000001     2023.01.01T09:30:09.000 0     
000001     2023.01.01T09:30:12.000 36    
000001     2023.01.01T09:30:15.000 26    
*/           

4. 進階:高頻因子流式實作優化

4.1 數組向量 (array vector)

DolphinDB 中的數組向量 (array vector) 是一種特殊的向量,用于存儲可變長度的二維數組。這種存儲方式可顯著簡化某些常用的查詢與計算。若不同列中含有大量重複資料,使用數組向量存儲亦可提高資料壓縮比,提升查詢速度。數組向量可以與标量、向量或另一個數組向量進行二進制運算,能夠友善因子計算邏輯的向量化實作。

level 2 高頻因子往往需要對十檔量價資料進行頻繁的操作。從第 3 章節的例子中也可以發現,為了讓十檔量價資料能夠友善地實作向量化計算,往往需要通過 fixedLengthArrayVector 函數組合十檔資料。針對這個特點,可以選擇直接使用數組向量 (array vector) 來存儲原始的 level 2 快照行情資料,省去函數内組裝十檔資料的步驟,降低流式計算延時。

比如 “3.2.3.1 移動平均買賣壓力” 章節中的複雜因子,其 arrayVector 的實作代碼如下:

  • DDB 實作代碼:
def pressArrayVector(bidPrice, bidOrderQty, offerPrice, offerOrderQty){
	wap = (bidPrice[0]*offerOrderQty[0] + offerPrice[0]*bidOrderQty[0]) \ (offerOrderQty[0]+bidOrderQty[0])
	bidPress = rowWavg(bidOrderQty, wap \ (bidPrice - wap))
	askPress = rowWavg(offerOrderQty, wap \ (offerPrice - wap))
	press = log(bidPress \ askPress)
	return press
}

@state
def averagePress3(bidPrice, bidOrderQty, offerPrice, offerOrderQty, lag){
	press = pressArrayVector(bidPrice, bidOrderQty, offerPrice, offerOrderQty)
	return mavg(press, lag, 1)
}           
  • 注意事項:

① 對 array vector 類型的資料進行操作時,往往需要使用 row 系列函數或者 byRow 高階函數。比如,對十檔買方價格求和 rowSum(bidPrice) 。

② array vector 類型的列在資料插入時要求資料類型的強一緻性。比如,引擎裡 dummyTable 定義了 bidOrderQty 的資料類型是 INT[],則上遊輸入資料表中對應列的資料類型必須也是 INT[]。

③ 因為對 array vector 的切片索引也是有開銷的,是以并不是所有因子轉化為 array vector 的形式都會有性能提升。如果因子涉及對十檔資料的大量複雜操作,則使用 array vector 作為輸入會有明顯的性能提升;如果因子隻是對某檔資料進行計算,比如計算中隻會使用到的第一檔資料,那麼更适合多檔多列的存儲方式。

4.2 即時編譯(JIT)

DolphinDB 底層由 C++ 實作,腳本中的一次函數調用會轉化為多次 C++ 内的虛拟函數調用。在不能使用向量化的情況下,解釋成本會比較高。

DolphinDB 中的即時編譯功能,在運作時将代碼翻譯為機器碼,能夠顯著提高了 for 循環,while 循環和 if-else 等語句的運作速度,特别适合于無法使用向量化運算但又對運作速度有極高要求的場景,例如高頻因子計算、實時流資料處理等。

JIT 的詳細使用方法可以參考教程:DolphinDB JIT教程。

本章節主要介紹響應式狀态引擎中 JIT 版本因子轉寫時的注意事項。

響應式狀态引擎因子是狀态函數和無狀态函數的組合。JIT 版本的因子和非 JIT 版本的因子轉寫的主要差別在無狀态函數,狀态函數的轉寫沒有差別。

  • 差別 1:jit 函數需要 @jit 辨別。
  • 差別 2:普通的無狀态函數沒有函數使用限制;JIT 版本目前僅支援部分函數。

JIT 中不支援的函數,需要使用者通過公式展開、for/while循環語句、if-else 語句等方式自己手動實作。

① 下面以計算買方十檔成交額之和為例,說明如何使用公式展開的方法實作 sum 函數。

因子計算邏輯: bidPrice0*bidOrderQty0 + … + bidPrice9*bidOrderQty9

DDB 實作代碼(公式展開):

@jit
def calAmount(bidPrice0, bidPrice1, bidPrice2, bidPrice3, bidPrice4, bidPrice5, bidPrice6, bidPrice7, bidPrice8, bidPrice9, bidOrderQty0, bidOrderQty1, bidOrderQty2, bidOrderQty3, bidOrderQty4, bidOrderQty5, bidOrderQty6, bidOrderQty7, bidOrderQty8, bidOrderQty9){
	return bidPrice0*bidOrderQty0+bidPrice1*bidOrderQty1+bidPrice2*bidOrderQty2+bidPrice3*bidOrderQty3+bidPrice4*bidOrderQty4+bidPrice5*bidOrderQty5+bidPrice6*bidOrderQty6+bidPrice7*bidOrderQty7+bidPrice8*bidOrderQty8+bidPrice9*bidOrderQty9
}           

② 下面以計算買方十檔成交額中的最大值為例,說明如何使用 for/while 和 if-else 語句實作 max 函數

因子計算邏輯: max(bidPrice0*bidOrderQty0, …, bidPrice9*bidOrderQty9)

DDB 實作代碼 (for 循環+if-else):

@jit def calAmountMax(bidPrice0, bidPrice1, bidPrice2, bidPrice3, bidPrice4, bidPrice5, bidPrice6, bidPrice7, bidPrice8, bidPrice9, bidO@jit
def calAmountMax(bidPrice0, bidPrice1, bidPrice2, bidPrice3, bidPrice4, bidPrice5, bidPrice6, bidPrice7, bidPrice8, bidPrice9, bidOrderQty0, bidOrderQty1, bidOrderQty2, bidOrderQty3, bidOrderQty4, bidOrderQty5, bidOrderQty6, bidOrderQty7, bidOrderQty8, bidOrderQty9){
	amount = [bidPrice0*bidOrderQty0, bidPrice1*bidOrderQty1, bidPrice2*bidOrderQty2, bidPrice3*bidOrderQty3, bidPrice4*bidOrderQty4, bidPrice5*bidOrderQty5, bidPrice6*bidOrderQty6, bidPrice7*bidOrderQty7, bidPrice8*bidOrderQty8, bidPrice9*bidOrderQty9]
	maxRes = -1.0
	for(i in 0:10){
		if(amount[i] > maxRes) maxRes = amount[i]
	}
	return maxRes
}           

注意事項:在給變量設定初始值或者預設值的時候,需要注意變量資料類型的前後一緻性。比如這個例子中,變量 maxRes 是 DOUBLE 類型的,則設定初始值的時候需要 maxRes=-1.0 ,而不能 maxRes=-1 。

  • 差別 3:普通的無狀态函數多标情況下,傳參是向量(詳情見“3.2.1 無狀态函數” 章節中的注意事項);JIT 版本中,傳參是标量,是以不需要再用 each/loop 多一層額外的處理。

以 “3.2.1 無狀态函數” 章節的注意事項中的例子為例。可以直接在 weightedAveragedPrice 函數外面加 @jit 辨別,而不需要在 factorWeightedAveragedPrice 函數裡多一層 each 的處理

// 因子實作
@jit
def weightedAveragedPrice(bidPrice0, bidOrderQty0, offerPrice0, offerOrderQty0, default){
    if(bidPrice0 > 0){  return (bidPrice0*offerOrderQty0 + offerPrice0*bidOrderQty0) \ (offerOrderQty0+bidOrderQty0)}
    return default
}

metrics = <[dateTime, weightedAveragedPrice(bidPrice0, bidOrderQty0, offerPrice0, offerOrderQty0, 0.0)]>           
  • 差別 4:JIT 版本中可以通過 vector[index] 的方式來擷取向量中指定位置的資料。

但是其中的 index 隻能是标量(index = 0)或者是向量(index=0..5),不能是資料對(index = 0:5)。

(上述寫法中,index=0..5 左閉右閉;index=0:5 左閉右開)

  • 差別 5:JIT 版本中函數定義裡不能設定預設參數。

比如 def foo(x, y){} 是可以的,但 def foo(x, y=1){} 不可以。

4.3 性能測試

server 版本:2.00.9.2 2023.03.10 JIT

測試資料量:上交所 100 隻股票的某日的 Level2 快照資料 (372,208 條資料)

測試方案:通過 timer 函數,統計從資料灌入引擎開始到所有名額計算結束的總共耗時。

測試結果:

因子轉寫方式 耗時(ms)
多檔多列 6368.81
多檔 array vector 3727.03
多檔多列 + JIT 771.13
多檔 array vector + JIT 458.56

測試因子:十檔量價資料除去空檔資料後,計算移動平均買賣壓力(公式見 “3.2.3.1 移動平均買賣壓力”)

多檔多列

def calPress(bidPrice0, bidPrice1, bidPrice2, bidPrice3, bidPrice4, bidPrice5, bidPrice6, bidPrice7, bidPrice8, bidPrice9, bidOrderQty0, bidOrderQty1, bidOrderQty2, bidOrderQty3, bidOrderQty4, bidOrderQty5, bidOrderQty6, bidOrderQty7, bidOrderQty8, bidOrderQty9, offerPrice0, offerPrice1, offerPrice2, offerPrice3, offerPrice4, offerPrice5, offerPrice6, offerPrice7, offerPrice8, offerPrice9, offerOrderQty0, offerOrderQty1, offerOrderQty2, offerOrderQty3, offerOrderQty4, offerOrderQty5, offerOrderQty6, offerOrderQty7, offerOrderQty8, offerOrderQty9){
	bidPrice = [bidPrice0, bidPrice1, bidPrice2, bidPrice3, bidPrice4, bidPrice5, bidPrice6, bidPrice7, bidPrice8, bidPrice9]
	bidOrderQty = [bidOrderQty0, bidOrderQty1, bidOrderQty2, bidOrderQty3, bidOrderQty4, bidOrderQty5, bidOrderQty6, bidOrderQty7, bidOrderQty8, bidOrderQty9]
	offerPrice = [offerPrice0, offerPrice1, offerPrice2, offerPrice3, offerPrice4, offerPrice5, offerPrice6, offerPrice7, offerPrice8, offerPrice9]
	offerOrderQty = [offerOrderQty0, offerOrderQty1, offerOrderQty2, offerOrderQty3, offerOrderQty4, offerOrderQty5, offerOrderQty6, offerOrderQty7, offerOrderQty8, offerOrderQty9]
	// 除去空檔資料
	bidPrice, bidOrderQty = bidPrice[bidPrice > 0], bidOrderQty[bidPrice > 0]
	offerPrice, offerOrderQty = offerPrice[offerPrice > 0], offerOrderQty[offerPrice > 0]
	// 計算買賣壓力名額
	wap = (bidPrice0*offerOrderQty0 + offerPrice0*bidOrderQty0) \ (offerOrderQty0+bidOrderQty0)
	bidPress = wavg(bidOrderQty, wap \ (bidPrice - wap))
	askPress = wavg(offerOrderQty, wap \ (offerPrice - wap))
	press = log(bidPress \ askPress)
	return press.nullFill(0.0)
}

@state
def averagePress(bidPrice0, bidPrice1, bidPrice2, bidPrice3, bidPrice4, bidPrice5, bidPrice6, bidPrice7, bidPrice8, bidPrice9, bidOrderQty0, bidOrderQty1, bidOrderQty2, bidOrderQty3, bidOrderQty4, bidOrderQty5, bidOrderQty6, bidOrderQty7, bidOrderQty8, bidOrderQty9, offerPrice0, offerPrice1, offerPrice2, offerPrice3, offerPrice4, offerPrice5, offerPrice6, offerPrice7, offerPrice8, offerPrice9, offerOrderQty0, offerOrderQty1, offerOrderQty2, offerOrderQty3, offerOrderQty4, offerOrderQty5, offerOrderQty6, offerOrderQty7, offerOrderQty8, offerOrderQty9, lag){
	press = each(calPress, bidPrice0, bidPrice1, bidPrice2, bidPrice3, bidPrice4, bidPrice5, bidPrice6, bidPrice7, bidPrice8, bidPrice9, bidOrderQty0, bidOrderQty1, bidOrderQty2, bidOrderQty3, bidOrderQty4, bidOrderQty5, bidOrderQty6, bidOrderQty7, bidOrderQty8, bidOrderQty9, offerPrice0, offerPrice1, offerPrice2, offerPrice3, offerPrice4, offerPrice5, offerPrice6, offerPrice7, offerPrice8, offerPrice9, offerOrderQty0, offerOrderQty1, offerOrderQty2, offerOrderQty3, offerOrderQty4, offerOrderQty5, offerOrderQty6, offerOrderQty7, offerOrderQty8, offerOrderQty9)
	return mavg(press, lag, 1)
}           

多檔 array vector

def calPressArray(bidPrices, bidOrderQtys, offerPrices, offerOrderQtys){
	// 除去空檔資料
	bidPrice, bidOrderQty = bidPrices[bidPrices > 0], bidOrderQtys[bidPrices > 0]
	offerPrice, offerOrderQty = offerPrices[offerPrices > 0], offerOrderQtys[offerPrices > 0]
	// 計算買賣壓力名額
	wap = (bidPrice[0]*offerOrderQty[0] + offerPrice[0]*bidOrderQty[0]) \ (offerOrderQty[0]+bidOrderQty[0])
	bidPress = wavg(bidOrderQty, wap \ (bidPrice - wap))
	askPress = wavg(offerOrderQty, wap \ (offerPrice - wap))
	press = log(bidPress \ askPress)
	return press.nullFill(0.0)
}

@state
def averagePressArray(bidPrice, bidOrderQty, offerPrice, offerOrderQty, lag){
	press = each(calPressArray, bidPrice, bidOrderQty, offerPrice, offerOrderQty)
	return mavg(press, lag, 1)
}           

多檔多列 + JIT

@jit
def calPressJIT(bidPrice0, bidPrice1, bidPrice2, bidPrice3, bidPrice4, bidPrice5, bidPrice6, bidPrice7, bidPrice8, bidPrice9, bidOrderQty0, bidOrderQty1, bidOrderQty2, bidOrderQty3, bidOrderQty4, bidOrderQty5, bidOrderQty6, bidOrderQty7, bidOrderQty8, bidOrderQty9, offerPrice0, offerPrice1, offerPrice2, offerPrice3, offerPrice4, offerPrice5, offerPrice6, offerPrice7, offerPrice8, offerPrice9, offerOrderQty0, offerOrderQty1, offerOrderQty2, offerOrderQty3, offerOrderQty4, offerOrderQty5, offerOrderQty6, offerOrderQty7, offerOrderQty8, offerOrderQty9){
	bidPrice = [bidPrice0, bidPrice1, bidPrice2, bidPrice3, bidPrice4, bidPrice5, bidPrice6, bidPrice7, bidPrice8, bidPrice9]
	bidOrderQty = [bidOrderQty0, bidOrderQty1, bidOrderQty2, bidOrderQty3, bidOrderQty4, bidOrderQty5, bidOrderQty6, bidOrderQty7, bidOrderQty8, bidOrderQty9]
	offerPrice = [offerPrice0, offerPrice1, offerPrice2, offerPrice3, offerPrice4, offerPrice5, offerPrice6, offerPrice7, offerPrice8, offerPrice9]
	offerOrderQty = [offerOrderQty0, offerOrderQty1, offerOrderQty2, offerOrderQty3, offerOrderQty4, offerOrderQty5, offerOrderQty6, offerOrderQty7, offerOrderQty8, offerOrderQty9]

	wap = (bidPrice0*offerOrderQty0 + offerPrice0*bidOrderQty0) \ (offerOrderQty0+bidOrderQty0)
	bidPress = 0.0
	bidWeightSum = 0.0
	askPress = 0.0
	askWeightSum = 0.0
	for(i in 0:10){
		if(bidPrice[i] > 0){
			weight = wap \ (bidPrice[i] - wap)
			bidWeightSum += weight
			bidPress += bidOrderQty[i] * weight
		}
		if(offerPrice[i] > 0){
			weight = wap \ (offerPrice[i] - wap)
			askWeightSum += weight
			askPress += offerOrderQty[i] * weight
		}
	}
	bidPress = bidPress \ bidWeightSum
	askPress = askPress \ askWeightSum
	press = log(bidPress \ askPress)
	return press
}

@state
def averagePressJIT(bidPrice0, bidPrice1, bidPrice2, bidPrice3, bidPrice4, bidPrice5, bidPrice6, bidPrice7, bidPrice8, bidPrice9, bidOrderQty0, bidOrderQty1, bidOrderQty2, bidOrderQty3, bidOrderQty4, bidOrderQty5, bidOrderQty6, bidOrderQty7, bidOrderQty8, bidOrderQty9, offerPrice0, offerPrice1, offerPrice2, offerPrice3, offerPrice4, offerPrice5, offerPrice6, offerPrice7, offerPrice8, offerPrice9, offerOrderQty0, offerOrderQty1, offerOrderQty2, offerOrderQty3, offerOrderQty4, offerOrderQty5, offerOrderQty6, offerOrderQty7, offerOrderQty8, offerOrderQty9, lag){
	press = calPressJIT(bidPrice0, bidPrice1, bidPrice2, bidPrice3, bidPrice4, bidPrice5, bidPrice6, bidPrice7, bidPrice8, bidPrice9, bidOrderQty0, bidOrderQty1, bidOrderQty2, bidOrderQty3, bidOrderQty4, bidOrderQty5, bidOrderQty6, bidOrderQty7, bidOrderQty8, bidOrderQty9, offerPrice0, offerPrice1, offerPrice2, offerPrice3, offerPrice4, offerPrice5, offerPrice6, offerPrice7, offerPrice8, offerPrice9, offerOrderQty0, offerOrderQty1, offerOrderQty2, offerOrderQty3, offerOrderQty4, offerOrderQty5, offerOrderQty6, offerOrderQty7, offerOrderQty8, offerOrderQty9)
	return mavg(press.nullFill(0.0), lag, 1)
}           

多檔 array vector + JIT

@jit
def calPressArrayJIT(bidPrice, bidOrderQty, offerPrice, offerOrderQty){
	// 計算買賣壓力名額
	wap = (bidPrice[0]*offerOrderQty[0] + offerPrice[0]*bidOrderQty[0]) \ (offerOrderQty[0]+bidOrderQty[0])
	bidPress = 0.0
	bidWeightSum = 0.0
	askPress = 0.0
	askWeightSum = 0.0
	for(i in 0:10){
		if(bidPrice[i] > 0){
			weight = wap \ (bidPrice[i] - wap)
			bidWeightSum += weight
			bidPress += bidOrderQty[i] * weight
		}
		if(offerPrice[i] > 0){
			weight = wap \ (offerPrice[i] - wap)
			askWeightSum += weight
			askPress += offerOrderQty[i] * weight
		}
	}
	bidPress = bidPress \ bidWeightSum
	askPress = askPress \ askWeightSum
	press = log(bidPress \ askPress)
	return press
}

@state
def averagePressArrayJIT(bidPrice, bidOrderQty, offerPrice, offerOrderQty, lag){
	press = calPressArrayJIT(bidPrice, bidOrderQty, offerPrice, offerOrderQty)
	return mavg(press.nullFill(0.0), lag, 1)
}           

測試腳本:

// 導入測試資料
csvPath = "/hdd/hdd0/jit200ssd/server/testdata/"
// 快照多檔多列
colName = ["securityID","dateTime","preClosePx","openPx","highPx","lowPx","lastPx","totalVolumeTrade","totalValueTrade","instrumentStatus"] <- flatten(eachLeft(+, ["bidPrice","bidOrderQty","bidNumOrders"], string(0..9))) <- ("bidOrders"+string(0..49)) <- flatten(eachLeft(+, ["offerPrice","offerOrderQty","offerNumOrders"], string(0..9))) <- ("offerOrders"+string(0..49)) <- ["numTrades","iopv","totalBidQty","totalOfferQty","weightedAvgBidPx","weightedAvgOfferPx","totalBidNumber","totalOfferNumber","bidTradeMaxDuration","offerTradeMaxDuration","numBidOrders","numOfferOrders","withdrawBuyNumber","withdrawBuyAmount","withdrawBuyMoney","withdrawSellNumber","withdrawSellAmount","withdrawSellMoney","etfBuyNumber","etfBuyAmount","etfBuyMoney","etfSellNumber","etfSellAmount","etfSellMoney"]
colType = ["SYMBOL","TIMESTAMP","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","INT","DOUBLE","SYMBOL"] <- take("DOUBLE", 10) <- take("INT", 70)<- take("DOUBLE", 10) <- take("INT", 70) <- ["INT","INT","INT","INT","DOUBLE","DOUBLE","INT","INT","INT","INT","INT","INT","INT","INT","DOUBLE","INT","INT","DOUBLE","INT","INT","INT","INT","INT","INT"]
data = select * from loadText(csvPath + "snapshot_100stocks_multi.csv", schema=table(colName, colType)) order by dateTime, securityID
// 快照多檔 array vector
colName = ["securityID","dateTime","preClosePx","openPx","highPx","lowPx","lastPx","totalVolumeTrade","totalValueTrade","instrumentStatus","bidPrice","bidOrderQty","bidNumOrders","bidOrders","offerPrice","offerOrderQty","offerNumOrders","offerOrders","numTrades","iopv","totalBidQty","totalOfferQty","weightedAvgBidPx","weightedAvgOfferPx","totalBidNumber","totalOfferNumber","bidTradeMaxDuration","offerTradeMaxDuration","numBidOrders","numOfferOrders","withdrawBuyNumber","withdrawBuyAmount","withdrawBuyMoney","withdrawSellNumber","withdrawSellAmount","withdrawSellMoney","etfBuyNumber","etfBuyAmount","etfBuyMoney","etfSellNumber","etfSellAmount","etfSellMoney"]
colType = ["SYMBOL","TIMESTAMP","DOUBLE","DOUBLE","DOUBLE","DOUBLE","DOUBLE","INT","DOUBLE","SYMBOL","DOUBLE[]","INT[]","INT[]","INT[]","DOUBLE[]","INT[]","INT[]","INT[]","INT","INT","INT","INT","DOUBLE","DOUBLE","INT","INT","INT","INT","INT","INT","INT","INT","DOUBLE","INT","INT","DOUBLE","INT","INT","INT","INT","INT","INT"]
dataArrayVector = select * from loadText(csvPath + "snapshot_100stocks_arrayvector.csv", schema=table(colName, colType)) order by dateTime, securityID


// 定義輸入輸出的表結構
inputTable = table(1:0, data.schema().colDefs.name, data.schema().colDefs.typeString)
inputTableArrayVector = table(1:0, dataArrayVector.schema().colDefs.name, dataArrayVector.schema().colDefs.typeString)
resultTable1 = table(10000:0, ["securityID", "dateTime", "factor"], [SYMBOL, TIMESTAMP, DOUBLE])
resultTable2 = table(10000:0, ["securityID", "dateTime", "factor"], [SYMBOL, TIMESTAMP, DOUBLE])
resultTable3 = table(10000:0, ["securityID", "dateTime", "factor"], [SYMBOL, TIMESTAMP, DOUBLE])
resultTable4 = table(10000:0, ["securityID", "dateTime", "factor"], [SYMBOL, TIMESTAMP, DOUBLE])

// 使用 createReactiveStateEngine 建立響應式狀态引擎
// 多檔多列
try{ dropStreamEngine("reactiveDemo1")} catch(ex){ print(ex) }
metrics1 = <[dateTime, averagePress(bidPrice0, bidPrice1, bidPrice2, bidPrice3, bidPrice4, bidPrice5, bidPrice6, bidPrice7, bidPrice8, bidPrice9, bidOrderQty0, bidOrderQty1, bidOrderQty2, bidOrderQty3, bidOrderQty4, bidOrderQty5, bidOrderQty6, bidOrderQty7, bidOrderQty8, bidOrderQty9, offerPrice0, offerPrice1, offerPrice2, offerPrice3, offerPrice4, offerPrice5, offerPrice6, offerPrice7, offerPrice8, offerPrice9, offerOrderQty0, offerOrderQty1, offerOrderQty2, offerOrderQty3, offerOrderQty4, offerOrderQty5, offerOrderQty6, offerOrderQty7, offerOrderQty8, offerOrderQty9, 60)]>
rse1 = createReactiveStateEngine(name="reactiveDemo1", metrics =metrics1, dummyTable=inputTable, outputTable=resultTable1, keyColumn="securityID", keepOrder=true)
// 多檔 arrayvector
try{ dropStreamEngine("reactiveDemo2")} catch(ex){ print(ex) }
metrics2 = <[dateTime, averagePressArray(bidPrice, bidOrderQty, offerPrice, offerOrderQty, 60)]>
rse2 = createReactiveStateEngine(name="reactiveDemo2", metrics =metrics2, dummyTable=inputTableArrayVector, outputTable=resultTable2, keyColumn="securityID", keepOrder=true)
// 多檔多列 + jit
try{ dropStreamEngine("reactiveDemo3")} catch(ex){ print(ex) }
metrics3 = <[dateTime, averagePressJIT(bidPrice0, bidPrice1, bidPrice2, bidPrice3, bidPrice4, bidPrice5, bidPrice6, bidPrice7, bidPrice8, bidPrice9, bidOrderQty0, bidOrderQty1, bidOrderQty2, bidOrderQty3, bidOrderQty4, bidOrderQty5, bidOrderQty6, bidOrderQty7, bidOrderQty8, bidOrderQty9, offerPrice0, offerPrice1, offerPrice2, offerPrice3, offerPrice4, offerPrice5, offerPrice6, offerPrice7, offerPrice8, offerPrice9, offerOrderQty0, offerOrderQty1, offerOrderQty2, offerOrderQty3, offerOrderQty4, offerOrderQty5, offerOrderQty6, offerOrderQty7, offerOrderQty8, offerOrderQty9, 60)]>
rse3 = createReactiveStateEngine(name="reactiveDemo3", metrics =metrics3, dummyTable=inputTable, outputTable=resultTable3, keyColumn="securityID", keepOrder=true)
// 多檔 arrayvector + jit
try{ dropStreamEngine("reactiveDemo4")} catch(ex){ print(ex) }
metrics4 = <[dateTime, averagePressArrayJIT(bidPrice, bidOrderQty, offerPrice, offerOrderQty, 60)]>
rse4 = createReactiveStateEngine(name="reactiveDemo4", metrics =metrics4, dummyTable=inputTableArrayVector, outputTable=resultTable4, keyColumn="securityID", keepOrder=true)

// 輸入資料
timer rse1.append!(data)
timer rse2.append!(dataArrayVector)
timer rse3.append!(data)
timer rse4.append!(dataArrayVector)

// 計算結果正确性驗證
assert each(eqObj, resultTable1.factor, resultTable2.factor).all()
assert each(eqObj, resultTable1.factor, resultTable3.factor).all()
assert each(eqObj, resultTable1.factor, resultTable4.factor).all()           

附件

多檔多列與多檔 array vector 的快照見以下附件:

snapshot_100stocks_arrayvector.zip

snapshot_100stocks_multi.zip

繼續閱讀