天天看點

Python協程的底層實作生成器async, await事件循環Task

生成器

協程的核心就是上下文切換,在Python中最簡單的實作是用生成器

生成器有個方法

send()

可以從調用者向生成器函數發送資料,這樣就可以在生成器中

yield future

表示要等待 future 的結果,然後把上下文切換到調用者,等 future 結果準備好後調用者再

send(future.result())

給生成器發送結果,并把上下文切換到生成器函數

def generator_function():
    # 正常情況應用loop.create_future()
    result = yield asyncio.Future()
    print('future結果:', result)
    return 


def main():
    generator = generator_function()
    try:
        future = generator.send(None)
        # 假設某個回調調用了future.set_result
        future.set_result()
        future = generator.send(future.result())
    except StopIteration as e:
        print('generator_function結果:', e.value)
           

輸出:

future結果: 
generator_function結果: 
           

但是在生成器函數中調用子生成器會很麻煩:

def generator_function2():
    # 正常情況應用loop.create_future()
    result = yield asyncio.Future()
    print('future結果:', result)
    return 


def generator_function():
    generator2 = generator_function2()
    try:
        future = generator2.send(None)
        result = yield future
        future = generator2.send(result)
    except StopIteration as e:
        print('generator_function2結果:', e.value)

    return 


def main():
    generator = generator_function()
    try:
        future = generator.send(None)
        # 假設某個回調調用了future.set_result
        future.set_result()
        future = generator.send(future.result())
    except StopIteration as e:
        print('generator_function結果:', e.value)
           

輸出:

future結果: 
generator_function2結果: 
generator_function結果: 
           

于是有了

yield from

的文法糖,可以把流程控制交給子生成器,即子生成器

yield

直接切換上下文到調用者,調用者

send()

直接給子生成器發送資料。這樣上面的例子可以寫成:

def generator_function2():
    # 正常情況應用loop.create_future()
    result = yield asyncio.Future()
    print('future結果:', result)
    return 


def generator_function():
    result = yield from generator_function2()
    print('generator_function2結果:', result)
    return 


def main():
    generator = generator_function()
    try:
        future = generator.send(None)
        # 假設某個回調調用了future.set_result
        future.set_result()
        future = generator.send(future.result())
    except StopIteration as e:
        print('generator_function結果:', e.value)
           

輸出同上

但是用生成器實作協程語義上不明确,而且不能實作異步生成器(既是協程又是生成器),于是 PEP 492 提出了用

async

await

作為協程文法

async, await

async def

定義的函數稱為協程函數,它永遠傳回一個協程對象,即使函數裡沒有用到

await

await

後面可以跟一個 awaitable 對象,它的傳回值是 awaitable 對象的結果。一個實作了

__await__()

方法的對象或者協程對象都是 awaitable 對象。

__await__()

方法傳回一個生成器(即這個方法是一個生成器函數),它的實作和上面的生成器協程一樣,

yield future

表示要等待future的結果。當執行協程遇到

await

時,流程控制交給後面的 awaitable 對象,直到最底層用

yield future

上下文才切換到調用者

為了使

await

相容生成器實作的協程,可以用

@asyncio.coroutine

裝飾器裝飾

yield from

實作的協程(其實它就是給生成器函數加了個 flag

CO_ITERABLE_COROUTINE

)。生成器實作的協程傳回的對象(生成器)沒有

__await__()

方法,但它也是 awaitable 對象

協程對象和生成器一樣實作了

send(), throw(), close()

方法,但是不可以直接疊代(

__await__()

方法傳回的生成器可以疊代),知道這個就可以實作手動執行協程了:

# 另一種寫法,用 yield from 實作的協程
# @asyncio.coroutine
# def coroutine_function2():
#     # 正常情況應用loop.create_future()
#     result = yield from asyncio.Future()
#     print('future結果:', result)
#     return 2


# 用 async, await 實作的協程
async def coroutine_function2():
    # 正常情況應用loop.create_future()
    result = await asyncio.Future()
    print('future結果:', result)
    return 


async def coroutine_function():
    result = await coroutine_function2()
    print('coroutine_function2結果:', result)
    return 


def main():
    coroutine = coroutine_function()
    # 正常情況應用asyncio.ensure_future()執行協程
    try:
        future = coroutine.send(None)
        # 假設某個回調調用了future.set_result
        future.set_result()
        future = coroutine.send(future.result())
    except StopIteration as e:
        print('coroutine_function結果:', e.value)
           

事件循環

其實事件循環本身跟協程沒有什麼關系,它隻負責添加回調(

call_soon, call_later

),維護

scheduled, ready

隊列,在有事件時調用回調而已。這裡不研究了,感興趣的可以看它的實作,大部分在

asyncio.base_events.BaseEventLoop

Task

真正實作執行協程的是

Task

。正常情況執行一個協程用

asyncio.ensure_future()

,參數為協程對象,它的内部又調用了

loop.create_task()

建立一個

Task

Task

的實作在

asyncio.tasks

Task

繼承自

Future

,它的結果就是協程的傳回值。

Task

建立時就在事件循環裡添加回調開始執行協程:

