三個内置引擎
- BaseEngine
- 1. LogEngine 日志引擎
- 2. OmsEngine 指令管理系統
- 3. EmailEngine 郵件引擎
-
- 3.1 一個運作線程
- 3.2 将發送郵件添加到主引擎
- 3.3 關于郵箱的配置
- 3.4 測試
BaseEngine
引擎的基礎類,實作功能引擎的抽象類。初始化:初始化主引擎,事件引擎和引擎名。
下面三個内置功能引擎都繼承該類
1. LogEngine 日志引擎
處理日志事件和日志輸出
日志引擎主要是對python logging子產品的進一步義封裝。這裡對其基于vnpy做簡單說明。執行個體化日志引擎時,加載了logging子產品的基本配置:
def __init__(self, main_engine: MainEngine, event_engine: EventEngine):
super(LogEngine, self).__init__(main_engine, event_engine, "log")
if not SETTINGS["log.active"]: # 是否啟動日志
return
self.level = SETTINGS["log.level"] # 日志記錄級别 預設為最高50 是以預設情況下不會輸出任何資訊
self.logger = logging.getLogger("VN Trader") # 使用工廠方法傳回一個Logger執行個體。
self.logger.setLevel(self.level) # 設定日志級别
self.formatter = logging.Formatter("%(asctime)s %(levelname)s: %(message)s") # 日志記錄樣式
self.add_null_handler() #
if SETTINGS["log.console"]:
self.add_console_handler() # 添加控制台輸出
if SETTINGS["log.file"]:
self.add_file_handler() # 添加檔案輸出
self.register_event() # 注冊事件
關于本地的配置SETTINGS可以在vnpy/trader/setting.py中找到(我自己開發時将不同功能的配置資訊分割到不同檔案,比如日志的配置寫在一個檔案,郵件的寫在另一個檔案),可以看到,vnpy将日志級别設定到最高,在loggging子產品中定義,僅僅在該等級以上(或相等)才會回報資訊。是以預設情況下隻有出現嚴重錯誤時才會出現回報,否則不會産生任何日志資訊,這裡為了測試需要将日志級别該低一些(我調到了20,也就是DEBUG級别)。
日志級别及數值以及使用場景見下表:
類型 | 數值 | 描述 |
---|---|---|
CRITICAL | 50 | 嚴重錯誤,表明軟體已不能繼續運作了。 |
ERROR | 40 | 由于更嚴重的問題,軟體已不能執行一些功能了。 |
WARNING | 30 | 某些沒有預料到的事件的提示,或者在将來可能會出現的問題提示。例如:磁盤空間不足。但是軟體還是會照常運作。 |
INFO | 20 | 證明事情按預期工作。 |
DEBUG | 10 | 詳細資訊,一般隻在調試問題時使用。 |
NOTSET | – |
接着,程式使用
add_null_handler
、
add_console_handler
和
add_file_handler
添加三個處理器,分别對日志的處理方式進行定義。
add_console_handler 添加控制台列印:
def add_console_handler(self):
console_handler = logging.StreamHandler() # 執行個體化對象
console_handler.setLevel(self.level) # 設定輸出到控制台的級别
console_handler.setFormatter(self.formatter) # 設定輸出樣式
self.logger.addHandler(console_handler) # 添加此處理器對象
add_file_handler 添加檔案輸出
def add_file_handler(self):
"""
Add file output of log.
"""
today_date = datetime.now().strftime("%Y%m%d")
filename = f"vt_{today_date}.log" # 檔案名稱按照日期定義
log_path = get_folder_path("log")
file_path = log_path.joinpath(filename)
file_handler = logging.FileHandler(
file_path, mode="a", encoding="utf8"
) # 執行個體檔案處理器對象,并配置輸出檔案路徑(vnpy預設輸出到項目下的.vntrader/log檔案夾(如果存在,不存在輸出到項目工作路徑下的.vntrader/log檔案夾))
file_handler.setLevel(self.level) # 設定檔案輸出級别
file_handler.setFormatter(self.formatter) #
self.logger.addHandler(file_handler) #
不同的處理器可以對日志級别、輸出樣式等資訊做詳細配置
注冊事件
self.register_event()
,将日志事件注冊到事件引擎:
def register_event(self):
self.event_engine.register(EVENT_LOG, self.process_log_event)
處理方法
process_log_event
對日志資訊進行輸出,使用logging子產品的log()對象
def process_log_event(self, event: Event):
log = event.data
self.logger.log(log.level, log.msg)
現在使用下面的代碼對日志輸出進行測試
event_engine = EventEngine()
main_engine = MainEngine(event_engine)
main_engine.write_log("日志測試")
得到輸出結果:
可以看見輸出的日志級别為INFO,那麼這是哪裡定義的呢?看看調用的write_log()方法:
def write_log(self, msg: str, source: str = ""):
log = LogData(msg=msg, gateway_name=source) # 執行個體化LogData對象
event = Event(EVENT_LOG, log)
self.event_engine.put(event) # 将日志事件放到事件引擎
是以事件引擎就會執行process_log_event函數輸出資訊。在執行個體化log對象的時候可以在vnpy/trader/object.py 中的LogData類中看到,對象的level屬性預設為INFO級别,也就是20:
@dataclass
class LogData(BaseData):
msg: str
level: int = INFO
def __post_init__(self):
self.time = datetime.now()
是以在這一步,我對write_log進行優化,對其進行級别設定,預設還是20
def write_log(self, msg: str, source: str = "", level=20):
"""
Put log event with specific message.
"""
log = LogData(msg=msg, level=level, gateway_name=source)
event = Event(EVENT_LOG, log)
self.event_engine.put(event)
測試:
main_engine.write_log("日志測試", level=20)
main_engine.write_log("警告測試", level=30)
得到結果:
2. OmsEngine 指令管理系統
(Order Manage System)
指令管理系統做的事情很簡單。1. 添加函數到主引擎。 2. 注冊事件
- 添加查詢函數到主引擎
self.main_engine.get_tick = self.get_tick # 擷取行情資料 需要傳入vt_symbol參數
self.main_engine.get_order = self.get_order # 擷取訂單 需要傳入vt_orderid參數
self.main_engine.get_trade = self.get_trade # 擷取交易資訊 需要傳入vt_tradeid
self.main_engine.get_position = self.get_position # 擷取持倉資訊 需要傳入vt_positionid參數
self.main_engine.get_account = self.get_account # 擷取賬戶資訊 需要傳入vt_accountid參數
self.main_engine.get_contract = self.get_contract # 擷取合約資訊 需要傳入vt_symbol參數
self.main_engine.get_all_ticks = self.get_all_ticks # 擷取所有行情資料
self.main_engine.get_all_orders = self.get_all_orders # 擷取所有訂單資訊
self.main_engine.get_all_trades = self.get_all_trades # 擷取所有交易資訊
self.main_engine.get_all_positions = self.get_all_positions # 擷取所有持倉資訊
self.main_engine.get_all_accounts = self.get_all_accounts # 擷取所有賬戶資訊
self.main_engine.get_all_contracts = self.get_all_contracts # 擷取所有合約
self.main_engine.get_all_active_orders = self.get_all_active_orders # 擷取訂單,參數vt_symbol,為空則擷取所有
- 注冊事件
self.event_engine.register(EVENT_TICK, self.process_tick_event) # 處理行情資料
self.event_engine.register(EVENT_ORDER, self.process_order_event) # 處理訂單
self.event_engine.register(EVENT_TRADE, self.process_trade_event) # 處理交易資訊
self.event_engine.register(EVENT_POSITION, self.process_position_event) # 處理持倉
self.event_engine.register(EVENT_ACCOUNT, self.process_account_event) # 處理賬戶資訊
self.event_engine.register(EVENT_CONTRACT, self.process_contract_event) # 處理合約資訊
上面的處理方法都是将相應的資料已字典形式儲存,如行情資料資訊:
def process_tick_event(self, event: Event):
tick = event.data
self.ticks[tick.vt_symbol] = tick
是以當有新的同一品種的行情資訊到達時,字典将更新該值,而不是直接儲存。
3. EmailEngine 郵件引擎
vnpy的郵件引擎是基于python smtplib子產品封裝的工具。SMTP(Simple Mail Transfer Protocol)即簡單郵件傳輸協定,它是一組用于由源位址到目的位址傳送郵件的規則,由它來控制信件的中轉方式。python的smtplib提供了一種很友善的途徑發送電子郵件。它對smtp協定進行了簡單的封裝。
初始化:
3.1 一個運作線程
該線程先從隊列中取出消息,然後連接配接郵箱服務及端口,并登陸郵箱,最後發送郵件
def run(self):
while self.active:
try:
msg = self.queue.get(block=True, timeout=1)
with smtplib.SMTP_SSL(
SETTINGS["email.server"], SETTINGS["email.port"]
) as smtp:
smtp.login(
SETTINGS["email.username"], SETTINGS["email.password"] # 登入郵箱
)
smtp.send_message(msg)
except Empty:
pass
3.2 将發送郵件添加到主引擎
self.main_engine.send_email = self.send_email
send_email()方法:
def send_email(self, subject: str, content: str, receiver: str = ""):
"""
:param subject: 郵件主題
:param content: 郵件内容
:param receiver: 接收者
:return:
"""
# 開啟郵件引擎
if not self.active:
self.start()
# 如果沒有指定接收者就使用預設的接收者
if not receiver:
receiver = SETTINGS["email.receiver"]
msg = EmailMessage()
msg["From"] = SETTINGS["email.sender"]
msg["To"] = receiver # 這裡vnpy使用SETTINGS["email.receiver"], 應該使用接收參數receiver
msg["Subject"] = subject
msg.set_content(content)
self.queue.put(msg) # 放入隊列
3.3 關于郵箱的配置
vnpy/trader/setting.py中有幾個關于郵箱的配置:
“email.server”:郵箱伺服器。
“email.port”:郵箱端口
“email.username”:發送者的郵箱
“email_password”:發送者的密碼,不是郵箱的登入密碼,而是授權碼
“emial_sender”:發送者,也填寫發者郵箱
“email_receiver”:接收者郵箱,可填寫,填寫即預設
3.4 測試
event_engine = EventEngine()
main_engine = MainEngine(event_engine)
email_engine = main_engine.get_engine("email")
email_engine.send_email("測試郵件主題", "-----------------------測試郵件内容----------------------", receiver) # receiver為接收者的郵件位址