天天看點

vnpy源碼學習記錄(2) ----------三個内置引擎BaseEngine1. LogEngine 日志引擎2. OmsEngine 指令管理系統3. EmailEngine 郵件引擎

三個内置引擎

  • BaseEngine
  • 1. LogEngine 日志引擎
  • 2. OmsEngine 指令管理系統
  • 3. EmailEngine 郵件引擎
    • 3.1 一個運作線程
    • 3.2 将發送郵件添加到主引擎
    • 3.3 關于郵箱的配置
    • 3.4 測試

BaseEngine

引擎的基礎類,實作功能引擎的抽象類。初始化:初始化主引擎,事件引擎和引擎名。

下面三個内置功能引擎都繼承該類

1. LogEngine 日志引擎

處理日志事件和日志輸出

日志引擎主要是對python logging子產品的進一步義封裝。這裡對其基于vnpy做簡單說明。執行個體化日志引擎時,加載了logging子產品的基本配置:

def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
    super(LogEngine, self).__init__(main_engine, event_engine, "log")
    if not SETTINGS["log.active"]:  # 是否啟動日志
        return
    self.level = SETTINGS["log.level"]  # 日志記錄級别  預設為最高50 是以預設情況下不會輸出任何資訊
    self.logger = logging.getLogger("VN Trader")  # 使用工廠方法傳回一個Logger執行個體。
    self.logger.setLevel(self.level)  # 設定日志級别
    self.formatter = logging.Formatter("%(asctime)s  %(levelname)s: %(message)s")  # 日志記錄樣式
    self.add_null_handler()  #
    if SETTINGS["log.console"]:
        self.add_console_handler()  # 添加控制台輸出
    if SETTINGS["log.file"]:
        self.add_file_handler()  # 添加檔案輸出
	self.register_event()  # 注冊事件
           

關于本地的配置SETTINGS可以在vnpy/trader/setting.py中找到(我自己開發時将不同功能的配置資訊分割到不同檔案,比如日志的配置寫在一個檔案,郵件的寫在另一個檔案),可以看到,vnpy将日志級别設定到最高,在loggging子產品中定義,僅僅在該等級以上(或相等)才會回報資訊。是以預設情況下隻有出現嚴重錯誤時才會出現回報,否則不會産生任何日志資訊,這裡為了測試需要将日志級别該低一些(我調到了20,也就是DEBUG級别)。

日志級别及數值以及使用場景見下表:

類型 數值 描述
CRITICAL 50 嚴重錯誤,表明軟體已不能繼續運作了。
ERROR 40 由于更嚴重的問題,軟體已不能執行一些功能了。
WARNING 30 某些沒有預料到的事件的提示,或者在将來可能會出現的問題提示。例如:磁盤空間不足。但是軟體還是會照常運作。
INFO 20 證明事情按預期工作。
DEBUG 10 詳細資訊,一般隻在調試問題時使用。
NOTSET

接着,程式使用

add_null_handler

add_console_handler

add_file_handler

添加三個處理器,分别對日志的處理方式進行定義。

add_console_handler 添加控制台列印:

def add_console_handler(self):
    console_handler = logging.StreamHandler()  # 執行個體化對象
    console_handler.setLevel(self.level)  # 設定輸出到控制台的級别
    console_handler.setFormatter(self.formatter)  # 設定輸出樣式
    self.logger.addHandler(console_handler)  # 添加此處理器對象
           

add_file_handler 添加檔案輸出

def add_file_handler(self):
    """
    Add file output of log. 
    """
    today_date = datetime.now().strftime("%Y%m%d")
    filename = f"vt_{today_date}.log"  # 檔案名稱按照日期定義
    log_path = get_folder_path("log")
    file_path = log_path.joinpath(filename)
    file_handler = logging.FileHandler(
        file_path, mode="a", encoding="utf8"
    )  # 執行個體檔案處理器對象,并配置輸出檔案路徑(vnpy預設輸出到項目下的.vntrader/log檔案夾(如果存在,不存在輸出到項目工作路徑下的.vntrader/log檔案夾))
    file_handler.setLevel(self.level)  # 設定檔案輸出級别
    file_handler.setFormatter(self.formatter)  # 
    self.logger.addHandler(file_handler)  # 
           

