天天看點

Python3并發場景任務的并發方式的場景抉擇(線程/程序/協程)

一般情況下,大家對Python原生的并發/并行工作方式:程序、線程和協程的關系與差別都能講清楚。甚至具體的對象名稱、内置方法都可以如數家珍,這顯然是極好的,但我們其實都忽略了一個問題,就是具體應用場景,三者的使用目的是一樣的,換句話說,使用結果是一樣的,都可以提高程式運作的效率,但到底那種場景用那種方式更好一點?

這就好比,目前主流的汽車發動機變速箱無外乎三種:雙離合、CVT以及傳統AT。主機廠把它們搭載到不同的發動機和車型上,它們都是變速箱,都可以将發動機産生的動力作用到車輪上,但不同使用場景下到底該選擇那種變速箱?這顯然也是一個問題。

所謂“無場景,不功能”,本次我們來讨論一下,具體的并發程式設計場景有哪些,并且對應到具體場景,應該怎麼選擇并發手段和方式。

什麼是并發和并行?

在讨論場景之前,我們需要将多任務執行的方式進行一下分類,那就是并發方式和并行方式。教科書上告訴我們:并行是指兩個或者多個事件在同一時刻發生;而并發是指兩個或多個事件在同一時間間隔内發生。 在多道程式環境下,并發性是指在一段時間内宏觀上有多個程式在同時運作,但在單處理機系統中,每一時刻卻僅能有一道程式執行,故微觀上這些程式隻能是分時地交替執行。

好像有那麼一點抽象,好吧,讓我們務實一點,由于GIL全局解釋器鎖的存在,在Python程式設計領域,我們可以簡單粗暴地将并發和并行用程式通過能否使用多核CPU來區分,能使用多核CPU就是并行,不能使用多核CPU,隻能單核處理的,就是并發。就這麼簡單,是的,Python的GIL全局解釋器鎖幫我們把問題簡化了, 這是Python的大幸?還是不幸?

Python中并發任務實作方式包含:多線程threading和協程asyncio,它們的共同點都是交替執行,而差別是多線程threading是搶占式的,而協程asyncio是協作式的,原理也很簡單,隻有一顆CPU可以用,而一顆CPU一次隻能做一件事,是以隻能靠不停地切換才能完成并發任務。

Python中并行任務的實作方式是多程序multiprocessing,通過multiprocessing庫,Python可以在程式主程序中建立新的子程序。這裡的一個程序可以被認為是一個幾乎完全不同的程式,盡管從技術上講,它們通常被定義為資源集合,其中資源包括記憶體、檔案句柄等。換一種說法是,每個子程序都擁有自己的Python解釋器,是以,Python中的并行任務可以使用一顆以上的CPU,每一顆CPU都可以跑一個程序,是真正的同時運作,而不需要切換,如此Python就可以完成并行任務。

什麼時候使用并發?IO密集型任務

現在我們搞清楚了,Python裡的并發運作方式就是多線程threading和協程asyncio,那麼什麼場景下使用它們?

一般情況下,任務場景,或者說的更準确一些,任務類型,無非兩種:CPU密集型任務和IO密集型任務。

什麼是IO密集型任務?IO就是Input-Output的縮寫,說白了就是程式的輸入和輸出,想一想确實就是這樣,您的電腦,它不就是這兩種功能嗎?用鍵盤、麥克風、攝像頭輸入資料,然後再用螢幕和音箱進行輸出操作。

但輸入和輸出操作要比電腦中的CPU運作速度慢,換句話說,CPU得等着這些比它慢的輸入和輸出操作,說白了就是CPU運算一會,就得等這些IO操作,等IO操作完了,CPU才能繼續運算一會,然後再等着IO操作,如圖所示:

Python3并發場景任務的并發方式的場景抉擇(線程/程式/協程)

由此可知,并發适合這種IO操作密集和頻繁的工作,因為就算CPU是蘋果最新ARM架構的M2晶片,也沒有用武之地。

另外,如果把IO密集型任務具象化,那就是我們經常操作的:硬碟讀寫(資料庫讀寫)、網絡請求、檔案的列印等等。

并發方式的選擇:多線程threading還是協程asyncio?

既然涉及硬碟讀寫(資料庫讀寫)、網絡請求、檔案列印等任務都算并發任務,那我們就真正地實踐一下,看看不同的并發方式到底能提升多少效率?

