天天看点

vnpy源码学习记录(1) ----------两个主引擎事件引擎:EventEngine主引擎:MainEngine

VNPY源码学习(1) ---------- 两个主引擎

  • 事件引擎:EventEngine
    • 1. 计时器
    • 2. 处理事件
    • 3. 测试
  • 主引擎:MainEngine
    • 1. 实例化主引擎
      • 1.1 加载事件引擎
      • 1.2 启动事件引擎
      • 1.3 初始化内置引擎
    • 2. 其他方法

在vnpy中,有两个主要的引擎:事件引擎和主引擎:

事件引擎:EventEngine

vnpy/event/engine.py

事件引擎根据其类型将事件对象分发给注册的处理程序。 它还会按每个间隔秒生成定时器事件,这个功能可用于计时。计时器事件每interval秒被生成。实例化:

event_engine = EventEngine() 
           

event_engine = EventEngine(interval) 
           

interval表示计时器间隔,默认为1秒。

EventEngine中

self._thread = Thread(target=self._run)  # 处理事件的线程
self._timer = Thread(target=self._run_timer)  # 计时器线程
           
  • _run()函数:从队列中获取事件并处理(_process()函数)事件。
    • 只要引擎被启动,引擎将不断从队列中取出事件进行process
    • _process():首先将事件分发给那些注册监听了此类型的处理程序。 然后分发事件给监听了所有类型通用处理程序。
  • _run_timer()函数:暂停interval秒后生成一个计时器事件
  • put():将一个事件对象放到事件队列中
  • register:给特定的事件类型注册处理器。每一个函数只能为某一事件类型注册一次。
  • register_general:注册通用处理程序

下面通过几个例子看看事件引擎的操作。

1. 计时器

首先看看计时器是怎么操作的。在实例化了事件引擎以后,启动事件引擎,并注册两个计时器事件:

event_engine = EventEngine()
event_engine.start()
event_engine.register(EVENT_TIMER, test)
event_engine.register(EVENT_TIMER, test2)

           

test和test2为事件的处理器,为一个方法,方法必须接受事件参数,这里打印出时间:

def test(event: Event):
    print("in test:" + str(datetime.now()))

def test2(event: Event):
	print("in test2:" + str(datetime.now()))

           

运行后得到结果:

vnpy源码学习记录(1) ----------两个主引擎事件引擎:EventEngine主引擎:MainEngine

解释:

event_engine.register(EVENT_TIMER, test)

注册一个计时器事件,并将该计时器事件的处理方法设置为

test()

register()

里面,将事件名称和事件处理器保存起来

def register(self, type: str, handler: HandlerType):
    handler_list = self._handlers[type]
    if handler not in handler_list:
        handler_list.append(handler)
           

注册了两个计时器事件之后,

self._handlers

为:

vnpy源码学习记录(1) ----------两个主引擎事件引擎:EventEngine主引擎:MainEngine

可见,事件是以字典的形式保存的,字典的key为事件名称,value为一个列表,里面的值为事件的处理方法。

而在事件引擎启动时,计时器线程就启动了,计时器每暂停一秒,就put一个计时器事件

def _run_timer(self):
    while self._active:
        sleep(self._interval)
        event = Event(EVENT_TIMER)
        self.put(event)
           

put就是将事件添加到队列对象中

def put(self, event: Event):
	self._queue.put(event)
           

在计时器线程启动的同时,处理器线程就启动了,下面是启动事件引擎的方法:

def start(self):
    self._active = True
    self._thread.start()
	self._timer.start()
           

因此处理器线程调用_run()方法,方法不断从队列中取事件,并用self._process(event)处理它:

def _run(self):
    while self._active:
        try:
            event = self._queue.get(block=True, timeout=1)
            self._process(event)
        except Empty:
            pass
           

2. 处理事件

使用for循环将该事件的处理方法全部依次调用,参数为事件本身

