在股票交易市場,資金流是一個重要的量價名額。資金流名額按照是否對交易訂單号進行合并計算,可以分為逐筆資金流和逐單資金流;按照統計時間,可以分為分鐘資金流和日累計資金流。其中逐筆資金流的處理邏輯比較簡單,直接對每一筆成交記錄的成交股數或者成交金額進行大小單的判斷,然後進行相關名額的計算。而逐單資金流相對複雜一些,需要先根據買賣訂單号進行合并,然後進行大小單的判斷和相關名額的計算。
關于實時計算逐單分鐘資金流的解決方案,可以參考教程:DolphinDB流計算在金融行業的應用:實時計算分鐘資金流
本教程主要提供一種基于DolphinDB流資料處理架構,實時計算日累計逐單資金流的低延時解決方案。
注意:本教程後文提到的日累計資金流都是指日累計逐單資金流。
本教程包含日累計資金流場景描述、名額實作和實時計算結果展示等内容,可通過左側目錄快速檢視相應内容。
1. 日累計資金流場景描述
1.1 實時計算日累計資金流的難點
- 日累計逐單資金流計算中的大小單是一個動态的概念,一個小單在成交量增加後可能變成一個大單。
- 日累計逐單資金流的計算過程中涉及曆史狀态,如若不能實作增量計算,當計算下午的資料時,可能需要回溯有關這筆訂單上午的資料,效率會非常低下。
- 該場景需要對每一筆成交記錄做出一次響應,計算出每隻股票截止目前成交記錄的最新日累計資金流名額,實時計算壓力較大。
- 計算涉及至少兩個階段:在第一階段需要根據訂單分組,根據訂單的累計成交量判斷大小單;在第二階段要根據股票來分組,統計每個股票的大小單數量及成交額。
- 實時流計算場景中的低延時要求。
1.2 逐筆成交資料
本教程基于上交所2020年某日的逐筆成交資料進行代碼調試,在DolphinDB中存儲的表結構為:
name | typeString | comment |
SecurityID | SYMBOL | 股票代碼 |
Market | SYMBOL | 交易所 |
TradeTime | TIMESTAMP | 交易時間 |
TradePrice | DOUBLE | 交易價格 |
TradeQty | INT | 成交量 |
TradeAmount | DOUBLE | 成交額 |
BuyNum | INT | 買單訂單号 |
SellNum | INT | 賣單訂單号 |
1.3 日累計資金流名額
本教程示例代碼計算的日累計資金流名額為:
名額名稱 | 含義 |
TotalAmount | 從開盤到目前記錄,總成交額 |
SellSmallAmount | 從開盤到目前記錄,賣方向小單的總成交額,成交股數小于等于2萬股 |
SellMediumAmount | 從開盤到目前記錄,賣方向中單的總成交額,成交股數大于2萬股、小于等于20萬股 |
SellBigAmount | 從開盤到目前記錄,賣方向大單的總成交額,成交股數大于20萬股 |
SellSmallCount | 從開盤到目前記錄,賣方向小單的總訂單數,成交股數小于等于2萬股 |
SellMediumCount | 從開盤到目前記錄,賣方向中單的總訂單數,成交股數大于2萬股、小于等于20萬股 |
SellBigCount | 從開盤到目前記錄,賣方向大單的總訂單數,成交股數大于20萬股 |
BuySmallAmount | 從開盤到目前記錄,買方向小單的總成交額,成交股數小于等于2萬股 |
BuyMediumAmount | 從開盤到目前記錄,買方向中單的總成交額,成交股數大于2萬股、小于等于20萬股 |
BuyBigAmount | 從開盤到目前記錄,買方向大單的總成交額,成交股數大于20萬股 |
BuySmallCount | 從開盤到目前記錄,買方向小單的總訂單數,成交股數小于等于2萬股 |
BuyMediumCount | 從開盤到目前記錄,買方向中單的總訂單數,成交股數大于2萬股、小于等于20萬股 |
BuyBigCount | 從開盤到目前記錄,買方向大單的總訂單數,成交股數大于20萬股 |
關于資金流大小單的劃分規則,不同的開發者會有不同的定義方法。以常用的股票行情軟體為例:
(1)東方财富
- 超級大單:>50萬股或100萬元
- 大單:10-50萬股或20-100萬元
- 中單:2-10萬股或4-20萬元
- 小單:<2萬股或4萬元
(2)新浪财經
- 特大單:>100萬元
- 大單:20-100萬元
- 小單:5-20萬元
- 散單:<5萬元
包括大智慧、同花順等,不同軟體之間的大小單區分規則都會有差異,但是判斷條件都是基于成交股數或成交金額。
注意:本教程中,資金流大小單的判斷條件基于成交股數,劃分了大單、中單、小單三種,判斷的邊界值是随機定義的,開發者必須根據自己的實際場景進行調整。
1.4 日累計資金流增量計算方案
日累計逐單資金流的增量計算包括兩個步驟。首先是計算每個買單或賣單的累計成交量,據此判斷訂單是大單,中單或小單。這一步的增量計算實作比較簡單,隻要按訂單分組,并用cumsum計算累計的成交量。在此基礎上,進一步按股票統計大小單的數量和交易金額等名額。這一步如果沒有實作增量計算,那麼每次統計大中小單的數量的耗時會越來越長,因為訂單數量在不斷的增加。事實上,如果我們能夠獲得某一訂單目前時刻的狀态(大單、中單、小單等)以及前一個時刻的狀态,第二步的增量計算就非常簡單。

