VNPY 使用k生成器BarGenerator合成日线数据
- 问题起因
- vnpy合成k线原理
-
- 策略类Strategy
- ArrayManager
- BarGenerator
- Strategy.on_tick
- Strategy.on_bar
- BarGenerator.update_bar
- BarGenerator.update_bar_minute_window
- BarGenerator.update_bar_hour_window
- BarGenerator.on_hour_bar
- Strategy.on_window_bar
- 总结
- 日线合成
- 周线合成
问题起因
因为要使用tick级别的数据进行商品期货的回测,所以我选择了比较出名的开源回测框架vnpy,但是我发现它没有我想象中的成熟,有很多需求都没有实现,比如实时k线合成,这个是仿真的时候肯定会用到的东西,但是再换框架也不行,其他框架也需要学习成本,而且也不一定能解决问题,所以时间有限就硬着头皮上吧,先来解决vnpy合成日线周线的问题,vnpy仅仅支持分钟和小时级别的k线合成,如果要使用日线的k线只能用多个小时线来代替,但是品种的交易时间段并非统一的,所以只能自己动手写了
vnpy合成k线原理
要动手改代码,先要了解一下vnpy的框架结构,为此我准备了一个流程图供大家参考
策略类Strategy
我们写的策略都是一般都是一个类,它的格式一般是固定的,都是继承策略模板类
CtaTemplate
的,具体可以参考示例双均线策略
from vnpy.app.cta_strategy import (
CtaTemplate,
StopOrder,
TickData,
BarData,
TradeData,
OrderData,
BarGenerator,
ArrayManager,
)
class DoubleMaStrategy(CtaTemplate):
author = "用Python的交易员"
fast_window = 10
slow_window = 20
fast_ma0 = 0.0
fast_ma1 = 0.0
slow_ma0 = 0.0
slow_ma1 = 0.0
parameters = ["fast_window", "slow_window"]
variables = ["fast_ma0", "fast_ma1", "slow_ma0", "slow_ma1"]
def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
""" """
super().__init__(cta_engine, strategy_name, vt_symbol, setting)
self.bg = BarGenerator(self.on_bar)
self.am = ArrayManager()
def on_init(self):
"""
Callback when strategy is inited.
"""
self.write_log("策略初始化")
self.load_bar(10)
def on_start(self):
"""
Callback when strategy is started.
"""
self.write_log("策略启动")
self.put_event()
def on_stop(self):
"""
Callback when strategy is stopped.
"""
self.write_log("策略停止")
self.put_event()
def on_tick(self, tick: TickData):
"""
Callback of new tick data update.
"""
self.bg.update_tick(tick)
def on_bar(self, bar: BarData):
"""
Callback of new bar data update.
"""
am = self.am
am.update_bar(bar)
if not am.inited:
return
fast_ma = am.sma(self.fast_window, array=True)# 计算ma10
self.fast_ma0 = fast_ma[-1]
self.fast_ma1 = fast_ma[-2]
slow_ma = am.sma(self.slow_window, array=True)# 计算ma20
self.slow_ma0 = slow_ma[-1]
self.slow_ma1 = slow_ma[-2]
cross_over = self.fast_ma0 > self.slow_ma0 and self.fast_ma1 < self.slow_ma1
cross_below = self.fast_ma0 < self.slow_ma0 and self.fast_ma1 > self.slow_ma1
if cross_over:
if self.pos == 0:
self.buy(bar.close_price, 1)
elif self.pos < 0:
self.cover(bar.close_price, 1)
self.buy(bar.close_price, 1)
elif cross_below:
if self.pos == 0:
self.short(bar.close_price, 1)
elif self.pos > 0:
self.sell(bar.close_price, 1)
self.short(bar.close_price, 1)
self.put_event()
def on_order(self, order: OrderData):
"""
Callback of new order data update.
"""
pass
def on_trade(self, trade: TradeData):
"""
Callback of new trade data update.
"""
self.put_event()
def on_stop_order(self, stop_order: StopOrder):
"""
Callback of stop order update.
"""
pass
ArrayManager
保存k线数据,其构造函数如下:默认保存最新的100跟k线
def __init__(self, size: int = 100):
"""Constructor"""
self.count: int = 0
self.size: int = size
self.inited: bool = False
self.open_array: np.ndarray = np.zeros(size)
self.high_array: np.ndarray = np.zeros(size)
self.low_array: np.ndarray = np.zeros(size)
self.close_array: np.ndarray = np.zeros(size)
self.volume_array: np.ndarray = np.zeros(size)
self.open_interest_array: np.ndarray = np.zeros(size)
ArrayManager类内部引用了
talib
指标库,封装了很多方法,可以计算各种指标,使用方法可以参考双均线策略中计算MA10
BarGenerator
k线生成器,构造方法如下:
def __init__(
self,
on_bar: Callable, #回调函数
window: int = 0,
on_window_bar: Callable = None,
interval: Interval = Interval.MINUTE
):
"""Constructor"""
self.bar: BarData = None
self.on_bar: Callable = on_bar
self.interval: Interval = interval
self.interval_count: int = 0
self.hour_bar: BarData = None
self.day_bar: BarData = None
self.week_bar: BarData = None
self.window: int = window
self.window_bar: BarData = None
self.on_window_bar: Callable = on_window_bar
self.last_tick: TickData = None
self.last_bar: BarData = None
使用示例
self.bg = BarGenerator(self.on_bar) # 1分钟的bar
self.bg2 = BarGenerator(self.on_bar,5,self.on_5mins_bar) # 5分钟bar
现在还不知道
BarGenerator
怎么用的没有关系,我们下面再慢慢讲解
vnpy是先合成1分钟k线然后再合成小时k线的,我们从数据的传递的过程来开始梳理
Strategy.on_tick
接受tick数据,调用k线生成器
update_tick
函数更新1分钟,判断逻辑如下:
if self.bar.datetime.minute != tick.datetime.minute
or self.bar.datetime.hour != tick.datetime.hour :
self.bar.datetime = self.bar.datetime.replace(second=0, microsecond=0)
self.on_bar(self.bar)
k线数据的时间戳的分钟数不相等或者小时数不相等,肯定是一个新的分钟k线,就通过回调函数
on_bar
更新数据,如果仅使用一分钟数据,就可以直接传给
ArrayManager
保存1分钟数据,双均线策略就是这样的,如果使用其他数据一定要再策略中写对应的回调函数,比如下方的
on_15min_bar
class BollChannelStrategy(CtaTemplate):
def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
""" """
super().__init__(cta_engine, strategy_name, vt_symbol, setting)
self.bg = BarGenerator(self.on_bar, 15, self.on_15min_bar)
self.am = ArrayManager()
def on_tick(self, tick: TickData):
"""
Callback of new tick data update.
"""
self.bg.update_tick(tick)
def on_bar(self, bar: BarData):
"""
Callback of new bar data update.
"""
self.bg.update_bar(bar)
def on_15min_bar(self, bar: BarData):
""" """
self.cancel_all()
am = self.am
am.update_bar(bar) #更新一个15分钟的bar到ArrayManager
if not am.inited:
return
Strategy.on_bar
将1分钟数据传入到k线生成器中,调用类似上面的
self.bg.update_bar(bar)
BarGenerator.update_bar
源码:
def update_bar(self, bar: BarData) -> None:
"""
Update 1 minute bar into generator
"""
if self.interval == Interval.MINUTE:
self.update_bar_minute_window(bar)
else:
self.update_bar_hour_window(bar)
可以从这里看出vnpy仅支持合成分钟和小时的k线数据,根据bar的类型传入到对应的函数中
BarGenerator.update_bar_minute_window
根据
window
的大小生成对应的bar,BarGenerator里面有一个
window_bar
属性暂存对应的bar,bar数据进来,先判断有没有这个属性,如果没有新建一个,有的话就更新
if not self.window_bar:
dt = bar.datetime.replace(second=0, microsecond=0)
self.window_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
datetime=dt,
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price
)
else:
self.window_bar.high_price = max(
self.window_bar.high_price,
bar.high_price
)
self.window_bar.low_price = min(
self.window_bar.low_price,
bar.low_price
)
# Update close price/volume into window bar
self.window_bar.close_price = bar.close_price
self.window_bar.volume += int(bar.volume)
self.window_bar.open_interest = bar.open_interest
通过对window做求余运算来判断,是否是一个新的window
if not (bar.datetime.minute + 1) % self.window:
self.on_window_bar(self.window_bar)
self.window_bar = None
self.last_bar = bar
分钟k线是从整点开始的,0-1分钟算第一根,而14-15分钟的一分钟k线的时间点对应的是14:00 所以要加一再求余,然后再将
window_bar
设为
None
下次新数据进来就是新的
window_bar
BarGenerator.update_bar_hour_window
功能结构类似
update_bar_minute_window
,但是它在判断上有点区别,因为它只能合成一个一小时的k线,然后再将数据传给
on_hour_bar
,去判断window
if bar.datetime.minute == 59: #59分钟k线就是第60根k线,所以更新小时线
finished_bar = self.hour_bar #把数据保存给finished_bar
创建新的self.hour_bar
....
# 小时数不一样,肯定是新的一小时
elif bar.datetime.hour != self.hour_bar.datetime.hour:
finished_bar = self.hour_bar
创建新的self.hour_bar
...
else:
更新数据
if finished_bar:
self.on_hour_bar(finished_bar) #将一个小时k线的bar推送给on_hour_bar
BarGenerator.on_hour_bar
接受对应的bar,利用
interval_count
属性计数,根据当前的计数与window的取模运算的结果处理对应的bar。
注意 这里并不关心bar是分钟的还是小时的,它只关心bar的数量有没有超过窗口的大小,超过就推送,默认情况就是更新bar
if self.window == 1:
self.on_window_bar(bar) # 1小时bar
else:
if not self.window_bar:
创建bar
else:
更新bar
self.interval_count += 1
if not self.interval_count % self.window:
self.interval_count = 0
self.on_window_bar(self.window_bar)
self.window_bar = None
Strategy.on_window_bar
需要在策略里自定义的函数,也就是用到对应k线的地方,作用一般是处理策略的主逻辑,比如下面的布林带策略的示例,
BarGenerator(self.on_bar, 15, self.on_15min_bar)
中的
on_15min_bar
就是对应的
on_window_bar
处理15分钟的k线
class BollChannelStrategy(CtaTemplate):
author = "布林带策略demo"
boll_window = 18
boll_dev = 3.4
cci_window = 10
atr_window = 30
sl_multiplier = 5.2
fixed_size = 1
boll_up = 0
boll_down = 0
cci_value = 0
atr_value = 0
intra_trade_high = 0
intra_trade_low = 0
long_stop = 0
short_stop = 0
parameters = [
"boll_window",
"boll_dev",
"cci_window",
"atr_window",
"sl_multiplier",
"fixed_size"
]
variables = [
"boll_up",
"boll_down",
"cci_value",
"atr_value",
"intra_trade_high",
"intra_trade_low",
"long_stop",
"short_stop"
]
def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
""" """
super().__init__(cta_engine, strategy_name, vt_symbol, setting)
self.bg = BarGenerator(self.on_bar, 15, self.on_15min_bar)
self.am = ArrayManager()
def on_init(self):
self.write_log("策略初始化")
self.load_bar(10) # 加载10天的数据
def on_start(self):
self.write_log("策略启动")
def on_stop(self):
self.write_log("策略停止")
def on_tick(self, tick: TickData):
self.bg.update_tick(tick)
def on_bar(self, bar: BarData):
self.bg.update_bar(bar)
def on_15min_bar(self, bar: BarData):
""" """
self.cancel_all()
am = self.am
am.update_bar(bar)
if not am.inited:
return
self.boll_up, self.boll_down = am.boll(self.boll_window, self.boll_dev)
self.cci_value = am.cci(self.cci_window)
self.atr_value = am.atr(self.atr_window)
if self.pos == 0:
self.intra_trade_high = bar.high_price
self.intra_trade_low = bar.low_price
if self.cci_value > 0:
self.buy(self.boll_up, self.fixed_size, True)
elif self.cci_value < 0:
self.short(self.boll_down, self.fixed_size, True)
elif self.pos > 0:
self.intra_trade_high = max(self.intra_trade_high, bar.high_price)
self.intra_trade_low = bar.low_price
self.long_stop = self.intra_trade_high - self.atr_value * self.sl_multiplier
self.sell(self.long_stop, abs(self.pos), True)
elif self.pos < 0:
self.intra_trade_high = bar.high_price
self.intra_trade_low = min(self.intra_trade_low, bar.low_price)
self.short_stop = self.intra_trade_low + self.atr_value * self.sl_multiplier
self.cover(self.short_stop, abs(self.pos), True)
self.put_event()
def on_order(self, order: OrderData):
pass
def on_trade(self, trade: TradeData):
self.put_event()
def on_stop_order(self, stop_order: StopOrder):
pass
总结
最后再梳理一下,vnpy先通过
on_tick ->update_tick
合成1分钟k线,然后
on_bar ->update_bar
合成5分钟、10分钟、1小时、3小时等类似Xm、Xh格式的数据,并将数据传给对应的
on_window_bar
,在
on_window_bar
里面将数据存到
ArrayManager
里面进而计算MA、ATR、RSI等各类指标。如果你理解vnpy的k线合成逻辑的话就明白,vnpy是不支持实时合成k线的,比如合成日k线的话,今天不收盘它是合成不了日k线的,也就是数据不完整它就不会推送日k线,但是实盘大家都知道,日k线是实时更新的,这也是我自己动手的原因,这篇文章先解决合成日线、周线的问题.
日线合成
有了上面的基础,我们可以比葫芦画瓢,参考小时线合成的逻辑,先生成一根日k线,然后再合成多日的k线。首先将BarGenerator的update_bar改成以下形式,让其可以识别对应类型的k线
def update_bar(self, bar: BarData) -> None:
# if self.interval == Interval.MINUTE:
# self.update_bar_minute_window(bar)
# else:
# self.update_bar_hour_window(bar)
if self.interval == Interval.MINUTE:
self.update_bar_minute_window(bar)
elif self.interval == Interval.HOUR:
self.update_bar_hour_window(bar)
elif self.interval == Interval.DAILY:
self.update_bar_day_window(bar) #处理日线
else:
self.update_bar_week_window(bar) #处理周线
定义函数
update_bar_day_window
生成一根日k线,判断日线是不是新的日k线的逻辑主要两个
- 每天下午的14:59的分钟线是最后一根分钟线,以此来判断一根日线的结束
if bar.datetime.minute == 59 and bar.datetime.hour == 14:
更新k线数据,当前的bar就是完整的日k线可以推送出去了
- 根据
datetime.day
获取的天数不一样的时候也是新的一天,这样可以应对数据缺失没有14:59的时候也可以更新k线
但是夜盘是一天的开始,而夜盘都是晚上9点开盘的,所以我们在比较是不是新的一天的时候需要将时间都后移4个小时,这样晚上9点就成了明天的凌晨0点,10点成了第二天的1点,而第二天下午三点只是移到了晚上7点还是同一天。
#此处的bar 是一个一分钟k线,因为vnpy是先合成1分钟k线,然后合成其他k线的
temp_datetime = (bar.datetime + timedelta(hours=4)) #加4个小时
if temp_datetime.day != (self.day_bar.datetime+ timedelta(hours=4)).day:
更新k线数据,当前的bar就是完整的日k线,保存下来,生成新的bar
但是星期五的夜盘是周一的日k线的开始,所以还要特判一下,完整代码如下:
def update_bar_day_window(self, bar: BarData) -> None:
""""""
# 没有日线bar就生成一个
if not self.day_bar:
dt = bar.datetime.replace(minute=0, second=0, microsecond=0)
self.day_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
datetime=dt,
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price,
volume=bar.volume
)
return
finished_bar = None
temp_datetime = (bar.datetime + timedelta(hours=4))
# 14:59 更新bar,生成新的日线bar
if bar.datetime.minute == 59 and bar.datetime.hour == 14:
self.day_bar.high_price = max(
self.day_bar.high_price,
bar.high_price
)
self.day_bar.low_price = min(
self.day_bar.low_price,
bar.low_price
)
self.day_bar.close_price = bar.close_price
self.day_bar.volume += int(bar.volume)
self.day_bar.open_interest = bar.open_interest
finished_bar = self.day_bar #保存日线bar
self.day_bar = None #因为日线bar已经保存给finished_bar了所以将日线bar设为空,下次新数据来了就会生成新的日线bar
# 夜盘算新的一天的开始,
# 现存的bar加上5小时如果是周六的话就那代表是周五的夜盘数据,而它对应的白天数据是下周一的,隔了2天加5个小时还是不够的,
# 所以特判一下如果现存的self.day_bar是周五的话不要用5小时判断,剩下不用管他,因为下周一的夜盘进来的话会被+5小时的条件判断掉,进而将周五夜盘和周一白天的数据推送出去
elif temp_datetime.day != (self.day_bar.datetime+ timedelta(hours=5)).day and (self.day_bar.datetime+ timedelta(hours=5)).weekday() != 5:
finished_bar = self.week_bar
dt = bar.datetime.replace(minute=0, second=0, microsecond=0)
self.day_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
datetime=dt,
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price,
close_price=bar.close_price,
volume=bar.volume
)
# 更新 现存的day_bar
else:
self.day_bar.high_price = max(
self.day_bar.high_price,
bar.high_price
)
self.day_bar.low_price = min(
self.day_bar.low_price,
bar.low_price
)
self.day_bar.close_price = bar.close_price
self.day_bar.volume += int(bar.volume)
self.day_bar.open_interest = bar.open_interest
# 推送日线给on_hour_bar处理
if finished_bar:
self.on_hour_bar(finished_bar)
# Cache last bar object
self.last_bar = bar
我们在jupyter notebook环境里面跑一下回测,因为在GUI里面是看不到日志的
class demoStrategy(CtaTemplate):
def __init__(self, cta_engine, strategy_name, vt_symbol, setting):
super().__init__(cta_engine, strategy_name, vt_symbol, setting)
self.bg = BarGenerator(self.on_bar, 1, self.on_day_bar, Interval.DAILY)
self.am = ArrayManager()
....
省略其他代码
....
def on_day_bar(self,bar:BarData):
am = self.am
am.update_bar(bar)
self.write_log(bar.datetime) # 把bar的时间戳输出在日志里面
from vnpy.app.cta_strategy.backtesting import BacktestingEngine, OptimizationSetting
from vnpy.app.cta_strategy.base import BacktestingMode
from datetime import datetime
# 回测
engine = BacktestingEngine()
engine.set_parameters(
vt_symbol="rb9999.SHFE",
interval="1m",
start=datetime(2019, 1, 2),
end=datetime(2019, 2, 8),
rate=0.5/10000,
slippage=5,
size=10,
pricetick=5,
capital=1_000_000,
mode=BacktestingMode.TICK #tick级回测
)
engine.add_strategy(demoStrategy, {})
engine.load_data()
engine.run_backtesting()
df = engine.calculate_result()
engine.calculate_statistics()
engine.show_chart()
数据是tick级别,回测时间是
20190102——20190208
正好碰上春节,8号是大年初四,所以数据只有一月的。
一般节假日的第一天是没有夜盘数据的,比如5.1前的4月30号夜盘是不交易的,所以除了1月2号没有夜盘数据,正常的日线bar都是晚上9点开始的。
左边的时间戳是打印的时间,比如
2019-01-03 21:00:00.500
是新的一天了,所以打印上一根日线bar的时间戳
2019-01-02 09:00:00
,这根bar保存的是
2019-01-02 09:00:00 - 2019-01-02 14:59:59.500
的数据,从这里也可以看除k线数据不完整vnpy是不会推送的,只有当天走完它才会推送。
周线合成
同理周线的判断逻辑也有两个
- 接收到每周五的下午14:59:00的一分钟bar的时候就可以更新一根周k线
-
datetime.isocalendar()
可以计算今天是今年的第几周这样就可以判断如果出现跨周的话一定是新的周k线
但是还要再加上2天5小时,让周五的夜盘变成下一周,而周五的下午三点会变成周日的晚上8点,还是当前周
完整代码如下:
def update_bar_week_window(self, bar: BarData) -> None:
""""""
# If not inited, create window bar object
if not self.week_bar:
dt = bar.datetime.replace(minute=0, second=0, microsecond=0)
self.week_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
datetime=dt,
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price,
volume=bar.volume
)
return
finished_bar = None
# If time is Firday 14:59, update day bar into window bar and push
if bar.datetime.minute == 59 and bar.datetime.hour == 14 and bar.datetime.weekday() == 4:
self.week_bar.high_price = max(
self.week_bar.high_price,
bar.high_price
)
self.week_bar.low_price = min(
self.week_bar.low_price,
bar.low_price
)
self.week_bar.close_price = bar.close_price
self.week_bar.volume += int(bar.volume)
self.week_bar.open_interest = bar.open_interest
finished_bar = self.week_bar
self.week_bar = None
# isocalendar() 返回多少年的第几周的第几天 格式如(2018, 27, 5)
# 周数不相同肯定是新的一周,可以推送出一根完整周k线了
elif (bar.datetime + timedelta(days=2,hours=5)).isocalendar()[1] != (self.week_bar.datetime + timedelta(days=2,hours=5)).isocalendar()[1]:
# print(bar.datetime.isocalendar())
finished_bar = self.week_bar
dt = bar.datetime.replace(minute=0, second=0, microsecond=0)
self.week_bar = BarData(
symbol=bar.symbol,
exchange=bar.exchange,
datetime=dt,
gateway_name=bar.gateway_name,
open_price=bar.open_price,
high_price=bar.high_price,
low_price=bar.low_price,
close_price=bar.close_price,
volume=bar.volume
)
# Otherwise only update minute bar
else:
self.week_bar.high_price = max(
self.week_bar.high_price,
bar.high_price
)
self.week_bar.low_price = min(
self.week_bar.low_price,
bar.low_price
)
self.week_bar.close_price = bar.close_price
self.week_bar.volume += int(bar.volume)
self.week_bar.open_interest = bar.open_interest
# Push finished window bar
if finished_bar:
self.on_hour_bar(finished_bar) #on_window_bar只关心bar的数量,不关心bar的类型,所以可以直接调用
# Cache last bar object
self.last_bar = bar
同理类似日线,我们也打印一下日志
结合日历看比较方便,在1月4号周五的夜盘开始的时候,可以打印上一根周k线,这根k线从2号的白天9点到4号下午15点
1月11号周五的夜盘开始的时候推送出上周五1月4号夜盘21点到本周五11号下午15点的周k线。