def _process(self, event: Event):
    if event.type in self._handlers:
        [handler(event) for handler in self._handlers[event.type]]

    if self._general_handlers:
        [handler(event) for handler in self._general_handlers]
           

3. 测试

如何自己定义一个事件

首先定义一个处理器,事件引擎调用方法后参数为事件本身,因此打印出事件的data属性:

def my_func(event: Event):
    print(event.data)
           

实例化一个事件:

my_event = Event(
    type="self_event",
    data="test data ..."
)
           

然后注册这个事件

event_engine.register("self_event", my_func)
           

并将该事件放入队列中

event_engine.put(my_event)
           

执行后得到结果为:

vnpy源码学习记录(1) ----------两个主引擎事件引擎:EventEngine主引擎:MainEngine

主引擎:MainEngine

vnpy/trader/engine.py 主引擎是VN Trader的核心,

实例化:

main_engine = MainEngine(event_engine) 
           

1. 实例化主引擎

event_engine表示事件引擎对象,默认为None,如果没有传入事件引擎对象,系统在主引擎中实例化一个对象:

1.1 加载事件引擎

if event_engine:
        self.event_engine = event_engine
else:
        self.event_engine = EventEngine()

           

1.2 启动事件引擎

然后event_engine.start() 启动事件引擎,start()方法中

  1. 启动事件引擎(self._active = True)以处理事件生和成计时器事件。
  2. 启动处理事件的线程
  3. 启动计时器线程
os.chdir(TRADER_DIR)   # 更改工作目录,这一步没必要
           

1.3 初始化内置引擎

self.init_engines()

初始化引擎:

def init_engines(self):
    self.add_engine(LogEngine)
    self.add_engine(OmsEngine)
  self.add_engine(EmailEngine)
           

初始化引擎时将这三种引擎添加到主引擎的engines字典中,key为engine的名字,value为引擎的实例对象:

def add_engine(self, engine_class: Any):
    engine = engine_class(self, self.event_engine)  # 实例化指定引擎
    self.engines[engine.engine_name] = engine
    return engine
           

vnpy中内置了三个引擎,分别为:

  1. LogEngine 日志引擎
  2. OmsEngine 指定管理系统 (Order Manage System)
  3. EmailEngine

在后面会详细介绍该三种引擎。

2. 其他方法

def add_engine(self, engine_class: Any)  # 添加引擎
 def add_gateway(self, gateway_class: BaseGateway)  # 添加网关
 def add_app(self, app_class: BaseApp)  # 添加应用
 def write_log(self, msg: str, source: str = "", level=20)  # 输出日志,在LogEngine中介绍
 def get_gateway(self, gateway_name: str)  # 根据网关名称获取网关实例
 def get_engine(self, engine_name: str)  # 根据引擎名称获取引擎实例

 def get_all_gateway_names(self)  # 获取所有的网关名(只获取名称,不获取实例)
 def get_all_apps(self)  # 获取所有的应用名称
 def get_all_exchanges(self)  # 获取所有的交易所 交易所在添加网关的时候将指定网关支持的交易所添加到交易所列表中
           
def connect(self, setting: dict, gateway_name: str)
def subscribe(self, req: SubscribeRequest, gateway_name: str)
def send_order(self, req: OrderRequest, gateway_name: str)
def cancel_order(self, req: CancelRequest, gateway_name: str)
def send_orders(self, reqs: Sequence[OrderRequest], gateway_name: str)
def cancel_orders(self, reqs: Sequence[CancelRequest], gateway_name: str)
def query_history(self, req: HistoryRequest, gateway_name: str)
           

上面的几个方法都是在指定网关内完成的,具体使用的时候具体讲解

def close(self):
    """
	在程序退出前确保每一个网关和应用都被关闭
    """
    self.event_engine.stop()
    for engine in self.engines.values():
        engine.close()
    for gateway in self.gateways.values():
        gateway.close()