天天看點

如何實時計算中證1000指數的主買/主賣交易量

主買是指以賣方的報價成交,主賣是指以買方的報價成交。 一般來說,主動買入就是資金流入,主動賣出就是資金流出,是以實時統計主買/主賣交易量能夠實時監控資金的流入流出情況。本文基于中證 1000 指數,介紹如何利用 DolphinDB 流資料處理架構,實時高效計算中證1000指數的主買/主賣交易量。

本文包含場景概述、實作思路、實時計算主買/主賣交易量、結果展示等部分。

1. 場景概述

本文介紹如何使用 DolphinDB 計算中證1000成分股分鐘級主買/主賣交易量。計算公式如下:

如何實時計算中證1000指數的主買/主賣交易量

2. 實作思路

在實際生産環境中,資料往往是以“流”的形式實時注入到資料表中,如何根據資料流實時計算并響應結果是業務面臨的重要問題。針對此類問題,DolphinDB 開發了一套完善的流資料的訂閱釋出機制和多種流計算引擎,為多樣化的實時場景提供了靈活的解決方案。

本文通過行情回放(函數 replay)模拟實時資料流,并通過流資料引擎的級聯建構計算模型,以實作每分鐘輸出中證1000實時主買/主賣的交易量的需求。

整體計算流程如下圖所示:

如何實時計算中證1000指數的主買/主賣交易量

本文涉及到的流資料引擎有:​​時間序列引擎​​,​​響應式狀态引擎​​,​​橫截面引擎​​。

流程說明:

  • 使用行情回放功能模拟注入資料到流表;
  • 時間序列引擎訂閱流表資料,并計算每隻股票每分鐘的主買/主賣成交金額(交易價格 * 交易量);
  • 用響應式狀态引擎進一步計算每隻股票的累計主買/主賣成交金額;
  • 用橫截面引擎計算 1000 隻股票的累計主買/主賣成交金額的權重和,并輸出結果。
各引擎通過級聯進行連接配接,無需通過中間表。詳情可參考:​​使用者手冊流資料引擎主題頁​​。

3. 資料說明

3.1 資料源

深交所和上交所 ​​Level2 逐筆成交資料​​表結構如下:

列名 類型 說明
SecurityID SYMBOL 股票代碼
TradeTime TIMESTAMP 交易時間
TradePrice DOUBLE 成交價格
TradeQty INT 交易量
TradeBSFlag SYMBOL 成交方向,0 表示主買,1 表示主賣
注意:實際的逐筆成交資料包含的其他字段與本文計算無關,故忽略不列出。

本文采用的資料根據中證1000成分股清單模拟生成,僅從資料源中選取 2022.09.01 一天的中證1000成分股的逐筆成交記錄,且僅選取本文計算涉及的字段。其中字段TradeBSFlag表示該筆交易為主買還是主賣。資料量約為 2000 萬條。

通過下述腳本将附件資料導入記憶體。有關資料導入的相關詳細說明請參見​​資料導入教程​​。

dataFilePath = "/your/path/to/data.csv"
schemaTb = extractTextSchema(dataFilePath)
update schemaTb set type=`SYMBOL where name=`TradeBSFlag
data = loadText(dataFilePath,, schemaTb)      
如何實時計算中證1000指數的主買/主賣交易量

3.2 成分股權重因子的建立

實際生産中,成分股的權重每天可能會變化。本教程為簡便起見,将中證1000中成分股的權重定義為1/1000,這個假設會影響計算結果,但不影響教程中給出的方法和代碼。用自定義的 ​

​createWeightDict()​

​ 函數建立一個字典儲存各股票的權重因子。

def createWeightDict(constituentCsvPath){
  return dict(loadText(constituentCsvPath).SecurityID, take(0.001, 1000))
}      
如何實時計算中證1000指數的主買/主賣交易量

4. 實時計算主買/主賣交易量

本章将按第 2 章節的實作思路,分步介紹如何實時計算主買/主賣交易量。

4.1 曆史行情回放

首先通過回放曆史資料來模拟實時行情資料的注入。在 DolphinDB 内,隻需要調用内置的 ​​replay​​

  • 建立一張共享流資料表 trade,用于流資料的接收和釋出