def __init__(self, coro, *, loop=None):
        assert coroutines.iscoroutine(coro), repr(coro)
        super().__init__(loop=loop)
        if self._source_traceback:
            del self._source_traceback[-]
        self._coro = coro
        self._fut_waiter = None
        self._must_cancel = False
        # 添加回調
        self._loop.call_soon(self._step)
        self.__class__._all_tasks.add(self)
           

_step

負責協程的一次疊代

def _step(self, exc=None):
        assert not self.done(), \
            '_step(): already done: {!r}, {!r}'.format(self, exc)
        if self._must_cancel:
            if not isinstance(exc, futures.CancelledError):
                exc = futures.CancelledError()
            self._must_cancel = False
        coro = self._coro
        self._fut_waiter = None

        self.__class__._current_tasks[self._loop] = self
        # Call either coro.throw(exc) or coro.send(None).
        try:
            if exc is None:
                # We use the `send` method directly, because coroutines
                # don't have `__iter__` and `__next__` methods.
                # 疊代一次協程,await的傳回值總是future.result(),是以這裡不指定發送資料也可以
                result = coro.send(None)
            else:
                result = coro.throw(exc)
        except StopIteration as exc:
            # 協程已經執行完畢,這裡設定result或者exception
            if self._must_cancel:
                # Task is cancelled right before coro stops.
                self._must_cancel = False
                self.set_exception(futures.CancelledError())
            else:
                self.set_result(exc.value)
        except futures.CancelledError:
            # 協程被取消
            super().cancel()  # I.e., Future.cancel(self).
        except Exception as exc:
            # 其他異常
            self.set_exception(exc)
        except BaseException as exc:
            self.set_exception(exc)
            raise
        else:
            # 沒有異常,result應該是一個future
            blocking = getattr(result, '_asyncio_future_blocking', None)
            if blocking is not None:
                # Yielded Future must come from Future.__iter__().
                if result._loop is not self._loop:
                    # 錯誤,result是另一個事件循環的future
                    self._loop.call_soon(
                        self._step,
                        RuntimeError(
                            'Task {!r} got Future {!r} attached to a '
                            'different loop'.format(self, result)))
                elif blocking:
                    if result is self:
                        # 錯誤,await自己
                        self._loop.call_soon(
                            self._step,
                            RuntimeError(
                                'Task cannot await on itself: {!r}'.format(
                                    self)))
                    else:
                        # 正常情況,這裡給result添加一個完成時的回調self._wakeup,此協程在result完成前進入睡眠
                        result._asyncio_future_blocking = False
                        result.add_done_callback(self._wakeup)
                        self._fut_waiter = result
                        if self._must_cancel:
                            if self._fut_waiter.cancel():
                                self._must_cancel = False
                else:
                    # 錯誤,在生成器實作的協程中使用了yield而不是yield from
                    self._loop.call_soon(
                        self._step,
                        RuntimeError(
                            'yield was used instead of yield from '
                            'in task {!r} with {!r}'.format(self, result)))
            elif result is None:
                # 正常情況,在生成器實作的協程中使用裸yield交出控制權
                # Bare yield relinquishes control for one event loop iteration.
                self._loop.call_soon(self._step)
            elif inspect.isgenerator(result):
                # 錯誤,yield一個生成器
                # Yielding a generator is just wrong.
                self._loop.call_soon(
                    self._step,
                    RuntimeError(
                        'yield was used instead of yield from for '
                        'generator in task {!r} with {}'.format(
                            self, result)))
            else:
                # 錯誤,yield了其他東西
                # Yielding something else is an error.
                self._loop.call_soon(
                    self._step,
                    RuntimeError(
                        'Task got bad yield: {!r}'.format(result)))
        finally:
            self.__class__._current_tasks.pop(self._loop)
            self = None  # Needed to break cycles when an exception occurs.
           

等待的 future 完成後事件循環調用

_wakeup

喚醒協程

def _wakeup(self, future):
        try:
            future.result()
        except Exception as exc:
            # This may also be a cancellation.
            self._step(exc)
        else:
            # Don't pass the value of `future.result()` explicitly,
            # as `Future.__iter__` and `Future.__await__` don't need it.
            # If we call `_step(value, None)` instead of `_step()`,
            # Python eval loop would use `.send(value)` method call,
            # instead of `__next__()`, which is slower for futures
            # that return non-generator iterators from their `__iter__`.
            self._step()
        self = None  # Needed to break cycles when an exception occurs.
           

這是用事件循環和

Task

執行的協程:

async def coroutine_function2():
    loop = asyncio.get_event_loop()
    future = loop.create_future()

    # 假設某個IO操作3秒後完成
    loop.call_later(, lambda: future.set_result())

    result = await future
    print('future結果:', result)
    return 


async def coroutine_function():
    result = await coroutine_function2()
    print('coroutine_function2結果:', result)
    return 


def main():
    loop = asyncio.get_event_loop()
    # run_until_complete内部調用了ensure_future()
    result = loop.run_until_complete(coroutine_function())
    print('coroutine_function結果:', result)
    loop.close()