天天看點

一篇文章學會 asyncio 子產品

【實驗樓個人教學筆記】

要寫個asyncio的伺服器,隻能抽空學學這個庫。

1、協程裝飾器

在 Python 3.4 中,asyncio 子產品出現,此時建立協程函數須使用

asyncio.coroutine

裝飾器标記。此前的包含

yield from

語句的函數既可以稱作生成器函數也可以稱作協程函數,為了突出協程的重要性,現在使用

asyncio.coroutine

裝飾器的函數就是真正的協程函數了。

2、任務和時間循環

  • coroutine 協程

    協程對象,使用

    asyncio.coroutine

    裝飾器裝飾的函數被稱作協程函數,它的調用不會立即執行函數,而是傳回一個協程對象,即協程函數的運作結果為協程對象,注意這裡說的 “運作結果” 不是 return 值。協程對象需要包裝成任務注入到事件循環,由事件循環調用。
  • task 任務

    将協程對象作為參數建立任務,任務是對協程對象的進一步封裝,其中包含任務的各種狀态。

  • event_loop 事件循環

    将多線程比喻為工廠裡的多個工廠中的房間,那麼協程就是一個工廠中的房間内的多台機器。

    線上程級程式中,一台機器開始工作,工廠中的房間内的其它機器不能同時工作,需要等上一台機器停止,但其它工廠中的房間内的機器可以同時啟動,這樣就可以顯著提高工作效率。

    在協程程式中,一個工廠中的房間内的不同機器可以同時運轉,啟動機器、暫停運轉、延時啟動、停止機器等操作都可以人為設定。

事件循環能夠控制任務運作流程,也就是任務的調用方。

3、一個簡單的例子

In [50]: import time

In [51]: import asyncio

In [52]: def one():
    ...:     start = time.time()
    ...:
    ...:     @asyncio.coroutine   # 使用協程裝飾器建立協程函數
    ...:     def do_some_work():  # 協程函數
    ...:         print('Start coroutine')
    ...:         time.sleep(0.1)  # 模拟 IO 操作
    ...:         print('This is a coroutine')
    ...:
    ...:     loop = asyncio.get_event_loop()     # 建立事件循環。每個線程中隻能有一個事件循環,get_event_loop 方法會擷取目前已經存在的事件循環,如果目前線程中沒有,建立一個
    ...:     coroutine = do_some_work()          # 調用協程函數擷取協程對象
    ...:     loop.run_until_complete(coroutine)  
    ...:	 # 将協程對象注入到事件循環,協程的運作由事件循環控制。事件循環的 run_until_complete 方法會阻塞運作,直到任務全部完成。協程對象作為 run_until_complete 方法的參數,loop 會自動将協程對象包裝成任務來運作。後面我們會講到多個任務注入事件循環的情況
    ...:
    ...:     end = time.time()
    ...:     print('運作耗時:{:.4f}'.format(end - start))  # 列印程式運作耗時
    ...:

In [53]: one()
Start coroutine
This is a coroutine
運作耗時:0.1062
           

4、任務狀态

協程對象不能直接運作,必須放入事件循環中或者由

yield from

語句調用。将協程對象注入事件循環的時候,其實是

run_until_complete

方法将協程包裝成了一個任務(task)對象,任務對象儲存了協程運作後的狀态,用于未來擷取協程的結果。

修改之前的代碼:

