天天看點

Python标準庫asyncio子產品基本原理淺析

Python标準庫asyncio子產品基本原理淺析

本文環境python3.7.0
           

asyncio子產品的實作思路

目前程式設計語言都開始在語言層面上,開始簡化對異步程式的程式設計過程,其中Python中也開始了在語言層面上對異步程式設計的簡化,特地使用了await和async這兩個關鍵字來進行對異步代碼的簡化和封裝。本文就開始簡單的分析一下asyncio标準庫是怎麼來封裝異步程式設計的這麼一個過程,大緻浏覽一下asyncio中一個代碼示例的工作流程,其實Python異步程式設計的本質也是封裝了yeild等協程來實作的,涉及到的Task,Future等概念可通過前文異步爬蟲架構與協程淺析來了解一下如果通過關鍵字yield等來直接進行協程封裝的異步程式設計過程,該過程與本文待分析的asyncio子產品包的工作原理基本一緻,隻不過在asnycio子產品中添加了對await和async關鍵字的支援。

await和async關鍵字示例淺析

首先來看一下await和async兩個關鍵字的基本示例:

class TestAsync(object):
	def __await__(self):
		yield self
		print("await return value")
		return 'TestAsync'

async def test():
	print("start")
	data = await TestAsync()
	print("over ", data)
	return 'test return'

if __name__ == '__main__':
	c = test()
	res = c.send(None)
	print('res ', res)
	try:
		c.send(None)
	except StopIteration as exc:
		print(" c last return ", exc.value)
           

該示例代碼的傳回結果如下:

start
res  <__main__. TestAsync object at 0x7f29e8ebd240>
await return value
over  TestAsync
 c last return  test return
           

從運作結果可知,首先先得到一個執行個體c,然後向執行個體send值None,此時就執行到了TestAsync類執行個體的__await__方法處的yield方法,此時就yield傳回了自己,此時就列印出了res,再一次通過c去send值None時,此時就執行到了TestAsync的__await__方法剩下的函數流程去執行,此時就報錯StopIteration錯誤,此時傳回的值就是__await__方法傳回的值,此時test函數在await TestAsync之後的代碼繼續執行,此時的data 就是__await__的傳回值,然後test函數執行完成後,傳回’test return’,此時整個執行結束,此時報錯的傳回值就是test函數執行完成的函數值’test return’,至此一個簡單的有關await和async關鍵字的例子就執行完成了。此處其實不用該關鍵字,也可以實作同等情況下的效果,可參考yield和yield from 的關鍵字的使用方法,大家有興趣可自行嘗試。

asyncio子產品的原理描述與示例代碼

有關異步程式設計,其實本質上都是通過事件觸發來實作的異步程式設計,本質上都采用了IO複用的方式,來實作非阻塞的操作,通過注冊讀或寫事件來注冊當該事件發生時的回調函數,完成的時候就執行回調,讓邏輯繼續執行,有關IO複用的詳細内容,大家可自行查閱相關内容,asyncio子產品的本質也是圍繞IO複用來實作的時間注冊的運作模式,隻不過配合了協程來實作,進而使程式設計的方式更加的簡化,可以儲存目前函數的執行的過程,進而更友善簡潔的實作相關業務邏輯。

import asyncio
import urllib.parse

def test():
    async def print_http_headers(url):
        url = urllib.parse.urlsplit(url)
        if url.scheme == 'https':
            reader, writer = await asyncio.open_connection(
                url.hostname, 443, ssl=True
            )
        else:
            reader, writer = await asyncio.open_connection(
                url.hostname, 80
            )

        query = (
            f"HEAD {url.path or '/' } HTTP/1.0\r\n"
            f"Host: {url.hostname}\r\n"
            f"\r\n"
        )

        writer.write(query.encode('latin-1'))
        while True:
            line = await reader.readline()
            if not line:
                break

            line = line.decode('latin1').rstrip()
            if line:
                print(f'HTTP header> {line}')

        writer.close()

    url = 'https://www.baidu.com'
    asyncio.run(print_http_headers(url))


if __name__ == '__main__':
    test()
           

該函數的執行結果如下:

HTTP header> HTTP/1.0 200 OK
HTTP header> Accept-Ranges: bytes
HTTP header> Cache-Control: private, no-cache, no-store, proxy-revalidate, no-transform
HTTP header> Content-Length: 277
HTTP header> Content-Type: text/html
HTTP header> Date: Tue, 22 Jan 2019 00:40:02 GMT
HTTP header> Etag: "575e1f80-115"
HTTP header> Last-Modified: Mon, 13 Jun 2016 02:50:40 GMT
HTTP header> Pragma: no-cache
HTTP header> Server: bfe/1.0.8.18   
           

接下來,本文就簡單分析一下該示例代碼的執行流程。

asyncio子產品示例代碼執行原理分析

首先檢視執行個體代碼的asyncio.run()函數的基本内容:

