天天看點

分鐘頻、曆史值…實時關聯分析操作指南

作者:DolphinDB

前兩期中,我們為大家展示了如何用 Window Join、Asof Join 引擎将逐筆成交資料與快照資料進行關聯分析,以及使用 Left Semi Join 補充原始委托資訊,這些都是金融中常見的應用場景,感興趣的小夥伴可以閱讀往期内容。

本期,我們将為大家介紹剩下的三個場景:

- 實時計算股票與某指數的分鐘收益率相關性;

- 對多個資料源降頻采樣,計算分鐘名額并将結果關聯到同一張表中;

- 根據快照資料實時比對股票曆史日頻名額。

Equal Join 不同資料源的分鐘名額實時合并

在量化實盤中,往往需要對原始的行情快照、逐筆成交資料進行降采樣處理,以得到分鐘頻名額,作為政策研發的輸入資料,這就要求将多個不同資料源計算出的名額關聯到同一張表中。

本例将對快照和成交資料分别做實時的 1 分鐘聚合,并将計算所得名額關聯後輸出到同一張寬表中。

這個場景的特征是,每支股票的快照和逐筆分鐘名額在每一分鐘隻有一條記錄,具有唯一性,并且在某一分鐘的輸出上,期望總是在兩類名額都計算完成後再關聯輸出。

用 Equal Join 引擎實作此場景的腳本如下:

// create table
share streamTable(1:0, `Sym`TradeTime`Side`TradeQty, [SYMBOL, TIME, INT, LONG]) as trades
share streamTable(1:0, `UpdateTime`Sym`BuyTradeQty`SellTradeQty, [TIME, SYMBOL, LONG, LONG]) as tradesMin
share streamTable(1:0, `Sym`Time`Bid1Price`Bid1Qty, [SYMBOL, TIME, DOUBLE, LONG]) as snapshot
share streamTable(1:0, `UpdateTime`Sym`AvgBid1Amt, [TIME, SYMBOL, DOUBLE]) as snapshotMin
share streamTable(1:0, `UpdateTime`Sym`AvgBid1Amt`BuyTradeQty`SellTradeQty, [TIME, SYMBOL, DOUBLE, LONG, LONG]) as output


// create engine: 
eqJoinEngine = createEqualJoinEngine(name="EqualJoin", leftTable=tradesMin, rightTable=snapshotMin, outputTable=output, metrics=<[AvgBid1Amt, BuyTradeQty, SellTradeQty]>, matchingColumn=`Sym, timeColumn=`UpdateTime)
// create engine: 
tsEngine1 = createTimeSeriesEngine(name="tradesAggr", windowSize=60000, step=60000, metrics=<[sum(iif(Side==1, 0, TradeQty)), sum(iif(Side==2, 0, TradeQty))]>, dummyTable=trades, outputTable=getLeftStream(eqJoinEngine), timeColumn=`TradeTime, keyColumn=`Sym, useSystemTime=false, fill=(0, 0))
// create engine: 
tsEngine2 = createTimeSeriesEngine(name="snapshotAggr", windowSize=60000, step=60000, metrics=<[avg(iif(Bid1Price!=NULL, Bid1Price*Bid1Qty, 0))]>, dummyTable=snapshot, outputTable=getRightStream(eqJoinEngine), timeColumn=`Time, keyColumn=`Sym, useSystemTime=false, fill=(0.0))


// subscribe topic
subscribeTable(tableName="trades", actionName="minAggr", handler=tsEngine1, msgAsTable=true, offset=-1, hash=1)
subscribeTable(tableName="snapshot", actionName="minAggr", handler=tsEngine2, msgAsTable=true, offset=-1, hash=2)           

首先用兩個獨立的時序聚合引擎(createTimeSeriesEngine)對原始的快照和成交資料流按資料中的時間戳做實時聚合,輸出每一分鐘的名額;然後通過引擎級聯的方式,将兩個時序聚合引擎的輸出分别作為左右表注入連接配接引擎。

構造資料寫入作為原始輸入的 2 個流資料表,先寫入右表,再寫入左表:

// generate data: snapshot
t1 = table(`A`B`A`B`A`B as Sym, 10:00:52.000+(3 3 6 6 9 9)*1000 as Time, (3.5 7.6 3.6 7.6 3.6 7.6) as Bid1Price, (1000 2000 500 1500 400 1800) as Bid1Qty)
// generate data: trade
t2 = table(`A`A`B`A`B`B`A`B`B`A as Sym, 10:00:54.000+(1..10)*700 as TradeTime,  (1 2 1 1 1 1 2 1 2 2) as Side, (1..10) * 10 as TradeQty)
// input
trades.append!(t2)
snapshot.append!(t1)           

關聯得到的結果表 output 如下:

分鐘頻、曆史值…實時關聯分析操作指南

Lookup Join 根據快照資料實時比對曆史日頻名額

在當日的實時計算中,有時還需要依賴曆史名額,本例中我們基于行情快照資料,通過比對股票代碼關聯昨日的日頻名額。

這個場景的特征是,每條快照記錄到達後要求立刻關聯輸出,如果日頻資料裡沒有對應的股票,輸出結果對應的字段為空,輸出與原始輸入中的每一條行情快照記錄一一對應。同時,日頻名額并非實時資料,而是一個以較低頻率更新的有主鍵的離線資料集。

用 Lookup Join 引擎實作此場景的腳本如下:

// create table
share streamTable(1:0, `Sym`Time`Open`High`Low`Close, [SYMBOL, TIME, DOUBLE, DOUBLE, DOUBLE, DOUBLE]) as snapshot
historicalData = table(`A`B as Sym, (0.8 0.2) as PreWeight, (3.1 7.6) as PreClose)
share table(1:0, `Sym`Time`Open`High`Low`Close`PreWeight`PreClose, [SYMBOL, TIME, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE]) as output


