天天看點

第11天續,Python并發程式設計之線程池/程序池

@(python)

目錄

引言
Executor和Future
使用submit來操作線程池/程序池
add_done_callback實作回調函數
           

引言

Python标準庫為我們提供了

threading

multiprocessing

子產品編寫相應的多線程/多程序代碼,但是當項目達到一定的規模,頻繁建立/銷毀程序或者線程是非常消耗資源的,這個時候我們就要編寫自己的線程池/程序池,以空間換時間。但從Python3.2開始,标準庫為我們提供了

concurrent.futures

子產品,它提供了

ThreadPoolExecutor

ProcessPoolExecutor

兩個類,實作了對

threading

multiprocessing

的進一步抽象,對編寫線程池/程序池提供了直接的支援。

Executor和Future

concurrent.futures

子產品的基礎是

Exectuor

Executor

是一個抽象類,它不能被直接使用。但是它提供的兩個子類

ThreadPoolExecutor

ProcessPoolExecutor

卻是非常有用,顧名思義兩者分别被用來建立線程池和程序池的代碼。我們可以将相應的tasks直接放入線程池/程序池,不需要維護

Queue

來操心死鎖的問題,線程池/程序池會自動幫我們排程。

Future

這個概念相信有java和nodejs下程式設計經驗的朋友肯定不陌生了,你可以把它了解為一個在未來完成的操作,這是異步程式設計的基礎,傳統程式設計模式下比如我們操作

queue.get

的時候,在等待傳回結果之前會産生阻塞,cpu不能讓出來做其他事情,而

Future

的引入幫助我們在等待的這段時間可以完成其他的操作。

p.s:如果你依然在堅守Python2.x,請先安裝futures子產品。

pip install futures

使用submit來操作線程池/程序池

我們先通過下面這段代碼來了解一下線程池的概念:

# example1.py 
from concurrent.futures import ThreadPoolExecutor 
import time 
def return_future_result(message): 
    time.sleep(2) 
    return message 
pool = ThreadPoolExecutor(max_workers=2)  # 建立一個最大可容納2個task的線程池 
future1 = pool.submit(return_future_result, ("hello"))  # 往線程池裡面加入一個task 
future2 = pool.submit(return_future_result, ("world"))  # 往線程池裡面加入一個task 
print(future1.done())  # 判斷task1是否結束 
time.sleep(3) 
print(future2.done())  # 判斷task2是否結束 
print(future1.result())  # 檢視task1傳回的結果 
print(future2.result())  # 檢視task2傳回的結果 
           

執行結果:

False 
True 
hello 
world 
           
解析:我們根據運作結果來分析一下。我們使用

submit

方法來往線程池中加入一個task,submit傳回一個

Future

對象,對于

Future

對象可以簡單地了解為一個在未來完成的操作。在第一個print語句中很明顯因為

time.sleep(2)

的原因我們的future1沒有完成,因為我們使用

time.sleep(3)

暫停了主線程,是以到第二個print語句的時候我們線程池裡的任務都已經全部結束。

上面的代碼我們也可以改寫為程序池形式,api和線程池如出一轍,我就不羅嗦了。

# example2.py 
from concurrent.futures import ProcessPoolExecutor 
import time 
def return_future_result(message): 
    time.sleep(2) 
    return message 
pool = ProcessPoolExecutor(max_workers=2) 
future1 = pool.submit(return_future_result, ("hello")) 
future2 = pool.submit(return_future_result, ("world")) 
print(future1.done()) 
time.sleep(3) 
print(future2.done()) 
print(future1.result()) 
print(future2.result()) 
           
參考此篇部落格

add_done_callback

實作回調函數

from concurrent.futures import ThreadPoolExecutor
import requests

def task(url):
    response = requests.get(url)

    if response.status_code == 200:
        return response

def download(futures):
    response = futures.result()  #會得到一個傳回值,這個傳回值就是task函數的傳回值
    content = response.text
    tmp_list = response.url.split("/")
    filename = tmp_list[len(tmp_list)-1]
    print("正在下載下傳:%s" %response.url)
    with open(filename,"w",encoding="utf-8") as f:
        f.write("%s\n%s" %(response.url,content))
        print("下載下傳完成")

url_list = [
    "http://www.cnblogs.com/wupeiqi/articles/5713330.html",
    "http://blog.csdn.net/anzhsoft/article/details/19563091",
    "http://blog.csdn.net/anzhsoft/article/details/19570187"
]

thread_pool = ThreadPoolExecutor(max_workers=2) #生一個線程池對象,最大線程數為2個

for url in url_list:
    futures = thread_pool.submit(task,url)  #會得到一個Future對象
    #回調函數,會将futures本身當作參數傳給download函數
    futures.add_done_callback(download)