def run(main, *, debug=False):
    """Run a coroutine.

    This function runs the passed coroutine, taking care of
    managing the asyncio event loop and finalizing asynchronous
    generators.

    This function cannot be called when another asyncio event loop is
    running in the same thread.

    If debug is True, the event loop will be run in debug mode.

    This function always creates a new event loop and closes it at the end.
    It should be used as a main entry point for asyncio programs, and should
    ideally only be called once.

    Example:

        async def main():
            await asyncio.sleep(1)
            print('hello')

        asyncio.run(main())
    """
    if events._get_running_loop() is not None:                                  # 檢查目前線程是否有loop執行個體,如果不為空則報錯
        raise RuntimeError(
            "asyncio.run() cannot be called from a running event loop")         # 如果擷取不到則報錯

    if not coroutines.iscoroutine(main):                                        # 檢查是否是協程
        raise ValueError("a coroutine was expected, got {!r}".format(main))     # 如果不是則報錯

    loop = events.new_event_loop()                                              # 擷取loop循環執行個體,該執行個體就是IO複用的循環處理執行個體
    try:
        events.set_event_loop(loop)                                             # 設定loop
        loop.set_debug(debug)                                                   # 設定loop是否為調試模式
        return loop.run_until_complete(main)                                    # 調用run_until_complete方法直到傳入的main函數執行完畢
    finally:
        try:
            _cancel_all_tasks(loop)
            loop.run_until_complete(loop.shutdown_asyncgens())
        finally:
            events.set_event_loop(None)
            loop.close()
           

該函數主要是做了一些檢查,擷取loop的執行個體,然後調用執行個體的run_until_complete方法,我們檢視一下new_event_loop()函數的内容:

def new_event_loop():
    """Equivalent to calling get_event_loop_policy().new_event_loop()."""
    return get_event_loop_policy().new_event_loop()      # 調用loop的new_event_loop函數


def _init_event_loop_policy():
    global _event_loop_policy                           # 全局變量 _event_loop_policy
    with _lock:                                         # 加鎖
        if _event_loop_policy is None:  # pragma: no branch
            from . import DefaultEventLoopPolicy
            _event_loop_policy = DefaultEventLoopPolicy()   # 使用預設的DefaultEventLoopPolicy初始化并擷取執行個體


def get_event_loop_policy():
    """Get the current event loop policy."""
    if _event_loop_policy is None:                      # 判斷全局變量是否為空
        _init_event_loop_policy()                       # 為空則初始化
    return _event_loop_policy                           # 傳回初始化執行個體
           

此時可知,loop就是DefaultEventLoopPolicy類執行個體調用new_event_loop方法傳回的執行個體,在Linux系統上DefaultEventLoopPolicy就是_UnixDefaultEventLoopPolicy,繼續檢視該類的代碼:

class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
    """UNIX event loop policy with a watcher for child processes."""
    _loop_factory = _UnixSelectorEventLoop                                          # loop執行個體化的類

    ...

    def set_event_loop(self, loop):
        """Set the event loop.

        As a side effect, if a child watcher was set before, then calling
        .set_event_loop() from the main thread will call .attach_loop(loop) on
        the child watcher.
        """

        super().set_event_loop(loop)

        if (self._watcher is not None and
                isinstance(threading.current_thread(), threading._MainThread)):
            self._watcher.attach_loop(loop)

    ...


class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
    """Default policy implementation for accessing the event loop.

    In this policy, each thread has its own event loop.  However, we
    only automatically create an event loop by default for the main
    thread; other threads by default have no event loop.

    Other policies may have different rules (e.g. a single global
    event loop, or automatically creating an event loop per thread, or
    using some other notion of context to which an event loop is
    associated).
    """

    _loop_factory = None

    class _Local(threading.local):
        _loop = None
        _set_called = False

    def __init__(self):
        self._local = self._Local()                                                 # 線程安全儲存

    def get_event_loop(self):                                                       # 擷取loop
        """Get the event loop.

        This may be None or an instance of EventLoop.
        """
        if (self._local._loop is None and
                not self._local._set_called and
                isinstance(threading.current_thread(), threading._MainThread)):     # 判斷loop是否為空
            self.set_event_loop(self.new_event_loop())                              # 如果為空則設定一個新的loop執行個體

        if self._local._loop is None:                                               # 如果設定完成後仍然為空則報錯
            raise RuntimeError('There is no current event loop in thread %r.'
                               % threading.current_thread().name)

        return self._local._loop                                                    # 傳回loop執行個體

    def set_event_loop(self, loop):
        """Set the event loop."""
        self._local._set_called = True                                              # 設定被設定标志為True
        assert loop is None or isinstance(loop, AbstractEventLoop)                  # 判斷loop執行個體是否為AbstractEventLoop子類
        self._local._loop = loop                                                    # 設定loop類執行個體

    def new_event_loop(self):
        """Create a new event loop.

        You must call set_event_loop() to make this the current event
        loop.
        """
        return self._loop_factory()                                                 # 執行個體化loop類執行個體
           