不同的處理器可以對日志級别、輸出樣式等資訊做詳細配置

注冊事件

self.register_event()

,将日志事件注冊到事件引擎:

def register_event(self):
    self.event_engine.register(EVENT_LOG, self.process_log_event)
           

處理方法

process_log_event

對日志資訊進行輸出,使用logging子產品的log()對象

def process_log_event(self, event: Event):
    log = event.data
    self.logger.log(log.level, log.msg)
           

現在使用下面的代碼對日志輸出進行測試

event_engine = EventEngine()
main_engine = MainEngine(event_engine)
main_engine.write_log("日志測試")	
           

得到輸出結果:

vnpy源碼學習記錄(2) ----------三個内置引擎BaseEngine1. LogEngine 日志引擎2. OmsEngine 指令管理系統3. EmailEngine 郵件引擎
vnpy源碼學習記錄(2) ----------三個内置引擎BaseEngine1. LogEngine 日志引擎2. OmsEngine 指令管理系統3. EmailEngine 郵件引擎

可以看見輸出的日志級别為INFO,那麼這是哪裡定義的呢?看看調用的write_log()方法:

def write_log(self, msg: str, source: str = ""):
    log = LogData(msg=msg, gateway_name=source)  # 執行個體化LogData對象
    event = Event(EVENT_LOG, log)
    self.event_engine.put(event)  # 将日志事件放到事件引擎

           

是以事件引擎就會執行process_log_event函數輸出資訊。在執行個體化log對象的時候可以在vnpy/trader/object.py 中的LogData類中看到,對象的level屬性預設為INFO級别,也就是20:

@dataclass
class LogData(BaseData):
    msg: str
    level: int = INFO
    def __post_init__(self):
        self.time = datetime.now()

           

是以在這一步,我對write_log進行優化,對其進行級别設定,預設還是20

def write_log(self, msg: str, source: str = "", level=20):
    """
    Put log event with specific message.
    """
    log = LogData(msg=msg, level=level, gateway_name=source)
    event = Event(EVENT_LOG, log)
    self.event_engine.put(event)

           

測試:

main_engine.write_log("日志測試", level=20)
main_engine.write_log("警告測試", level=30)
           

得到結果:

vnpy源碼學習記錄(2) ----------三個内置引擎BaseEngine1. LogEngine 日志引擎2. OmsEngine 指令管理系統3. EmailEngine 郵件引擎

2. OmsEngine 指令管理系統

(Order Manage System)

指令管理系統做的事情很簡單。1. 添加函數到主引擎。 2. 注冊事件

  1. 添加查詢函數到主引擎
self.main_engine.get_tick = self.get_tick  # 擷取行情資料 需要傳入vt_symbol參數
    self.main_engine.get_order = self.get_order  # 擷取訂單 需要傳入vt_orderid參數
    self.main_engine.get_trade = self.get_trade  # 擷取交易資訊 需要傳入vt_tradeid
    self.main_engine.get_position = self.get_position  # 擷取持倉資訊  需要傳入vt_positionid參數
    self.main_engine.get_account = self.get_account  # 擷取賬戶資訊  需要傳入vt_accountid參數
    self.main_engine.get_contract = self.get_contract  # 擷取合約資訊  需要傳入vt_symbol參數
    self.main_engine.get_all_ticks = self.get_all_ticks  # 擷取所有行情資料
    self.main_engine.get_all_orders = self.get_all_orders  # 擷取所有訂單資訊
    self.main_engine.get_all_trades = self.get_all_trades  # 擷取所有交易資訊
    self.main_engine.get_all_positions = self.get_all_positions  # 擷取所有持倉資訊
    self.main_engine.get_all_accounts = self.get_all_accounts  # 擷取所有賬戶資訊
    self.main_engine.get_all_contracts = self.get_all_contracts  # 擷取所有合約
    self.main_engine.get_all_active_orders = self.get_all_active_orders  # 擷取訂單,參數vt_symbol,為空則擷取所有
           
  1. 注冊事件