// create engine
lookupJoinEngine = createLookupJoinEngine(name="lookupJoin", leftTable=snapshot, rightTable=historicalData, outputTable=output, metrics=<[Time, Open, High, Low, Close, PreWeight, PreClose]>, matchingColumn=`Sym, checkTimes=10s)


// subscribe topic
subscribeTable(tableName="snapshot", actionName="appendLeftStream", handler=getLeftStream(lookupJoinEngine), msgAsTable=true, offset=-1)           

構造資料寫入作為引擎左表輸入的流資料表 snapshot:

// generate data: snapshot
t1 = table(`A`B`A`B`A`B as Sym, 10:00:00.000+(3 3 6 6 9 9)*1000 as Time, (3.5 7.6 3.5 7.6 3.5 7.6) as Open, (3.5 7.6 3.6 7.6 3.6 7.6) as High, (3.5 7.6 3.5 7.6 3.4 7.5) as Low, (3.5 7.6 3.5 7.6 3.6 7.5) as Close)
snapshot.append!(t1)           

輸入資料與關聯關系如下:

分鐘頻、曆史值…實時關聯分析操作指南

結果在左表資料到達引擎時立刻輸出,關聯得到的結果表 output 如下:

分鐘頻、曆史值…實時關聯分析操作指南

Left Semi Join 實時計算股票與某指數的分鐘收益率相關性

Left Semi Join 引擎的連接配接機制類似于 SQL 中的 equal join ,按連接配接列等值關聯左右表。上一期我們利用這一功能實作了在成交資料的基礎上比對委托訂單,豐富原始的委托資訊。本例中,我們來實時計算股票和某個指數在過去一段時間内分鐘收益率的相關性。

這個場景的特征是,兩個資料流的時間戳頻率一緻,全部股票都需要關聯同一支指數,輸入是已經降為分鐘頻率的股票和指數資料,輸出與原始輸入中的股票資料一一對應。

可以用如下腳本實作關聯:

// create table
share streamTable(1:0, `Sym`Time`Close, [SYMBOL, TIME, DOUBLE]) as stockKline
share streamTable(1:0, `Sym`Time`Close, [SYMBOL, TIME, DOUBLE]) as indexKline
share streamTable(1:0, `Time`Sym`Close`Index1Close, [TIME, SYMBOL, DOUBLE, DOUBLE]) as stockKlineAddIndex1
share streamTable(1:0, `Sym`Time`Close`Index1Close`Index1Corr, [SYMBOL, TIME, DOUBLE, DOUBLE, DOUBLE]) as output


//  create engine: calculate correlation
rsEngine = createReactiveStateEngine(name="calCorr", dummyTable=stockKlineAddIndex1, outputTable=output, metrics=[<Time>, <Close>, <Index1Close>, <mcorr(ratios(Close)-1, ratios(Index1Close)-1, 3)>], keyColumn="Sym")


//  create engine: left join Index1
ljEngine1 = createLeftSemiJoinEngine(name="leftJoinIndex1", leftTable=stockKline, rightTable=indexKline, outputTable=getStreamEngine("calCorr"), metrics=<[Sym, Close, indexKline.Close]>, matchingColumn=`Time)


// subscribe topic
def appendIndex(engineName, indexName, msg){
  tmp = select * from msg where Sym = indexName
  getRightStream(getStreamEngine(engineName)).append!(tmp)
}
subscribeTable(tableName="indexKline", actionName="appendIndex1", handler=appendIndex{"leftJoinIndex1", "idx1"}, msgAsTable=true, offset=-1, hash=1)
subscribeTable(tableName="stockKline", actionName="appendStock", handler=getLeftStream(ljEngine1), msgAsTable=true, offset=-1, hash=0)           

這裡連接配接引擎的輸出會直接注入響應式狀态引擎進行下一步計算,多個引擎之間采用了引擎級聯的方式處理。

構造資料寫入作為原始輸入的 2 個流資料表:

// generate data: stock Kline
t1 = table(`A`B`A`B`A`B`A`B`A`B as Sym, 10:00:00.000+(0 0 1 1 2 2 3 3 4 4)*60000 as Time, (4.1 7.6 3.8 7.6 4.3 7.5 3.5 7.6 4.2 7.6) as Close)
// generate data: index Kline
t2 = table(`idx1`idx2`idx1`idx2`idx1`idx2`idx1`idx2`idx1`idx2 as Sym, 10:00:00.000+(0 0 1 1 2 2 3 3 4 4)*60000 as Time, (2.1 5 2.2 5 1.9 5 1.7 5 1.7 5) as Close)
// input data
indexKline.append!(t2)
stockKline.append!(t1)           

輸入資料與關聯關系如下:

分鐘頻、曆史值…實時關聯分析操作指南

關聯得到的結果表 output 如下:

分鐘頻、曆史值…實時關聯分析操作指南

至此,我們已經為大家展示了所有六個應用場景,囊括了 Asof Join、Window Join、Equal Join、Lookup Join、Left Semi Join 這五個流資料連接配接引擎。

這些引擎均内置實作了高效的關聯計算、實時觸發規則和記憶體管理機制,很好地解決了對齊難、觸發難、緩存難、計算難等一系列問題。開發人員通過簡單的引擎參數配置,便能快速實作複雜的實時關聯需求。

在這些連接配接引擎的基礎上,再結合 DolphinDB 流資料架構中其他流計算引擎、流水線處理、并行計算等重要特性,開發人員便可以高效實作業務場景實時化,掌握更及時的資訊、挖掘更多的業務價值。

五大連接配接引擎的資訊對比如下表所示:

分鐘頻、曆史值…實時關聯分析操作指南

繼續閱讀