此時loop就是_UnixSelectorEventLoop類的執行個體,繼續檢視該類的run_until_complete方法:

_UnixSelectorEventLoop繼承自

selector_events.BaseSelectorEventLoop繼承自

base_events.BaseEventLoop繼承自

events.AbstractEventLoop
           

此時調用的就是BaseEventLoop的run_until_complete方法;

def run_until_complete(self, future):
    """Run until the Future is done.

    If the argument is a coroutine, it is wrapped in a Task.

    WARNING: It would be disastrous to call run_until_complete()
    with the same coroutine twice -- it would wrap it in two
    different Tasks and that can't be good.

    Return the Future's result, or raise its exception.
    """
    self._check_closed()                                                        # 檢查loop是否是關閉狀态

    new_task = not futures.isfuture(future)                                     # 檢查是否是future類
    future = tasks.ensure_future(future, loop=self)                             # 生成Task任務執行個體
    if new_task:                                                                # 是否是task
        # An exception is raised if the future didn't complete, so there
        # is no need to log the "destroy pending task" message
        future._log_destroy_pending = False                             

    future.add_done_callback(_run_until_complete_cb)                            # 添加完成回調方法
    try:
        self.run_forever()                                                      # 運作
    except:
        if new_task and future.done() and not future.cancelled():               # 報錯則檢查任務是否完成 是否取消
            # The coroutine raised a BaseException. Consume the exception
            # to not log a warning, the caller doesn't have access to the
            # local task.
            future.exception()                                                  # 報錯
        raise
    finally:
        future.remove_done_callback(_run_until_complete_cb)                     # 移除回調方法
    if not future.done():                                                       # 如果任務沒有完成則報錯
        raise RuntimeError('Event loop stopped before Future completed.')

    return future.result()                                                      # 傳回future 的執行結果
           

首先檢視tasks.ensure_future方法來檢視生成Task的方法;

def ensure_future(coro_or_future, *, loop=None):
    """Wrap a coroutine or an awaitable in a future.

    If the argument is a Future, it is returned directly.
    """
    if coroutines.iscoroutine(coro_or_future):                                      # 判斷是否是協程 在本例中是協程
        if loop is None:                                                            # 檢查loop是否為空
            loop = events.get_event_loop()                                          # 為空則建立一個新的loop執行個體
        task = loop.create_task(coro_or_future)                                     # 初始化一個task
        if task._source_traceback:
            del task._source_traceback[-1]
        return task                                                                 # 傳回task執行個體
    elif futures.isfuture(coro_or_future):                                          # 是否是future
        if loop is not None and loop is not futures._get_loop(coro_or_future):
            raise ValueError('loop argument must agree with Future')
        return coro_or_future
    elif inspect.isawaitable(coro_or_future):                                       # 是否是await類
        return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)            # 如果是則通過包裝後繼續生成一個task執行個體
    else:
        raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
                        'required')                                                 # 都不是則報錯
           

在本例中main是否一個coroutine則直接調用了loop.create_task(coro_or_future)來生成一個task執行個體,此時就調用了base_events.BaseEventLoop的create_task方法;

def create_task(self, coro):
    """Schedule a coroutine object.

    Return a task object.
    """
    self._check_closed()                                # 檢查是否關閉
    if self._task_factory is None:                      # 如果_task_factory為空則使用tasks.Task類初始化生成一個該類執行個體
        task = tasks.Task(coro, loop=self)              # 生成task執行個體
        if task._source_traceback:
            del task._source_traceback[-1]
    else:
        task = self._task_factory(self, coro)           # 如果配置則使用配置的_task_factory
    return task                                         # 傳回task執行個體
           

此時我們分析一下tasks.Task的初始化過程;

