VNPY 使用k生成器BarGenerator合成日線資料
- 問題起因
- vnpy合成k線原理
-
- 政策類Strategy
- ArrayManager
- BarGenerator
- Strategy.on_tick
- Strategy.on_bar
- BarGenerator.update_bar
- BarGenerator.update_bar_minute_window
- BarGenerator.update_bar_hour_window
- BarGenerator.on_hour_bar
- Strategy.on_window_bar
- 總結
- 日線合成
- 周線合成
問題起因
因為要使用tick級别的資料進行商品期貨的回測,是以我選擇了比較出名的開源回測架構vnpy,但是我發現它沒有我想象中的成熟,有很多需求都沒有實作,比如實時k線合成,這個是仿真的時候肯定會用到的東西,但是再換架構也不行,其他架構也需要學習成本,而且也不一定能解決問題,是以時間有限就硬着頭皮上吧,先來解決vnpy合成日線周線的問題,vnpy僅僅支援分鐘和小時級别的k線合成,如果要使用日線的k線隻能用多個小時線來代替,但是品種的交易時間段并非統一的,是以隻能自己動手寫了
vnpy合成k線原理
要動手改代碼,先要了解一下vnpy的架構結構,為此我準備了一個流程圖供大家參考
政策類Strategy
我們寫的政策都是一般都是一個類,它的格式一般是固定的,都是繼承政策模闆類
CtaTemplate
的,具體可以參考示例雙均線政策
from vnpy.app.cta_strategy import (
CtaTemplate,
StopOrder,
TickData,
BarData,
TradeData,
OrderData,
BarGenerator,
ArrayManager,
)
class DoubleMaStrategy(CtaTemplate):
author = "用Python的交易員"
fast_window = 10
slow_window = 20
fast_ma0 = 0.0
fast_ma1 = 0.0
slow_ma0 = 0.0
slow_ma1 = 0.0
parameters = ["fast_window", "slow_window"]
variables = ["fast_ma0", "fast_ma1", "slow_ma0", "slow_ma1"]
def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
""" """
super().__init__(cta_engine, strategy_name, vt_symbol, setting)
self.bg = BarGenerator(self.on_bar)
self.am = ArrayManager()
def on_init(self):
"""
Callback when strategy is inited.
"""
self.write_log("政策初始化")
self.load_bar(10)
def on_start(self):
"""
Callback when strategy is started.
"""
self.write_log("政策啟動")
self.put_event()
def on_stop(self):
"""
Callback when strategy is stopped.
"""
self.write_log("政策停止")
self.put_event()
def on_tick(self, tick: TickData):
"""
Callback of new tick data update.
"""
self.bg.update_tick(tick)
def on_bar(self, bar: BarData):
"""
Callback of new bar data update.
"""
am = self.am
am.update_bar(bar)
if not am.inited:
return
fast_ma = am.sma(self.fast_window, array=True)# 計算ma10
self.fast_ma0 = fast_ma[-1]
self.fast_ma1 = fast_ma[-2]
slow_ma = am.sma(self.slow_window, array=True)# 計算ma20
self.slow_ma0 = slow_ma[-1]
self.slow_ma1 = slow_ma[-2]
cross_over = self.fast_ma0 > self.slow_ma0 and self.fast_ma1 < self.slow_ma1
cross_below = self.fast_ma0 < self.slow_ma0 and self.fast_ma1 > self.slow_ma1
if cross_over:
if self.pos == 0:
self.buy(bar.close_price, 1)
elif self.pos < 0:
self.cover(bar.close_price, 1)
self.buy(bar.close_price, 1)
elif cross_below:
if self.pos == 0:
self.short(bar.close_price, 1)
elif self.pos > 0:
self.sell(bar.close_price, 1)
self.short(bar.close_price, 1)
self.put_event()
def on_order(self, order: OrderData):
"""
Callback of new order data update.
"""
pass
def on_trade(self, trade: TradeData):
"""
Callback of new trade data update.
"""
self.put_event()
def on_stop_order(self, stop_order: StopOrder):
"""
Callback of stop order update.
"""
pass
ArrayManager
儲存k線資料,其構造函數如下:預設儲存最新的100跟k線
def __init__(self, size: int = 100):
"""Constructor"""
self.count: int = 0
self.size: int = size
self.inited: bool = False
self.open_array: np.ndarray = np.zeros(size)
self.high_array: np.ndarray = np.zeros(size)
self.low_array: np.ndarray = np.zeros(size)
self.close_array: np.ndarray = np.zeros(size)
self.volume_array: np.ndarray = np.zeros(size)
self.open_interest_array: np.ndarray = np.zeros(size)
ArrayManager類内部引用了
talib
名額庫,封裝了很多方法,可以計算各種名額,使用方法可以參考雙均線政策中計算MA10
BarGenerator
k線生成器,構造方法如下:
def __init__(
self,
on_bar: Callable, #回調函數
window: int = 0,
on_window_bar: Callable = None,
interval: Interval = Interval.MINUTE
):
"""Constructor"""
self.bar: BarData = None
self.on_bar: Callable = on_bar
self.interval: Interval = interval
self.interval_count: int = 0
self.hour_bar: BarData = None
self.day_bar: BarData = None
self.week_bar: BarData = None
self.window: int = window
self.window_bar: BarData = None
self.on_window_bar: Callable = on_window_bar
self.last_tick: TickData = None
self.last_bar: BarData = None
使用示例
self.bg = BarGenerator(self.on_bar) # 1分鐘的bar
self.bg2 = BarGenerator(self.on_bar,5,self.on_5mins_bar) # 5分鐘bar
現在還不知道
BarGenerator
怎麼用的沒有關系,我們下面再慢慢講解
vnpy是先合成1分鐘k線然後再合成小時k線的,我們從資料的傳遞的過程來開始梳理
Strategy.on_tick
接受tick資料,調用k線生成器
update_tick
函數更新1分鐘,判斷邏輯如下:
if self.bar.datetime.minute != tick.datetime.minute
or self.bar.datetime.hour != tick.datetime.hour :
self.bar.datetime = self.bar.datetime.replace(second=0, microsecond=0)
self.on_bar(self.bar)
k線資料的時間戳的分鐘數不相等或者小時數不相等,肯定是一個新的分鐘k線,就通過回調函數
on_bar
更新資料,如果僅使用一分鐘資料,就可以直接傳給
ArrayManager
儲存1分鐘資料,雙均線政策就是這樣的,如果使用其他資料一定要再政策中寫對應的回調函數,比如下方的
on_15min_bar
class BollChannelStrategy(CtaTemplate):
def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
""" """
super().__init__(cta_engine, strategy_name, vt_symbol, setting)
self.bg = BarGenerator(self.on_bar, 15, self.on_15min_bar)
self.am = ArrayManager()
def on_tick(self, tick: TickData):
"""
Callback of new tick data update.
"""
self.bg.update_tick(tick)
def on_bar(self, bar: BarData):
"""
Callback of new bar data update.
"""
self.bg.update_bar(bar)
def on_15min_bar(self, bar: BarData):
""" """
self.cancel_all()
am = self.am
am.update_bar(bar) #更新一個15分鐘的bar到ArrayManager
if not am.inited:
return
Strategy.on_bar
将1分鐘資料傳入到k線生成器中,調用類似上面的
self.bg.update_bar(bar)
BarGenerator.update_bar
源碼:
def update_bar(self, bar: BarData) -> None:
"""
Update 1 minute bar into generator
"""
if self.interval == Interval.MINUTE:
self.update_bar_minute_window(bar)
else:
self.update_bar_hour_window(bar)
可以從這裡看出vnpy僅支援合成分鐘和小時的k線資料,根據bar的類型傳入到對應的函數中
BarGenerator.update_bar_minute_window
根據
window
的大小生成對應的bar,BarGenerator裡面有一個
window_bar
屬性暫存對應的bar,bar資料進來,先判斷有沒有這個屬性,如果沒有建立一個,有的話就更新
if not self.window_bar:
dt = bar.datetime.replace(second=0, microsecond=0)
self.window_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
datetime=dt,
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price
)
else:
self.window_bar.high_price = max(
self.window_bar.high_price,
bar.high_price
)
self.window_bar.low_price = min(
self.window_bar.low_price,
bar.low_price
)
# Update close price/volume into window bar
self.window_bar.close_price = bar.close_price
self.window_bar.volume += int(bar.volume)
self.window_bar.open_interest = bar.open_interest
通過對window做求餘運算來判斷,是否是一個新的window
if not (bar.datetime.minute + 1) % self.window:
self.on_window_bar(self.window_bar)
self.window_bar = None
self.last_bar = bar
分鐘k線是從整點開始的,0-1分鐘算第一根,而14-15分鐘的一分鐘k線的時間點對應的是14:00 是以要加一再求餘,然後再将
window_bar
設為
None
下次新資料進來就是新的
window_bar
BarGenerator.update_bar_hour_window
功能結構類似
update_bar_minute_window
,但是它在判斷上有點差別,因為它隻能合成一個一小時的k線,然後再将資料傳給
on_hour_bar
,去判斷window
if bar.datetime.minute == 59: #59分鐘k線就是第60根k線,是以更新小時線
finished_bar = self.hour_bar #把資料儲存給finished_bar
建立新的self.hour_bar
....
# 小時數不一樣,肯定是新的一小時
elif bar.datetime.hour != self.hour_bar.datetime.hour:
finished_bar = self.hour_bar
建立新的self.hour_bar
...
else:
更新資料
if finished_bar:
self.on_hour_bar(finished_bar) #将一個小時k線的bar推送給on_hour_bar
BarGenerator.on_hour_bar
接受對應的bar,利用
interval_count
屬性計數,根據目前的計數與window的取模運算的結果處理對應的bar。
注意 這裡并不關心bar是分鐘的還是小時的,它隻關心bar的數量有沒有超過視窗的大小,超過就推送,預設情況就是更新bar
if self.window == 1:
self.on_window_bar(bar) # 1小時bar
else:
if not self.window_bar:
建立bar
else:
更新bar
self.interval_count += 1
if not self.interval_count % self.window:
self.interval_count = 0
self.on_window_bar(self.window_bar)
self.window_bar = None
Strategy.on_window_bar
需要在政策裡自定義的函數,也就是用到對應k線的地方,作用一般是處理政策的主邏輯,比如下面的布林帶政策的示例,
BarGenerator(self.on_bar, 15, self.on_15min_bar)
中的
on_15min_bar
就是對應的
on_window_bar
處理15分鐘的k線
class BollChannelStrategy(CtaTemplate):
author = "布林帶政策demo"
boll_window = 18
boll_dev = 3.4
cci_window = 10
atr_window = 30
sl_multiplier = 5.2
fixed_size = 1
boll_up = 0
boll_down = 0
cci_value = 0
atr_value = 0
intra_trade_high = 0
intra_trade_low = 0
long_stop = 0
short_stop = 0
parameters = [
"boll_window",
"boll_dev",
"cci_window",
"atr_window",
"sl_multiplier",
"fixed_size"
]
variables = [
"boll_up",
"boll_down",
"cci_value",
"atr_value",
"intra_trade_high",
"intra_trade_low",
"long_stop",
"short_stop"
]
def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
""" """
super().__init__(cta_engine, strategy_name, vt_symbol, setting)
self.bg = BarGenerator(self.on_bar, 15, self.on_15min_bar)
self.am = ArrayManager()
def on_init(self):
self.write_log("政策初始化")
self.load_bar(10) # 加載10天的資料
def on_start(self):
self.write_log("政策啟動")
def on_stop(self):
self.write_log("政策停止")
def on_tick(self, tick: TickData):
self.bg.update_tick(tick)
def on_bar(self, bar: BarData):
self.bg.update_bar(bar)
def on_15min_bar(self, bar: BarData):
""" """
self.cancel_all()
am = self.am
am.update_bar(bar)
if not am.inited:
return
self.boll_up, self.boll_down = am.boll(self.boll_window, self.boll_dev)
self.cci_value = am.cci(self.cci_window)
self.atr_value = am.atr(self.atr_window)
if self.pos == 0:
self.intra_trade_high = bar.high_price
self.intra_trade_low = bar.low_price
if self.cci_value > 0:
self.buy(self.boll_up, self.fixed_size, True)
elif self.cci_value < 0:
self.short(self.boll_down, self.fixed_size, True)
elif self.pos > 0:
self.intra_trade_high = max(self.intra_trade_high, bar.high_price)
self.intra_trade_low = bar.low_price
self.long_stop = self.intra_trade_high - self.atr_value * self.sl_multiplier
self.sell(self.long_stop, abs(self.pos), True)
elif self.pos < 0:
self.intra_trade_high = bar.high_price
self.intra_trade_low = min(self.intra_trade_low, bar.low_price)
self.short_stop = self.intra_trade_low + self.atr_value * self.sl_multiplier
self.cover(self.short_stop, abs(self.pos), True)
self.put_event()
def on_order(self, order: OrderData):
pass
def on_trade(self, trade: TradeData):
self.put_event()
def on_stop_order(self, stop_order: StopOrder):
pass
總結
最後再梳理一下,vnpy先通過
on_tick ->update_tick
合成1分鐘k線,然後
on_bar ->update_bar
合成5分鐘、10分鐘、1小時、3小時等類似Xm、Xh格式的資料,并将資料傳給對應的
on_window_bar
,在
on_window_bar
裡面将資料存到
ArrayManager
裡面進而計算MA、ATR、RSI等各類名額。如果你了解vnpy的k線合成邏輯的話就明白,vnpy是不支援實時合成k線的,比如合成日k線的話,今天不收盤它是合成不了日k線的,也就是資料不完整它就不會推送日k線,但是實盤大家都知道,日k線是實時更新的,這也是我自己動手的原因,這篇文章先解決合成日線、周線的問題.
日線合成
有了上面的基礎,我們可以比葫蘆畫瓢,參考小時線合成的邏輯,先生成一根日k線,然後再合成多日的k線。首先将BarGenerator的update_bar改成以下形式,讓其可以識别對應類型的k線
def update_bar(self, bar: BarData) -> None:
# if self.interval == Interval.MINUTE:
# self.update_bar_minute_window(bar)
# else:
# self.update_bar_hour_window(bar)
if self.interval == Interval.MINUTE:
self.update_bar_minute_window(bar)
elif self.interval == Interval.HOUR:
self.update_bar_hour_window(bar)
elif self.interval == Interval.DAILY:
self.update_bar_day_window(bar) #處理日線
else:
self.update_bar_week_window(bar) #處理周線
定義函數
update_bar_day_window
生成一根日k線,判斷日線是不是新的日k線的邏輯主要兩個
- 每天下午的14:59的分鐘線是最後一根分鐘線,以此來判斷一根日線的結束
if bar.datetime.minute == 59 and bar.datetime.hour == 14:
更新k線資料,目前的bar就是完整的日k線可以推送出去了
- 根據
datetime.day
擷取的天數不一樣的時候也是新的一天,這樣可以應對資料缺失沒有14:59的時候也可以更新k線
但是夜盤是一天的開始,而夜盤都是晚上9點開盤的,是以我們在比較是不是新的一天的時候需要将時間都後移4個小時,這樣晚上9點就成了明天的淩晨0點,10點成了第二天的1點,而第二天下午三點隻是移到了晚上7點還是同一天。
#此處的bar 是一個一分鐘k線,因為vnpy是先合成1分鐘k線,然後合成其他k線的
temp_datetime = (bar.datetime + timedelta(hours=4)) #加4個小時
if temp_datetime.day != (self.day_bar.datetime+ timedelta(hours=4)).day:
更新k線資料,目前的bar就是完整的日k線,儲存下來,生成新的bar
但是星期五的夜盤是周一的日k線的開始,是以還要特判一下,完整代碼如下:
def update_bar_day_window(self, bar: BarData) -> None:
""""""
# 沒有日線bar就生成一個
if not self.day_bar:
dt = bar.datetime.replace(minute=0, second=0, microsecond=0)
self.day_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
datetime=dt,
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price,
volume=bar.volume
)
return
finished_bar = None
temp_datetime = (bar.datetime + timedelta(hours=4))
# 14:59 更新bar,生成新的日線bar
if bar.datetime.minute == 59 and bar.datetime.hour == 14:
self.day_bar.high_price = max(
self.day_bar.high_price,
bar.high_price
)
self.day_bar.low_price = min(
self.day_bar.low_price,
bar.low_price
)
self.day_bar.close_price = bar.close_price
self.day_bar.volume += int(bar.volume)
self.day_bar.open_interest = bar.open_interest
finished_bar = self.day_bar #儲存日線bar
self.day_bar = None #因為日線bar已經儲存給finished_bar了是以将日線bar設為空,下次新資料來了就會生成新的日線bar
# 夜盤算新的一天的開始,
# 現存的bar加上5小時如果是周六的話就那代表是周五的夜盤資料,而它對應的白天資料是下周一的,隔了2天加5個小時還是不夠的,
# 是以特判一下如果現存的self.day_bar是周五的話不要用5小時判斷,剩下不用管他,因為下周一的夜盤進來的話會被+5小時的條件判斷掉,進而将周五夜盤和周一白天的資料推送出去
elif temp_datetime.day != (self.day_bar.datetime+ timedelta(hours=5)).day and (self.day_bar.datetime+ timedelta(hours=5)).weekday() != 5:
finished_bar = self.week_bar
dt = bar.datetime.replace(minute=0, second=0, microsecond=0)
self.day_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
datetime=dt,
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price,
close_price=bar.close_price,
volume=bar.volume
)
# 更新 現存的day_bar
else:
self.day_bar.high_price = max(
self.day_bar.high_price,
bar.high_price
)
self.day_bar.low_price = min(
self.day_bar.low_price,
bar.low_price
)
self.day_bar.close_price = bar.close_price
self.day_bar.volume += int(bar.volume)
self.day_bar.open_interest = bar.open_interest
# 推送日線給on_hour_bar處理
if finished_bar:
self.on_hour_bar(finished_bar)
# Cache last bar object
self.last_bar = bar
我們在jupyter notebook環境裡面跑一下回測,因為在GUI裡面是看不到日志的
class demoStrategy(CtaTemplate):
def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
super().__init__(cta_engine, strategy_name, vt_symbol, setting)
self.bg = BarGenerator(self.on_bar, 1, self.on_day_bar, Interval.DAILY)
self.am = ArrayManager()
....
省略其他代碼
....
def on_day_bar(self,bar:BarData):
am = self.am
am.update_bar(bar)
self.write_log(bar.datetime) # 把bar的時間戳輸出在日志裡面
from vnpy.app.cta_strategy.backtesting import BacktestingEngine, OptimizationSetting
from vnpy.app.cta_strategy.base import BacktestingMode
from datetime import datetime
# 回測
engine = BacktestingEngine()
engine.set_parameters(
vt_symbol="rb9999.SHFE",
interval="1m",
start=datetime(2019, 1, 2),
end=datetime(2019, 2, 8),
rate=0.5/10000,
slippage=5,
size=10,
pricetick=5,
capital=1_000_000,
mode=BacktestingMode.TICK #tick級回測
)
engine.add_strategy(demoStrategy, {})
engine.load_data()
engine.run_backtesting()
df = engine.calculate_result()
engine.calculate_statistics()
engine.show_chart()
資料是tick級别,回測時間是
20190102——20190208
正好碰上春節,8号是大年初四,是以資料隻有一月的。
一般節假日的第一天是沒有夜盤資料的,比如5.1前的4月30号夜盤是不交易的,是以除了1月2号沒有夜盤資料,正常的日線bar都是晚上9點開始的。
左邊的時間戳是列印的時間,比如
2019-01-03 21:00:00.500
是新的一天了,是以列印上一根日線bar的時間戳
2019-01-02 09:00:00
,這根bar儲存的是
2019-01-02 09:00:00 - 2019-01-02 14:59:59.500
的資料,從這裡也可以看除k線資料不完整vnpy是不會推送的,隻有當天走完它才會推送。
周線合成
同理周線的判斷邏輯也有兩個
- 接收到每周五的下午14:59:00的一分鐘bar的時候就可以更新一根周k線
-
datetime.isocalendar()
可以計算今天是今年的第幾周這樣就可以判斷如果出現跨周的話一定是新的周k線
但是還要再加上2天5小時,讓周五的夜盤變成下一周,而周五的下午三點會變成周日的晚上8點,還是目前周
完整代碼如下:
def update_bar_week_window(self, bar: BarData) -> None:
""""""
# If not inited, create window bar object
if not self.week_bar:
dt = bar.datetime.replace(minute=0, second=0, microsecond=0)
self.week_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
datetime=dt,
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price,
volume=bar.volume
)
return
finished_bar = None
# If time is Firday 14:59, update day bar into window bar and push
if bar.datetime.minute == 59 and bar.datetime.hour == 14 and bar.datetime.weekday() == 4:
self.week_bar.high_price = max(
self.week_bar.high_price,
bar.high_price
)
self.week_bar.low_price = min(
self.week_bar.low_price,
bar.low_price
)
self.week_bar.close_price = bar.close_price
self.week_bar.volume += int(bar.volume)
self.week_bar.open_interest = bar.open_interest
finished_bar = self.week_bar
self.week_bar = None
# isocalendar() 傳回多少年的第幾周的第幾天 格式如(2018, 27, 5)
# 周數不相同肯定是新的一周,可以推送出一根完整周k線了
elif (bar.datetime + timedelta(days=2,hours=5)).isocalendar()[1] != (self.week_bar.datetime + timedelta(days=2,hours=5)).isocalendar()[1]:
# print(bar.datetime.isocalendar())
finished_bar = self.week_bar
dt = bar.datetime.replace(minute=0, second=0, microsecond=0)
self.week_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
datetime=dt,
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price,
close_price=bar.close_price,
volume=bar.volume
)
# Otherwise only update minute bar
else:
self.week_bar.high_price = max(
self.week_bar.high_price,
bar.high_price
)
self.week_bar.low_price = min(
self.week_bar.low_price,
bar.low_price
)
self.week_bar.close_price = bar.close_price
self.week_bar.volume += int(bar.volume)
self.week_bar.open_interest = bar.open_interest
# Push finished window bar
if finished_bar:
self.on_hour_bar(finished_bar) #on_window_bar隻關心bar的數量,不關心bar的類型,是以可以直接調用
# Cache last bar object
self.last_bar = bar
同理類似日線,我們也列印一下日志
結合月曆看比較友善,在1月4号周五的夜盤開始的時候,可以列印上一根周k線,這根k線從2号的白天9點到4号下午15點
1月11号周五的夜盤開始的時候推送出上周五1月4号夜盤21點到本周五11号下午15點的周k線。