天天看點

原來Python的協程有2種實作方式

作者:不背鍋運維

什麼是協程

在 Python 中,協程(Coroutine)是一種輕量級的并發程式設計方式,可以通過協作式多任務來實作高效的并發執行。協程是一種特殊的生成器函數,通過使用 yield 關鍵字來挂起函數的執行,并儲存目前的執行狀态。協程的執行可以通過 send 方法來恢複,并在下一次挂起時傳回一個值。

在 Python 3.4 之前,協程通常使用 yield 關鍵字來實作,稱為“生成器協程”。在 Python 3.4 引入了 asyncio 子產品後,可以使用 async/await 關鍵字來定義協程函數,稱為“原生協程”。

協程相比于線程和程序,具有以下優點:

  1. 輕量級:協程的上下文切換成本很小,可以在單線程内并發執行大量的協程。
  2. 低延遲:協程的執行過程中,沒有線程切換的開銷,也沒有加鎖解鎖的開銷,可以更快地響應外部事件。
  3. 高效性:協程的代碼通常比多線程和多程序的代碼更加簡潔和可讀,維護成本更低。
協程的使用場景包括網絡程式設計、異步 I/O、資料流處理、高并發任務等。

生成器協程

在 Python 3 中,生成器協程(Generator Coroutine)是指使用生成器函數來實作的協程。生成器函數是一種特殊的函數,其傳回一個生成器對象,可以通過 yield 語句暫停函數的執行,然後在下一次調用生成器對象的 「next」() 方法時繼續執行。

下面給出一個簡單的生成器協程的示例,其中包含一個生成器函數 coroutine 和一個簡單的異步 I/O 操作:

import asyncio

def coroutine():
    print('Coroutine started')
    while True:
        result = yield
        print('Coroutine received:', result)

async def main():
    print('Main started')
    c = coroutine()
    next(c)
    c.send('Hello')
    await asyncio.sleep(1)
    c.send('World')
    print('Main finished')

asyncio.run(main())
           

結果輸出:

[root@workhost k8s]# python3 test.py 
Main started
Coroutine started
Coroutine received: Hello
Coroutine received: World
Main finished
           

來看一下,上面代碼的執行過程:

  1. main 函數開始執行,列印出 Main started。
  2. 建立一個生成器對象 c,調用 next(c) 使其執行到第一個 yield 語句處暫停。
  3. 使用 c.send('Hello') 恢複生成器函數的執行,并将 'Hello' 作為生成器函數的傳回值。
  4. 在等待1秒鐘的過程中,main 函數暫停執行,等待事件循環發起下一次任務。
  5. 在等待1秒鐘後,使用 c.send('World') 繼續執行生成器函數,并将 'World' 作為生成器函數的傳回值。
  6. main 函數恢複執行,列印出 Main finished。

在上面的代碼中,使用生成器函數 coroutine 實作了一個簡單的協程。生成器函數通過使用 yield 語句暫停函數的執行,然後可以通過 send 方法恢複函數的執行,并将值傳遞給生成器函數。通過這種方式,可以使用生成器函數實作異步并發。在上面的示例中,使用生成器函數接收并列印異步 I/O 操作的結果。

原生協程

Python 3 引入了原生協程(Native Coroutine)作為一種新的協程類型。原生協程是通過使用 async/await 關鍵字來定義的,與生成器協程不同,它們可以像普通函數一樣使用 return 語句傳回值,而不是使用 yield 語句。

下面給出一個簡單的原生協程示例,其中包含一個 async 關鍵字修飾的協程函數 coroutine 和一個簡單的異步 I/O 操作:

import asyncio

async def coroutine():
    print('Coroutine started')
    await asyncio.sleep(1)
    print('Coroutine finished')

async def main():
    print('Main started')
    await coroutine()
    print('Main finished')

asyncio.run(main())
           

結果輸出:

[root@workhost k8s]# python3 test.py 
Main started
Coroutine started
Coroutine finished
Main finished
           

繼續看一下執行過程:

  1. main 函數開始執行,列印出 Main started。
  2. 調用 coroutine 函數,将其作為一個協程對象運作。
  3. 在 coroutine 函數中,列印出 Coroutine started。
  4. 在 coroutine 函數中,使用 await asyncio.sleep(1) 暫停函數的執行,等待1秒鐘。
  5. 在1秒鐘後,恢複 coroutine 函數的執行,并列印出 Coroutine finished。
  6. main 函數恢複執行,列印出 Main finished。