In [56]: def two():
    ...:     start = time.time()
    ...:
    ...:     @asyncio.coroutine
    ...:     def do_some_work():
    ...:         print('Start coroutine')
    ...:         time.sleep(0.1)
    ...:         print('This is a coroutine')
    ...:
    ...:     loop = asyncio.get_event_loop()
    ...:     coroutine = do_some_work()
    ...:     task = loop.create_task(coroutine)  # 事件循環的 create_task 方法可以建立任務,另外 asyncio.ensure_future 方法也可以建立任務,參數須為協程對象
    ...:     print('task 是不是 asyncio.Task 的執行個體?', isinstance(task, asyncio.Task))  # task 是 asyncio.Task 類的執行個體,為什麼要使用協程對象建立任務?因為在這個過程中 asyncio.Task 做了一些工作,包括預激協程、協程運作中遇到某些異常時的處理
    ...:     print('Task state:', task._state)   # task 對象的 _state 屬性儲存目前任務的運作狀态,任務的運作狀态有 PENDING 和 FINISHED 兩種
    ...:     loop.run_until_complete(task)       # 将任務注入事件循環,阻塞運作
    ...:     print('Task state:', task._state)
    ...:
    ...:     end = time.time()
    ...:     print('運作耗時:{:.4f}'.format(end - start))
    ...:

In [57]: two()
task 是不是 asyncio.Task 的執行個體? True
Task state: PENDING
Start coroutine
This is a coroutine
Task state: FINISHED
運作耗時:0.1052
           

5、

async

/

await

關鍵字

在 Python 3.5 中新增了

async

/

await

關鍵字用來定義協程函數。這兩個關鍵字是一個組合,其作用等同于

asyncio.coroutine

裝飾器和

yield from

語句。此後協程與生成器就徹底泾渭分明了。

6、綁定回調

假如協程包含一個 IO 操作(這幾乎是肯定的),等它處理完資料後,我們希望得到通知,以便下一步資料處理。這一需求可以通過向

future 對象

中添加回調來實作。那麼什麼是 future 對象?task 對象就是 future 對象,我們可以這樣認為,因為

asyncio.Task

asyncio.Future

的子類。也就是說,task 對象可以添加回調函數。回調函數的最後一個參數是

future

task 對象

,通過該對象可以擷取協程傳回值。如果回調需要多個參數,可以通過偏函數導入。

In [64]: def three():
    ...:     start = time.time()
    ...:
    ...:     # @asyncio.coroutine
    ...:     async def corowork():      # 使用 async 關鍵字替代 asyncio.coroutine 裝飾器建立協程函數
    ...:         print('[corowork] Start coroutine')
    ...:         time.sleep(0.1)
    ...:         print('[corowork] This is a coroutine')
    ...:
    ...:     def callback(name, task):  # 回調函數,協程終止後需要順便運作的代碼寫入這裡,回調函數的參數有要求,最後一個位置參數須為 task 對象
    ...:         print('[callback] Hello {}'.format(name))
    ...:         print('[callback] coroutine state: {}'.format(task._state))
    ...:
    ...:     loop = asyncio.get_event_loop()
    ...:     coroutine = corowork()
    ...:     task = loop.create_task(coroutine)
    ...:	 # task 對象的 add_done_callback 方法可以添加回調函數,注意參數必須是回調函數,這個方法不能傳入回調函數的參數,這一點需要通過 functools 子產品的 partial 方法解決,将回調函數和其參數 name 作為 partial 方法的參數,此方法的傳回值就是偏函數,偏函數可作為 task.add_done_callback 方法的參數
    ...:     task.add_done_callback(functools.partial(callback, 'Shiyanlou'))
    ...:     loop.run_until_complete(task)
    ...:
    ...:     end = time.time()
    ...:     print('運作耗時:{:.4f}'.format(end - start))
    ...:

In [65]: import functools

In [66]: three()
[corowork] Start coroutine
[corowork] This is a coroutine
[callback] Hello Shiyanlou
[callback] coroutine state: FINISHED
運作耗時:0.1051
           

7、多任務

實際項目中,往往有多個協程建立多個任務對象,同時在一個 loop 裡運作。為了把多個協程交給 loop,需要借助

asyncio.gather

方法。任務的 result 方法可以獲得對應的協程函數的 return 值。