處理流程圖說明:
- tradeOriginalStream是DolphinDB中的流資料表,用于接收實時資料源的資料并釋出給流計算引擎進行實時計算。
- capitalFlowStream是DolphinDB中的流資料表,用于實時接收流計算引擎的計算結果,其資料可以被外部消費者訂閱消費。
-
參數是指流計算的并行度,本教程中把逐筆成交表parallel
中的資料對tradeOriginalStream
字段(股票代碼)按照雜湊演算法,相對均勻地釋出到SecurityID
個響應式狀态引擎1實作并行計算。因為逐筆成交表的資料流量較大,且日累計逐單資金流名額的計算相對複雜,是以需要使用并行流處理。parallel
- 響應式狀态引擎1結合内置的cumsum,prev函數,增量計算目前訂單根據股票代碼和買單訂單号分組後的累計成交金額,以及目前訂單合入前後的大小單标簽、累計成交量,更詳細的計算邏輯介在第2章的代碼開發部分說明。
- 響應式狀态引擎2結合内置的cumsum,prev函數,增量計算目前訂單根據股票代碼和賣單訂單号分組後的累計成交金額,以及目前訂單合入前後的大小單标簽、累計成交量,同時保留上一步買方向的中間計算結果,更詳細的計算邏輯會在第2章的代碼開發部分說明。
- 響應式狀态引擎3結合内置的cumsum,dynamicGroupCumsum,dynamicGroupCumcount函數實作根據股票代碼合并的資金流名額的增量計算,更詳細的計算邏輯會在第2章的代碼開發部分說明。
2. 日累計資金流名額實作
本教程代碼開發工具采用DolphinDB GUI,所有代碼均可在DolphinDB GUI用戶端開發工具執行。
2.1 建立相關流資料表
def createStreamTableFunc(){
//create stream table: tradeOriginalStream
colName = `SecurityID`Market`TradeTime`TradePrice`TradeQty`TradeAmount`BuyNum`SellNum
colType = [SYMBOL, SYMBOL, TIMESTAMP, DOUBLE, INT, DOUBLE, INT, INT]
tradeOriginalStreamTemp = streamTable(20000000:0, colName, colType)
try{ enableTableShareAndPersistence(table=tradeOriginalStreamTemp, tableName="tradeOriginalStream", asynWrite=true, compress=true, cacheSize=20000000, retentionMinutes=1440, flushMode=0, preCache=10000) }
catch(ex){ print(ex) }
undef("tradeOriginalStreamTemp")
//create stream table: capitalFlow
colName = `SecurityID`TradeTime`TotalAmount`SellSmallAmount`SellMediumAmount`SellBigAmount`SellSmallCount`SellMediumCount`SellBigCount`BuySmallAmount`BuyMediumAmount`BuyBigAmount`BuySmallCount`BuyMediumCount`BuyBigCount
colType = [SYMBOL, TIMESTAMP, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, INT, INT, DOUBLE, DOUBLE, DOUBLE, INT, INT, INT]
capitalFlowStreamTemp = streamTable(20000000:0, colName, colType)
try{ enableTableShareAndPersistence(table=capitalFlowStreamTemp, tableName="capitalFlowStream", asynWrite=true, compress=true, cacheSize=20000000, retentionMinutes=1440, flushMode=0, preCache=10000) }
catch(ex){ print(ex) }
undef("capitalFlowStreamTemp")
//create stream table: capitalFlowStream60min
colName = `TradeTime`SecurityID`TotalAmount`SellSmallAmount`SellMediumAmount`SellBigAmount`SellSmallCount`SellMediumCount`SellBigCount`BuySmallAmount`BuyMediumAmount`BuyBigAmount`BuySmallCount`BuyMediumCount`BuyBigCount
colType = [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, INT, INT, DOUBLE, DOUBLE, DOUBLE, INT, INT, INT]
capitalFlowStream60minTemp = streamTable(1000000:0, colName, colType)
try{ enableTableShareAndPersistence(table=capitalFlowStream60minTemp, tableName="capitalFlowStream60min", asynWrite=true, compress=true, cacheSize=1000000, retentionMinutes=1440, flushMode=0, preCache=10000) }
catch(ex){ print(ex) }
undef("capitalFlowStreamTemp")
}
createStreamTableFunc()
go
setStreamTableFilterColumn(tradeOriginalStream, `SecurityID)
- go語句的作用是對代碼分段進行解析和執行。
- setStreamTableFilterColumn函數作用是指定流資料表的過濾列,與subscribeTable函數的
參數配合使用。本教程中的作用是把逐筆成交表中的資料對股票代碼按照雜湊演算法,相對均勻地釋出到不同的流處理線程消費,實作并行計算的目的。filter
2.2 定義資金流大小單判斷的函數
/*
* Label small, medium and large order
* small : 0
* medium : 1
* large : 2
*/
@state
def tagFunc(qty){
return iif(qty <= 20000, 0, iif(qty <= 200000 and qty > 20000, 1, 2))
}
- 成交股數小于等于2萬股的訂單标記為小單,标簽為0;成交股數大于2萬股、小于等于20萬股的訂單标記為中單,标簽為1;成交股數大于20萬股的訂單标記為大單,标簽為2。本教程中,資金流大小單的判斷條件基于成交股數,劃分了大單、中單、小單三種,判斷的邊界值是随機定義的,開發者必須根據自己的實際場景進行調整。
- 該函數将在響應式狀态引擎中使用,是以需要用 @state 表示函數是自定義的狀态函數。
2.3 根據股票和買單訂單号合并的增量計算
def processBuyOrderFunc(parallel){
metricsBuy = [
<TradeTime>,
<SellNum>,
<TradeAmount>,
<TradeQty>,
<cumsum(TradeAmount)>,
<tagFunc(cumsum(TradeQty))>,
<prev(cumsum(TradeAmount))>,
<prev(tagFunc(cumsum(TradeQty)))>]
for(i in 1..parallel){
createReactiveStateEngine(name="processBuyOrder"+string(i), metrics=metricsBuy, dummyTable=tradeOriginalStream, outputTable=getStreamEngine("processSellOrder"+string(i)), keyColumn=`SecurityID`BuyNum, keepOrder=true)
subscribeTable(tableName="tradeOriginalStream", actionName="processBuyOrder"+string(i), offset=-1, handler=getStreamEngine("processBuyOrder"+string(i)), msgAsTable=true, hash=i, filter=(parallel, i-1))
}
}
-
參數是指流計算的并行度,上述代碼中是把逐筆成交表parallel
中的資料對股票代碼按照雜湊演算法,相對均勻地釋出到tradeOriginalStream
個響應式狀态引擎1實作并行計算。這些響應式狀态引擎1的計算邏輯相同,但是處理的股票不同。parallel
- 上述代碼中通過DolphinDB的響應式狀态引擎和内置的cumsum,prev函數實作流式增量計算,分組字段為
和SecurityID
,即股票代碼和買單訂單号。BuyNum
-
中的内容為響應式狀态引擎中以元代碼形式表示的計算公式:metricsBuy
metricsBuy = [
<TradeTime>,
<SellNum>,
<TradeAmount>,
<TradeQty>,
<cumsum(TradeAmount)>,
<tagFunc(cumsum(TradeQty))>,
<prev(cumsum(TradeAmount))>,
<prev(tagFunc(cumsum(TradeQty)))>]
<TradeTime>
,
<SellNum>
,
<TradeAmount>
,
<TradeQty>
是無狀态的計算,作用是保留原始表中這些字段的原始資訊,輸入給下一層的響應式狀态引擎計算使用。
<cumsum(TradeAmount)>
,
<tagFunc(cumsum(TradeQty))>
,
<prev(cumsum(TradeAmount))>
,
<prev(tagFunc(cumsum(TradeQty)))>
是有狀态的計算,分别計算了每一條成交記錄所代表的股票按照此記錄的買單訂單号合并後的累計成交金額、目前成交記錄合入後根據累計成交量判斷的大小單标簽、目前成交記錄合入前的累計成交金額、目前成交記錄合入前根據累計成交量判斷的大小單标簽,作用是作為第三層響應式狀态引擎中的dynamicGroupCumsum, dynamicGroupCumcount函數的輸入,增量計算買方向的資金流名額。這些有狀态因子的計算都是通過流式增量計算的方法實作的。
為了友善開發者快速了解這塊代碼的計算邏輯,下面我們輸入一些樣本資料來觀察第一層響應式狀态引擎的運作:
- 逐筆成交表
中寫入5條資料tradeOriginalStream
- 經過第一層響應式狀态引擎的處理後,輸出為
上述代碼對股票代碼為
60000
的逐筆成交資料按照買單訂單号
69792
進行合并計算,在響應式狀态引擎中對每一筆輸入都會進行一次響應計算,是以輸出結果的條數和輸入記錄的條數相等。結果表中的
TotalBuyAmount
,
BuyOrderFlag
,
PrevTotalBuyAmount
,
PrevBuyOrderFlag
分别代表每一條成交記錄所代表的股票按照此記錄的買單訂單号合并後的累計成交金額、目前成交記錄合入後根據累計成交量判斷的大小單标簽、目前成交記錄合入前的累計成交金額、目前成交記錄合入前根據累計成交量判斷的大小單标簽,這些有狀态因子的計算都是通過流式增量計算的方法實作的。
2.4 根據股票和賣單訂單号合并的增量計算
def processSellOrderFunc(parallel){
colName = `SecurityID`BuyNum`TradeTime`SellNum`TradeAmount`TradeQty`TotalBuyAmount`BuyOrderFlag`PrevTotalBuyAmount`PrevBuyOrderFlag
colType = [SYMBOL, INT, TIMESTAMP, INT, DOUBLE, INT, DOUBLE, INT, DOUBLE, INT]
processBuyOrder = table(1:0, colName, colType)
metricsSell = [
<TradeTime>,
<TradeAmount>,
<cumsum(TradeAmount)>,
<tagFunc(cumsum(TradeQty))>,
<prev(cumsum(TradeAmount))>,
<prev(tagFunc(cumsum(TradeQty)))>,
<BuyNum>,
<TotalBuyAmount>,
<BuyOrderFlag>,
<PrevTotalBuyAmount>,
<PrevBuyOrderFlag>]
for(i in 1..parallel){
createReactiveStateEngine(name="processSellOrder"+string(i), metrics=metricsSell, dummyTable=processBuyOrder, outputTable=getStreamEngine("processCapitalFlow"+string(i)), keyColumn=`SecurityID`SellNum, keepOrder=true)
}
}
-
參數是指流計算的并行度,上述代碼中是建立了parallel
個響應式狀态引擎2,這些響應式狀态引擎2的輸入是對應的parallel
個響應式狀态引擎1的輸出,實作并行計算。這些響應式狀态引擎2的計算邏輯相同,但是處理的股票不同。parallel
- 上述代碼中通過DolphinDB的響應式狀态引擎和内置的cumsum,prev函數實作流式增量計算,分組字段為
和SecurityID
,即股票代碼和賣單訂單号。SellNum
-
中的内容為響應式狀态引擎中以元代碼形式表示的計算公式:metricsSell
metricsSell = [
<TradeTime>,
<TradeAmount>,
<cumsum(TradeAmount)>,
<tagFunc(cumsum(TradeQty))>,
<prev(cumsum(TradeAmount))>,
<prev(tagFunc(cumsum(TradeQty)))>,
<BuyNum>,
<TotalBuyAmount>,
<BuyOrderFlag>,
<PrevTotalBuyAmount>,
<PrevBuyOrderFlag>]
<TradeTime>
,
<TradeAmount>
,
<BuyNum>
,
<TotalBuyAmount>
,
<BuyOrderFlag>
,
<PrevTotalBuyAmount>
,
<PrevBuyOrderFlag>
是無狀态的計算,作用是保留原始表中這些字段的原始資訊,輸入給下一層的響應式狀态引擎計算使用。
<cumsum(TradeAmount)>
,
<tagFunc(cumsum(TradeQty))>
,
<prev(cumsum(TradeAmount))>
,
<prev(tagFunc(cumsum(TradeQty)))>
是有狀态的計算,分别計算了每一條成交記錄所代表的股票按照此記錄的賣單訂單号合并後的累計成交金額、目前成交記錄合入後根據累計成交量判斷的大小單标簽、目前成交記錄合入前的累計成交金額、目前成交記錄合入前根據累計成交量判斷的大小單标簽,作用是作為第三層響應式狀态引擎中的dynamicGroupCumsum, dynamicGroupCumcount函數的輸入,增量計算賣方向的資金流名額。這些有狀态因子的計算都是通過流式增量計算的方法實作的。
為了友善開發者快速了解這塊代碼的計算邏輯,下面我們輸入一些樣本資料來觀察第二層響應式狀态引擎的運作:
- 第二層響應式狀态引擎的輸入為
- 經過第二層響應式狀态引擎的處理後,輸出為
上述代碼對股票代碼為
60000
的逐筆成交資料按照賣單訂單号
38446
,
70031
,
143303
,
155394
,
38433
進行合并計算,在響應式狀态引擎中對每一筆輸入都會進行一次響應計算,是以輸出結果的條數和輸入記錄的條數相等。結果表中的
TotalSellAmount
,
SellOrderFlag
,
PrevTotalSellAmount
,
PrevSellOrderFlag
分别代表每一條成交記錄所代表的股票按照此記錄的賣單訂單号合并後的累計成交金額、目前成交記錄合入後根據累計成交量判斷的大小單标簽、目前成交記錄合入前的累計成交金額、目前成交記錄合入前根據累計成交量判斷的大小單标簽,這些有狀态因子的計算都是通過流式增量計算的方法實作的。
2.5 根據股票合并的資金流名額的增量計算
def processCapitalFlowFunc(parallel){
colName = `SecurityID`SellNum`TradeTime`TradeAmount`TotalSellAmount`SellOrderFlag`PrevTotalSellAmount`PrevSellOrderFlag`BuyNum`TotalBuyAmount`BuyOrderFlag`PrevTotalBuyAmount`PrevBuyOrderFlag
colType = [SYMBOL, INT, TIMESTAMP, DOUBLE, DOUBLE, INT, DOUBLE, INT, INT, DOUBLE, INT, DOUBLE, INT]
processSellOrder = table(1:0, colName, colType)
metrics1 = <dynamicGroupCumsum(TotalSellAmount, PrevTotalSellAmount, SellOrderFlag, PrevSellOrderFlag, 3)>
metrics2 = <dynamicGroupCumcount(SellOrderFlag, PrevSellOrderFlag, 3)>
metrics3 = <dynamicGroupCumsum(TotalBuyAmount, PrevTotalBuyAmount, BuyOrderFlag, PrevBuyOrderFlag, 3)>
metrics4 = <dynamicGroupCumcount(BuyOrderFlag, PrevBuyOrderFlag, 3)>
for(i in 1..parallel){
createReactiveStateEngine(name="processCapitalFlow"+string(i), metrics=[<TradeTime>, <cumsum(TradeAmount)>, metrics1, metrics2, metrics3, metrics4], dummyTable=processSellOrder, outputTable=capitalFlowStream, keyColumn=`SecurityID, keepOrder=true)
}
}
-
參數是指流計算的并行度,上述代碼中是建立了parallel
個響應式狀态引擎3,這些響應式狀态引擎3的輸入是對應的parallel
個響應式狀态引擎2的輸出,實作并行計算。這些響應式狀态引擎3的計算邏輯相同,但是處理的股票不同。parallel
- 上述代碼中通過DolphinDB的響應式狀态引擎和内置的cumsum,dynamicGroupCumsum,dynamicGroupCumcount函數實作流式增量計算,分組字段為
,即股票代碼。SecurityID
-
中的内容為響應式狀态引擎中以元代碼形式表示的計算公式:metrics
metrics1 = <dynamicGroupCumsum(TotalSellAmount, PrevTotalSellAmount, SellOrderFlag, PrevSellOrderFlag, 3)>
metrics2 = <dynamicGroupCumcount(SellOrderFlag, PrevSellOrderFlag, 3)>
metrics3 = <dynamicGroupCumsum(TotalBuyAmount, PrevTotalBuyAmount, BuyOrderFlag, PrevBuyOrderFlag, 3)>
metrics4 = <dynamicGroupCumcount(BuyOrderFlag, PrevBuyOrderFlag, 3)>
metrics = [<TradeTime>, <cumsum(TradeAmount)>, metrics1, metrics2, metrics3, metrics4]
<TradeTime>
是無狀态的計算,作用是保留每一條計算結果的原始時間資訊。
<cumsum(TradeAmount)>
是有狀态的計算,表示從開盤到目前記錄,該隻股票的總成交額。
metrics1
中的
<dynamicGroupCumsum(TotalSellAmount, PrevTotalSellAmount, SellOrderFlag, PrevSellOrderFlag, 3)>
是有狀态的計算,輸入是目前成交記錄所代表的股票按照此記錄的賣單訂單号合并後的累計成交金額、目前成交記錄合入前的累計成交金額、目前成交記錄合入後根據累計成交量判斷的大小單标簽、目前成交記錄合入前根據累計成交量判斷的大小單标簽、大小單标簽數量,輸出是表示從開盤到目前記錄,該隻股票的賣方向小單的總成交額、賣方向中單的總成交額、賣方向大單的總成交額。
metrics2
中的
<dynamicGroupCumcount(SellOrderFlag, PrevSellOrderFlag, 3)>
是有狀态的計算,輸入是目前成交記錄所代表的股票按照此記錄的賣單訂單号合并後根據累計成交量判斷的大小單标簽、目前成交記錄合入前根據累計成交量判斷的大小單标簽、大小單标簽數量,輸出是表示從開盤到目前記錄,該隻股票的賣方向小單的總訂單數、賣方向中單的總訂單數、賣方向大單的總訂單數。
metrics3
和
metrics4
也都是有狀态的計算,表示買方向的資金流名額,與賣方向的計算邏輯相似,不在展開闡述。這些有狀态因子的計算都是通過流式增量計算的方法實作的。
為了友善開發者快速了解這塊代碼的計算邏輯,下面我們輸入一些樣本資料來觀察第三層響應式狀态引擎的運作:
- 第三層響應式狀态引擎的輸入為
- 經過第三層響應式狀态引擎的處理後,輸出為
上圖為股票代碼為
60000
的日累計逐單資金流名額計算結果。
在響應式狀态引擎中對每一筆輸入都會進行一次響應計算,是以輸出結果的條數和輸入記錄的條數相等。結果表中的
TotalAmount
表示從開盤到目前記錄,該隻股票的總成交額,計算表達式是
<cumsum(TradeAmount)>
,輸入是每一筆交易的成交額,是通過響應式狀态引擎和cumsum累計求和函數實作流式增量計算的。
結果表中的
SellSmallAmount
,
SellMediumAmount
,
SellBigAmount
表示從開盤到目前記錄,該隻股票的賣方向小單的總成交額、賣方向中單的總成交額、賣方向大單的總成交額,計算表達式是
<dynamicGroupCumsum(TotalSellAmount, PrevTotalSellAmount, SellOrderFlag, PrevSellOrderFlag, 3)>
,輸入是目前成交記錄所代表的股票按照此記錄的賣單訂單号合并後的累計成交金額、目前成交記錄合入前的累計成交金額、目前成交記錄合入後根據累計成交量判斷的大小單标簽、目前成交記錄合入前根據累計成交量判斷的大小單标簽和大小單标簽數量,是通過響應式狀态引擎和dynamicGroupCumsum函數實作流式增量計算的,在日累計資金流實時計算場景中,随着交易量的不斷增加,某個訂單的類别可能從一個小單變成大單,此時需要從小單累計統計量中減去該筆訂單已經累計的值,并在大單累計統計量中加上該筆訂單的最新累計值,dynamicGroupCumsum函數即可應用在這類場景下。結果表中的
SellSmallCount
,
SellMediumCount
,
SellBigCount
表示從開盤到目前記錄,該隻股票的賣方向小單的總訂單數、賣方向中單的總訂單數、賣方向大單的總訂單數,計算表達式是
<dynamicGroupCumcount(SellOrderFlag, PrevSellOrderFlag, 3)>
,輸入是是目前成交記錄所代表的股票按照此記錄的賣單訂單号合并後根據累計成交量判斷的大小單标簽、目前成交記錄合入前根據累計成交量判斷的大小單标簽和大小單标簽數量,是通過響應式狀态引擎和dynamicGroupCumcount函數實作流式增量計算的,在日累計資金流實時計算場景中,随着交易量的不斷增加,某個訂單的類别可能從一個小單變成大單,此時需要從小單累計統計量中減1,并在大單累計統計量中加1,dynamicGroupCumcount函數即可應用在這類場景下。
結果表中的
BuySmallAmount
,
BuyMediumAmount
,
BuyBigAmount
,
BuySmallCount
,
BuyMediumCount
,
BuyBigCount
表示買方向的日累計資金流名額,與賣方向的計算邏輯相似,不在展開闡述,也是通過響應式狀态引擎和dynamicGroupCumsum, dynamicGroupCumcount函數實作流式增量計算的。
2.6 固定頻率往外推送計算結果
def processCapitalFlow60minFunc(){
aggrMetrics = <[
last(TotalAmount),
last(SellSmallAmount),
last(SellMediumAmount),
last(SellBigAmount),
last(SellSmallCount),
last(SellMediumCount),
last(SellBigCount),
last(BuySmallAmount),
last(BuyMediumAmount),
last(BuyBigAmount),
last(BuySmallCount),
last(BuyMediumCount),
last(BuyBigCount)]>
createDailyTimeSeriesEngine(name="processCapitalFlow60min", windowSize=60000*60, step=60000*60, metrics=aggrMetrics, dummyTable=capitalFlowStream, outputTable=capitalFlowStream60min, timeColumn="TradeTime", useSystemTime=false, keyColumn=`SecurityID, useWindowStartTime=false)
subscribeTable(tableName="capitalFlowStream", actionName="processCapitalFlow60min", offset=-1, handler=getStreamEngine("processCapitalFlow60min"), msgAsTable=true, batchSize=10000, throttle=1, hash=0)
}
- 分組字段為
,即股票代碼。SecurityID
- 通過DolphinDB的時間序列聚合引擎對資金流名額結果表做實時的60分鐘滾動視窗計算,聚合函數為
。last
- 實時計算的資料源為日累計資金流結果表capitalFlowStream,雖然該表的資料流量較大(和原始逐筆成交表的資料流量一樣大),但是由于是做簡單的60分鐘滾動名額計算,是以隻需要單線程處理,不需要使用并行流處理。
2.7 注冊訂閱引擎和訂閱流資料表
parallel = 3
processCapitalFlowFunc(parallel)
go
processSellOrderFunc(parallel)
go
processBuyOrderFunc(parallel)
processCapitalFlow60minFunc()
-
參數是指流計算的并行度。parallel
- 本教程設定
,表示資金流計算的并行度為3,能夠支撐的上遊逐筆交易資料的最大流量為5萬條每秒。2022年1月某日,滬深兩市全市場股票,在09:30:00開盤時候的逐筆交易資料流量峰值可以達到4.2萬筆每秒,是以生産環境部署的時候,為了避免因流量高峰時流處理堆積造成延時增加的現象,可以将parallel=3
設定為3,提高系統實時計算的最大負載。parallel
2.8 曆史資料回放
//replay history data
t = select * from loadTable("dfs://trade", "trade") where time(TradeTime) between 09:30:00.000 : 15:00:00.000 order by TradeTime, SecurityID
submitJob("replay_trade", "trade", replay{t, tradeOriginalStream, `TradeTime, `TradeTime, 50000, true, 1})
getRecentJobs()
執行完後,傳回如下資訊:
如果endTime和errorMsg為空,說明任務正在正常運作中。
3. 日累計資金流實時計算結果展示
3.1 節點内的計算結果表
計算結果表
capitalFlowStream
,可以通過DolphinDB所有API查詢接口實時查詢,通過DolphinDB GUI實時檢視該表的結果,傳回:
3.2 固定頻率往外推送計算結果
計算結果表
capitalFlowStream
的資料量和逐筆成交資料量是一樣的,對每一筆成交記錄做一次響應。
如果開發者需要定時取一次截面上每一隻股票的最新日累計資金流名額,可以通過DolphinDB内置的時間序列計算引擎,在滾動視窗内取每一隻股票的最後一條計算結果即可。本教程中對計算結果表
capitalFlowStream
進行了實時滾動視窗的計算,視窗大小是60分鐘,計算結果存儲在
capitalFlowStream60min
流資料表中,資料内容如下圖所示:
3.3 Grafana實時監控結果
4. 性能測試
本教程測試了單次響應計算和連續響應計算兩種場景。測試資料為上交所2020年某天1558隻股票的逐筆成交資料。
4.1 單次響應計算性能測試
本教程使用了3個響應式狀态引擎串聯的流水線處理,單次響應計算時間為從第1個引擎收到輸入至第3個引擎輸出結果所經曆的時間,測試了單隻股票響應計算一次和1558隻股票響應計算一次的性能。統計了10次的總耗時,取平均值作為單次的耗時。測試使用的伺服器CPU為Intel(R) Xeon(R) Silver 4216 CPU @ 2.10GHz。單線程情況下,測試結果如下:
股票個數 | 耗時(機關:ms) |
1 | 0.18 |
1558 | 2.09 |
4.2 連續響應計算性能測試
本教程使用了3個響應式狀态引擎串聯的流水線處理,計算的并行度為3,能夠支撐的上遊逐筆交易資料的最大流量為5萬條每秒。以上交所2020年某天1558隻股票的1632萬條逐筆成交資料為測試資料,通過DolphinDB的replay回放工具,把曆史資料以流資料的方式注入到流計算最上遊的流資料表tradeOriginalStream,回放速度是全速,計算總耗時是326秒,處理的能力是5萬條每秒。開發者可以增加計算的并行度,提高系統的處理能力。
5. 總結
DolphinDB内置的流資料架構支援流資料的釋出,訂閱,預處理,實時記憶體計算,複雜名額的滾動視窗計算、滑動視窗計算、累計視窗計算等,是一個運作高效、使用便捷的流資料處理架構。
本教程基于DolphinDB流資料處理架構,提供了一種實時計算日累計逐單資金流的低延時解決方案,旨在提高開發人員在使用 DolphinDB 内置的流資料架構開發流計算業務場景時的開發效率、降低開發難度,更好地挖掘 DolphinDB 在複雜實時流計算場景中的價值。
附件
業務代碼
01.清理環境并建立相關流資料表
02.注冊流計算引擎和訂閱流資料表
- CPU 類型:Intel(R) Xeon(R) Silver 4216 CPU @ 2.10GHz
- 邏輯 CPU 總數:8
- 記憶體:64GB
- OS:64位 CentOS Linux 7 (Core)
- 磁盤:SSD 盤,最大讀寫速率為 520MB/s
- server 版本:1.30.18,2.00.6