在上面的代碼中,使用 async 關鍵字定義了一個原生協程函數 coroutine,并在其中使用 await 關鍵字來暫停函數的執行,等待異步 I/O 操作的完成。通過這種方式,可以在原生協程中編寫異步并發代碼,進而提高代碼的性能和效率。

兩種協程對比

Python 3 中原生協程和生成器協程是兩種不同的協程實作方式,它們各自有自己的特點和适用場景。下面,通過對比它們的差別和優缺點,才可以更好地了解它們之間的異同,以便選擇适合自己的協程實作方式,進而更好地編寫高效、可維護的異步程式。

  1. 差別:
  • 定義方式不同:原生協程使用 async/await 關鍵字來定義,而生成器協程使用 yield 關鍵字來定義。
  • 傳回方式不同:原生協程使用 return 語句來傳回結果,而生成器協程使用 yield 語句來傳回結果。
  • 調用方式不同:原生協程使用 await 關鍵字來調用,而生成器協程使用 yield from 或 yield 語句來調用。
  • 内部實作不同:原生協程通過 asyncio 庫來實作,而生成器協程是 Python 語言内置的特性。
  1. 優缺點:
  • 原生協程的優點:
    • 代碼簡潔易懂:使用 async/await 關鍵字,可以編寫出更簡潔易懂的協程代碼。
    • 性能更高:原生協程不需要建立生成器對象,也不需要通過 yield 語句來控制函數的執行流程,是以能夠更加高效地處理異步操作。
    • 支援異步 I/O 和任務處理:原生協程可以支援異步 I/O 操作和并發任務處理,可以在處理異步操作時更加靈活。
  • 原生協程的缺點:
    • 相容性差:原生協程是 Python 3.5 版本之後才引入的新特性,是以在舊版本的 Python 中無法使用。
    • 異常處理不友善:原生協程在處理異常時比較麻煩,需要使用 try/except 語句來處理。
  • 生成器協程的優點:
    • 相容性好:生成器協程是 Python 2 和 Python 3 都支援的特性。
    • 可讀性好:生成器協程使用 yield 關鍵字來實作,代碼邏輯清晰易懂。
    • 異常處理友善:生成器協程在處理異常時比較友善,可以使用 try/except 語句來處理。
  • 生成器協程的缺點:
    • 性能相對較低:生成器協程需要建立生成器對象,也需要通過 yield 語句來控制函數的執行流程,是以處理異步操作時性能相對較低。
    • 功能有限:生成器協程不能像原生協程一樣支援異步 I/O 操作和任務處理。

實戰案例

接下來,模拟一個場景,假設實作一個異步的批量處理任務的工具,使用原生協程來實作。

看下面代碼:

import asyncio
import random

async def batch_process_task(tasks, batch_size=10):
    # 将任務清單劃分為多個批次
    for i in range(0, len(tasks), batch_size):
        batch = tasks[i:i+batch_size]
        # 使用原生協程來異步處理每個批次的任務
        await asyncio.gather(*[process_task(task) for task in batch])

async def process_task(task):
    # 模拟任務處理過程
    await asyncio.sleep(random.uniform(0.5, 2.0))
    print("Task {} processed".format(task))

async def main():
    # 構造任務清單
    tasks = [i for i in range(1, 101)]
    # 并發處理批量任務
    await batch_process_task(tasks, batch_size=10)

if __name__ == '__main__':
    asyncio.run(main())
           

輸出:

[root@workhost k8s]# python3 test.py 
Task 9 processed
Task 10 processed
Task 1 processed
Task 8 processed
Task 6 processed
Task 4 processed
Task 3 processed
Task 2 processed
Task 5 processed
...
...
           

batch_process_task函數使用原生協程來處理每個批次的任務,而process_task函數則是處理每個任務的函數。main函數則是構造任務清單,并且使用batch_process_task函數來異步地處理批量任務。

本文轉載于WX公衆号:不背鍋運維(喜歡的盆友關注我們):https://mp.weixin.qq.com/s/GkhvW9qTCjw89xy0gLgPGQ