雷鋒網 AI 科技評論按,本文是工程師 Jim Anderson 分享的關于「通過并發性加快 python 程式的速度」的文章的第二部分,主要内容是 I/O 綁定程式加速相關。
在上一篇中,我們已經講過了相關的概念:什麼是并發?什麼是并行? I/O 綁定和 CPU 綁定等。在這裡,我們将對一些 python 并發方法進行比較,包括線程、異步和多程序,在程式中何時使用并發性以及使用哪個子產品。
當然,本文假設讀者對 python 有一個基本的了解,并且使用 python3.6 及以上版來運作示例。你可以從 Real python GitHub repo 下載下傳示例。
如何加速 I/O 綁定程式
讓我們從關注 I/O 綁定程式和一個常見問題開始:通過網絡下載下傳内容。在我們的例子中,你将從一些站點下載下傳網頁,但這個過程可能會産生任何故障。它隻是更容易可視化。
同步版本
我們将從這個任務的非并發版本開始。注意,這個程式需要請求子產品。在運作這個程式之前,你需要運作 pip 安裝請求,這可能需要使用 virtualenv 指令。此版本根本不使用并發:
import requests
import time
def download_site(url, session):
with session.get(url) as response:
print(f"Read {len(response.content)} from {url}")
def download_all_sites(sites):
with requests.Session() as session:
for url in sites:
download_site(url, session)
if __name__ == "__main__":
sites = [
"http://www.jython.org",
"http://olympus.realpython.org/dice",
] * 80
start_time = time.time()
download_all_sites(sites)
duration = time.time() - start_time
print(f"Downloaded {len(sites)} in {duration} seconds")
如你所見,這是一個相當短的程式。download_site()可以從 URL 下載下傳内容并列印它的大小。要指出的一個小問題是,我們正在使用來自 Session 的會話對象。
直接從 requests 中使用 get(),但建立一個 Session 對象允許 requests 執行一些花哨的網絡技巧進而真正加快速度是可能的。
download_all_sites()建立 Session,然後浏覽站點清單,依次下載下傳每個站點。最後,它列印出這個過程花費了多長時間,這樣你就可以滿意地看到在下面的示例中并發性對我們有多大幫助。
這個程式的處理圖看起來很像上一節中的 I/O 綁定圖。
注意:網絡流量取決于許多因素,這些因素可能在每秒都在變化。我已經看到由于網絡問題,這些測試案例從一次運作跳轉到另一次的時間加倍了。
為什麼同步版本很重要
這個版本的代碼最棒的特點是,它很簡單,編寫和調試相對容易。代碼的思路更加直接,是以你可以預測它将如何運作。
同步版本的問題
和我們提供的其他解決方案相比,同步版本最大的問題是,它的速度相對較慢。以下是我的機器上的最終輸出示例:

