天天看点

VNPY核心数据结构EventEngine理解

VNPY中最重要的数据处理类就是EventEngine,负责各模块间的数据通信和针对不同数据进行函数调用处理。EventEngine由两个主要的结构构成,一个是Queue进行数据的传输和存放,一个是利用字典来进行数据类型和函数的对应及执行,可以将其视为一个具有数据存储和数据处理的容器。

首先从Event类来看:

class Event:

    def __init__(self, type: str, data: Any = None):
        """"""
        self.type: str = type
        self.data: Any = data
           

可以将Event理解为为数据添加了对应的业务属性,便于根据业务属性找到对应的函数进行处理。Event存在两个属性,其中type属性用于存储Event的属性,使用字符串进行标识;其中data属性用于存储实际的数据。

接下来看EventEngine类:

class EventEngine:
    def __init__(self, interval: int = 1):
        self._interval: int = interval
        # 数据传输队列
        self._queue: Queue = Queue()
        # 判断Engine是否启动
        self._active: bool = False
        self._thread: Thread = Thread(target=self._run)
        self._timer: Thread = Thread(target=self._run_timer)
        # 事件类型和对应函数的映射
        self._handlers: defaultdict = defaultdict(list)
        # 通用事件映射
        self._general_handlers: List = []
           

EventEngine初始化时采用了Queue解决了进程间传参的问题,由生产者产生数据后,将数据和数据业务类型打包生成Event,放入对应的EventEngine的_queue中。_handlers利用defaultdict生成了一个空字典,该字典的value缺省值为list,字典形式如下{‘数据业务类型’:[处理方法1,处理方法2...]}。

那么如何将处理方法和对应的业务类型进行绑定与解绑,这次采用了register和unregister方法进行处理:

def register(self, type: str, handler: HandlerType) -> None:
        handler_list = self._handlers[type]
        if handler not in handler_list:
            handler_list.append(handler)

    def unregister(self, type: str, handler: HandlerType) -> None:
        handler_list = self._handlers[type]

        if handler in handler_list:
            handler_list.remove(handler)

        if not handler_list:
            self._handlers.pop(type)
           

函数接收数据业务类型和对应的处理方法,由于_handlers利用defaultdict生成,所以如果数据业务类型不在_handlers中,则生成{‘数据业务类型’:[]},并将对应的处理方法放入list中。如果数据业务类型在_handlers中,则将对应的函数加入list。这里有效的利用的python中字典可以存储各类object的特性,可以理解为实现了一个switch case的结构。

最后就是EventEngine如何对到来的数据进行处理:

def _run(self) -> None:
        while self._active:
            try:
                event = self._queue.get(block=True, timeout=1)
                self._process(event)
            except Empty:
                pass

    def _process(self, event: Event) -> None:
        if event.type in self._handlers:
            [handler(event) for handler in self._handlers[event.type]]

        if self._general_handlers:
            [handler(event) for handler in self._general_handlers]

    def start(self) -> None:
        self._active = True
        self._thread.start()
        self._timer.start()
           

EventEngine启动时,将_active置为True,标示EventEngine已经启动,同时启动一个线程进行监听。该线程监听_queue是否为空,如果_queue中存在Event,则将其取出并在调用_process(),在_handlers中根据Event.type找到对应的函数列表,并遍历执行这些函数。

最终生产者消费者模型逻辑如下:

VNPY核心数据结构EventEngine理解