天天看點

Python 多程序 multiprocessing.Pool類詳解

multiprocessing子產品

multiprocessing包是Python中的多程序管理包。它與 threading.Thread類似,可以利用multiprocessing.Process對象來建立一個程序。該程序可以允許放在Python程式内部編寫的函數中。該Process對象與Thread對象的用法相同,擁有is_alive()、join([timeout])、run()、start()、terminate()等方法。屬性有:authkey、daemon(要通過start()設定)、exitcode(程序在運作時為None、如果為–N,表示被信号N結束)、name、pid。此外multiprocessing包中也有Lock/Event/Semaphore/Condition類,用來同步程序,其用法也與threading包中的同名類一樣。multiprocessing的很大一部份與threading使用同一套API,隻不過換到了多程序的情境。

這個子產品表示像線程一樣管理程序,這個是multiprocessing的核心,它與threading很相似,對多核CPU的使用率會比threading好的多。

看一下Process類的構造方法:

__init__(self, group=None, target=None, name=None, args=(), kwargs={})
           

參數說明:

group:程序所屬組。基本不用

target:表示調用對象。

args:表示調用對象的位置參數元組。

name:别名

kwargs:表示調用對象的字典。

建立程序的簡單執行個體:

#coding=utf-8
import multiprocessing

def do(n) :
  #擷取目前線程的名字
  name = multiprocessing.current_process().name
  print name,'starting'
  print "worker ", n
  return 

if __name__ == '__main__' :
  numList = []
  for i in xrange() :
    p = multiprocessing.Process(target=do, args=(i,))
    numList.append(p)
    p.start()
    p.join()
    print "Process end."
           

執行結果:

Process- starting
worker  
Process end.
Process- starting
worker  
Process end.
Process- starting
worker  
Process end.
Process- starting
worker  
Process end.
Process- starting
worker  
Process end.
           

建立子程序時,隻需要傳入一個執行函數和函數的參數,建立一個Process執行個體,并用其start()方法啟動,這樣建立程序比fork()還要簡單。

join()方法表示等待子程序結束以後再繼續往下運作,通常用于程序間的同步。

注意:

在Windows上要想使用程序子產品,就必須把有關程序的代碼寫在目前.py檔案的if __name__ == ‘__main__’ :語句的下面,才能正常使用Windows下的程序子產品。Unix/Linux下則不需要。

Pool類

在使用Python進行系統管理時,特别是同時操作多個檔案目錄或者遠端控制多台主機,并行操作可以節約大量的時間。如果操作的對象數目不大時,還可以直接使用Process類動态的生成多個程序,十幾個還好,但是如果上百個甚至更多,那手動去限制程序數量就顯得特别的繁瑣,此時程序池就派上用場了。

Pool類可以提供指定數量的程序供使用者調用,當有新的請求送出到Pool中時,如果池還沒有滿,就會建立一個新的程序來執行請求。如果池滿,請求就會告知先等待,直到池中有程序結束,才會建立新的程序來執行這些請求。

下面介紹一下multiprocessing 子產品下的Pool類下的幾個方法

apply()

函數原型:

apply(func[, args=()[, kwds={}]])
           

該函數用于傳遞不定參數,主程序會被阻塞直到函數執行結束(不建議使用,并且3.x以後不在出現)。

apply_async()

函數原型:

apply_async(func[, args=()[, kwds={}[, callback=None]]])
           

與apply用法一樣,但它是非阻塞且支援結果傳回進行回調。

map()

函數原型:

Pool類中的map方法,與内置的map函數用法行為基本一緻,它會使程序阻塞直到傳回結果。

注意,雖然第二個參數是一個疊代器,但在實際使用中,必須在整個隊列都就緒後,程式才會運作子程序。

close()

關閉程序池(pool),使其不在接受新的任務。

terminate()

結束工作程序,不在處理未處理的任務。

join()

主程序阻塞等待子程序的退出,join方法必須在close或terminate之後使用。

multiprocessing.Pool類的執行個體:

import time
from multiprocessing import Pool
def run(fn):
  #fn: 函數參數是資料清單的一個元素
  time.sleep()
  return fn*fn