In [67]: def four():
    ...:     start = time.time()
    ...:
    ...:     async def corowork(name, t):
    ...:         print('[corowork] Start coroutine', name)
    ...:         await asyncio.sleep(t)                  # 1
    ...:         print('[corowork] Stop coroutine', name)
    ...:         return 'Coroutine {} OK'.format(name)   # 2
    ...:
    ...:     loop = asyncio.get_event_loop()
    ...:     coroutine1 = corowork('ONE', 3)             # 3
    ...:     coroutine2 = corowork('TWO', 1)             # 3
    ...:     task1 = loop.create_task(coroutine1)        # 4
    ...:     task2 = loop.create_task(coroutine2)        # 4
    ...:     gather = asyncio.gather(task1, task2)       # 5
    ...:     loop.run_until_complete(gather)             # 6
    ...:     print('[task1] ', task1.result())           # 7
    ...:     print('[task2] ', task2.result())           # 7
    ...:
    ...:     end = time.time()
    ...:     print('運作耗時:{:.4f}'.format(end - start))
    ...:

In [68]: four()
[corowork] Start coroutine ONE
[corowork] Start coroutine TWO
[corowork] Stop coroutine TWO
[corowork] Stop coroutine ONE
[task1]  Coroutine ONE OK
[task2]  Coroutine TWO OK
運作耗時:3.0070
           

代碼說明:

  1. await 關鍵字等同于 Python 3.4 中的 yield from 語句,後面接協程對象。asyncio.sleep 方法的傳回值為協程對象,這一步為阻塞運作。asyncio.sleep 與 time.sleep 是不同的,前者阻塞目前協程,即 corowork 函數的運作,而 time.sleep 會阻塞整個線程,是以這裡必須用前者,阻塞目前協程,CPU 可以線上程内的其它協程中執行
  2. 協程函數的 return 值可以在協程運作結束後儲存到對應的 task 對象的 result 方法中
  3. 建立兩個協程對象,在協程内部分别阻塞 3 秒和 1 秒
  4. 建立兩個任務對象
  5. 将任務對象作為參數,asyncio.gather 方法建立任務收集器。注意,asyncio.gather 方法中參數的順序決定了協程的啟動順序
  6. 将任務收集器作為參數傳入事件循環的 run_until_complete 方法,阻塞運作,直到全部任務完成
  7. 任務結束後,事件循環停止,列印任務的 result 方法傳回值,即協程函數的 return 值

到這一步,大家應該可以看得出,上面的代碼已經是異步程式設計的結構了,在事件循環内部,兩個協程是交替運作完成的。簡單叙述一下程式協程部分的運作過程:

-> 首先運作 task1

-> 列印 [corowork] Start coroutine ONE

-> 遇到 asyncio.sleep 阻塞

-> 釋放 CPU 轉到 task2 中執行

-> 列印 [corowork] Start coroutine TWO

-> 再次遇到 asyncio.sleep 阻塞

-> 這次沒有其它協程可以運作了,隻能等阻塞結束

-> task2 的阻塞時間較短,阻塞 1 秒後先結束,列印 [corowork] Stop coroutine TWO

-> 又過了 2 秒,阻塞 3 秒的 task1 也結束了阻塞,列印 [corowork] Stop coroutine ONE

-> 至此兩個任務全部完成,事件循環停止

-> 列印兩個任務的 result

-> 列印程式運作時間

-> 程式全部結束

需要額外說明的幾點:

  • 1、多數情況下無需調用 task 的 add_done_callback 方法,可以直接把回調函數中的代碼寫入 await 語句後面,協程是可以暫停和恢複的
  • 2、多數情況下同樣無需調用 task 的 result 方法擷取協程函數的 return 值,因為事件循環的 run_until_complete 方法的傳回值就是協程函數的 return 值
  • 3、事件循環有一個 stop 方法用來停止循環和一個 close 方法用來關閉循環。以上示例中都沒有調用 loop.close 方法,似乎并沒有什麼問題。是以到底要不要調用 loop.close 呢?簡單來說,loop 隻要不關閉,就還可以再次運作 run_until_complete 方法,關閉後則不可運作。有人會建議調用 loop.close,徹底清理 loop 對象防止誤用,其實多數情況下根本沒有這個必要。
  • 4、asyncio 子產品提供了 asyncio.gather 和 asyncio.wait 兩個任務收集方法,它們的作用相同,都是将協程任務按順序排定,再将傳回值作為參數加入到事件循環中。前者在上文已經用到,後者與前者的差別是它可以擷取任務的執行狀态(PENING & FINISHED),當有一些特别的需求例如在某些情況下取消任務,可以使用 asyncio.wait 方法。

