天天看點

python多程序簡介,和VNPY多程序參數優化代碼分析

之前為了實作利用遺傳算法,進行多程序政策的優化,學習研究了python的多程序庫Multiprocessing。以前感覺真是黑科技,學習後發現,還是python優點,簡單好用,對于一般應用還是很好了解。

首先,由于GIL(全局解釋鎖)的問題,全局對象隻能一個程序調用,python多線程并不能充分利用多核處理器,比如有時候用pandas跑大型資料分析,發現隻有一核在累死累活。如果想要充分地使用多核CPU的資源,在python中大部分情況需要使用多程序。multiprocessing可以給每個程序賦予單獨的Python解釋器,這樣就規避了全局解釋鎖所帶來的問題。可以了解為多核CPU配置設定好一個工作任務,這個工作任務包括工作方法和工作内容。

其實python多線程很簡單,相對于其他語言來說。其實簡單就是針對需要多線程的方法func(a),a是參數。相當于工作内容;使用 Multiprocessing. Process(target = func, args =(a,)),建立一個Prcoess對象,也就是工作任務,再啟動這個對象,這樣一個多程序任務就完成了。等CPU配置設定一個獨立核去幹活,func(a)就開動了。這裡唯一要注意args是預設輸入元祖參數。

P = Multiprocessing.Process(target = func, args =(a,))
P.start()
      

Multiprocessing提供了更簡潔的pool做為程序池,其實叫任務池更為恰當。把需要幹的工作任務打包好,放在這個池子裡面,這樣空閑下來的核心就撿pool的任務幹活。

常見的pool的使用如下,其中prcesses = 4 是定義任務池大小,不一定要小于或者等于cpu核心數量,可以大于cpu核心數量,不過這樣就有幾個任務空挂着還占用記憶體。

然後使用pool方法apply_async(task, args=(x,)),把打包好的任務插入池中。 apply_asyncs是異步的帶傳回值。如果用apply也可以正常,但是會沒有傳回值,此處不仔細研究了。

之後close()是把這個任務池關閉,不再接受新的任務;但是還有一些已有任務在跑,是以用pool.join(),吊着主程式,直到所有任務完成才進入下一步。

if __name__ == '__main__':
    Multiprocessing.pool = Pool(processes=4)
    for x in range(10):
        pool.apply_async(task, args=(x,))
    pool.close()
    pool.join()
      

下面看看VNPY多程序優化方法。其實很好了解了,runParallelOptimization是類BacktestingEngine的一個方法。

傳入參數strategyClass就是這個政策類,setting是要優化參數範圍,後面通過optimizationSetting.generateSetting()生成政策參數隊列,做為任務内容;optimizationSetting.optimizeTarget是後面傳回值。至于回測品種,回測時間段,交易費用什麼,在 BacktestingEngine建立時候維護了。

然後建立任務池pool,大小剛好是cpu核數,這個也是比較穩妥設定。

之後做一個l隊列來放傳回值。

然後打包政策類,回測參數,政策參數做為任務内容,和任務方法optimize一起組合為一個工作任務。然後插入任務池給cpu核心去跑。這個時候在系統螢幕可以看到于核心數相同的python虛拟環境運作。

然後就是對傳回值排序。後面詳細說說。

df =  engine.runParallelOptimization(AtrRsiStrategy, setting)
def runParallelOptimization(self, strategyClass, optimizationSetting):
    """并行優化參數"""
    # 擷取優化設定        
    settingList = optimizationSetting.generateSetting()
    targetName = optimizationSetting.optimizeTarget
    
    # 檢查參數設定問題
    if not settingList or not targetName:
        self.output(u'優化設定有問題,請檢查')
    
    # 多程序優化,啟動一個對應CPU核心數量的程序池
    pool = multiprocessing.Pool(multiprocessing.cpu_count())
    l = []
    for setting in settingList:
        l.append(pool.apply_async(optimize, (strategyClass, setting,
                                             targetName, self.mode, 
                                             self.startDate, self.initDays, self.endDate,
                                             self.slippage, self.rate, self.size, self.priceTick,
                                             self.dbName, self.symbol)))
    pool.close()
    pool.join()
    
    # 顯示結果
    resultList = [res.get() for res in l]
    resultList.sort(reverse=True, key=lambda result:result[1])
    return resultList
      

像現在雙核四線程就有四個python環境在跑任務。

python多程式簡介,和VNPY多程式參數優化代碼分析

這裡會發現是用靜态方法optimize,如果直接調用 BacktestingEngine的回測方法更簡潔,為什麼沒有呢,這個是python2.7的Multiprocessing的一個局限,隻能打包靜态方法做為工作方法,如果打包類中的方法,會提示錯誤。

cPickle.PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup  builtin .instanceme

如果VNPY2.0基于python3.6版本,應該就會更簡化一些。

下面看看 靜态方法 optimize,其實沒什麼好說,就是建立一個回測引擎BacktestingEngine對象,按照參數跑一遍回測,傳回一個元祖,包含了這次回測的參數,針對回測目标的值,和一個包含回測結果的字典,這個字典包括什麼年化收入,sharpe等一堆回測結果。

然後所有的回測結果元祖組成一個回測結果隊列,這個結果隊列按照targetValue反向排序,最大放在第一位。

因為太多了,一般我都是輸出到excel裡面,之前說過怎麼實作。

#----------------------------------------------------------------------
def optimize(strategyClass, setting, targetName,
             mode, startDate, initDays, endDate,
             slippage, rate, size, priceTick,
             dbName, symbol):
    """多程序優化時跑在每個程序中運作的函數"""
    engine = BacktestingEngine()
    engine.setBacktestingMode(mode)
    engine.setStartDate(startDate, initDays)
    engine.setEndDate(endDate)
    engine.setSlippage(slippage)
    engine.setRate(rate)
    engine.setSize(size)
    engine.setPriceTick(priceTick)
    engine.setDatabase(dbName, symbol)
    engine.initStrategy(strategyClass, setting)
    engine.runBacktesting()
    engine.calculateDailyResult()
    d, result = engine.calculateDailyStatistics()
    try:
        targetValue = result[targetName]
    except KeyError:
        targetValue = 0
    return (str(setting), targetValue, result)