if __name__ == "__main__":
  testFL = [,,,,,]  
  print 'shunxu:' #順序執行(也就是串行執行,單程序)
  s = time.time()
  for fn in testFL:
    run(fn)

  e1 = time.time()
  print "順序執行時間:", int(e1 - s)

  print 'concurrent:' #建立多個程序,并行執行
  pool = Pool()  #建立擁有5個程序數量的程序池
  #testFL:要處理的資料清單,run:處理testFL清單中資料的函數
  rl =pool.map(run, testFL) 
  pool.close()#關閉程序池,不再接受新的程序
  pool.join()#主程序阻塞等待子程序的退出
  e2 = time.time()
  print "并行執行時間:", int(e2-e1)
  print rl
           

執行結果:

shunxu:
順序執行時間: 
concurrent:
并行執行時間: 
[, , , , , ]
           

上例是一個建立多個程序并發處理與順序執行處理同一資料,所用時間的差别。從結果可以看出,并發執行的時間明顯比順序執行要快很多,但是程序是要耗資源的,是以平時工作中,程序數也不能開太大。

程式中的r1表示全部程序執行結束後全局的傳回結果集,run函數有傳回值,是以一個程序對應一個傳回結果,這個結果存在一個清單中,也就是一個結果堆中,實際上是用了隊列的原理,等待所有程序都執行完畢,就傳回這個清單(清單的順序不定)。

對Pool對象調用join()方法會等待所有子程序執行完畢,調用join()之前必須先調用close(),讓其不再接受新的Process了。

再看一個執行個體:

import time
from multiprocessing import Pool
def run(fn) :
  time.sleep()
  print fn
if __name__ == "__main__" :
  startTime = time.time()
  testFL = [,,,,]
  pool = Pool()#可以同時跑10個程序
  pool.map(run,testFL)
  pool.close()
  pool.join()   
  endTime = time.time()
  print "time :", endTime - startTime
           

執行結果:

time : 
           

再次執行結果如下:

time : 
           

結果中為什麼還有空行和沒有折行的資料呢?其實這跟程序排程有關,當有多個程序并行執行時,每個程序得到的時間片時間不一樣,哪個程序接受哪個請求以及執行完成時間都是不定的,是以會出現輸出亂序的情況。那為什麼又會有沒這行和空行的情況呢?因為有可能在執行第一個程序時,剛要列印換行符時,切換到另一個程序,這樣就極有可能兩個數字列印到同一行,并且再次切換回第一個程序時會列印一個換行符,是以就會出現空行的情況。

程序實戰執行個體

并行處理某個目錄下檔案中的字元個數和行數,存入res.txt檔案中,

每個檔案一行,格式為:filename:lineNumber,charNumber

import os
import time
from multiprocessing import Pool

def getFile(path) :
  #擷取目錄下的檔案list
  fileList = []
  for root, dirs, files in list(os.walk(path)) :
    for i in files :
      if i.endswith('.txt') or i.endswith('.10w') :
        fileList.append(root + "\\" + i)
  return fileList

def operFile(filePath) :
  #統計每個檔案中行數和字元數,并傳回
  filePath = filePath
  fp = open(filePath)
  content = fp.readlines()
  fp.close()
  lines = len(content)
  alphaNum = 
  for i in content :
    alphaNum += len(i.strip('\n'))
  return lines,alphaNum,filePath

def out(list1, writeFilePath) :
  #将統計結果寫入結果檔案中
  fileLines = 
  charNum = 
  fp = open(writeFilePath,'a')
  for i in list1 :
    fp.write(i[] + " 行數:"+ str(i[]) + " 字元數:"+str(i[]) + "\n")
    fileLines += i[]
    charNum += i[]
  fp.close()
  print fileLines, charNum

if __name__ == "__main__":
  #建立多個程序去統計目錄中所有檔案的行數和字元數
  startTime = time.time()
  filePath = "C:\\wcx\\a"
  fileList = getFile(filePath)
  pool = Pool()  
  resultList =pool.map(operFile, fileList)  
  pool.close()
  pool.join()

  writeFilePath = "c:\\wcx\\res.txt"
  print resultList
  out(resultList, writeFilePath)
  endTime = time.time()
  print "used time is ", endTime - startTime
           

執行結果:

Python 多程式 multiprocessing.Pool類詳解

耗時不到1秒,可見多程序并發執行速度是很快的。