class Task(futures._PyFuture):  # Inherit Python Task implementation
                                # from a Python Future implementation.

    """A coroutine wrapped in a Future."""

    # An important invariant maintained while a Task not done:
    #
    # - Either _fut_waiter is None, and _step() is scheduled;
    # - or _fut_waiter is some Future, and _step() is *not* scheduled.
    #
    # The only transition from the latter to the former is through
    # _wakeup().  When _fut_waiter is not None, one of its callbacks
    # must be _wakeup().

    # If False, don't log a message if the task is destroyed whereas its
    # status is still pending
    _log_destroy_pending = True
    ...


    def __init__(self, coro, *, loop=None):
        super().__init__(loop=loop)
        if self._source_traceback:
            del self._source_traceback[-1]
        if not coroutines.iscoroutine(coro):
            # raise after Future.__init__(), attrs are required for __del__
            # prevent logging for pending task in __del__
            self._log_destroy_pending = False
            raise TypeError(f"a coroutine was expected, got {coro!r}")

        self._must_cancel = False
        self._fut_waiter = None
        self._coro = coro
        self._context = contextvars.copy_context()                              # 擷取執行上下文

        self._loop.call_soon(self.__step, context=self._context)                # 調用loop的call_soon方法将__step方法傳入執行
        _register_task(self)

    ...

    def __step(self, exc=None):
        if self.done():                                                         # 檢查是否已經完成
            raise futures.InvalidStateError(
                f'_step(): already done: {self!r}, {exc!r}')
        if self._must_cancel:
            if not isinstance(exc, futures.CancelledError):
                exc = futures.CancelledError()
            self._must_cancel = False
        coro = self._coro                                                       # 擷取協程
        self._fut_waiter = None 

        _enter_task(self._loop, self)
        # Call either coro.throw(exc) or coro.send(None).
        try:
            if exc is None:
                # We use the `send` method directly, because coroutines
                # don't have `__iter__` and `__next__` methods.
                result = coro.send(None)                                        # 通過協程調用send去執行,result就是傳回的future或者報錯的值
            else:
                result = coro.throw(exc)
        except StopIteration as exc:
            if self._must_cancel:
                # Task is cancelled right before coro stops.
                self._must_cancel = False
                super().set_exception(futures.CancelledError())
            else:
                super().set_result(exc.value)                                   # 如果StopIteration,則傳回函數最後的傳回值
        except futures.CancelledError:
            super().cancel()  # I.e., Future.cancel(self).
        except Exception as exc:
            super().set_exception(exc)                                          # 報錯則抛出錯誤
        except BaseException as exc:
            super().set_exception(exc)
            raise
        else:
            blocking = getattr(result, '_asyncio_future_blocking', None)        # 如果成功則擷取result的_asyncio_future_blocking屬性
            if blocking is not None:                                            # 如果不為空
                # Yielded Future must come from Future.__iter__().
                if futures._get_loop(result) is not self._loop:                 # 檢查是否是同一個loop 不是則報錯
                    new_exc = RuntimeError(
                        f'Task {self!r} got Future '
                        f'{result!r} attached to a different loop')
                    self._loop.call_soon(
                        self.__step, new_exc, context=self._context)
                elif blocking:                                                  # 如果為True
                    if result is self:                                          # 如果結果為自己則報錯
                        new_exc = RuntimeError(
                            f'Task cannot await on itself: {self!r}')
                        self._loop.call_soon(
                            self.__step, new_exc, context=self._context)
                    else:
                        result._asyncio_future_blocking = False                 # 設定為False
                        result.add_done_callback(
                            self.__wakeup, context=self._context)               # 添加self.__wakeup到回調函數清單中
                        self._fut_waiter = result                               # 設定_fut_waiter為result
                        if self._must_cancel:                                   # 檢查是否需要取消
                            if self._fut_waiter.cancel():
                                self._must_cancel = False
                else:
                    new_exc = RuntimeError(
                        f'yield was used instead of yield from '
                        f'in task {self!r} with {result!r}')
                    self._loop.call_soon(
                        self.__step, new_exc, context=self._context)            # 其他情況都報錯處理

            elif result is None:                                                # 如果result為空
                # Bare yield relinquishes control for one event loop iteration.
                self._loop.call_soon(self.__step, context=self._context)        # 繼續調用__step
            elif inspect.isgenerator(result):                                   # 檢查是否是生成器
                # Yielding a generator is just wrong.
                new_exc = RuntimeError(
                    f'yield was used instead of yield from for '
                    f'generator in task {self!r} with {result!r}')
                self._loop.call_soon(
                    self.__step, new_exc, context=self._context)                # 是生成器則報錯
            else:
                # Yielding something else is an error.
                new_exc = RuntimeError(f'Task got bad yield: {result!r}')
                self._loop.call_soon(
                    self.__step, new_exc, context=self._context)                # 否則就抛錯誤的Task
        finally:
            _leave_task(self._loop, self)                                       # 彈出該task
            self = None  # Needed to break cycles when an exception occurs.

    def __wakeup(self, future):
        try:
            future.result()                                                     # 先擷取future的值
        except Exception as exc:
            # This may also be a cancellation.
            self.__step(exc)                                                    # 如果報錯則直接報錯處理
        else:
            # Don't pass the value of `future.result()` explicitly,
            # as `Future.__iter__` and `Future.__await__` don't need it.
            # If we call `_step(value, None)` instead of `_step()`,
            # Python eval loop would use `.send(value)` method call,
            # instead of `__next__()`, which is slower for futures
            # that return non-generator iterators from their `__iter__`.
            self.__step()                                                       # 繼續調用step執行
        self = None  # Needed to break cycles when an exception occurs.