一個簡單的小需求,對本站資料進行重複抓取操作,并計算首頁資料文本的行數:

import requests
import time


def download_site(url, session):
    with session.get(url) as response:
        print(f"下載下傳了{len(response.content)}行資料")


def download_all_sites(sites):
    with requests.Session() as session:
        for url in sites:
            download_site(url, session)


if __name__ == "__main__":

    sites = ["https://v3u.cn"] * 50
    start_time = time.time()
    download_all_sites(sites)
    duration = time.time() - start_time
    print(f"下載下傳了 {len(sites)}次,執行了{duration}秒")           

在不使用任何并發手段的前提下,程式傳回:

下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了76347行資料
下載下傳了 50 次資料,執行了8.781155824661255秒
[Finished in 9.6s]           

這裡程式的每一步都是同步操作,也就是說當第一次抓取網站首頁時,剩下的49次都在等待。

接着使用多線程threading來改造程式:

import concurrent.futures
import requests
import threading
import time


thread_local = threading.local()


def get_session():
    if not hasattr(thread_local, "session"):
        thread_local.session = requests.Session()
    return thread_local.session


def download_site(url):
    session = get_session()
    with session.get(url) as response:
        print(f"下載下傳了{len(response.content)}行資料")


def download_all_sites(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
        executor.map(download_site, sites)


if __name__ == "__main__":

    sites = ["https://v3u.cn"] * 50
    start_time = time.time()
    download_all_sites(sites)
    duration = time.time() - start_time
    print(f"下載下傳了 {len(sites)}次,執行了{duration}秒")           

這裡通過with關鍵詞開啟線程池上下文管理器,并發8個線程進行下載下傳,程式傳回:

下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76161行資料
下載下傳了76424行資料
下載下傳了76161行資料
下載下傳了76161行資料
下載下傳了76161行資料
下載下傳了76161行資料
下載下傳了76161行資料
下載下傳了76161行資料
下載下傳了76161行資料
下載下傳了76161行資料
下載下傳了76161行資料
下載下傳了76161行資料
下載下傳了 50次,執行了7.680492877960205秒           

很明顯,效率上有所提升,事實上,每個線程其實是在不停“切換”着運作,這就節省了單線程每次等待爬取結果的時間:

Python3并發場景任務的并發方式的場景抉擇(線程/程式/協程)

由此帶來了另外一個問題:上下文切換的時間開銷。

讓我們繼續改造,用協程來一試鋒芒,首先安裝異步web請求庫aiohttp:

pip3 install aiohttp           

改寫邏輯:

import asyncio
import time
import aiohttp


async def download_site(session, url):
    async with session.get(url) as response:
        print(f"下載下傳了{response.content_length}行資料")


async def download_all_sites(sites):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in sites:
            task = asyncio.ensure_future(download_site(session, url))
            tasks.append(task)
        await asyncio.gather(*tasks, return_exceptions=True)


if __name__ == "__main__":
    sites = ["https://v3u.cn"] * 50
    start_time = time.time()
    asyncio.run(download_all_sites(sites))
    duration = time.time() - start_time
    print(f"下載下傳了 {len(sites)}次,執行了{duration}秒")           

程式傳回:

下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76424行資料
下載下傳了76161行資料
下載下傳了76424行資料
下載下傳了76161行資料
下載下傳了76161行資料
下載下傳了76161行資料
下載下傳了76161行資料
下載下傳了76161行資料
下載下傳了76161行資料
下載下傳了76161行資料
下載下傳了76161行資料
下載下傳了76161行資料
下載下傳了76161行資料下載下傳了 50次,執行了6.893810987472534秒           

效率上百尺竿頭更進一步,同樣的使用with關鍵字操作上下文管理器,協程使用asyncio.ensure_future()建立任務清單,該清單還負責啟動它們。建立所有任務後,使用asyncio.gather()來保持會話上下文的執行個體,直到所有爬取任務完成。和多線程threading的差別是,協程并不需要切換上下文,是以每個任務所需的資源和建立時間要少得多,是以建立和運作更多的任務效率更高:

Python3并發場景任務的并發方式的場景抉擇(線程/程式/協程)

綜上,并發邏輯歸根結底是減少CPU等待的時間,也就是讓CPU少等一會兒,而協程的工作方式顯然讓CPU等待的時間最少。

并行方式:多程序multiprocessing

再來試試多程序multiprocessing,并行能不能幹并發的事?

import requests
import multiprocessing
import time

session = None


def set_global_session():
    global session
    if not session:
        session = requests.Session()


def download_site(url):
    with session.get(url) as response:
        name = multiprocessing.current_process().name
        print(f"讀了{len(response.content)}行")


def download_all_sites(sites):
    with multiprocessing.Pool(initializer=set_global_session) as pool:
        pool.map(download_site, sites)


if __name__ == "__main__":
    sites = ["https://v3u.cn"] * 50
    start_time = time.time()
    download_all_sites(sites)
    duration = time.time() - start_time
    print(f"下載下傳了 {len(sites)}次,執行了{duration}秒")           

這裡我們依然使用上下文管理器開啟程序池,預設程序數比對目前計算機的CPU核心數,也就是有幾核就開啟幾個程序,程式傳回:

讀了76000行
讀了76241行
讀了76044行
讀了75894行
讀了76290行
讀了76312行
讀了76419行
讀了76753行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
讀了76290行
下載下傳了 50次,執行了8.195281982421875秒           

雖然比同步程式要快,但無疑的,效率上要低于多線程和協程。為什麼?因為多程序不适合IO密集型任務,雖然可以利用多核資源,但沒有任何意義:

Python3并發場景任務的并發方式的場景抉擇(線程/程式/協程)

無論開多少程序,CPU都沒有用武之地,多數情況下CPU都在等待IO操作,也就是說,多核反而拖累了IO程式的執行。

并行方式的選擇:CPU密集型任務

什麼是CPU密集型任務?這裡我們可以使用逆定理:所有不涉及硬碟讀寫(資料庫讀寫)、網絡請求、檔案列印等任務都算CPU密集型任務任務,說白了就是,計算型任務。

以求平方和為例子:

import time


def cpu_bound(number):
    return sum(i * i for i in range(number))


def find_sums(numbers):
    for number in numbers:
        cpu_bound(number)


if __name__ == "__main__":
    numbers = [5_000_000 + x for x in range(20)]
    start_time = time.time()
    find_sums(numbers)
    duration = time.time() - start_time
    print(f"{duration}秒")           

同步執行20次,需要花費多少時間?

4.466595888137817秒           

再來試試并行方式:

import multiprocessing
import time


def cpu_bound(number):
    return sum(i * i for i in range(number))


def find_sums(numbers):
    with multiprocessing.Pool() as pool:
        pool.map(cpu_bound, numbers)


if __name__ == "__main__":
    numbers = [5_000_000 + x for x in range(20)]

    start_time = time.time()
    find_sums(numbers)
    duration = time.time() - start_time
    print(f"{duration}秒")           

八核處理器,開八個程序開始跑:

1.1755797863006592秒           

不言而喻,并行方式有效提高了計算效率。

最後,既然之前用并行方式運作了IO密集型任務,我們就再來試試用并發的方式運作CPU密集型任務:

import concurrent.futures
import time


def cpu_bound(number):
    return sum(i * i for i in range(number))


def find_sums(numbers):
    with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
        executor.map(cpu_bound, numbers)


if __name__ == "__main__":
    numbers = [5_000_000 + x for x in range(20)]

    start_time = time.time()
    find_sums(numbers)
    duration = time.time() - start_time
    print(f"{duration}秒")           

單程序開8個線程,走起:

4.452666759490967秒           

如何?和并行方式運作IO密集型任務一樣,可以運作,但是沒有任何意義。為什麼?因為沒有任何IO操作了,CPU不需要等待了,CPU隻要全力運算即可,是以你上多線程或者協程,無非就是畫蛇添足、多此一舉。

結語

有經驗的汽修師傅會告訴你,想省油就選CVT和雙離合,想品質穩定就選AT,經常高速上激烈駕駛就選雙離合,經常市區内堵車就選CVT;同樣地,作為經驗豐富的背景研發,你也可以告訴汽修師傅,任何不需要CPU等待的任務就選擇并行(multiprocessing)的處理方式,而需要CPU等待時間過長的任務,選擇并發(threading/asyncio)。反過來,我就想用CVT在高速上飙車,用雙離合在市區堵車,行不行?行,但沒有意義,或者說的更準确一些,沒有任何額外的收益;而用并發方式執行CPU密集型任務,用并行方式執行IO密集型任務行不行?也行,但依然沒有任何額外的收益, 無他,唯物無定味,适口者珍矣。