8、取消任務

在事件循環啟動之後停止之前,我們可以手動取消任務的執行,注意 PENDING 狀态的任務才能被取消,FINISHED 狀态的任務已經完成,不能取消。

# File Name: async_cancel.py

import asyncio

async def work(id, t):
    print('Working...')
    await asyncio.sleep(t)
    print('Work {} done'.format(id))

def main():
    loop = asyncio.get_event_loop()
    coroutines = [work(i, i) for i in range(1, 4)]            # 建立一個清單,清單中有 3 個協程對象,協程内部分别阻塞 1 - 3 秒

    try:
        loop.run_until_complete(asyncio.gather(*coroutines))  # 程式運作過程中,快捷鍵 Ctrl + C 會觸發 KeyboardInterrupt 異常。捕獲這個異常,在程式終止前完成 # 3 和 # 4 代碼的執行
    except KeyboardInterrupt:
        loop.stop()    # 事件循環的 stop 方法取消所有未完成的任務,停止事件循環
    finally:
        loop.close()   # 關閉事件循環

if __name__ == '__main__':
    main()
           

運作結果:

$ python3 async_cancel.py
Working...
Working...
Working...
Work 1 done
^C%
           

任務的

cancel

方法也可以取消任務,而

asyncio.Task.all_tasks

方法可以獲得事件循環中的全部任務。修改上文代碼中的 main 函數如下:

def main():
    loop = asyncio.get_event_loop()
    coroutines = [work(i, i) for i in range(1, 4)]
    # 程式運作過程中,快捷鍵 Ctrl + C 會觸發 KeyboardInterrupt 異常
    try:
        loop.run_until_complete(asyncio.gather(*coroutines))
    except KeyboardInterrupt:
        print()
        # 每個線程裡隻能有一個事件循環
        # 此方法可以獲得事件循環中的所有任務的集合
        # 任務的狀态有 PENDING 和 FINISHED 兩種
        tasks = asyncio.Task.all_tasks()
        for i in tasks:
            print('取消任務:{}'.format(i))
            # 任務的 cancel 方法可以取消未完成的任務
            # 取消成功傳回 True ,已完成的任務取消失敗傳回 False
            print('取消狀态:{}'.format(i.cancel()))
    finally:
        loop.close()
           

運作結果:

$ python3 async_cancel.py
Working...
Working...
Working...
Work 1 done
^C
取消任務:<Task finished coro=<work() done, defined at a.py:5> result=None>
取消狀态:False
取消任務:<Task pending coro=<work() running at a.py:7> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x102cd8a38>()]> cb=[gather.<locals>._done_callback() at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py:664]>
取消狀态:True
取消任務:<Task pending coro=<work() running at a.py:7> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x102cd8a98>()]> cb=[gather.<locals>._done_callback() at /usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py:664]>
取消狀态:True
           

9、無限循環任務

事件循環的

run_until_complete

方法運作事件循環,當其中的全部任務完成後,自動停止事件循環;

run_forever

方法為無限運作事件循環,需要自定義

loop.stop

方法并執行之才會停止。

# File Name: run_forever.py

import asyncio

async def work(loop, t):
    print('start')
    await asyncio.sleep(t)  # 模拟 IO 操作
    print('after {}s stop'.format(t))
    loop.stop()             # 停止事件循環,stop 後仍可重新運作

loop = asyncio.get_event_loop()             # 建立事件循環
task = asyncio.ensure_future(work(loop, 1)) # 建立任務,該任務會自動加入事件循環
loop.run_forever()  # 無限運作事件循環,直至 loop.stop 停止
loop.close()        # 關閉事件循環,隻有 loop 處于停止狀态才會執行
           