tradeTemp = streamTable(2000:0, `TradeTime`SecurityID`TradePrice`TradeQty`TradeBSFlag, [TIMESTAMP, SYMBOL,DOUBLE,DOUBLE,SYMBOL])
enableTableShareAndPersistence(table=tradeTemp, tableName="trade", asynWrite=true, compress=true, cacheSize=20000000, retentionMinutes=1440, flushMode=0, preCache=10000)       
如何實時計算中證1000指數的主買/主賣交易量
  • 回放逐筆成交資料。這裡送出了一個作業用于異步進行回放任務,回放模式為極速回放
filteredData = select TradeTime, SecurityID, TradePrice, TradeQty, TradeBSFlag from data where SecurityID in weightDict order by TradeTime
submitJob("replayJob", "replay at the maximum speed", replay{filteredData, objByName("trade")})      
如何實時計算中證1000指數的主買/主賣交易量

上述代碼将中證1000成分股的逐筆成交資料回放到流資料表 trade 中,可通過下述語句檢視前 10 條資料:

select top 10 * from trade      
如何實時計算中證1000指數的主買/主賣交易量

trade 表的資料将作為資料源注入 4.2 節建立的時間序列引擎 ​

​tsEngine​

​ 中。

4.2 計算每個股票每分鐘的成交金額

計算的第一步是使用時間序列引擎算每個股票每個鐘累計的交易量。步驟如下:

  • 建立​​時間序列引擎​​
tsEngine = createTimeSeriesEngine(name="tsEngine", windowSize=60000, step=60000, metrics=<[sum(iif(TradeBSFlag=="0", 1, 0)*TradeQty*TradePrice),  sum(iif(TradeBSFlag=="1", 1, 0)*TradeQty*TradePrice)]>, dummyTable=objByName("trade"), outputTable=rsEngine, timeColumn=`TradeTime, keyColumn=`SecurityID, useWindowStartTime=true,  fill=[0, 0], forceTriggerTime=100)      
如何實時計算中證1000指數的主買/主賣交易量

metrics 參數中計算相關的元代碼使用 iif 函數來計算主買或主買的交易量。fill 設為 [0, 0] 表示若個别股票 1 分鐘之内沒有任何資料,則填充每分鐘主買/主買的交易量均為 0 的記錄,用于觸發 4.4 小節建立的橫截面引擎計算。forceTriggerTime 設為 100,表示新的視窗開始之後的100 ms以内,如果某隻股票資料到達,則僅該股票觸發計算;新的視窗開始後的100 ms以後,任意一隻股票資料到達,之前所有未觸發計算的股票,全部強制觸發計算。這樣的設定的目的是防止某些股票的資料較為稀疏導緻其視窗關閉過慢,進而影響後續計算延遲過高。在實盤中,也有不少機構設定 useSystemTime = true 用系統時間來觸發,即按系統時間每 1 分鐘觸發計算 1 次。outputTable 參數指定為 4.3 小節建立的 響應式狀态引擎 ​

​rsEngine​

​ 中以實作引擎級聯。具體參數的設定請根據實際情況參考使用者手冊設定。

  • 建立​​流資料訂閱​​,将資料灌入時間序列引擎中
subscribeTable(tableName="trade", actionName="act_tsEngine", offset=0, handler=append!{tsEngine}, msgAsTable=true, batchSize=10000, throttle=0.001)      
如何實時計算中證1000指數的主買/主賣交易量

訂閱共享流資料表 trade,并指定 handler 參數為時間序列引擎 ​

​tsEngine​

​,以将訂閱資料注入引擎。

4.3 累加每隻股票每分鐘的累加成交金額

計算的第二步是使用響應式狀态引擎計算每個股票累計的交易量。步驟如下:

  • 建立一張記憶體表,為響應式狀态引擎提供輸入的表結構
tsEngineDummy = table(2000:0, `TradeTime`SecurityID`SellTradeAmount`BuyTradeAmount, [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE])      
如何實時計算中證1000指數的主買/主賣交易量
  • 建立​​響應式狀态引擎​​
rsEngine = createReactiveStateEngine(name="rsEngine", metrics=<[cummax(TradeTime), cumsum(SellTradeAmount), cumsum(BuyTradeAmount)]>, dummyTable=tsEngineDummy, outputTable=csEngine, keyColumn=`SecurityID)      
如何實時計算中證1000指數的主買/主賣交易量

