
本文首發于 at7h 的個人部落格。
目前 Python 語言的協程從實作來說可分為兩類:
- 一種是基于傳統生成器的協程,叫做 generator-based coroutines ,通過包裝 generator 對象實作。
- 另一種在 Python 3.5 版本 PEP 492 誕生,叫做 native coroutines ,即通過使用
文法來聲明的協程。async
本文主要介紹第二種,第一種基于生成器的協程已在 Python 3.8 中棄用,并計劃在 Python 3.10 中移除。本文是「介紹」,就先不讨論太多實作原理的東西,感興趣的童鞋可以繼續關注後面的文章。
協程(coroutine)
首先,來看一個非常簡單的例子:
import asyncio
async def c():
await asyncio.sleep(1)
return 'Done '
這個被
async
修飾的函數
c
就是一個
協程函數,該函數會傳回一個
協程對象。
In [1]: asyncio.iscoroutinefunction(c)
Out[1]: True
In [2]: c()
Out[2]: <coroutine object c at 0x107b0f748>
In [3]: asyncio.iscoroutine(c())
Out[3]: True
一般來說,協程函數
c
應具有以下特點:
- 一定會傳回一個 協程對象 ,而不管其中是否有
表達式。await
- 函數中不能再使用
。yield from
- 函數内部可通過
表達式來挂起自身協程,并等待另一個協程完成直到傳回結果。await
-
表達式後面可以跟的一定是一個 可等待對象 ,而不僅僅是協程對象。await
- 不可在
修飾的協程函數外使用async
關鍵字,否則會引發一個await
。SyntaxError
- 當對協程對象進行垃圾收集時,如果從未等待過它,則會引發一個
(如果你剛開始寫 async,那你一定遇得到,不經意間就會忘掉一個RuntimeWarning
)。await
下面分别介紹下上面提到的兩個概念,
可等待對象和
協程對象。
可等待對象(awaitable)
我們來看
collections.abc
子產品中對
Awaitable
類的定義:
class Awaitable(metaclass=ABCMeta):
__slots__ = ()
@abstractmethod
def __await__(self):
yield
@classmethod
def __subclasshook__(cls, C):
if cls is Awaitable:
return _check_methods(C, "__await__")
return NotImplemented
可見,可等待對象
主要實作了一個
__await__
方法。且該方法必須傳回一個疊代器(iterator) ①,否則将會引發一個
TypeError
。
注意: 主要實作 是因為表達式需要跟老的基于生成器的協程相相容,即通過使用
await
或
types.coroutine()
裝飾器傳回的生成器疊代器對象(generator iterator)也屬于可等待對象,但它們并未實作
asyncio.coroutine()
方法。
__await__
協程對象(Coroutine)
同樣的,我們來看
collections.abc
子產品中對
Coroutine
類的定義:
class Coroutine(Awaitable):
__slots__ = ()
@abstractmethod
def send(self, value):
"""Send a value into the coroutine.
Return next yielded value or raise StopIteration.
"""
raise StopIteration
@abstractmethod
def throw(self, typ, val=None, tb=None):
"""Raise an exception in the coroutine.
Return next yielded value or raise StopIteration.
"""
if val is None:
if tb is None:
raise typ
val = typ()
if tb is not None:
val = val.with_traceback(tb)
raise val
def close(self):
"""Raise GeneratorExit inside coroutine.
"""
try:
self.throw(GeneratorExit)
except (GeneratorExit, StopIteration):
pass
else:
raise RuntimeError("coroutine ignored GeneratorExit")
@classmethod
def __subclasshook__(cls, C):
if cls is Coroutine:
return _check_methods(C, '__await__', 'send', 'throw', 'close')
return NotImplemented
由上可知,由于繼承關系,
協程對象是屬于可等待對象的。
除了協程
Coroutine
對象外,目前常見的可等待對象還有兩種:
asyncio.Task
和
asyncio.Future
,下文中介紹。
協程的執行可通過調用
__await__()
并疊代其結果進行控制。當協程結束執行并傳回時,疊代器會引發
StopIteration
異常,并通過該異常的
value
屬性來傳播協程的傳回值。下面看一個簡單的例子:
In [4]: c().send(None)
Out[4]: <Future pending>
In [5]: async def c1():
...: return "Done "
...:
In [6]: c1().send(None)
Out[6]: StopIteration: Done
運作
協程的運作需要在一個 EventLoop 中進行,由它來控制異步任務的注冊、執行、取消等。其大緻原理是:
把傳入的所有異步對象(準确的說是可等待對象,如,
Coroutine
等,見下文)都注冊到這個 EventLoop 上,EventLoop 會循環執行這些異步對象,但同時隻執行一個,當執行到某個對象時,如果它正在等待其他對象(I/O 處理) 傳回,事件循環會暫停它的執行去執行其他的對象。當某個對象完成 I/O 處理後,下次循環到它的時候會擷取其傳回值然後繼續向下執行。這樣以來,所有的異步任務就可以協同運作。
Task
EventLoop 接受的對象必須為可等待對象,目前主要有三種類型即
Coroutine
,
Task
和
Future。
下面簡單的介紹下
Task
和
Future
:
-
是一種特殊的低級的可等待對象,用來支援底層回調式代碼與高層Future
式的代碼互動,是對協程底層實作的封裝,其表示一個異步操作的最終結果。它提供了設定和擷取async/await
執行狀态或結果等操作的接口。Future
實作了Future
協定,并通過__await__
來相容老式協程。一般來說,我們不需要關心這玩意兒,日常的開發也是不需要要用到它的。如有需要,就用其子類 Task。__iter__ = __await__
- Task 用來協同的排程協程以實作并發,并提供了相應的接口供我們使用。
建立一個
Task
非常簡單:
In [6]: loop = asyncio.get_event_loop()
In [7]: task = loop.create_task(c())
In [8]: task
Out[8]: <Task pending coro=<c() running at <ipython-input-1-3afd2bbb1944>:3>>
In [9]: task.done()
Out[9]: False
In [10]: task.cancelled()
Out[10]: False
In [11]: task.result()
Out[11]: InvalidStateError: Result is not set.
In [12]: await task
Out[12]: 'Done '
In [13]: task
Out[13]: <Task finished coro=<c() done, defined at <ipython-input-1-3afd2bbb1944>:3> result='Done '>
In [14]: task.done()
Out[14]: True
In [15]: task.result()
Out[15]: 'Done '
In [16]: task = loop.create_task(c())
In [17]: task.cancel()
Out[17]: True
In [18]: await task
Out[18]: CancelledError:
上面說到,協程的運作需要在一個 EventLoop 中進行,在 Python 3.7 之前,你隻能這麼寫 :
In [19]: loop = asyncio.get_event_loop()
In [20]: loop.run_until_complete(c())
Out[20]: 'Done '
In [21]: loop.close()
Python 3.7 及以上版本可以直接使用
asyncio.run()
:
In [22]: asyncio.run(c())
Out[22]: 'Done '
并發
有些童鞋可能有疑問了,我寫好了一個個協程函數,怎樣才能并發的運作 ?
asyncio
提供了相應的兩個接口:
asyncio.gather
和
asyncio.wait
來支援:
async def c1():
await asyncio.sleep(1)
print('c1 done')
return True
async def c2():
await asyncio.sleep(2)
print('c2 done')
return True
async def c12_by_gather():
await asyncio.gather(c1(), c2())
async def c12_by_awit():
await asyncio.wait([c1(), c2()])
In [23]: asyncio.run(c12_by_gather())
c1 done
c2 done
In [24]: asyncio.run(c12_by_awit())
c1 done
c2 done
其它
上面我們介紹了 PEP 492 coroutine 的基礎使用,同時 PEP 492 也相應提出了基于
async with
和
async for
表達式的異步上下文管理器(asynchronous context manager)和異步疊代器(asynchronous iterator)。
下面的介紹的示例将基于 Python 可疊代對象, 疊代器和生成器 裡的示例展開,建議感興趣的同學可以先看下這篇文章。
異步上下文管理器
在 Python 中,我們常會通過實作
__enter__()
和
__exit__()
方法來實作一個上下文管理器:
In [1]: class ContextManager:
...:
...: def __enter__(self):
...: print('enter...')
...:
...: def __exit__(slef, exc_type, exc_val, exc_tb):
...: print('exit...')
...:
In [2]: with ContextManager():
...: print('Do something...')
...:
enter...
Do something...
exit...
同樣的,在異步程式設計時我們可以通過實作
__aenter__()
和
__aexit__()
方法來實作一個上下文管理器,并通過
async with
表達式來使用。
In [1]: class AsyncContextManager:
...:
...: async def __aenter__(self):
...: print('async enter...')
...:
...: async def __aexit__(slef, exc_type, exc_val, exc_tb):
...: print('async exit...')
...:
In [2]: async with AsyncContextManager():
...: print('Do something...')
...:
async enter...
Do something...
async exit...
異步疊代
在之前的文章 Python 可疊代對象, 疊代器和生成器 中,我們介紹了通過實作
__iter__()
方法來實作一個可疊代對象,通過實作疊代器協定
__iter__()
和
__next__()
方法來實作一個疊代器對象。下面我們改造下之前的例子,實作一個異步的版本。
class AsyncLinkFinder:
PATTERN = "(?<=href=").+?(?=" target="_blank" rel="external nofollow" target="_blank" rel="external nofollow" )|(?<=href=').+?(?=')"
def __init__(self, text):
self.links = re.findall(self.PATTERN, text)
def __aiter__(self):
return AsyncLinkIiterator(self.links)
class AsyncLinkIiterator:
def __init__(self, links):
self.links = links
self.index = 0
async def _gen_link(self):
try:
link = self.links[self.index]
self.index += 1
except IndexError:
link = None
return link
def __aiter__(self):
return self
async def __anext__(self):
link = await self._gen_link()
if link is None:
raise StopAsyncIteration
return link
In [7]: async for s in AsyncLinkFinder(TEXT):
...: print(s)
https://blog.python.org
http://feedproxy.google.com/~r/PythonSoftwareFoundationNew/~3/T3r7qZxo-xg/python-software-foundation-fellow.html
http://feedproxy.google.com/~r/PythonSoftwareFoundationNews/~3/lE0u-5MIUQc/why-sponsor-pycon-2020.html
http://feedproxy.google.com/~r/PythonSoftwareFoundationNews/~3/jAMRqiPhWSs/seeking-developers-for-paid-contract.html
例子中實作了
__aiter__()
方法的
AsyncLinkFinder
就是一個
異步可疊代對象,
__aiter__()
方法傳回的必須是一個
異步疊代器,如
AsyncLinkIiterator
。異步疊代器必須同時實作
__aiter__()
和
__anext__()
方法。一個不同的點是,異步疊代中,當疊代器耗盡時,需要引發一個
StopAsyncIteration
而不是
StopIteration
。
同樣的,我們也實作一個異步生成器版本的:
class LinkFinder:
PATTERN = "(?<=href=").+?(?=" target="_blank" rel="external nofollow" target="_blank" rel="external nofollow" )|(?<=href=').+?(?=')"
def __init__(self, text):
self.links = re.finditer(self.PATTERN, text)
async def __aiter__(self):
return (link.group() for link in self.links)
注
- ① 關于疊代器的介紹可閱讀 Python 可疊代對象, 疊代器和生成器。
- ② 關于示例中 __slots__ 的介紹請檢視 了解 Python 類屬性之 __slots__。
參考
- PEP 492: Coroutines with async and await syntax
- PEP 342: Coroutines via Enhanced Generators