調用了base_events.BaseEventLoop類的call_soon方法


    def call_soon(self, callback, *args, context=None):
        """Arrange for a callback to be called as soon as possible.

        This operates as a FIFO queue: callbacks are called in the
        order in which they are registered.  Each callback will be
        called exactly once.

        Any positional arguments after the callback will be passed to
        the callback when it is called.
        """
        self._check_closed()                                                # 檢查是否關閉
        if self._debug:                                                     # 是否是調試模式
            self._check_thread()
            self._check_callback(callback, 'call_soon')
        handle = self._call_soon(callback, args, context)                   # 調用_call_soon方法,包裝傳入的方法
        if handle._source_traceback:
            del handle._source_traceback[-1]
        return handle                                                       # 傳回


    def _call_soon(self, callback, args, context):
        handle = events.Handle(callback, args, self, context)               # 執行個體化一個handler類
        if handle._source_traceback:
            del handle._source_traceback[-1]
        self._ready.append(handle)                                          # 添加到_ready隊列中
        return handle                                                       # 傳回handle
           

其中首先初始化了一個Task執行個體,在初始化的過程中就調用了loop的call_soon方法,該方法就先執行了傳入的main()函數,先給該函數send值None此時就開始執行了,此時就執行到了asyncio.open_connection處,此時分析該代碼;

async def open_connection(host=None, port=None, *,
                          loop=None, limit=_DEFAULT_LIMIT, **kwds):
    """A wrapper for create_connection() returning a (reader, writer) pair.

    The reader returned is a StreamReader instance; the writer is a
    StreamWriter instance.

    The arguments are all the usual arguments to create_connection()
    except protocol_factory; most common are positional host and port,
    with various optional keyword arguments following.

    Additional optional keyword arguments are loop (to set the event loop
    instance to use) and limit (to set the buffer limit passed to the
    StreamReader).

    (If you want to customize the StreamReader and/or
    StreamReaderProtocol classes, just copy the code -- there's
    really nothing special here except some convenience.)
    """
    if loop is None:                                                    # 判斷是否為空
        loop = events.get_event_loop()                                  # 為空則擷取loop
    reader = StreamReader(limit=limit, loop=loop)                       # 初始化一個讀執行個體
    protocol = StreamReaderProtocol(reader, loop=loop)                  # 初始化一個讀執行個體
    transport, _ = await loop.create_connection(
        lambda: protocol, host, port, **kwds)                           # 建立連接配接
    writer = StreamWriter(transport, protocol, reader, loop)            # 初始化一個寫執行個體
    return reader, writer                                               # 傳回讀和寫
           

此時首先初始化了reader和protocol然後進入到loop.create_connections來建立連接配接,

