之前為了實作利用遺傳算法,進行多程序政策的優化,學習研究了python的多程序庫Multiprocessing。以前感覺真是黑科技,學習後發現,還是python優點,簡單好用,對于一般應用還是很好了解。
首先,由于GIL(全局解釋鎖)的問題,全局對象隻能一個程序調用,python多線程并不能充分利用多核處理器,比如有時候用pandas跑大型資料分析,發現隻有一核在累死累活。如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多程序。multiprocessing可以給每個程序賦予單獨的Python解釋器,這樣就規避了全局解釋鎖所帶來的問題。可以了解為多核CPU配置設定好一個工作任務,這個工作任務包括工作方法和工作内容。
其實python多線程很簡單,相對于其他語言來說。其實簡單就是針對需要多線程的方法func(a),a是參數。相當于工作内容;使用 Multiprocessing. Process(target = func, args =(a,)),建立一個Prcoess對象,也就是工作任務,再啟動這個對象,這樣一個多程序任務就完成了。等CPU配置設定一個獨立核去幹活,func(a)就開動了。這裡唯一要注意args是預設輸入元祖參數。
P = Multiprocessing.Process(target = func, args =(a,))
P.start()
Multiprocessing提供了更簡潔的pool做為程序池,其實叫任務池更為恰當。把需要幹的工作任務打包好,放在這個池子裡面,這樣空閑下來的核心就撿pool的任務幹活。
常見的pool的使用如下,其中prcesses = 4 是定義任務池大小,不一定要小于或者等于cpu核心數量,可以大于cpu核心數量,不過這樣就有幾個任務空挂着還占用記憶體。
然後使用pool方法apply_async(task, args=(x,)),把打包好的任務插入池中。 apply_asyncs是異步的帶傳回值。如果用apply也可以正常,但是會沒有傳回值,此處不仔細研究了。
之後close()是把這個任務池關閉,不再接受新的任務;但是還有一些已有任務在跑,是以用pool.join(),吊着主程式,直到所有任務完成才進入下一步。
if __name__ == '__main__':
Multiprocessing.pool = Pool(processes=4)
for x in range(10):
pool.apply_async(task, args=(x,))
pool.close()
pool.join()
下面看看VNPY多程序優化方法。其實很好了解了,runParallelOptimization是類BacktestingEngine的一個方法。
傳入參數strategyClass就是這個政策類,setting是要優化參數範圍,後面通過optimizationSetting.generateSetting()生成政策參數隊列,做為任務内容;optimizationSetting.optimizeTarget是後面傳回值。至于回測品種,回測時間段,交易費用什麼,在 BacktestingEngine建立時候維護了。
然後建立任務池pool,大小剛好是cpu核數,這個也是比較穩妥設定。
之後做一個l隊列來放傳回值。
然後打包政策類,回測參數,政策參數做為任務内容,和任務方法optimize一起組合為一個工作任務。然後插入任務池給cpu核心去跑。這個時候在系統螢幕可以看到于核心數相同的python虛拟環境運作。
然後就是對傳回值排序。後面詳細說說。
df = engine.runParallelOptimization(AtrRsiStrategy, setting)
def runParallelOptimization(self, strategyClass, optimizationSetting):
"""并行優化參數"""
# 擷取優化設定
settingList = optimizationSetting.generateSetting()
targetName = optimizationSetting.optimizeTarget
# 檢查參數設定問題
if not settingList or not targetName:
self.output(u'優化設定有問題,請檢查')
# 多程序優化,啟動一個對應CPU核心數量的程序池
pool = multiprocessing.Pool(multiprocessing.cpu_count())
l = []
for setting in settingList:
l.append(pool.apply_async(optimize, (strategyClass, setting,
targetName, self.mode,
self.startDate, self.initDays, self.endDate,
self.slippage, self.rate, self.size, self.priceTick,
self.dbName, self.symbol)))
pool.close()
pool.join()
# 顯示結果
resultList = [res.get() for res in l]
resultList.sort(reverse=True, key=lambda result:result[1])
return resultList
像現在雙核四線程就有四個python環境在跑任務。

這裡會發現是用靜态方法optimize,如果直接調用 BacktestingEngine的回測方法更簡潔,為什麼沒有呢,這個是python2.7的Multiprocessing的一個局限,隻能打包靜态方法做為工作方法,如果打包類中的方法,會提示錯誤。
cPickle.PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup builtin .instanceme
如果VNPY2.0基于python3.6版本,應該就會更簡化一些。
下面看看 靜态方法 optimize,其實沒什麼好說,就是建立一個回測引擎BacktestingEngine對象,按照參數跑一遍回測,傳回一個元祖,包含了這次回測的參數,針對回測目标的值,和一個包含回測結果的字典,這個字典包括什麼年化收入,sharpe等一堆回測結果。
然後所有的回測結果元祖組成一個回測結果隊列,這個結果隊列按照targetValue反向排序,最大放在第一位。
因為太多了,一般我都是輸出到excel裡面,之前說過怎麼實作。
#----------------------------------------------------------------------
def optimize(strategyClass, setting, targetName,
mode, startDate, initDays, endDate,
slippage, rate, size, priceTick,
dbName, symbol):
"""多程序優化時跑在每個程序中運作的函數"""
engine = BacktestingEngine()
engine.setBacktestingMode(mode)
engine.setStartDate(startDate, initDays)
engine.setEndDate(endDate)
engine.setSlippage(slippage)
engine.setRate(rate)
engine.setSize(size)
engine.setPriceTick(priceTick)
engine.setDatabase(dbName, symbol)
engine.initStrategy(strategyClass, setting)
engine.runBacktesting()
engine.calculateDailyResult()
d, result = engine.calculateDailyStatistics()
try:
targetValue = result[targetName]
except KeyError:
targetValue = 0
return (str(setting), targetValue, result)