metrics 中使用函數 cumsum 來計算每隻股票主買/主賣按分鐘的累加成交金額。cumsum 采用了增量計算方法,性能優于全量計算的方式。outputTable 參數指定為 4.4 小節建立的橫截面引擎 ​

​csEngine​

​ 中以實作引擎級聯。

4.4 計算1000隻股票的累加成交金額

計算的第二步使用橫截面引擎計算權重平均的累計交易量。步驟如下:

本例中橫截面引擎的觸發機制為每插入一條觸發一次計算( triggeringPattern="perRow”),是以同一時間戳下,每隻股票的輸入都會産生一條 wsum 的計算結果。是以這裡使用鍵值記憶體表儲存同一時間戳下,1000隻股票的最終計算結果。

  • 建立一張記憶體表為橫截面引擎提供輸入的表結構
rsEngineDummy = table(1:0, `SecurityID`TradeTime`SellTradeAmount`BuyTradeAmount, [SYMBOL, TIMESTAMP, DOUBLE, DOUBLE])      
如何實時計算中證1000指數的主買/主賣交易量
  • 建立一張鍵值記憶體表 tradeAmountIndex,用于儲存1000個股票的成交金額權重求和的結果
share(keyedTable(`TradeTime, 2000:0, `TradeTime`SellTradeAmount`BuyTradeAmount, [TIMESTAMP, DOUBLE, DOUBLE]), "tradeAmountIndex")      
如何實時計算中證1000指數的主買/主賣交易量
  • 建立​​橫截面引擎​​
csEngine = createCrossSectionalEngine(name="csEngine", metrics=<[wsum(SellTradeAmount, weightDict[SecurityID]), wsum(BuyTradeAmount, weightDict[SecurityID]), now()]>, dummyTable=rsEngineOutput, outputTable=objByName("tradeAmountIndex"), keyColumn=`SecurityID, triggeringPattern="perRow", useSystemTime=false, timeColumn=`TradeTime, lastBatchOnly=false)      
如何實時計算中證1000指數的主買/主賣交易量

輸出結果為每分鐘一條記錄,每條記錄包含2個名額 BuyTradeAmount 和 SellTradeAmount ,即累計主買成交金額和累計主賣成交金額。計算結果存儲在以時間戳為鍵的鍵值記憶體表 tradeAmountIndex 中。

設定 triggeringPattern="perRow" 表示每插入一條逐筆成交記錄就實時更新結果,該操作在資料量較大時性能較差。如果對實時性的要求不高,可以設定為每分鐘批量計算更新一次結果以換取更好的性能,隻需要指定 ​​triggeringPattern​​​="keyCount", ​​triggeringInterval​​=1000 即可。該參數配置在本案例中的含義為,隻有目前分鐘橫截面引擎的記錄數達到 1000 條或者下一分鐘的記錄到來,才會觸發目前分鐘的中證1000所有股票的累加成交金額的計算。

5. 結果展示

5.1 節點内的計算結果表

計算的最終結果儲存在鍵值記憶體表 tradeAmountIndex 中,可通過 DolphinDB 所有 API 随時查詢,以通過 DolphinDB GUI 查詢最新10條的中證1000主買/主賣的交易量計算結果為例:

select top 10 Time, SellTradeAmount, BuyTradeAmount from tradeAmountIndex where UpdateTime >=  datetimeAdd(now(),-10s)      
如何實時計算中證1000指數的主買/主賣交易量
如何實時計算中證1000指數的主買/主賣交易量

5.2 Grafana 實時監控結果

Grafana 配置 DolphinDB 資料源及監控 DolphinDB 資料表中資料的教程:​​Grafana連接配接DolphinDB資料源​​

Grafana 中的 Query 代碼:

select Time, SellTradeAmount, BuyTradeAmount, UpdateTime from tradeAmountIndex      
如何實時計算中證1000指數的主買/主賣交易量
如何實時計算中證1000指數的主買/主賣交易量

6. 總結

7. 附件

  • ​​demo.dos​​
  • ​​data.csv​​
  • ​​constituent.csv​​

繼續閱讀