@(python)
目錄
引言
Executor和Future
使用submit來操作線程池/程序池
add_done_callback實作回調函數
引言
Python标準庫為我們提供了
threading
和
multiprocessing
子產品編寫相應的多線程/多程序代碼,但是當項目達到一定的規模,頻繁建立/銷毀程序或者線程是非常消耗資源的,這個時候我們就要編寫自己的線程池/程序池,以空間換時間。但從Python3.2開始,标準庫為我們提供了
concurrent.futures
子產品,它提供了
ThreadPoolExecutor
ProcessPoolExecutor
兩個類,實作了對
threading
multiprocessing
的進一步抽象,對編寫線程池/程序池提供了直接的支援。
Executor和Future
concurrent.futures
子產品的基礎是
Exectuor
,
Executor
是一個抽象類,它不能被直接使用。但是它提供的兩個子類
ThreadPoolExecutor
ProcessPoolExecutor
卻是非常有用,顧名思義兩者分别被用來建立線程池和程序池的代碼。我們可以将相應的tasks直接放入線程池/程序池,不需要維護
Queue
來操心死鎖的問題,線程池/程序池會自動幫我們排程。
Future
這個概念相信有java和nodejs下程式設計經驗的朋友肯定不陌生了,你可以把它了解為一個在未來完成的操作,這是異步程式設計的基礎,傳統程式設計模式下比如我們操作
queue.get
的時候,在等待傳回結果之前會産生阻塞,cpu不能讓出來做其他事情,而
Future
的引入幫助我們在等待的這段時間可以完成其他的操作。
p.s:如果你依然在堅守Python2.x,請先安裝futures子產品。 pip install futures
使用submit來操作線程池/程序池
我們先通過下面這段代碼來了解一下線程池的概念:
# example1.py
from concurrent.futures import ThreadPoolExecutor
import time
def return_future_result(message):
time.sleep(2)
return message
pool = ThreadPoolExecutor(max_workers=2) # 建立一個最大可容納2個task的線程池
future1 = pool.submit(return_future_result, ("hello")) # 往線程池裡面加入一個task
future2 = pool.submit(return_future_result, ("world")) # 往線程池裡面加入一個task
print(future1.done()) # 判斷task1是否結束
time.sleep(3)
print(future2.done()) # 判斷task2是否結束
print(future1.result()) # 檢視task1傳回的結果
print(future2.result()) # 檢視task2傳回的結果
執行結果:
False
True
hello
world
解析:我們根據運作結果來分析一下。我們使用方法來往線程池中加入一個task,submit傳回一個
submit
對象,對于
Future
對象可以簡單地了解為一個在未來完成的操作。在第一個print語句中很明顯因為
Future
的原因我們的future1沒有完成,因為我們使用
time.sleep(2)
暫停了主線程,是以到第二個print語句的時候我們線程池裡的任務都已經全部結束。
time.sleep(3)
上面的代碼我們也可以改寫為程序池形式,api和線程池如出一轍,我就不羅嗦了。
# example2.py
from concurrent.futures import ProcessPoolExecutor
import time
def return_future_result(message):
time.sleep(2)
return message
pool = ProcessPoolExecutor(max_workers=2)
future1 = pool.submit(return_future_result, ("hello"))
future2 = pool.submit(return_future_result, ("world"))
print(future1.done())
time.sleep(3)
print(future2.done())
print(future1.result())
print(future2.result())
參考此篇部落格 add_done_callback
實作回調函數
add_done_callback
from concurrent.futures import ThreadPoolExecutor
import requests
def task(url):
response = requests.get(url)
if response.status_code == 200:
return response
def download(futures):
response = futures.result() #會得到一個傳回值,這個傳回值就是task函數的傳回值
content = response.text
tmp_list = response.url.split("/")
filename = tmp_list[len(tmp_list)-1]
print("正在下載下傳:%s" %response.url)
with open(filename,"w",encoding="utf-8") as f:
f.write("%s\n%s" %(response.url,content))
print("下載下傳完成")
url_list = [
"http://www.cnblogs.com/wupeiqi/articles/5713330.html",
"http://blog.csdn.net/anzhsoft/article/details/19563091",
"http://blog.csdn.net/anzhsoft/article/details/19570187"
]
thread_pool = ThreadPoolExecutor(max_workers=2) #生一個線程池對象,最大線程數為2個
for url in url_list:
futures = thread_pool.submit(task,url) #會得到一個Future對象
#回調函數,會将futures本身當作參數傳給download函數
futures.add_done_callback(download)