注意:你得到的結果可能會和上面有很大差異。運作這個腳本時,需要的時間從 14.2 秒到 21.9 秒不等。在本文中,時間取三次運作中最快的一次所花的時間,在這種情況下,兩種方法之間的差異仍然很明顯。
然而,運作速度變慢并不總是一個大問題。如果你正在運作的程式使用同步版本運作隻需要 2 秒,并且很少運作,那麼可能不需要添加并發性。
如果你的程式經常運作怎麼辦?如果運作程式需要幾個小時怎麼辦?讓我們繼續使用線程重寫這個程式以實作并發性。
線程版本
正如你可能猜測的那樣,編寫線程程式需要付出更多的努力。然而,對于簡單的案例,你可能會驚訝于它所花費的額外努力是如此之少。下面是同一個程式的線程版本:
import concurrent.futures
import threading
thread_local = threading.local()
def get_session():
if not getattr(thread_local, "session", None):
thread_local.session = requests.Session()
return thread_local.session
def download_site(url):
session = get_session()
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
executor.map(download_site, sites)
] * 80
start_time = time.time()
當你添加線程時,整體結構是相同的,是以你隻需要做一些更改。download_all_sites()從在每個站點調用一次函數改為更複雜的結構。
在這個版本中,你正在建立一個 ThreadPoolExecutor,這看起來很複雜。我們可以把它分解為:ThreadPoolExecutor=thread+pool+executor。
這個對象将建立一個線程池,每個線程都可以并發運作。最後,執行器會控制池中每個線程的運作方式和運作時間。請求将在池中執行。
标準庫将 ThreadPoolExecutor 實作為上下文管理器,這樣你就可以使用 with 文法來管理線程池的建立和釋放。
一旦有了 ThreadPoolExecutor,就可以很友善地使用它的.map()方法。此方法在清單中的每個站點上運作傳入函數。最重要的是,它使用所管理的線程池自動并發地運作它們。
那些學習其他語言,甚至是 python 2 的使用者可能想知道,在處理線程時,通常用來管理細節的對象和函數在哪裡,比如 thread.start()、thread.join()和 queue。
這些仍然存在,你可以使用它們來實作對線程運作方式的細粒度控制。但是,從 python3.2 開始,标準庫添加了一個執行器,如果不需要細粒度的控制,它可以為你管理許多細節。
我們的示例中另一個有趣的變化是,每個線程都需要建立自己的 requests.session()對象。當你檢視請求文檔時,不一定很容易分辨出來,但是讀到這個問題時,你似乎很清楚每個線程需要單獨的 Session。
這是線程處理的一個有趣又困難的問題之一。因為作業系統可以控制一個任務何時被中斷及另一個任務何時開始,是以線上程之間共享的任何資料都需要受到保護,保證線程安全。很遺憾,requests.session()不是線程安全的。
根據資料是什麼以及如何使用它,有幾種政策可以使資料通路線程安全。其中之一是使用線程安全的資料結構,如 python 隊列子產品中的 queue。
另一種政策是線程本地存儲。Threading.local() 建立一個看起來像全局的對象,但它對于每個線程來說是不一樣的。在你的示例中,這是通過 threadLocal 和 get_session()完成的:
threadLocal = threading.local()def get_session():
if getattr(threadLocal, "session", None) is None:
threadLocal.session = requests.Session()
return threadLocal.session
ThreadLocal 是線上程子產品中專門解決這個問題的。看起來有點奇怪,但你隻想建立這些對象中的一個,而不是為每個線程建立一個對象。對象本身負責分離不同線程對不同資料的通路過程。
當調用 get_session()時,它查找的 session 和它運作的特定線程是對應的。是以,每個線程在第一次調用 get_session()時将建立一個會話,然後後續在其整個生命周期内簡單地調用該會話。
最後,一個關于選擇線程數的簡短說明。你可以看到示例代碼使用了 5 個線程。你可以随意調整這個數字的大小,看看總的時間是如何變化的。你可能認為每次下載下傳隻有一個線程是最快的,但實際上不是這樣,至少在我的系統中不是這樣。我發現,線程數目在 5 到 10 個之間時,速度是最快的。如果超過這個值,那麼建立和銷毀線程所産生的額外開銷将抵消任何節省時間所帶來的好處。
這裡的難點在于,正确的線程數不是從一個任務到另一個任務中的常量。需要進行一些實驗才能得到結果。
為什麼線程版本很重要
它很快!這裡是我測試中最快的一次。記住,非并發版本需要 14 秒以上的時間:
它的執行時序圖如下所示:
它使用多個線程同時向網站發出多個打開的請求,允許你的程式重疊等待時間并更快地獲得最終結果!
線程版本的問題
正如你從示例中看到的,要實作這一點需要更多的代碼,而且你真的需要考慮線上程之間需要共享哪些資料。
線程可以以巧妙且難以檢測的方式進行互動。這些互動可能導緻随機的、間歇性的錯誤,且這些錯誤很難找到。
異步(asyncio)版本
在你開始檢查異步版本示例代碼之前,讓我們詳細讨論一下異步的工作原理。
異步基礎
這将是 asycio 的簡化版本。這裡有許多細節被掩蓋了,但它仍然說明了它是如何工作的。
asyncio 的一般概念是,一個被稱為事件循環的 python 對象控制每個任務的運作方式和時間。這個對象清楚地知道每個任務處于什麼狀态。實際上,任務可以處于許多狀态,但現在讓我們設想一個簡化的事件循環,它隻有兩個狀态。
就緒狀态指的是任務有工作要做并且準備運作,而等待狀态意味着任務正在等待一些外部事情完成,例如網絡操作。簡化的事件循環維護兩個任務清單,分别對應這兩個狀态。它選擇一個已經就緒的任務,然後重新開始運作。該任務處于完全控制狀态,直到它将控件送回事件循環。
當正在運作的任務将控制權交還給事件循環時,事件循環将該任務放入就緒或等待清單,然後周遊等待清單中的每個任務,以檢視完成 I/O 操作後該任務是否已就緒。它知道就緒清單中的任務仍然是就緒狀态,因為它們尚未運作。
一旦所有的任務都被重新排序到正确的清單中,事件循環就會選擇下一個要運作的任務。簡化的事件循環選擇等待時間最長的任務并運作該任務。此過程重複,直到事件循環完成。
asyncio 的一個重要點是,如果不是有意為之,任務永遠不會放棄控制。任務在執行的過程中從不會被打斷。這使得我們在異步中比線上程中更容易進行資源共享。你不需要擔心線程安全問題。
async 和 await
現在讓我們來談談添加到 python 中的兩個新關鍵字:async 和 await。根據上面的讨論,你可以将 await 視為允許任務将控制權交回事件循環的一種魔力。當你的代碼等待函數調用時,await 是一個信号,表明調用可能需要花費一段時間,并且任務應該放棄控制。
最簡單的方法是将 async 看作是 python 的标志,告訴它将使用 await 定義函數。在有些情況下,這不是完全正确的,比如異步生成器,但它适用于許多情況,并在開始時為你提供一個簡單的模型。
你将在下一個代碼中看到的一個例外是 async with 語句,它通常從你的等待的對象建立一個上下文管理器。雖然語義有點不同,但其思想是相同的:将這個上下文管理器标記為可以替換的東西。
我确信你可以想象到,在管理事件循環和任務之間的互動時有一些複雜性。對于以 asyncio 開始的開發人員來說,這些細節并不重要,但是你需要記住,任何調用 await 的函數都需要标記為 async。否則将出現文法錯誤。雷鋒網
回到代碼
既然你已經基本了解了什麼是 asyncio,那麼讓我們浏覽一下示例代碼的 asyncio 版本,并了解它是如何工作的。請注意,此版本添加了 aiohtp。在運作它之前,應該先運作 pip install aiohtp:
import asyncio
import aiohttp
async def download_site(session, url):
async with session.get(url) as response:
print("Read {0} from {1}".format(response.content_length, url))
async def download_all_sites(sites):
async with aiohttp.ClientSession() as session:
tasks = []
task = asyncio.ensure_future(download_site(session, url))
tasks.append(task)
await asyncio.gather(*tasks, return_exceptions=True)
asyncio.get_event_loop().run_until_complete(download_all_sites(sites))
print(f"Downloaded {len(sites)} sites in {duration} seconds")
這個版本比前兩個版本要複雜一些。它有一個類似的結構,但是啟動任務的工作量比建立線程池執行器的工作量要多一些。讓我們從示例的頂部開始。
download_site()
頂部的 download_site()與線程版本幾乎相同,但函數定義行上的 async 關鍵字和實際調用 session.get()時的 async with 關鍵字除外。稍後你将看到為什麼可以在這裡傳遞 session,而不是使用線程本地存儲。
download_all_sites()
download_all_sites() 中可以看到線程示例中最大的變化。
你可以在所有任務之間共享會話,是以該會話在此處建立為上下文管理器。任務可以共享會話,因為它們都在同一線程上運作。會話處于錯誤狀态時,一個任務無法中斷另一個任務。
在該上下文管理器中,它使用 asyncio.secure_future()建立一個任務清單,該清單還負責啟動它們。建立所有任務後,此函數使用 asyncio.gather()完成會話内容的變動,直到所有任務完成。
線程代碼的作用與此類似,但在 ThreadPoolExecutor 中可以友善地處理細節。目前沒有 asyncioPoolExecutor 類。
然而,這裡的細節中隐藏着一個小而重要的變化。還記得之前我們讨論過要建立的線程數嗎?線上程示例中,線程的最佳數量并不明顯。
asyncio 的一個很酷的優點是它的規模遠遠優于線程。與線程相比,每項任務建立所需的資源和時間要少得多,是以建立和運作更多的資源和時間能很好地工作。這個例子隻是為每個要下載下傳的站點建立一個單獨的任務,這個任務運作得很好。雷鋒網(公衆号:雷鋒網)
__main__
最後,異步的本質意味着你必須啟動事件循環,并告訴它要運作哪些任務。檔案底部的__main__部分包含 get_event_loop() 的代碼,然後運作 run_until_complete()。如果沒有别的,他們在命名這些函數方面做得很好。
如果你已經更新到 python 3.7,那麼 python 核心開發人員會為你簡化這種文法。不需要分辨那種情況下使用 asyncio.get_event_loop(),那種情況下使用 run_until_complete(),你隻需使用 asyncio.run()。
為什麼 asyncio 版本很重要
它真的很快!在我的機器上進行的所有測試中,這是代碼運作最快的版本:
執行時序圖與線程示例中所發生的情況非常相似。隻是 I/O 請求都是由同一線程完成的:
缺少線程池執行器,使得這段代碼比線程示例要複雜一些。在這種情況下,你需要做一些額外的工作來獲得更好的性能。
還有一個常見的論點是,在合适的位置添加 async 和 await 是一個複雜的問題。在某種程度上,這是事實。這個論點的另一個方面是,它迫使你思考何時交換給定的任務,這可以幫助你設計出一份更好、更快的代碼。
規模問題在這裡也很突出。為每個站點運作上面的線程示例明顯比用少量線程運作它慢。運作帶有數百個任務的 asyncio 示例并沒有減慢速度。
asyncio 版本的問題
現在 asyncio 有幾個問題。為了充分利用 asyncio,你需要特殊的 asyncio 版本的庫。如果你隻是使用下載下傳站點的請求,那麼速度會慢得多,因為請求不是用來通知事件循環它被阻塞了。随着時間的推移,這個問題越來越少,因為越來越多的庫采用 asyncio。
另一個更微妙的問題是,如果其中一個任務不合作,那麼協作多任務的所有優勢都會消失。代碼中的一個小錯誤會導緻一個任務運作,并長時間占用處理器,進而使其他需要運作的任務處于等待狀态。如果任務沒有将控制權交還給事件循環,則無法中斷事件循環。考慮到這一點,讓我們來看看一種完全不同的并發、多處理方法。
多處理版本
與前面的方法不同,多處理版本的代碼充分利用了新計算機的多個 CPU。讓我們從代碼開始:
import multiprocessing
session = None
def set_global_session():
global session
if not session:
session = requests.Session()
name = multiprocessing.current_process().name
print(f"{name}:Read {len(response.content)} from {url}")
with multiprocessing.Pool(initializer=set_global_session) as pool:
pool.map(download_site, sites)
這比 asyncio 示例短得多,實際上,它看起來與線程示例非常相似,但是在我們深入研究代碼之前,讓我們快速了解一下多處理對你會有什麼幫助。
簡述多處理
到目前為止,本文中的所有并發示例都隻在計算機的單個 CPU 或核上運作。其原因與目前的 cpython 的設計以及所謂的全局解釋器鎖(globalinterpretorlock,簡稱 gil)有關。
标準庫中的多處理設計正是為了改變這種狀态而設計的,它使你能在多個 CPU 上運作代碼。在高層,它是通過建立一個新的 python 解釋器執行個體在每個 CPU 上運作,然後釋放出程式的一部分來實作的。
在目前的 python 解釋器中啟動一個新線程的速度不如單獨啟動一個 python 解釋器的速度快。這是一個重要的操作,存在一些限制和困難,但對某些問題來說,它可以産生巨大的差異。
多處理代碼
代碼與我們的同步版本相比有一些小的變化。第一個差別位于 download_all_sites()中。它不是簡單地重複調用 download_site(),而是建立一個 multiprocessing.pool 對象,并讓它将 download_site 映射到不可通路的站點。和線程示例相比,這點比較相似。
這裡所發生的是,池(pool)建立了許多單獨的 python 解釋器程序,并讓每個程序在某些項上運作指定的函數,在我們的例子中是在站點清單上運作指定的函數。主程序和其他程序之間的通信由多處理子產品為你處理。
創造池的那條線值得你注意。首先,它不指定要在池中建立多少程序,盡管這是一個可選參數。預設情況下,multiprocessing.pool()将确定計算機中的 CPU 數量并與之比對。這通常是最好的答案,在我們的例子中也是如此。
對于這個問題,增加程序的數量并不能提高速度。相反,它實際上會降低速度,因為啟動和删除所有這些程序的成本大于并行執行 I/O 請求的好處。
接下來,我們得到該調用的 initializer=set_global_session 部分。請記住,池中的每個程序都有自己的記憶體空間,這意味着它們不能共享會話對象之類的東西。你不會希望每次調用函數時都建立新會話,而是希望為每個程序建立一個會話。
初始化功能參數就是為這種情況而生成的。無法将傳回值從初始值設定項傳遞回由程序 download_site()調用的函數,但可以初始化全局會話變量以儲存每個程序的單個會話。因為每個程序都有自己的記憶體空間,是以每個程序的全局空間都不同。
這就是所有要說的啦,其餘的代碼與你以前看到的非常相似。
為什麼多處理版本很重要
這個例子的多處理版本非常好,因為它相對容易啟動,并且隻需要很少的額外代碼。它還充分利用了計算機中的 CPU 資源。此代碼的執行時序圖如下所示:
多處理版本的問題
這個版本的示例确實需要一些額外的設定,而且全局會話對象很奇怪。你必須花費一些時間來考慮在每個流程中通路哪些變量。
最後,它明顯比本例中的異步和線程版本慢:
這并不奇怪,因為 I/O 綁定問題并不是多處理存在的真正原因。在進入下一節并檢視 CPU 綁定示例時,你将看到更多内容。
本文之前還有相關概念介紹:如何利用并發性加速你的python程式(一):相關概念
以及接下來的一篇是:如何利用并發性加速你的python程式(三):CPU 綁定程式加速
via:https://www.leiphone.com/news/201901/JfoLltRClm3bZzuB.html?type=preview
雷鋒網版權文章,未經授權禁止轉載。詳情見轉載須知。