生成器
協程的核心就是上下文切換,在Python中最簡單的實作是用生成器
生成器有個方法
send()
可以從調用者向生成器函數發送資料,這樣就可以在生成器中
yield future
表示要等待 future 的結果,然後把上下文切換到調用者,等 future 結果準備好後調用者再
send(future.result())
給生成器發送結果,并把上下文切換到生成器函數
def generator_function():
# 正常情況應用loop.create_future()
result = yield asyncio.Future()
print('future結果:', result)
return
def main():
generator = generator_function()
try:
future = generator.send(None)
# 假設某個回調調用了future.set_result
future.set_result()
future = generator.send(future.result())
except StopIteration as e:
print('generator_function結果:', e.value)
輸出:
future結果:
generator_function結果:
但是在生成器函數中調用子生成器會很麻煩:
def generator_function2():
# 正常情況應用loop.create_future()
result = yield asyncio.Future()
print('future結果:', result)
return
def generator_function():
generator2 = generator_function2()
try:
future = generator2.send(None)
result = yield future
future = generator2.send(result)
except StopIteration as e:
print('generator_function2結果:', e.value)
return
def main():
generator = generator_function()
try:
future = generator.send(None)
# 假設某個回調調用了future.set_result
future.set_result()
future = generator.send(future.result())
except StopIteration as e:
print('generator_function結果:', e.value)
輸出:
future結果:
generator_function2結果:
generator_function結果:
于是有了
yield from
的文法糖,可以把流程控制交給子生成器,即子生成器
yield
直接切換上下文到調用者,調用者
send()
直接給子生成器發送資料。這樣上面的例子可以寫成:
def generator_function2():
# 正常情況應用loop.create_future()
result = yield asyncio.Future()
print('future結果:', result)
return
def generator_function():
result = yield from generator_function2()
print('generator_function2結果:', result)
return
def main():
generator = generator_function()
try:
future = generator.send(None)
# 假設某個回調調用了future.set_result
future.set_result()
future = generator.send(future.result())
except StopIteration as e:
print('generator_function結果:', e.value)
輸出同上
但是用生成器實作協程語義上不明确,而且不能實作異步生成器(既是協程又是生成器),于是 PEP 492 提出了用
async
await
作為協程文法
async, await
async def
定義的函數稱為協程函數,它永遠傳回一個協程對象,即使函數裡沒有用到
await
await
後面可以跟一個 awaitable 對象,它的傳回值是 awaitable 對象的結果。一個實作了
__await__()
方法的對象或者協程對象都是 awaitable 對象。
__await__()
方法傳回一個生成器(即這個方法是一個生成器函數),它的實作和上面的生成器協程一樣,
yield future
表示要等待future的結果。當執行協程遇到
await
時,流程控制交給後面的 awaitable 對象,直到最底層用
yield future
上下文才切換到調用者
為了使
await
相容生成器實作的協程,可以用
@asyncio.coroutine
裝飾器裝飾
yield from
實作的協程(其實它就是給生成器函數加了個 flag
CO_ITERABLE_COROUTINE
)。生成器實作的協程傳回的對象(生成器)沒有
__await__()
方法,但它也是 awaitable 對象
協程對象和生成器一樣實作了
send(), throw(), close()
方法,但是不可以直接疊代(
__await__()
方法傳回的生成器可以疊代),知道這個就可以實作手動執行協程了:
# 另一種寫法,用 yield from 實作的協程
# @asyncio.coroutine
# def coroutine_function2():
# # 正常情況應用loop.create_future()
# result = yield from asyncio.Future()
# print('future結果:', result)
# return 2
# 用 async, await 實作的協程
async def coroutine_function2():
# 正常情況應用loop.create_future()
result = await asyncio.Future()
print('future結果:', result)
return
async def coroutine_function():
result = await coroutine_function2()
print('coroutine_function2結果:', result)
return
def main():
coroutine = coroutine_function()
# 正常情況應用asyncio.ensure_future()執行協程
try:
future = coroutine.send(None)
# 假設某個回調調用了future.set_result
future.set_result()
future = coroutine.send(future.result())
except StopIteration as e:
print('coroutine_function結果:', e.value)
事件循環
其實事件循環本身跟協程沒有什麼關系,它隻負責添加回調(
call_soon, call_later
),維護
scheduled, ready
隊列,在有事件時調用回調而已。這裡不研究了,感興趣的可以看它的實作,大部分在
asyncio.base_events.BaseEventLoop
Task
真正實作執行協程的是
Task
。正常情況執行一個協程用
asyncio.ensure_future()
,參數為協程對象,它的内部又調用了
loop.create_task()
建立一個
Task
,
Task
的實作在
asyncio.tasks
Task
繼承自
Future
,它的結果就是協程的傳回值。
Task
建立時就在事件循環裡添加回調開始執行協程:
def __init__(self, coro, *, loop=None):
assert coroutines.iscoroutine(coro), repr(coro)
super().__init__(loop=loop)
if self._source_traceback:
del self._source_traceback[-]
self._coro = coro
self._fut_waiter = None
self._must_cancel = False
# 添加回調
self._loop.call_soon(self._step)
self.__class__._all_tasks.add(self)
_step
負責協程的一次疊代
def _step(self, exc=None):
assert not self.done(), \
'_step(): already done: {!r}, {!r}'.format(self, exc)
if self._must_cancel:
if not isinstance(exc, futures.CancelledError):
exc = futures.CancelledError()
self._must_cancel = False
coro = self._coro
self._fut_waiter = None
self.__class__._current_tasks[self._loop] = self
# Call either coro.throw(exc) or coro.send(None).
try:
if exc is None:
# We use the `send` method directly, because coroutines
# don't have `__iter__` and `__next__` methods.
# 疊代一次協程,await的傳回值總是future.result(),是以這裡不指定發送資料也可以
result = coro.send(None)
else:
result = coro.throw(exc)
except StopIteration as exc:
# 協程已經執行完畢,這裡設定result或者exception
if self._must_cancel:
# Task is cancelled right before coro stops.
self._must_cancel = False
self.set_exception(futures.CancelledError())
else:
self.set_result(exc.value)
except futures.CancelledError:
# 協程被取消
super().cancel() # I.e., Future.cancel(self).
except Exception as exc:
# 其他異常
self.set_exception(exc)
except BaseException as exc:
self.set_exception(exc)
raise
else:
# 沒有異常,result應該是一個future
blocking = getattr(result, '_asyncio_future_blocking', None)
if blocking is not None:
# Yielded Future must come from Future.__iter__().
if result._loop is not self._loop:
# 錯誤,result是另一個事件循環的future
self._loop.call_soon(
self._step,
RuntimeError(
'Task {!r} got Future {!r} attached to a '
'different loop'.format(self, result)))
elif blocking:
if result is self:
# 錯誤,await自己
self._loop.call_soon(
self._step,
RuntimeError(
'Task cannot await on itself: {!r}'.format(
self)))
else:
# 正常情況,這裡給result添加一個完成時的回調self._wakeup,此協程在result完成前進入睡眠
result._asyncio_future_blocking = False
result.add_done_callback(self._wakeup)
self._fut_waiter = result
if self._must_cancel:
if self._fut_waiter.cancel():
self._must_cancel = False
else:
# 錯誤,在生成器實作的協程中使用了yield而不是yield from
self._loop.call_soon(
self._step,
RuntimeError(
'yield was used instead of yield from '
'in task {!r} with {!r}'.format(self, result)))
elif result is None:
# 正常情況,在生成器實作的協程中使用裸yield交出控制權
# Bare yield relinquishes control for one event loop iteration.
self._loop.call_soon(self._step)
elif inspect.isgenerator(result):
# 錯誤,yield一個生成器
# Yielding a generator is just wrong.
self._loop.call_soon(
self._step,
RuntimeError(
'yield was used instead of yield from for '
'generator in task {!r} with {}'.format(
self, result)))
else:
# 錯誤,yield了其他東西
# Yielding something else is an error.
self._loop.call_soon(
self._step,
RuntimeError(
'Task got bad yield: {!r}'.format(result)))
finally:
self.__class__._current_tasks.pop(self._loop)
self = None # Needed to break cycles when an exception occurs.
等待的 future 完成後事件循環調用
_wakeup
喚醒協程
def _wakeup(self, future):
try:
future.result()
except Exception as exc:
# This may also be a cancellation.
self._step(exc)
else:
# Don't pass the value of `future.result()` explicitly,
# as `Future.__iter__` and `Future.__await__` don't need it.
# If we call `_step(value, None)` instead of `_step()`,
# Python eval loop would use `.send(value)` method call,
# instead of `__next__()`, which is slower for futures
# that return non-generator iterators from their `__iter__`.
self._step()
self = None # Needed to break cycles when an exception occurs.
這是用事件循環和
Task
執行的協程:
async def coroutine_function2():
loop = asyncio.get_event_loop()
future = loop.create_future()
# 假設某個IO操作3秒後完成
loop.call_later(, lambda: future.set_result())
result = await future
print('future結果:', result)
return
async def coroutine_function():
result = await coroutine_function2()
print('coroutine_function2結果:', result)
return
def main():
loop = asyncio.get_event_loop()
# run_until_complete内部調用了ensure_future()
result = loop.run_until_complete(coroutine_function())
print('coroutine_function結果:', result)
loop.close()