天天看點

了解 Python 協程-async、await解析協程運作時異步接口同步實作使用Task實作異步生産者消費者模型的協程版本參考

文章目錄

  • 解析協程運作時
  • 異步接口同步實作
  • 使用Task實作異步
  • 生産者消費者模型的協程版本
  • 參考
看到吐血 (´ཀ`」 ∠)
  • 協程(Coroutine)本質上是一個函數,特點是在代碼塊中可以将執行權交給其他協程
  • 衆所周知,子程式(函數)都是層級調用的,如果在A中調用了B,那麼B執行完畢傳回後A才能執行完畢。協程與子程式有點類似,但是它在執行過程中可以中斷,轉而執行其他的協程,在适當的時候再回來繼續執行。
  • 協程與多線程相比的最大優勢在于:協程是一個線程中執行,沒有線程切換的開銷;協程由使用者決定在哪裡交出控制權
  • 這裡用到的是asyncio庫(Python 3.7),這個庫包含了大部分實作協程的魔法工具
    • 使用 async 修飾詞聲明異步函數
    • 使用 await 語句執行可等待對象(Coroutine、Task、Future)
    • 使用 asyncio.create_task 建立任務,将異步函數(協程)作為參數傳入,等待event loop執行
    • 使用 asyncio.run 函數運作協程程式,協程函數作為參數傳入

解析協程運作時

import asyncio
import time

async def a():
    print("歡迎使用 a !")
    await asyncio.sleep(1)
    print("歡迎回到 a !")

async def b():
    print("歡迎來到 b !")
    await asyncio.sleep(2)
    print("歡迎回到 b !")

async def main():
    task1 = asyncio.create_task(a())
    task2 = asyncio.create_task(b())
    print("準備開始")
    await task1
    print("task1 結束")
    await task2
    print("task2 結束")

if __name__ == "__main__":
    start = time.perf_counter()
    
    asyncio.run(main())
    
    print('花費 {} s'.format(time.perf_counter() - start))
           
了解 Python 協程-async、await解析協程運作時異步接口同步實作使用Task實作異步生産者消費者模型的協程版本參考
  • 解釋:
    • 1、asyncio.run(main()),程式進入main()函數,開啟事件循環
    • 2、建立任務task1、task2并進入事件循環等待運作
    • 3、輸出準備開始
    • 4、執行await task1,使用者選擇從目前主任務中切出,事件排程器開始排程 a
    • 5、a 開始運作,輸出歡迎使用a!,運作到await asyncio.sleep(1),從目前任務切出,事件排程器開始排程 b
    • 6、b 開始運作,輸出歡迎來到b!,運作到await asyncio.sleep(2),從目前任務切出
    • 7、以上事件運作時間非常短(毫秒),事件排程器開始暫停排程
    • 8、一秒鐘後,a的sleep完成,事件排程器将控制權重新交給a,輸出歡迎回到a!,task1完成任務,退出事件循環
    • 9、await task1完成,事件排程器将控制權還給主任務,輸出task1結束,然後在await task2處繼續等待
    • 10、兩秒鐘後,b的sleep完成,事件排程器将控制權重新傳給 b,輸出歡迎回到 b!,task2完成任務,從事件循環中退出
    • 11、事件排程器将控制權交還給主任務,主任務輸出task2結束,至此協程任務全部結束,事件循環結束。

上面的代碼也可以這樣寫,将15到21行換成一行

await asyncio.gather(a(), b())

也能實作類似的效果,await asyncio.gather 會并發運作傳入的可等待對象(Coroutine、Task、Future)。

import asyncio
import time

async def a():
    print("歡迎使用 a !")
    await asyncio.sleep(1)
    print("歡迎回到 a !")

async def b():
    print("歡迎來到 b !")
    await asyncio.sleep(2)
    print("歡迎回到 b !")

async def main():
    await asyncio.gather(a(), b())

if __name__ == "__main__":
    start = time.perf_counter()
    
    asyncio.run(main())
    
    print('花費 {} s'.format(time.perf_counter() - start))
           

異步接口同步實作

"""
- 簡單爬蟲模拟
- 這裡用異步接口寫了個同步代碼
"""

import asyncio
import time

async def crawl_page(url):
    print('crawling {}'.format(url))
    sleep_time = int(url.split('_')[-1])
    await asyncio.sleep(sleep_time)  # 休眠
    print('OK {}'.format(url))

async def main(urls):
    for url in urls:
        await crawl_page(url)  # await會将程式阻塞在這裡,進入被調用的協程函數,執行完畢後再繼續


start = time.perf_counter()

# pip install nest-asyncio
asyncio.run(main(['url_1', 'url_2'])) # 協程接口

print("Cost {} s".format(time.perf_counter() - start))
           
了解 Python 協程-async、await解析協程運作時異步接口同步實作使用Task實作異步生産者消費者模型的協程版本參考

使用Task實作異步

# 異步實作

import asyncio
import time

async def crawl_page(url):
    print('crawling {}'.format(url))
    sleep_time = int(url.split('_')[-1])
    await asyncio.sleep(sleep_time)
    print('OK {}'.format(url))

async def main(urls):
    tasks = [asyncio.create_task(crawl_page(url)) for url in urls]
    for task in tasks:
        await task
    # 14、15行也可以換成這一行await asyncio.gather(*tasks)
    # *tasks 解包清單,将清單變成了函數的參數,與之對應的是,** dict 将字典變成了函數的參數

start = time.perf_counter()

asyncio.run(main(['url_1', 'url_2']))

print("Cost {} s".format(time.perf_counter() - start))
           
了解 Python 協程-async、await解析協程運作時異步接口同步實作使用Task實作異步生産者消費者模型的協程版本參考

生産者消費者模型的協程版本

了解 Python 協程-async、await解析協程運作時異步接口同步實作使用Task實作異步生産者消費者模型的協程版本參考
# 極客時間:Python核心技術與實戰

import asyncio
import random
import time

async def consumer(queue, id):
    """消費者"""
    while True:
        val = await queue.get()
        print('{} get a val : {}'.format(id, val))
        await asyncio.sleep(1)


async def producer(queue, id):
    """生産者"""
    for _ in range(5):
        val = random.randint(1, 10)
        await queue.put(val)
        print('{} put a val: {}'.format(id, val))
        await asyncio.sleep(1)

async def main():
    queue = asyncio.Queue()
    
    consumer_1 = asyncio.create_task(consumer(queue, 'consumer_1'))
    consumer_2 = asyncio.create_task(consumer(queue, 'consumer_2'))

    producer_1 = asyncio.create_task(producer(queue, 'producer_1'))
    producer_2 = asyncio.create_task(producer(queue, 'producer_2'))
    
    await asyncio.sleep(10)
    # cancel掉執行之間過長的consumer_1、consumer_2,while True
    consumer_1.cancel()
    consumer_2.cancel()
    
    # return_exceptions 設為True,不讓異常throw到執行層,影響後續任務的執行
    await asyncio.gather(consumer_1, consumer_2, producer_1, producer_2, return_exceptions=True)

if __name__ == "__main__":
    start = time.perf_counter()

    asyncio.run(main())

    print("Cost {} s".format(time.perf_counter() - start))
           
了解 Python 協程-async、await解析協程運作時異步接口同步實作使用Task實作異步生産者消費者模型的協程版本參考
# 輸出結果
producer_1 put a val: 7
producer_2 put a val: 4
consumer_1 get a val : 7
consumer_2 get a val : 4
producer_1 put a val: 6
producer_2 put a val: 1
consumer_2 get a val : 6
consumer_1 get a val : 1
producer_1 put a val: 8
producer_2 put a val: 1
consumer_1 get a val : 8
consumer_2 get a val : 1
producer_1 put a val: 6
producer_2 put a val: 4
consumer_2 get a val : 6
consumer_1 get a val : 4
producer_1 put a val: 7
producer_2 put a val: 8
consumer_1 get a val : 7
consumer_2 get a val : 8
Cost 10.0093015 s
           

拓展閱讀:Python的生産者消費者模型,看這篇就夠了

參考

  • https://docs.python.org/3/library/asyncio.html#module-asyncio
  • 深入了解asyncio(一)
  • 揭密Python協程