天天看點

c++ 協程_Python3 協程(coroutine)介紹

c++ 協程_Python3 協程(coroutine)介紹
本文首發于 at7h 的個人部落格。

目前 Python 語言的協程從實作來說可分為兩類:

  • 一種是基于傳統生成器的協程,叫做 generator-based coroutines ,通過包裝 generator 對象實作。
  • 另一種在 Python 3.5 版本 PEP 492 誕生,叫做 native coroutines ,即通過使用

    async

    文法來聲明的協程。

本文主要介紹第二種,第一種基于生成器的協程已在 Python 3.8 中棄用,并計劃在 Python 3.10 中移除。本文是「介紹」,就先不讨論太多實作原理的東西,感興趣的童鞋可以繼續關注後面的文章。

協程(coroutine)

首先,來看一個非常簡單的例子:

import asyncio

async def c():
    await asyncio.sleep(1)
    return 'Done  '
           

這個被

async

修飾的函數

c

就是一個

協程函數

,該函數會傳回一個

協程對象

In [1]: asyncio.iscoroutinefunction(c)
Out[1]: True

In [2]: c()
Out[2]: <coroutine object c at 0x107b0f748>

In [3]: asyncio.iscoroutine(c())
Out[3]: True
           

一般來說,協程函數

c

應具有以下特點:

  • 一定會傳回一個 協程對象 ,而不管其中是否有

    await

    表達式。
  • 函數中不能再使用

    yield from

  • 函數内部可通過

    await

    表達式來挂起自身協程,并等待另一個協程完成直到傳回結果。
  • await

    表達式後面可以跟的一定是一個 可等待對象 ,而不僅僅是協程對象。
  • 不可在

    async

    修飾的協程函數外使用

    await

    關鍵字,否則會引發一個

    SyntaxError

  • 當對協程對象進行垃圾收集時,如果從未等待過它,則會引發一個

    RuntimeWarning

    (如果你剛開始寫 async,那你一定遇得到,不經意間就會忘掉一個

    await

    )。

下面分别介紹下上面提到的兩個概念,

可等待對象

協程對象

可等待對象(awaitable)

我們來看

collections.abc

子產品中對

Awaitable

類的定義:

class Awaitable(metaclass=ABCMeta):

    __slots__ = ()

    @abstractmethod
    def __await__(self):
        yield

    @classmethod
    def __subclasshook__(cls, C):
        if cls is Awaitable:
            return _check_methods(C, "__await__")
        return NotImplemented
           

可見,可等待對象

主要實作了

一個

__await__

方法。且該方法必須傳回一個疊代器(iterator) ①,否則将會引發一個

TypeError

注意: 主要實作 是因為

await

表達式需要跟老的基于生成器的協程相相容,即通過使用

types.coroutine()

asyncio.coroutine()

裝飾器傳回的生成器疊代器對象(generator iterator)也屬于可等待對象,但它們并未實作

__await__

方法。

協程對象(Coroutine)

同樣的,我們來看

collections.abc

子產品中對

Coroutine

類的定義:

class Coroutine(Awaitable):

    __slots__ = ()

    @abstractmethod
    def send(self, value):
        """Send a value into the coroutine.
        Return next yielded value or raise StopIteration.
        """
        raise StopIteration

    @abstractmethod
    def throw(self, typ, val=None, tb=None):
        """Raise an exception in the coroutine.
        Return next yielded value or raise StopIteration.
        """
        if val is None:
            if tb is None:
                raise typ
            val = typ()
        if tb is not None:
            val = val.with_traceback(tb)
        raise val

    def close(self):
        """Raise GeneratorExit inside coroutine.
        """
        try:
            self.throw(GeneratorExit)
        except (GeneratorExit, StopIteration):
            pass
        else:
            raise RuntimeError("coroutine ignored GeneratorExit")

    @classmethod
    def __subclasshook__(cls, C):
        if cls is Coroutine:
            return _check_methods(C, '__await__', 'send', 'throw', 'close')
        return NotImplemented
           

由上可知,由于繼承關系,

協程對象是屬于可等待對象的

除了協程

Coroutine

對象外,目前常見的可等待對象還有兩種:

asyncio.Task

asyncio.Future

,下文中介紹。

協程的執行可通過調用

__await__()

并疊代其結果進行控制。當協程結束執行并傳回時,疊代器會引發

StopIteration

異常,并通過該異常的

value

屬性來傳播協程的傳回值。下面看一個簡單的例子:

In [4]: c().send(None)
Out[4]: <Future pending>

In [5]: async def c1():
   ...:     return "Done  "
   ...:
In [6]: c1().send(None)
Out[6]: StopIteration: Done  
           

運作

協程的運作需要在一個 EventLoop 中進行,由它來控制異步任務的注冊、執行、取消等。其大緻原理是:

把傳入的所有異步對象(準确的說是可等待對象,如

Coroutine

Task

等,見下文)都注冊到這個 EventLoop 上,EventLoop 會循環執行這些異步對象,但同時隻執行一個,當執行到某個對象時,如果它正在等待其他對象(I/O 處理) 傳回,事件循環會暫停它的執行去執行其他的對象。當某個對象完成 I/O 處理後,下次循環到它的時候會擷取其傳回值然後繼續向下執行。這樣以來,所有的異步任務就可以協同運作。

EventLoop 接受的對象必須為可等待對象,目前主要有三種類型即

Coroutine

,

Task

Future。

下面簡單的介紹下

Task

Future

  • Future

    是一種特殊的低級的可等待對象,用來支援底層回調式代碼與高層

    async/await

    式的代碼互動,是對協程底層實作的封裝,其表示一個異步操作的最終結果。它提供了設定和擷取

    Future

    執行狀态或結果等操作的接口。

    Future

    實作了

    __await__

    協定,并通過

    __iter__ = __await__

    來相容老式協程。一般來說,我們不需要關心這玩意兒,日常的開發也是不需要要用到它的。如有需要,就用其子類 Task。
  • Task 用來協同的排程協程以實作并發,并提供了相應的接口供我們使用。

建立一個

Task

非常簡單:

In [6]: loop = asyncio.get_event_loop()
In [7]: task = loop.create_task(c())

In [8]: task
Out[8]: <Task pending coro=<c() running at <ipython-input-1-3afd2bbb1944>:3>>

In [9]: task.done()
Out[9]: False

In [10]: task.cancelled()
Out[10]: False

In [11]: task.result()
Out[11]: InvalidStateError: Result is not set.

In [12]: await task
Out[12]: 'Done  '

In [13]: task
Out[13]: <Task finished coro=<c() done, defined at <ipython-input-1-3afd2bbb1944>:3> result='Done  '>
In [14]: task.done()
Out[14]: True
In [15]: task.result()
Out[15]: 'Done  '

In [16]: task = loop.create_task(c())
In [17]: task.cancel()
Out[17]: True
In [18]: await task
Out[18]: CancelledError:
           

上面說到,協程的運作需要在一個 EventLoop 中進行,在 Python 3.7 之前,你隻能這麼寫 :

In [19]: loop = asyncio.get_event_loop()
In [20]: loop.run_until_complete(c())
Out[20]: 'Done  '
In [21]: loop.close()
           

Python 3.7 及以上版本可以直接使用

asyncio.run()

:

In [22]: asyncio.run(c())
Out[22]: 'Done  '
           

并發

有些童鞋可能有疑問了,我寫好了一個個協程函數,怎樣才能并發的運作 ?

asyncio

提供了相應的兩個接口:

asyncio.gather

asyncio.wait

來支援:

async def c1():
    await asyncio.sleep(1)
    print('c1 done')
    return True

async def c2():
    await asyncio.sleep(2)
    print('c2 done')
    return True

async def c12_by_gather():
    await asyncio.gather(c1(), c2())

async def c12_by_awit():
    await asyncio.wait([c1(), c2()])
In [23]: asyncio.run(c12_by_gather())
c1 done
c2 done

In [24]: asyncio.run(c12_by_awit())
c1 done
c2 done
           

其它

上面我們介紹了 PEP 492 coroutine 的基礎使用,同時 PEP 492 也相應提出了基于

async with

async for

表達式的異步上下文管理器(asynchronous context manager)和異步疊代器(asynchronous iterator)。

下面的介紹的示例将基于 Python 可疊代對象, 疊代器和生成器 裡的示例展開,建議感興趣的同學可以先看下這篇文章。

異步上下文管理器

在 Python 中,我們常會通過實作

__enter__()

__exit__()

方法來實作一個上下文管理器:

In [1]: class ContextManager:
   ...:
   ...:     def __enter__(self):
   ...:         print('enter...')
   ...:
   ...:     def __exit__(slef, exc_type, exc_val, exc_tb):
   ...:         print('exit...')
   ...:

In [2]: with ContextManager():
   ...:     print('Do something...')
   ...:
enter...
Do something...
exit...
           

同樣的,在異步程式設計時我們可以通過實作

__aenter__()

__aexit__()

方法來實作一個上下文管理器,并通過

async with

表達式來使用。

In [1]: class AsyncContextManager:
   ...:
   ...:     async def __aenter__(self):
   ...:         print('async enter...')
   ...:
   ...:     async def __aexit__(slef, exc_type, exc_val, exc_tb):
   ...:         print('async exit...')
   ...:

In [2]: async with AsyncContextManager():
   ...:     print('Do something...')
   ...:
async enter...
Do something...
async exit...
           

異步疊代

在之前的文章 Python 可疊代對象, 疊代器和生成器 中,我們介紹了通過實作

__iter__()

方法來實作一個可疊代對象,通過實作疊代器協定

__iter__()

__next__()

方法來實作一個疊代器對象。下面我們改造下之前的例子,實作一個異步的版本。

class AsyncLinkFinder:

    PATTERN = "(?<=href=").+?(?=" target="_blank" rel="external nofollow"  target="_blank" rel="external nofollow" )|(?<=href=').+?(?=')"

    def __init__(self, text):
        self.links = re.findall(self.PATTERN, text)

    def __aiter__(self):
        return AsyncLinkIiterator(self.links)

class AsyncLinkIiterator:

    def __init__(self, links):
        self.links = links
        self.index = 0

    async def _gen_link(self):
        try:
            link = self.links[self.index]
            self.index += 1
        except IndexError:
            link = None
        return link

    def __aiter__(self):
        return self

    async def __anext__(self):
        link = await self._gen_link()
        if link is None:
            raise StopAsyncIteration
        return link
In [7]: async for s in AsyncLinkFinder(TEXT):
    ...:     print(s)
https://blog.python.org
http://feedproxy.google.com/~r/PythonSoftwareFoundationNew/~3/T3r7qZxo-xg/python-software-foundation-fellow.html
http://feedproxy.google.com/~r/PythonSoftwareFoundationNews/~3/lE0u-5MIUQc/why-sponsor-pycon-2020.html
http://feedproxy.google.com/~r/PythonSoftwareFoundationNews/~3/jAMRqiPhWSs/seeking-developers-for-paid-contract.html
           

例子中實作了

__aiter__()

方法的

AsyncLinkFinder

就是一個

異步可疊代對象

__aiter__()

方法傳回的必須是一個

異步疊代器

,如

AsyncLinkIiterator

。異步疊代器必須同時實作

__aiter__()

__anext__()

方法。一個不同的點是,異步疊代中,當疊代器耗盡時,需要引發一個

StopAsyncIteration

而不是

StopIteration

同樣的,我們也實作一個異步生成器版本的:

class LinkFinder:

    PATTERN = "(?<=href=").+?(?=" target="_blank" rel="external nofollow"  target="_blank" rel="external nofollow" )|(?<=href=').+?(?=')"

    def __init__(self, text):
        self.links = re.finditer(self.PATTERN, text)

    async def __aiter__(self):
        return (link.group() for link in self.links)
           

  • ① 關于疊代器的介紹可閱讀 Python 可疊代對象, 疊代器和生成器。
  • ② 關于示例中 __slots__ 的介紹請檢視 了解 Python 類屬性之 __slots__。

參考

  • PEP 492: Coroutines with async and await syntax
  • PEP 342: Coroutines via Enhanced Generators