async def create_connection(
        self, protocol_factory, host=None, port=None,
        *, ssl=None, family=0,
        proto=0, flags=0, sock=None,
        local_addr=None, server_hostname=None,
        ssl_handshake_timeout=None):
    """Connect to a TCP server.

    Create a streaming transport connection to a given Internet host and
    port: socket family AF_INET or socket.AF_INET6 depending on host (or
    family if specified), socket type SOCK_STREAM. protocol_factory must be
    a callable returning a protocol instance.

    This method is a coroutine which will try to establish the connection
    in the background.  When successful, the coroutine returns a
    (transport, protocol) pair.
    """
    if server_hostname is not None and not ssl:                                 # 檢查連接配接是否是ssl
        raise ValueError('server_hostname is only meaningful with ssl')

    if server_hostname is None and ssl:                                         # 檢查是否傳入host
        # Use host as default for server_hostname.  It is an error
        # if host is empty or not set, e.g. when an
        # already-connected socket was passed or when only a port
        # is given.  To avoid this error, you can pass
        # server_hostname='' -- this will bypass the hostname
        # check.  (This also means that if host is a numeric
        # IP/IPv6 address, we will attempt to verify that exact
        # address; this will probably fail, but it is possible to
        # create a certificate for a specific IP address, so we
        # don't judge it here.)
        if not host:
            raise ValueError('You must set server_hostname '
                             'when using ssl without a host')
        server_hostname = host

    if ssl_handshake_timeout is not None and not ssl:                           # 檢查是否在ssl時傳入time_out
        raise ValueError(
            'ssl_handshake_timeout is only meaningful with ssl')

    if host is not None or port is not None:                                    # 如果Host不為空或者port不為空
        if sock is not None:                                                    
            raise ValueError(
                'host/port and sock can not be specified at the same time')

        infos = await self._ensure_resolved(
            (host, port), family=family,
            type=socket.SOCK_STREAM, proto=proto, flags=flags, loop=self)       # 解析Host資訊
        if not infos:
            raise OSError('getaddrinfo() returned empty list')

        if local_addr is not None:
            laddr_infos = await self._ensure_resolved(
                local_addr, family=family,
                type=socket.SOCK_STREAM, proto=proto,
                flags=flags, loop=self)
            if not laddr_infos:
                raise OSError('getaddrinfo() returned empty list')

        exceptions = []
        for family, type, proto, cname, address in infos:
            try:
                sock = socket.socket(family=family, type=type, proto=proto)      # 嘗試連接配接
                sock.setblocking(False)                                          # 設定連接配接為非阻塞的
                if local_addr is not None:
                    for _, _, _, _, laddr in laddr_infos:
                        try:
                            sock.bind(laddr)                                     # 監聽端口
                            break
                        except OSError as exc:
                            msg = (
                                f'error while attempting to bind on '
                                f'address {laddr!r}: '
                                f'{exc.strerror.lower()}'
                            )
                            exc = OSError(exc.errno, msg)
                            exceptions.append(exc)
                    else:
                        sock.close()                                             # 關閉端口
                        sock = None
                        continue
                if self._debug:
                    logger.debug("connect %r to %r", sock, address)
                await self.sock_connect(sock, address)                           # 擷取連接配接
            except OSError as exc:
                if sock is not None:
                    sock.close()
                exceptions.append(exc)
            except:
                if sock is not None:
                    sock.close()
                raise
            else:
                break
        else:
            if len(exceptions) == 1:
                raise exceptions[0]
            else:
                # If they all have the same str(), raise one.
                model = str(exceptions[0])
                if all(str(exc) == model for exc in exceptions):
                    raise exceptions[0]
                # Raise a combined exception so the user can see all
                # the various error messages.
                raise OSError('Multiple exceptions: {}'.format(
                    ', '.join(str(exc) for exc in exceptions)))

    else:
        if sock is None:
            raise ValueError(
                'host and port was not specified and no sock specified')
        if sock.type != socket.SOCK_STREAM:
            # We allow AF_INET, AF_INET6, AF_UNIX as long as they
            # are SOCK_STREAM.
            # We support passing AF_UNIX sockets even though we have
            # a dedicated API for that: create_unix_connection.
            # Disallowing AF_UNIX in this method, breaks backwards
            # compatibility.
            raise ValueError(
                f'A Stream Socket was expected, got {sock!r}')

    transport, protocol = await self._create_connection_transport(
        sock, protocol_factory, ssl, server_hostname,
        ssl_handshake_timeout=ssl_handshake_timeout)                                # 建立連接配接
    if self._debug:
        # Get the socket from the transport because SSL transport closes
        # the old socket and creates a new SSL socket
        sock = transport.get_extra_info('socket')
        logger.debug("%r connected to %s:%r: (%r, %r)",
                     sock, host, port, transport, protocol)
    return transport, protocol                                                      # 傳回連接配接
           

此時繼續分析,self.sock_connect函數相關内容;

async def sock_connect(self, sock, address):
    """Connect to a remote socket at address.

    This method is a coroutine.
    """
    if self._debug and sock.gettimeout() != 0:                                  # 檢查是否在調試模式下是阻塞連接配接
        raise ValueError("the socket must be non-blocking")

    if not hasattr(socket, 'AF_UNIX') or sock.family != socket.AF_UNIX:
        resolved = await self._ensure_resolved(
            address, family=sock.family, proto=sock.proto, loop=self)
        _, _, _, _, address = resolved[0]                                       # 解析位址

    fut = self.create_future()                                                  # 建立future
    self._sock_connect(fut, sock, address)                                      # 調用連接配接
    return await fut                                                            # 傳回該fut

def _sock_connect(self, fut, sock, address):
    fd = sock.fileno()                                                          # 擷取檔案描述符
    try:
        sock.connect(address)                                                   # 連接配接該位址
    except (BlockingIOError, InterruptedError):
        # Issue #23618: When the C function connect() fails with EINTR, the
        # connection runs in background. We have to wait until the socket
        # becomes writable to be notified when the connection succeed or
        # fails.
        fut.add_done_callback(
            functools.partial(self._sock_connect_done, fd))                     #  如果報錯則添加_sock_connect_done到執行完成的會回調函數清單中
        self.add_writer(fd, self._sock_connect_cb, fut, sock, address)          # 添加寫事件到loop中注冊回調方法_sock_connect_cb
    except Exception as exc:
        fut.set_exception(exc)                                                  # 如果報錯則直接報錯
    else:
        fut.set_result(None)                                                    # 如果此時連接配接成功則直接讓task進行下一步

def _sock_connect_done(self, fd, fut):
    self.remove_writer(fd)                                                      # 當連接配接完成後從監聽清單中删除該檔案描述符

def _sock_connect_cb(self, fut, sock, address):
    if fut.cancelled():                                                         # 檢查是否已經取消
        return

    try:
        err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)               # 擷取連接配接資訊
        if err != 0:
            # Jump to any except clause below.
            raise OSError(err, f'Connect call failed {address}')
    except (BlockingIOError, InterruptedError):
        # socket is still registered, the callback will be retried later        # 如果目前還不能連接配接則繼續不調用fut去執行下一步
        pass
    except Exception as exc:
        fut.set_exception(exc)                                                  # 如果其他異常
    else:
        fut.set_result(None)                                                    # 連接配接成功則執行下一步,讓task繼續執行
           

