VNPY中最重要的数据处理类就是EventEngine,负责各模块间的数据通信和针对不同数据进行函数调用处理。EventEngine由两个主要的结构构成,一个是Queue进行数据的传输和存放,一个是利用字典来进行数据类型和函数的对应及执行,可以将其视为一个具有数据存储和数据处理的容器。
首先从Event类来看:
class Event:
def __init__(self, type: str, data: Any = None):
""""""
self.type: str = type
self.data: Any = data
可以将Event理解为为数据添加了对应的业务属性,便于根据业务属性找到对应的函数进行处理。Event存在两个属性,其中type属性用于存储Event的属性,使用字符串进行标识;其中data属性用于存储实际的数据。
接下来看EventEngine类:
class EventEngine:
def __init__(self, interval: int = 1):
self._interval: int = interval
# 数据传输队列
self._queue: Queue = Queue()
# 判断Engine是否启动
self._active: bool = False
self._thread: Thread = Thread(target=self._run)
self._timer: Thread = Thread(target=self._run_timer)
# 事件类型和对应函数的映射
self._handlers: defaultdict = defaultdict(list)
# 通用事件映射
self._general_handlers: List = []
EventEngine初始化时采用了Queue解决了进程间传参的问题,由生产者产生数据后,将数据和数据业务类型打包生成Event,放入对应的EventEngine的_queue中。_handlers利用defaultdict生成了一个空字典,该字典的value缺省值为list,字典形式如下{‘数据业务类型’:[处理方法1,处理方法2...]}。
那么如何将处理方法和对应的业务类型进行绑定与解绑,这次采用了register和unregister方法进行处理:
def register(self, type: str, handler: HandlerType) -> None:
handler_list = self._handlers[type]
if handler not in handler_list:
handler_list.append(handler)
def unregister(self, type: str, handler: HandlerType) -> None:
handler_list = self._handlers[type]
if handler in handler_list:
handler_list.remove(handler)
if not handler_list:
self._handlers.pop(type)
函数接收数据业务类型和对应的处理方法,由于_handlers利用defaultdict生成,所以如果数据业务类型不在_handlers中,则生成{‘数据业务类型’:[]},并将对应的处理方法放入list中。如果数据业务类型在_handlers中,则将对应的函数加入list。这里有效的利用的python中字典可以存储各类object的特性,可以理解为实现了一个switch case的结构。
最后就是EventEngine如何对到来的数据进行处理:
def _run(self) -> None:
while self._active:
try:
event = self._queue.get(block=True, timeout=1)
self._process(event)
except Empty:
pass
def _process(self, event: Event) -> None:
if event.type in self._handlers:
[handler(event) for handler in self._handlers[event.type]]
if self._general_handlers:
[handler(event) for handler in self._general_handlers]
def start(self) -> None:
self._active = True
self._thread.start()
self._timer.start()
EventEngine启动时,将_active置为True,标示EventEngine已经启动,同时启动一个线程进行监听。该线程监听_queue是否为空,如果_queue中存在Event,则将其取出并在调用_process(),在_handlers中根据Event.type找到对应的函数列表,并遍历执行这些函数。
最终生产者消费者模型逻辑如下: