天天看點

aiohttp與asyncio實作并發爬蟲模式

近日需要實作一個站點的爬蟲,嘗試了下aiohtp結合asyncio來實作,也參考了網上相關資料。

第一回合 異步并發居然和同步一樣工作

代碼如下:

async def fetch_get(session, url):
    asyncio.sleep(random.randint(3,6))
    # print('get:', url)
    async with session.get(url) as response:
        return await response.text(encoding='utf-8')
        
async def result_get(session, url):
    pass
    
async def fetch_main():
    async with aiohttp.ClientSession() as session:
        shelf_text = await fetch_get(session, FBASE_URL)
        shelf_html = etree.HTML(shelf_text)
        urls = parse(shelf_html)
        for url in urls:
            await  result_get(session, url)
loop = asyncio.get_event_loop()
loop.run_until_complete(fetch_main())
           

運作後會發現還是一個個URL擷取内容,并沒有達到并發效果。

第二回合 添加完全部任務并發執行

async def fetch_get(session, url):
    asyncio.sleep(random.randint(3,6))
    # print('get:', url)
    async with session.get(url) as response:
        return await response.text(encoding='utf-8')
        
async def result_get(session, url):
    pass
    
async def fetch_main():
    async with aiohttp.ClientSession() as session:
        shelf_text = await fetch_get(session, FBASE_URL)
        shelf_html = etree.HTML(shelf_text)
        urls = parse(shelf_html)
        tasks = []
        for url in urls:
            task = asyncio.ensure_future(result_get(session, url))
            tasks.append(task)
        await asyncio.wait(tasks)
        
loop = asyncio.get_event_loop()
loop.run_until_complete(fetch_main())
           

這種模式下,如果urls中很多,就會連續不停添加異步任務。

第三回合 逐漸添加任務并發執行

sem = asyncio.Semaphore(30)

async def fetch_get(session, url):
    asyncio.sleep(random.randint(3,6))
    async with sem:
        async with session.get(url) as response:
            return await response.text(encoding='utf-8')
        
async def result_get(session, url):
    pass
    
async def fetch_main():
    async with sem:
        async with aiohttp.ClientSession() as session:
            shelf_text = await fetch_get(session, FBASE_URL)
            shelf_html = etree.HTML(shelf_text)
            urls = parse(shelf_html)
            tasks = []
            part_tasks = []
            for index,url in enumerate(urls):
                if index % 15 == 0:
                    asyncio.sleep(240)
                    part_tasks = []
                task = asyncio.ensure_future(result_get(session, url))
                tasks.append(task)
                part_tasks.append(task)
                if index % 15 == 0:
                    await asyncio.wait(part_tasks)
            await asyncio.wait(tasks)
        
loop = asyncio.get_event_loop()
loop.run_until_complete(fetch_main())
           

目前用的就是這種模式。設定為每15個url添加後會開始異步執行,并等待240秒後再開始,總并發連接配接數為30。

如果通過一個異步任務擷取URL放資料庫中,再通過另一個異步任務從資料庫中擷取URL來擷取結果。可以在擷取結果的異步任務中使用一個while循環,每次從資料庫中取出一定量的URL添加至異步執行的任務中,直到資料庫中全部URL執行完成為止。