如果此時連接配接失敗則會注冊一個可寫事件到循環中,此時就檢視send之後的操作,此時繼續傳回run_until_complete函數,此時就執行到self.run_forever()處,此時的邏輯代碼執行過程如下;

def run_forever(self):
        """Run until stop() is called."""
        self._check_closed()                                                    # 檢查是否關閉
        if self.is_running():                                                   # 檢查是否已經在運作
            raise RuntimeError('This event loop is already running')
        if events._get_running_loop() is not None:                              # 擷取Loop
            raise RuntimeError(
                'Cannot run the event loop while another loop is running')
        self._set_coroutine_origin_tracking(self._debug)                        # 設定是否為調試模式
        self._thread_id = threading.get_ident()                                 # 擷取目前線程id

        old_agen_hooks = sys.get_asyncgen_hooks()
        sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
                               finalizer=self._asyncgen_finalizer_hook)
        try:
            events._set_running_loop(self)                                      # 設定目前運作的loop
            while True:
                self._run_once()                                                # 運作
                if self._stopping:
                    break
        finally:
            self._stopping = False
            self._thread_id = None
            events._set_running_loop(None)
            self._set_coroutine_origin_tracking(False)
            sys.set_asyncgen_hooks(*old_agen_hooks)



    def _run_once(self):
        """Run one full iteration of the event loop.

        This calls all currently ready callbacks, polls for I/O,
        schedules the resulting callbacks, and finally schedules
        'call_later' callbacks.
        """

        sched_count = len(self._scheduled)                                          # 擷取定時器相關的跳讀
        if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
            self._timer_cancelled_count / sched_count >
                _MIN_CANCELLED_TIMER_HANDLES_FRACTION):                             # 檢查待執行的定時任務并重新移動到一個排程清單中
            # Remove delayed calls that were cancelled if their number
            # is too high
            new_scheduled = []
            for handle in self._scheduled:
                if handle._cancelled:
                    handle._scheduled = False
                else:
                    new_scheduled.append(handle)

            heapq.heapify(new_scheduled)
            self._scheduled = new_scheduled
            self._timer_cancelled_count = 0
        else:
            # Remove delayed calls that were cancelled from head of queue.
            while self._scheduled and self._scheduled[0]._cancelled:                # 移除已經逾時的任務
                self._timer_cancelled_count -= 1
                handle = heapq.heappop(self._scheduled)
                handle._scheduled = False

        timeout = None
        if self._ready or self._stopping:                                           # 是否有已經準備的任務
            timeout = 0                                                             # 如果有則設定為0
        elif self._scheduled:                                                       # 是否有可排程的任務
            # Compute the desired timeout.
            when = self._scheduled[0]._when                                         # 擷取排程任務的時間
            timeout = max(0, when - self.time())                                    # 比較timeout與下一次需要執行的任務的時間

        if self._debug and timeout != 0:                                            # 是否為調試模式并且timeout不為0
            t0 = self.time()
            event_list = self._selector.select(timeout)
            dt = self.time() - t0
            if dt >= 1.0:
                level = logging.INFO
            else:
                level = logging.DEBUG
            nevent = len(event_list)
            if timeout is None:
                logger.log(level, 'poll took %.3f ms: %s events',
                           dt * 1e3, nevent)
            elif nevent:
                logger.log(level,
                           'poll %.3f ms took %.3f ms: %s events',
                           timeout * 1e3, dt * 1e3, nevent)
            elif dt >= 1.0:
                logger.log(level,
                           'poll %.3f ms took %.3f ms: timeout',
                           timeout * 1e3, dt * 1e3)
        else:
            event_list = self._selector.select(timeout)                             # 調用IO複用
        self._process_events(event_list)                                            # 處理讀寫事件

        # Handle 'later' callbacks that are ready.
        end_time = self.time() + self._clock_resolution                             # 擷取目前時間
        while self._scheduled:                                                      # 周遊待執行任務清單
            handle = self._scheduled[0]                                             # 擷取定時任務
            if handle._when >= end_time:                                            # 如果時間還未到則停止
                break
            handle = heapq.heappop(self._scheduled)                                 # 如果時間到了
            handle._scheduled = False
            self._ready.append(handle)                                              # 添加到_ready隊列中

        # This is the only place where callbacks are actually *called*.
        # All other places just add them to ready.
        # Note: We run all currently scheduled callbacks, but not any
        # callbacks scheduled by callbacks run this time around --
        # they will be run the next time (after another I/O poll).
        # Use an idiom that is thread-safe without using locks.
        ntodo = len(self._ready)                                                    # 擷取隊列長度
        for i in range(ntodo):
            handle = self._ready.popleft()                                          # 彈出handle
            if handle._cancelled:                                                   # 如果handle取消則循環下一個
                continue
            if self._debug:                                                         # 是否是調試模式
                try:
                    self._current_handle = handle
                    t0 = self.time()
                    handle._run()
                    dt = self.time() - t0
                    if dt >= self.slow_callback_duration:
                        logger.warning('Executing %s took %.3f seconds',
                                       _format_handle(handle), dt)
                finally:
                    self._current_handle = None
            else:
                handle._run()                                                       # 調用回調函數進行處理
        handle = None  # Needed to break cycles when an exception occurs.


