天天看点

初识 asyncio异步爬虫

# coding=utf-8
import asyncio
import functools
import logging
import time
from concurrent.futures import ThreadPoolExecutor
from aiohttp import ClientSession

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [*] %(processName)s %(threadName)s %(message)s"
)


async def hello (url):
    try:
        async with ClientSession() as session:
            async with session.get(url) as response:
                response = await response.read()
                # logging.info(response[-20:])
                return response[-20:]
    except asyncio.CancelledError:
        raise


def test ():
    time.sleep(4)  # 模仿阻塞
    return "Hello World!"


async def fetch_async (event_loop_, executor_, func_):
    result = await event_loop_.run_in_executor(executor_, func_)
    return result


async def main (event_loop_):
    now = time.time()
    # -----------------------------------------------------------
    # 第一种:线程池的方式-----------------------
    # executor = ThreadPoolExecutor(3)
    # tasks = [
    #     fetch_async(event_loop_, executor, test),
    #     fetch_async(event_loop_, executor, test)
    # ]
    # 第二种:将一个obj包装成Future-------------
    # tasks = []
    # for num in range(10):
    #     task = asyncio.ensure_future(hello(num))
    #     tasks.append(task)
    # 第三种:多个链接的方式--------------------
    tasks = []
    urls = [
        "https://docs.python.org/3/library/asyncio-task.html",
        "https://www.cnblogs.com/yzh2857/p/10376598.html",
        "https://www.cnblogs.com/yzh2857/p/10390808.html",
        "https://matplotlib.org/api/pyplot_api.html",
        "https://www.programcreek.com/python/index/module/list",
        "https://selenium-python.readthedocs.io/api.html",
        "https://www.w3resource.com/python/python-tutorial.php",
        "https://docs.scipy.org/doc/numpy/reference/generated/numpy.ones.html",
        "https://www.w3resource.com/python/module/calendar/",
    ]
    for url in urls:
        task = asyncio.ensure_future(hello(url))
        tasks.append(task)
    # 第一种运行方式---------------------------
    dones, pendings = await asyncio.wait(tasks)
    # 已完成的协程
    for done in dones:
        print("已完成的协程:", done)
    # 超时未完成的协程
    for pending in pendings:
        print("超时未完成的协程  :", pending)
        pending.cancel()
    # 第二种运行方式---------------------------
    # result = await asyncio.gather(*tasks)
    # print("结果:", result)
    # 第三种运行方式---------------------------
    # for task in asyncio.as_completed(tasks):
    #     result = await task
    #     print("结果:", result)
    # ------------------------------------------------------------
    print("总用时", time.time() - now)


if __name__ == "__main__":
    event_loop = asyncio.get_event_loop()
    # ------------------------------------------------
    try:
        # 用这个协程启动循环,协程返回时这个方法将停止循环。
        event_loop.run_until_complete(main(event_loop))
    except KeyboardInterrupt:
        for task in asyncio.Task.all_tasks():
            print(task.cancel())
        event_loop.stop()
    finally:
        event_loop.close()
           

执行结果如下:

已完成的协程: <Task finished coro=<hello() done, defined at F:/python/3.py:15> result=b'pt>\n</body>\n</html>\n'>
已完成的协程: <Task finished coro=<hello() done, defined at F:/python/3.py:15> result=b'v>\n  </body>\n</html>'>
已完成的协程: <Task finished coro=<hello() done, defined at F:/python/3.py:15> result=b'div>\n</body></html>\n'>
已完成的协程: <Task finished coro=<hello() done, defined at F:/python/3.py:15> result=b'>\n\n</footer>\n</html>'>
已完成的协程: <Task finished coro=<hello() done, defined at F:/python/3.py:15> result=b'>\n\n  </body>\n</html>'>
已完成的协程: <Task finished coro=<hello() done, defined at F:/python/3.py:15> result=b'iv>\n</body>\n</html>\n'>
已完成的协程: <Task finished coro=<hello() done, defined at F:/python/3.py:15> result=b'pt>\n</body>\n</html>\n'>
已完成的协程: <Task finished coro=<hello() done, defined at F:/python/3.py:15> result=b'  \n  </body>\n</html>'>
已完成的协程: <Task finished coro=<hello() done, defined at F:/python/3.py:15> result=b'iv>\n</body>\n</html>\n'>
总用时 13.480730533599854