self.event_engine.register(EVENT_TICK, self.process_tick_event)  # 處理行情資料
self.event_engine.register(EVENT_ORDER, self.process_order_event)  # 處理訂單
self.event_engine.register(EVENT_TRADE, self.process_trade_event)  # 處理交易資訊
self.event_engine.register(EVENT_POSITION, self.process_position_event)  # 處理持倉
self.event_engine.register(EVENT_ACCOUNT, self.process_account_event)  # 處理賬戶資訊
self.event_engine.register(EVENT_CONTRACT, self.process_contract_event)  # 處理合約資訊
           

上面的處理方法都是将相應的資料已字典形式儲存,如行情資料資訊:

def process_tick_event(self, event: Event):
    tick = event.data
    self.ticks[tick.vt_symbol] = tick
           

是以當有新的同一品種的行情資訊到達時,字典将更新該值,而不是直接儲存。

3. EmailEngine 郵件引擎

vnpy的郵件引擎是基于python smtplib子產品封裝的工具。SMTP(Simple Mail Transfer Protocol)即簡單郵件傳輸協定,它是一組用于由源位址到目的位址傳送郵件的規則,由它來控制信件的中轉方式。python的smtplib提供了一種很友善的途徑發送電子郵件。它對smtp協定進行了簡單的封裝。

初始化:

3.1 一個運作線程

該線程先從隊列中取出消息,然後連接配接郵箱服務及端口,并登陸郵箱,最後發送郵件

def run(self):
    while self.active:
        try:
            msg = self.queue.get(block=True, timeout=1)
            with smtplib.SMTP_SSL(
                    SETTINGS["email.server"], SETTINGS["email.port"]
            ) as smtp:
                smtp.login(
                    SETTINGS["email.username"], SETTINGS["email.password"]  # 登入郵箱
                )
                smtp.send_message(msg)
        except Empty:
            pass

           

3.2 将發送郵件添加到主引擎

self.main_engine.send_email = self.send_email
           

send_email()方法:

def send_email(self, subject: str, content: str, receiver: str = ""):
        """
        :param subject:  郵件主題
        :param content:  郵件内容
        :param receiver:  接收者
        :return:
        """
    # 開啟郵件引擎
    if not self.active:
        self.start()
    # 如果沒有指定接收者就使用預設的接收者
    if not receiver:
        receiver = SETTINGS["email.receiver"]
  
    msg = EmailMessage()
    msg["From"] = SETTINGS["email.sender"]
    msg["To"] = receiver  # 這裡vnpy使用SETTINGS["email.receiver"], 應該使用接收參數receiver  
    msg["Subject"] = subject
    msg.set_content(content)
    
    self.queue.put(msg)  # 放入隊列

           

3.3 關于郵箱的配置

vnpy/trader/setting.py中有幾個關于郵箱的配置:

vnpy源碼學習記錄(2) ----------三個内置引擎BaseEngine1. LogEngine 日志引擎2. OmsEngine 指令管理系統3. EmailEngine 郵件引擎

“email.server”:郵箱伺服器。

“email.port”:郵箱端口

“email.username”:發送者的郵箱

“email_password”:發送者的密碼,不是郵箱的登入密碼,而是授權碼

“emial_sender”:發送者,也填寫發者郵箱

“email_receiver”:接收者郵箱,可填寫,填寫即預設

3.4 測試

event_engine = EventEngine()
main_engine = MainEngine(event_engine)
email_engine = main_engine.get_engine("email")
email_engine.send_email("測試郵件主題", "-----------------------測試郵件内容----------------------", receiver)  # receiver為接收者的郵件位址
           
vnpy源碼學習記錄(2) ----------三個内置引擎BaseEngine1. LogEngine 日志引擎2. OmsEngine 指令管理系統3. EmailEngine 郵件引擎