運作程式:

$ python3 run_forever.py
start
after 1s stop
           

以上是單任務事件循環,将

loop

作為參數傳入協程函數建立協程,在協程内部執行

loop.stop

方法停止事件循環。下面是多任務事件循環,使用回調函數執行

loop.stop

停止事件循環,修改

run_forever.py

檔案如下:

# File Name: run_forever.py

import time
import asyncio
import functools

def loop_stop(loop, future):    # 函數的最後一個參數須為 future / task
    loop.stop()                 # 停止事件循環,stop 後仍可重新運作

async def work(t):              # 協程函數
    print('start')
    await asyncio.sleep(t)      # 模拟 IO 操作
    print('after {}s stop'.format(t))

def main():
    loop = asyncio.get_event_loop()
    # 建立任務收集器,參數為任意數量的協程,任務收集器本身也是 task / future 對象
    tasks = asyncio.gather(work(1), work(2))
    # 任務收集器的 add_done_callback 方法添加回調函數
    # 當所有任務完成後,自動運作此回調函數
    # 注意 add_done_callback 方法的參數是回調函數
    # 這裡使用 functools.partial 方法建立偏函數以便将 loop 作為參數加入
    tasks.add_done_callback(functools.partial(loop_stop, loop))
    loop.run_forever()  # 無限運作事件循環,直至 loop.stop 停止
    loop.close()        # 關閉事件循環

if __name__ == '__main__':
    start = time.time()
    main()
    end = time.time()
    print('耗時:{:.4f}s'.format(end - start))
           

運作結果:

$ python3 run_forever.py
start
start
after 1s stop
after 2s stop
耗時:2.0064s
           

loop.run_until_complete

方法本身也是調用

loop.run_forever

方法,然後通過回調函數調用

loop.stop

方法實作的。

10、加入普通函數

事件循環的

call_soon

方法可以将普通函數作為任務加入到事件循環并立即排定任務的執行順序。

# File Name: call_soon.py

import asyncio
import time

def hello(name):          # 普通函數
    print('[hello] Hello, {}'.format(name))

async def work(t, name):  # 協程函數
    print('[work ] start', name)
    await asyncio.sleep(t)
    print('[work ] {} after {}s stop'.format(name, t))

def main():
    loop = asyncio.get_event_loop() 
    # 向事件循環中添加任務
    asyncio.ensure_future(work(1, 'A'))     # 第 1 個執行
    # call_soon 将普通函數當作 task 加入到事件循環并排定執行順序
    # 該方法的第一個參數為普通函數名字,普通函數的參數寫在後面
    loop.call_soon(hello, 'Tom')            # 第 2 個執行
    # 向事件循環中添加任務
    loop.create_task(work(2, 'B'))          # 第 3 個執行
    # 阻塞啟動事件循環,順便再添加一個任務  
    loop.run_until_complete(work(3, 'C'))   # 第 4 個執行

if __name__ == '__main__':
    main()
           

運作結果:

$ python3 call_soon.py
[work ] start A
[hello] Hello, Tom
[work ] start B
[work ] start C
[work ] A after 1s stop
[work ] B after 2s stop
[work ] C after 3s stop
           

11、稍後執行普通函數

loop.call_later

此方法同

loop.call_soon

一樣,可将普通函數作為任務放到事件循環裡,不同之處在于此方法可延時執行,第一個參數為延時時間。

# File Name: call_later.py

import asyncio
import functools

def hello(name):            # 普通函數
    print('[hello]  Hello, {}'.format(name))

async def work(t, name):    # 協程函數
    print('[work{}]  start'.format(name))
    await asyncio.sleep(t)
    print('[work{}]  stop'.format(name))