此時調用的是BaseSelectorEventLoop的_process_events方法
    def _process_events(self, event_list):
        for key, mask in event_list:                                                # 周遊任務清單
            fileobj, (reader, writer) = key.fileobj, key.data                       # 擷取值
            if mask & selectors.EVENT_READ and reader is not None:                  # 如果可讀
                if reader._cancelled:                                               # 如果讀取消了
                    self._remove_reader(fileobj)                                    # 移除該檔案監聽
                else:
                    self._add_callback(reader)                                      # 否則就添加到回調函數清單中
            if mask & selectors.EVENT_WRITE and writer is not None:
                if writer._cancelled:
                    self._remove_writer(fileobj)
                else:
                    self._add_callback(writer)


此時調用了BaseEventLoop的_add_callback方法

    def _add_callback(self, handle):
        """Add a Handle to _scheduled (TimerHandle) or _ready."""
        assert isinstance(handle, events.Handle), 'A Handle is required here'
        if handle._cancelled:                                                        # 如果任務取消則傳回
            return
        assert not isinstance(handle, events.TimerHandle)
        self._ready.append(handle)                                                   # 添加到準備好清單中
           

此時由于已經注冊了寫事件到清單中,此時就隻需等待IO複用的事件通知,通知完成後則調用_process_events函數處理事件,然後通過_add_callback将喚醒的時間添加到_ready清單中去,待此時可以連接配接之後就執行了fut.set_result(None) ,此時檢視Future類的定義;

def set_result(self, result):
    """Mark the future done and set its result.

    If the future is already done when this method is called, raises
    InvalidStateError.
    """
    if self._state != _PENDING:                                             # 檢查狀态是否還是未執行
        raise InvalidStateError('{}: {!r}'.format(self._state, self))
    self._result = result                                                   # 設定傳回結果
    self._state = _FINISHED                                                 # 設定狀态為完成
    self.__schedule_callbacks()                                             # 調用回調方法


def __schedule_callbacks(self):
    """Internal: Ask the event loop to call all callbacks.

    The callbacks are scheduled to be called as soon as possible. Also
    clears the callback list.
    """
    callbacks = self._callbacks[:]                                          # 擷取回調方法
    if not callbacks:
        return

    self._callbacks[:] = []
    for callback, ctx in callbacks:                                         # 依次周遊添加到可執行清單中
        self._loop.call_soon(callback, self, context=ctx)


def result(self):
    """Return the result this future represents.

    If the future has been cancelled, raises CancelledError.  If the
    future's result isn't yet available, raises InvalidStateError.  If
    the future is done and has an exception set, this exception is raised.
    """
    if self._state == _CANCELLED:
        raise CancelledError
    if self._state != _FINISHED:
        raise InvalidStateError('Result is not ready.')
    self.__log_traceback = False
    if self._exception is not None:
        raise self._exception
    return self._result                                                     # 擷取傳回值


def __await__(self):
    if not self.done():                                                     # 檢查是否完成
        self._asyncio_future_blocking = True                                # 設定标志位true
        yield self  # This tells Task to wait for completion.
    if not self.done():                                                     # 如果未完成則報錯
        raise RuntimeError("await wasn't used with future")
    return self.result()  # May raise too.                                  # 最後傳回fut設定的result

__iter__ = __await__  # make compatible with 'yield from'.
           

由于在Task的初始化過程中将__step傳入回調函數中,在send值None後獲得的fut中也添加了__wakeup函數作為回調函數,此時在傳入call_soon後就直接調用了__wakeup函數,然後又繼續send推進到下一步執行,進而達到了在IO複用的注冊的回調函數中,通過調用fut的set_result方法繼續推進Task的協程繼續向下執行,這也就是Python支援的異步程式設計的主要的思路。後續的讀和寫的方法,同理一樣的執行流程,限于本文長度就不再本文中繼續展開分析。

總結

Python提供的asyncio子產品,是更高層的IO複用和協程的封裝,其本質也是使用了yield關鍵字作為協程的執行和流程推進方式,進而大大的友善了使用者去編寫異步程式,該子產品的基本原理都是基于此來實作,其他提供的IO的操作都是基于此原理進行擴充編寫完成,大家有興趣可自行去檢視源碼學習。鑒于本人才疏學淺,如有疏漏請批評指正。