def main():
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(work(1, 'A'))         # 任務 1
    loop.call_later(1.2, hello, 'Tom')          # 任務 2
    loop.call_soon(hello, 'Kitty')              # 任務 3
    task4 = loop.create_task(work(2, 'B'))      # 任務 4
    loop.call_later(1, hello, 'Jerry')          # 任務 5
    loop.run_until_complete(task4)

if __name__ == '__main__':
    main()
           

運作結果:

$ python3 call_later.py
[workA]  start
[hello]  Hello, Kitty
[workB]  start
[hello]  Hello, Jerry
[workA]  stop
[hello]  Hello, Tom
[workB]  stop
           

12、其他常用方法

call_soon 立刻執行

call_later

延時執行,

call_at

在某時刻執行

loop.time

就是事件循環内部的一個計時方法,傳回值是時刻,資料類型是 float

def main():
    loop = asyncio.get_event_loop()
    start = loop.time()                         # 事件循環内部時刻
    asyncio.ensure_future(work(1, 'A'))         # 任務 1
    # loop.call_later(1.2, hello, 'Tom')
    # 上面注釋這行等同于下面這行
    loop.call_at(start+1.2, hello, 'Tom')       # 任務 2
    loop.call_soon(hello, 'Kitty')              # 任務 3
    task4 = loop.create_task(work(2, 'B'))      # 任務 4
    # loop.call_later(1, hello, 'Jerry')
    # 上面注釋這行等同于下面這行
    loop.call_at(start+1, hello, 'Jerry')       # 任務 5

    loop.run_until_complete(task4)
           

運作檔案結果與 call_later.py 一緻,不再展示。

這三個 call_xxx 方法的作用都是将普通函數作為任務排定到事件循環中,傳回值都是

asyncio.events.TimerHandle

執行個體,注意它們不是協程任務 ,不能作為

loop.run_until_complete

的參數。

13、協程鎖

按照字面意思來看,

asyncio.lock

應該叫做異步 IO 鎖,之是以叫協程鎖,是因為它通常使用在子協程中,其作用是将協程内部的一段代碼鎖住,直到這段代碼運作完畢解鎖。協程鎖的固定用法是使用

async with

建立協程鎖的上下文環境,将代碼塊寫入其中。

import asyncio

l = []
lock = asyncio.Lock()   # 協程鎖

async def work(name):
    print('lalalalalalalala')     # 列印此資訊是為了測試協程鎖的控制範圍
    # 這裡加個鎖,第一次調用該協程,運作到這個語句塊,上鎖
    # 當語句塊結束後解鎖,開鎖前該語句塊不可被運作第二次
    # 如果上鎖後有其它任務調用了這個協程函數,運作到這步會被阻塞,直至解鎖
    # with 是普通上下文管理器關鍵字,async with 是異步上下文管理器關鍵字
    # 能夠使用 with 關鍵字的對象須有 __enter__ 和 __exit__ 方法
    # 能夠使用 async with 關鍵字的對象須有 __aenter__ 和 __aexit__ 方法
    # async with 會自動運作 lock 的 __aenter__ 方法,該方法會調用 acquire 方法上鎖
    # 在語句塊結束時自動運作 __aexit__ 方法,該方法會調用 release 方法解鎖
    # 這和 with 一樣,都是簡化 try ... finally 語句
    async with lock:
        print('{} start'.format(name))  # 頭一次運作該協程時列印
        if 'x' in l:                    # 如果判斷成功
            return name                 # 直接傳回結束協程,不再向下執行
        await asyncio.sleep(0); print('----------')  # 阻塞 0 秒,切換協程
        l.append('x')
        print('{} end'.format(name))
        return name

async def one():
    name = await work('one')
    print('{} ok'.format(name))

async def two():
    name = await work('two')
    print('{} ok'.format(name))

def main():
    loop = asyncio.get_event_loop()
    tasks = asyncio.wait([one(), two()])
    loop.run_until_complete(tasks)

if __name__ == '__main__':
    main()
           

運作結果:

$ python3 async_lock.py
lalalalalalalala
one start
lalalalalalalala
----------
one end
one ok
two start
two ok
           

繼續閱讀