天天看點

Python 徹底解讀協程與異步【看完包會】同步異步概念協程協程 2.7代碼實作協程 3.5

title: Python 協程與異步

copyright: true

top: 0

date: 2018-08-11 10:15:50

tags:

categories: Python進階筆記

permalink:

password:

keywords: 協程

description: Python2.7中用代碼實作協程,同時區分同步與異步,以及異步的表現形式,回調與協程。

像煙花也是過一生,像櫻花也是過一生,隻要亮過和盛開過不就好了嗎?

同步異步概念

當提到同步與異步,大家不免會想到另一組詞語:阻塞與非阻塞。通常,同時提到這個這幾個詞語一般實在讨論network io的時候,在《unix network programming》中有詳盡的解釋,網絡中也有許多講解生動的文章。

關于異步 同步的一些了解:

同步和異步的差別就在于是否等待IO執行的結果。好比你去麥當勞點餐,你說“來個漢堡”,服務員告訴你,對不起,漢堡要現做,需要等5分鐘,于是你站在收銀台前面等了5分鐘,拿到漢堡再去逛商場,這是同步IO。

你說“來個漢堡”,服務員告訴你,漢堡需要等5分鐘,你可以先去逛商場,等做好了,我們再通知你,這樣你可以立刻去幹别的事情(逛商場),這是異步IO。

老張愛喝茶,廢話不說,煮開水。出場人物:老張,水壺兩把(普通水壺,簡稱水壺;會響的水壺,簡稱響水壺)。1 老張把水壺放到火上,立等水開。(同步阻塞)老張覺得自己有點傻2 老張把水壺放到火上,去客廳看電視,時不時去廚房看看水開沒有。(同步非阻塞)老張還是覺得自己有點傻,于是變高端了,買了把會響笛的那種水壺。水開之後,能大聲發出嘀~~~~的噪音。3 老張把響水壺放到火上,立等水開。(異步阻塞)老張覺得這樣傻等意義不大4 老張把響水壺放到火上,去客廳看電視,水壺響之前不再去看它了,響了再去拿壺。(異步非阻塞)老張覺得自己聰明了。所謂同步異步,隻是對于水壺而言。普通水壺,同步;響水壺,異步。雖然都能幹活,但響水壺可以在自己完工之後,提示老張水開了。這是普通水壺所不能及的。同步隻能讓調用者去輪詢自己(情況2中),造成老張效率的低下。所謂阻塞非阻塞,僅僅對于老張而言。立等的老張,阻塞;看電視的老張,非阻塞。情況1和情況3中老張就是阻塞的,媳婦喊他都不知道。雖然3中響水壺是異步的,可對于立等的老張沒有太大的意義。是以一般異步是配合非阻塞使用的,這樣才能發揮異步的效用。

并發通常指有多個任務需要同時進行,并行則是同一時刻有多個任務執行。用多線程、多程序、協程來說,協程實作并發,多線程與多程序實作并行。

關于異步 同步堵塞的一些了解:

同步阻塞,就好比火車站過安檢,需要你耗費幾分鐘時間,都檢查完了再進站,每個人都要耽誤幾分鐘。

同步非阻塞,我們假設火車站提供了一種服務名叫“回報”,你交10塊錢就可以加一個微信号,然後你把車票、身份證、行李一次性放到一個地方,同時人家還儲存了一下你的美照(這一系列操作後面統稱“打包”),這樣你可以直接進站買點東西呀上個廁所呀(後面統稱“閑逛”),再通過微信不斷詢問 我的票檢查好了嗎? 查好了嗎? 直到那頭回複你說“好了”,你到指定地點去把你剛才打的包取回(後面統稱“取包”),結束。

異步阻塞,你交20塊錢買了“回報2.0”—檢查完畢人家會主動發微信告訴你,不需要你在不斷詢問了,而你“打包”完畢,還在檢票口傻等,直到人家說“好了”,你在“取包”。這其實沒有任何意義,因為你還是在等待,真正有意義的是異步非阻塞。

異步非阻塞,你交20塊錢買了“回報2.0”,“打包”完畢,“閑逛”,直到人家說“好了”,然後你“取包”。這才是真正把你解放了,既不用等着,也不用不斷詢問。而本文的asyncio用的就是異步非阻塞的協程。

協程

優點

  1. 無需線程上下文切換的開銷
  2. 無需原子操作鎖定及同步的開銷
  3. 友善切換控制流,簡化程式設計模型
  4. 高并發+高擴充性+低成本:一個CPU支援上萬的協程都不是問題。是以很适合用于高并發處理

缺點

  1. 無法利用多核資源:協程的本質是個單線程,它不能同時将 單個CPU 的多個核用上,協程需要和程序配合才能運作在多CPU上.當然我們日常所編寫的絕大部分應用都沒有這個必要,除非是cpu密集型應用。
  2. 進行阻塞(Blocking)操作(如IO時)會阻塞掉整個程式

協程 2.7

協程:單線程裡面不斷切換這個單線程中的微程序,即通過代碼來實作讓一個線程中的更小程序來回切換,相對于多線程多程序可以節省線程切換的時間。

代碼實作

協程在Python中使用yield生成器實作,每次執行到yield位置代碼停止,傳回一個資料,随後在别的地方可以接手這個資料後,代碼恢複繼續執行

# -*- coding: utf-8 -*-
# @Time    : 2018/6/23 0023 10:19
# @Author  : Langzi
# @Blog    : www.langzi.fun
# @File    : 協程.py
# @Software: PyCharm
import sys
import time
reload(sys)
sys.setdefaultencoding('utf-8')

def fun_1():
    while 1:
        n = yield 'FUN_1 執行完畢,切換到FUN_2'
        # 函數運作到yield會暫停函數執行,存儲這個值。并且有next():調用這個值,與send():外部傳入一個值
        if not n:
            return
        time.sleep(1)
        print 'FUN_1 函數開始執行'

def fun_2(t):
    t.next()
    while 1:
        print '-'*20
        print 'FUN_2 函數開始執行'
        time.sleep(1)
        ret = t.send('over')
        print ret
    t.close()

if __name__ == '__main__':
    n = fun_1()
    fun_2(n)
           

可以看到,沒有使用多線程處理,依然在兩個函數中不斷切換循環。

總結一下:

1. 第一個生産者函數中,使用yield,後面的代碼暫時不會執行
2. 第一個函數執行到yield後,程式執行第二個函數,首先接受參數t,調用yield的下一個值,t.next()
3. 然後第二個函數繼續執行,執行完後給第一個函數發送一些資料,ret=t.send(None),其中ret就是第一個函數中yield的值
4. 最後關閉,t.close()
5. 把第一個函數的運作結果(其實就是當執行到yield的值)傳遞給第二個函數,第二個函數繼續執行,然後把傳回值繼續傳遞給第一個函數。
           

協程 3.5

在Python3中新增asyncio庫,在 3.5+ 版本中, asyncio 有兩樣文法非常重要, async, await. 弄懂了它們是如何協同工作的, 我們就完全能發揮出這個庫的功能了。

基本用法

我們要時刻記住,asyncio 不是多程序, 也不是多線程, 單單是一個線程, 但是是在 Python 的功能間切換着執行. 切換的點用 await 來标記, 使用async關鍵詞将其變成協程方法, 比如 async def function():。其中,async 定義一個協程,await 用來挂起阻塞方法的執行。

概念

  1. event_loop事件循環:程式開啟一個無限的循環,當把一些函數注冊到事件循環上時,滿足事件發生條件即調用相應的函數。
  2. coroutine協程對象:指一個使用async關鍵字定義的函數,它的調用不會立即執行函數,而是會傳回一個協程對象,協程對象需要注冊到事件循環,由事件循環調用。
  3. task任務:一個協程對象就是一個原生可以挂起的函數,任務則是對協程進一步封裝,其中包含任務的各種狀态。
  4. future:代表将來執行或沒有執行的任務的結果,它和task上沒有本質的差別
  5. async/await關鍵字:python3.5用于定義協程的關鍵字,async定義一個協程,await用于挂起阻塞的異步調用接口。

代碼示範

先看看不是異步的

# 不是異步的
import time
def job(t):
    print('Start job ', t)
    time.sleep(t)               # wait for "t" seconds
    print('Job ', t, ' takes ', t, ' s')
def main():
    [job(t) for t in range(1, 3)]
t1 = time.time()
main()
print("NO async total time : ", time.time() - t1)

"""
Start job  1
Job  1  takes  1  s
Start job  2
Job  2  takes  2  s
NO async total time :  3.008603096008301
           

從上面可以看出, 我們的 job 是按順序執行的, 必須執行完 job 1 才能開始執行 job 2, 而且 job 1 需要1秒的執行時間, 而 job 2 需要2秒. 是以總時間是 3 秒多. 而如果我們使用 asyncio 的形式, job 1 在等待 time.sleep(t) 結束的時候, 比如是等待一個網頁的下載下傳成功, 在這個地方是可以切換給 job 2, 讓它開始執行.

然後是異步的

import asyncio
async def job(t):                   # async 形式的功能
    print('Start job ', t)
    await asyncio.sleep(t)          # 等待 "t" 秒, 期間切換其他任務
    print('Job ', t, ' takes ', t, ' s')
async def main(loop):                       # async 形式的功能
    tasks = [
    loop.create_task(job(t)) for t in range(1, 3)
    ]                                       # 建立任務, 但是不執行
    await asyncio.wait(tasks)               # 執行并等待所有任務完成
t1 = time.time()
loop = asyncio.get_event_loop()             # 建立 loop
loop.run_until_complete(main(loop))         # 執行 loop,并且等待所有任務結束
loop.close()                                # 關閉 loop
print("Async total time : ", time.time() - t1)
"""
Start job  1
Start job  2
Job  1  takes  1  s
Job  2  takes  2  s
Async total time :  2.001495838165283
"""
           

從結果可以看出, 我們沒有等待 job 1 的結束才開始 job 2, 而是 job 1 觸發了 await 的時候就切換到了 job 2 了. 這時, job 1 和 job 2 同時在等待 await asyncio.sleep(t), 是以最終的程式完成時間, 取決于等待最長的 t, 也就是 2秒. 這和上面用普通形式的代碼相比(3秒), 的确快了很多.由于協程對象不能直接運作,在注冊事件循環的時候,其實是run_until_complete方法将協程包裝成為了一個任務(task)對象。所謂task對象是Future類的子類,儲存了協程運作後的狀态,用于未來擷取協程的結果。

簡單的例子:

import asyncio
import requests
async def scan(url):
    r = requests.get(url).status_code
    return r

task = asyncio.ensure_future(scan('http://www.langzi.fun'))
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print(task.result())
           

調用協程有好幾種方法,這裡就隻看我這種即可,主要是後面三行。把任務指派給task,然後loop為申請排程(這麼了解),然後執行。因為requests這個庫是同步堵塞的,是以沒辦法變成異步執行,這個時候學學aiohttp,一個唯一有可能在異步中取代requests的庫。

綁定回調

就是讓第一個函數執行後,執行的結果傳遞給第二個函數繼續執行

例子:

import asyncio
import requests

async def request():
    url = 'https://www.baidu.com'
    status = requests.get(url)
    return status

def callback(task):
    print('Status:', task.result())

coroutine = request()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)
print('Task:', task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
           

在這裡我們定義了一個 request() 方法,請求了百度,傳回狀态碼,但是這個方法裡面我們沒有任何 print() 語句。随後我們定義了一個 callback() 方法,這個方法接收一個參數,是 task 對象,然後調用 print() 方法列印了 task 對象的結果。這樣我們就定義好了一個 coroutine 對象和一個回調方法,我們現在希望的效果是,當 coroutine 對象執行完畢之後,就去執行聲明的 callback() 方法。

那麼它們二者怎樣關聯起來呢?很簡單,隻需要調用 add_done_callback() 方法即可,我們将 callback() 方法傳遞給了封裝好的 task 對象,這樣當 task 執行完畢之後就可以調用 callback() 方法了,同時 task 對象還會作為參數傳遞給 callback() 方法,調用 task 對象的 result() 方法就可以擷取傳回結果了。

多任務協程

就是把所有的任務加載到一個清單中,然後依次執行

上面的例子我們隻執行了一次請求,如果我們想執行多次請求應該怎麼辦呢?我們可以定義一個 task 清單,然後使用 asyncio 的 wait() 方法即可執行,看下面的例子:

import asyncio
import requests

async def request():
    url = 'https://www.baidu.com'
    status = requests.get(url)
    return status

tasks = [asyncio.ensure_future(request()) for _ in range(5)]
print('Tasks:', tasks)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

for task in tasks:
    print('Task Result:', task.result())
           

這裡我們使用一個 for 循環建立了五個 task,組成了一個清單,然後把這個清單首先傳遞給了 asyncio 的 wait() 方法,然後再将其注冊到時間循環中,就可以發起五個任務了。最後我們再将任務的運作結果輸出出來,運作結果如下:

Tasks: [<Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>]
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
Task Result: <Response [200]>
           

多任務協程實作

上面的多任務協程執行了,但是是依次執行的

舉例子測試,通路部落格測試速度

import asyncio
import requests
import time

start = time.time()

async def request():
    url = 'http://www.langzi.fun'
    print('Waiting for', url)
    response = requests.get(url)
    print('Get response from', url, 'Result:', response.text)

tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print('Cost time:', end - start)
           

這個和上面一樣,隻是把所有的任務量加載一個tasks清單中罷了,并沒有異步執行,但是不要慌,繼續看代碼

import asyncio
import aiohttp
import time

start = time.time()

async def get(url):
    session = aiohttp.ClientSession()
    response = await session.get(url)
    result = await response.text()
    session.close()
    return result

async def request():
    url = 'http://www.langzi.fun'
    print('Waiting for', url)
    result = await get(url)
    print('Get response from', url, 'Result:', result)

tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print('Cost time:', end - start)
           

為什麼使用aiohttp呢?在之前就說過requests這個庫是堵塞的,并不支援異步,而aiohttp是支援異步的網絡請求的庫。

協程嵌套

使用async可以定義協程,協程用于耗時的io操作,我們也可以封裝更多的io操作過程,這樣就實作了嵌套的協程,即一個協程中await了另外一個協程,如此連接配接起來。

import asyncio

async def myfun(i):
    print('start {}th'.format(i))
    await asyncio.sleep(1)
    print('finish {}th'.format(i))

loop = asyncio.get_event_loop()
myfun_list = [asyncio.ensure_future(myfun(i)) for i in range(10)]
loop.run_until_complete(asyncio.wait(myfun_list))
           

這種用法和上面一種的不同在于後面調用的是asyncio.gather還是asyncio.wait,目前看成完全等價即可,是以平時使用用上面哪種都可以。

上面是最常看到的兩種使用方式,這裡列出來保證讀者在看其他文章時不會發蒙。

另外,二者其實是有細微差别的

  • gather更擅長于将函數聚合在一起
  • wait更擅長篩選運作狀況

    import asyncio

    import time

    now = lambda: time.time()

    async def do_some_work(x):

    print('Waiting: ', x)

    await asyncio.sleep(x)

    return ‘Done after {}s’.format(x)

    async def main():

    coroutine1 = do_some_work(1)

    coroutine2 = do_some_work(2)

    coroutine3 = do_some_work(4)

    tasks = [

    asyncio.ensure_future(coroutine1),

    asyncio.ensure_future(coroutine2),

    asyncio.ensure_future(coroutine3)

    ]

    dones, pendings = await asyncio.wait(tasks)

    for task in dones:
          print('Task ret: ', task.result())
               

    start = now()

    loop = asyncio.get_event_loop()

    loop.run_until_complete(main())

    print('TIME: ', now() - start)

如果使用的是 asyncio.gather建立協程對象,那麼await的傳回值就是協程運作的結果。

results = await asyncio.gather(*tasks)

    for result in results:
        print('Task ret: ', result)
           

不在main協程函數裡處理結果,直接傳回await的内容,那麼最外層的run_until_complete将會傳回main協程的結果。

async def main():
    coroutine1 = do_some_work(1)
    coroutine2 = do_some_work(2)
    coroutine3 = do_some_work(2)

    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]

    return await asyncio.gather(*tasks)

start = now()

loop = asyncio.get_event_loop()
results = loop.run_until_complete(main())

for result in results:
    print('Task ret: ', result)
           

或者傳回使用asyncio.wait方式挂起協程。

async def main():
    coroutine1 = do_some_work(1)
    coroutine2 = do_some_work(2)
    coroutine3 = do_some_work(4)

    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]

    return await asyncio.wait(tasks)

start = now()

loop = asyncio.get_event_loop()
done, pending = loop.run_until_complete(main())

for task in done:
    print('Task ret: ', task.result())
           

也可以使用asyncio的as_completed方法

async def main():
    coroutine1 = do_some_work(1)
    coroutine2 = do_some_work(2)
    coroutine3 = do_some_work(4)

    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]
    for task in asyncio.as_completed(tasks):
        result = await task
        print('Task ret: {}'.format(result))

start = now()

loop = asyncio.get_event_loop()
done = loop.run_until_complete(main())
print('TIME: ', now() - start)
           

由此可見,協程的調用群組合十分靈活,尤其是對于結果的處理,如何傳回,如何挂起,需要逐漸積累經驗和前瞻的設計。

協程停止

上面見識了協程的幾種常用的用法,都是協程圍繞着事件循環進行的操作。future對象有幾個狀态:

  • Pending
  • Running
  • Done
  • Cancelled

建立future的時候,task為pending,事件循環調用執行的時候當然就是running,調用完畢自然就是done,如果需要停止事件循環,就需要先把task取消。可以使用asyncio.Task擷取事件循環的task

import asyncio

import time

now = lambda: time.time()

async def do_some_work(x):
    print('Waiting: ', x)

    await asyncio.sleep(x)
    return 'Done after {}s'.format(x)

coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(2)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
]

start = now()

loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt as e:
    print(asyncio.Task.all_tasks())
    for task in asyncio.Task.all_tasks():
        print(task.cancel())
    loop.stop()
    loop.run_forever()
finally:
    loop.close()

print('TIME: ', now() - start)
           

啟動事件循環之後,馬上ctrl+c,會觸發run_until_complete的執行異常 KeyBorardInterrupt。然後通過循環asyncio.Task取消future。可以看到輸出如下:

Waiting:  1
Waiting:  2
Waiting:  2
{<Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x101230648>()]> cb=[_wait.<locals>._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>, <Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1032b10a8>()]> cb=[_wait.<locals>._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>, <Task pending coro=<wait() running at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:307> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x103317d38>()]> cb=[_run_until_complete_cb() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py:176]>, <Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x103317be8>()]> cb=[_wait.<locals>._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>}
True
True
True
True
TIME:  0.8858370780944824
           

True表示cannel成功,loop stop之後還需要再次開啟事件循環,最後在close,不然還會抛出異常:

Task was destroyed but it is pending!
task: <Task pending coro=<do_some_work() done,
           

循環task,逐個cancel是一種方案,可是正如上面我們把task的清單封裝在main函數中,main函數外進行事件循環的調用。這個時候,main相當于最外出的一個task,那麼處理包裝的main函數即可。

import asyncio
import time
now = lambda: time.time()

async def do_some_work(x):
    print('Waiting: ', x)
    await asyncio.sleep(x)
    return 'Done after {}s'.format(x)
async def main():
    coroutine1 = do_some_work(1)
    coroutine2 = do_some_work(2)
    coroutine3 = do_some_work(2)

    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]
    done, pending = await asyncio.wait(tasks)
    for task in done:
        print('Task ret: ', task.result())

start = now()

loop = asyncio.get_event_loop()
task = asyncio.ensure_future(main())
try:
    loop.run_until_complete(task)
except KeyboardInterrupt as e:
    print(asyncio.Task.all_tasks())
    print(asyncio.gather(*asyncio.Task.all_tasks()).cancel())
    loop.stop()
    loop.run_forever()
finally:
    loop.close()
           

不同線程的事件循環

很多時候,我們的事件循環用于注冊協程,而有的協程需要動态的添加到事件循環中。一個簡單的方式就是使用多線程。目前線程建立一個事件循環,然後在建立一個線程,在新線程中啟動事件循環。目前線程不會被block。

from threading import Thread

def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

def more_work(x):
    print('More work {}'.format(x))
    time.sleep(x)
    print('Finished more work {}'.format(x))

start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print('TIME: {}'.format(time.time() - start))

new_loop.call_soon_threadsafe(more_work, 6)
new_loop.call_soon_threadsafe(more_work, 3)
           

啟動上述代碼之後,目前線程不會被block,新線程中會按照順序執行call_soon_threadsafe方法注冊的more_work方法,後者因為time.sleep操作是同步阻塞的,是以運作完畢more_work需要大緻6 + 3

新線程協程

def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

async def do_some_work(x):
    print('Waiting {}'.format(x))
    await asyncio.sleep(x)
    print('Done after {}s'.format(x))

def more_work(x):
    print('More work {}'.format(x))
    time.sleep(x)
    print('Finished more work {}'.format(x))

start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print('TIME: {}'.format(time.time() - start))

asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop)
asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop)
           

上述的例子,主線程中建立一個new_loop,然後在另外的子線程中開啟一個無限事件循環。主線程通過run_coroutine_threadsafe新注冊協程對象。這樣就能在子線程中進行事件循環的并發操作,同時主線程又不會被block。一共執行的時間大概在6s左右。

騷方法

requests實作異步爬蟲一

如同前面介紹如何在asyncio中使用requests子產品一樣,如果想在asyncio中使用其他阻塞函數,該怎麼實作呢?雖然目前有異步函數支援asyncio,但實際問題是大部分IO子產品還不支援asyncio。 阻塞函數(例如io讀寫,requests網絡請求)阻塞了客戶代碼與asycio事件循環的唯一線程,是以在執行調用時,整個應用程式都會當機。

解決方案:

這個問題的解決方法是使用事件循環對象的run_in_executor方法。asyncio的事件循環在背後維護着一個ThreadPoolExecutor對象,我們可以調用run_in_executor方法,把可調用對象發給它執行,即可以通過run_in_executor方法來建立一個線程來執行耗時函數。

run_in_executor方法

AbstractEventLoop.run_in_executor(executor, func, *args)
           
  • executor 參數應該是一個 Executor 執行個體。如果為 None,則使用預設 executor。
  • func 就是要執行的函數
  • args 就是傳遞給 func 的參數

實際例子(使用time.sleep()):

import asyncio
import time
async def run(url):
    print("start ",url)
    loop = asyncio.get_event_loop()
    try:
        await loop.run_in_executor(None,time.sleep,1)
    except Exception as e:
        print(e)
    print("stop ",url)
url_list = ["https://thief.one","https://home.nmask.cn","https://movie.nmask.cn","https://tool.nmask.cn"]
tasks = [asyncio.ensure_future(run(url)) for url in url_list]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
           

說明:有了run_in_executor方法,我們就可以使用之前熟悉的子產品建立協程并發了,而不需要使用特定的子產品進行IO異步開發。

requests實作異步爬蟲二

與之前學過的多線程、多程序相比,asyncio子產品有一個非常大的不同:傳入的函數不是随心所欲

  • 比如我們把上面myfun函數中的sleep換成time.sleep(1),運作時則不是異步的,而是同步,共等待了10秒
  • 如果我換一個myfun,比如換成下面這個使用request抓取網頁的函數

    import asyncio

    import requests

    from bs4 import BeautifulSoup

    async def get_title(a):

    url = ‘https://movie.douban.com/top250?start={}&filter=’.format(a*25)

    r = requests.get(url)

    soup = BeautifulSoup(r.content, ‘html.parser’)

    lis = soup.find(‘ol’, class_=‘grid_view’).find_all(‘li’)

    for li in lis:

    title = li.find(‘span’, class_=“title”).text

    print(title)

    loop = asyncio.get_event_loop()

    fun_list = (get_title(i) for i in range(10))

    loop.run_until_complete(asyncio.gather(*fun_list))

依然不會異步執行。

到這裡我們就會想,是不是異步隻對它自己定義的sleep(await asyncio.sleep(1))才能觸發異步?

對于上述函數,asyncio庫隻能通過添加線程的方式實作異步,下面我們實作time.sleep時的異步

import asyncio
import time

def myfun(i):
    print('start {}th'.format(i))
    time.sleep(1)
    print('finish {}th'.format(i))

async def main():
    loop = asyncio.get_event_loop()
    futures = (
        loop.run_in_executor(
            None,
            myfun, 
            i)
        for i in range(10)
        )
    for result in await asyncio.gather(*futures):
        pass

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
           

上面run_in_executor其實開啟了新的線程,再協調各個線程。調用過程比較複雜,隻要當模闆一樣套用即可。

上面10次循環仍然不是一次性列印出來的,而是像分批次一樣列印出來的。這是因為開啟的線程不夠多,如果想要實作一次列印,可以開啟10個線程,代碼如下

import concurrent.futures as cf # 多加一個子產品
import asyncio
import time

def myfun(i):
    print('start {}th'.format(i))
    time.sleep(1)
    print('finish {}th'.format(i))

async def main():
    with cf.ThreadPoolExecutor(max_workers = 10) as executor: # 設定10個線程
        loop = asyncio.get_event_loop()
        futures = (
            loop.run_in_executor(
                executor, # 按照10個線程來執行
                myfun, 
                i)
            for i in range(10)
            )
        for result in await asyncio.gather(*futures):
            pass

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
           

用這種方法實作requests異步爬蟲代碼如下

import concurrent.futures as cf
import asyncio
import requests
from bs4 import BeautifulSoup

def get_title(i):
    url = 'https://movie.douban.com/top250?start={}&filter='.format(i*25)
    r = requests.get(url)
    soup = BeautifulSoup(r.content, 'html.parser')
    lis = soup.find('ol', class_='grid_view').find_all('li')
    for li in lis:
        title = li.find('span', class_="title").text
        print(title)

async def main():
    with cf.ThreadPoolExecutor(max_workers = 10) as executor:
        loop = asyncio.get_event_loop()
        futures = (
            loop.run_in_executor(
                executor,
                get_title, 
                i)
            for i in range(10)
            )
        for result in await asyncio.gather(*futures):
            pass

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
           

uvloop,這個使用庫可以有效的加速asyncio,本庫基于libuv,也就是nodejs用的那個庫。使用它也非常友善

import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
           

沒錯就是2行代碼,就可以提速asyncio。

與多程序的結合

既然異步協程和多程序對網絡請求都有提升,那麼為什麼不把二者結合起來呢?在最新的 PyCon 2018 上,來自 Facebook 的 John Reese 介紹了 asyncio 和 multiprocessing 各自的特點,并開發了一個新的庫,叫做 aiomultiprocess

這個庫的安裝方式是:

pip3 install aiomultiprocess
           

需要 Python 3.6 及更高版本才可使用。

使用這個庫,我們可以将上面的例子改寫如下:

import asyncio
import aiohttp
import time
from aiomultiprocess import Pool

start = time.time()

async def get(url):
    session = aiohttp.ClientSession()
    response = await session.get(url)
    result = await response.text()
    session.close()
    return result

async def request():
    url = 'http://127.0.0.1:5000'
    urls = [url for _ in range(100)]
    async with Pool() as pool:
        result = await pool.map(get, urls)
        return result

coroutine = request()
task = asyncio.ensure_future(coroutine)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)

end = time.time()
print('Cost time:', end - start)
           

這樣就會同時使用多程序和異步協程進行請求,但在真實情況下,我們在做爬取的時候遇到的情況千變萬化,一方面我們使用異步協程來防止阻塞,另一方面我們使用 multiprocessing 來利用多核成倍加速,節省時間其實還是非常可觀的。

aiohttp基礎用法

aiohttp分為伺服器端和用戶端,本文隻介紹用戶端。

案例:

import aiohttp
async def job(session):
    response = await session.get(URL)       # 等待并切換
    return str(response.url)


async def main(loop):
    async with aiohttp.ClientSession() as session:      # 官網推薦建立 Session 的形式
        tasks = [loop.create_task(job(session)) for _ in range(2)]
        finished, unfinished = await asyncio.wait(tasks)
        all_results = [r.result() for r in finished]    # 擷取所有結果
        print(all_results)

t1 = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
print("Async total time:", time.time() - t1)

"""
['https://morvanzhou.github.io/', 'https://morvanzhou.github.io/']
Async total time: 0.11447715759277344
"""
           

我們剛剛建立了一個 Session, 這是官網推薦的方式, 但是我覺得也可以直接用 request 形式, 細節請參考官方說明. 如果要擷取網頁傳回的結果, 我們可以在 job() 中 return 個結果出來, 然後再在 finished, unfinished = await asyncio.wait(tasks) 收集完成的結果, 這裡它會傳回完成的和沒完成的, 我們關心的都是完成的, 而且 await 也确實是等待都完成了才傳回. 真正的結果被存放在了 result() 裡面.

aiohttp安裝

pip3 install aiohttp
           

基本請求用法

async with aiohttp.get('https://github.com') as r:
        await r.text()
           

其中r.text(), 可以在括号中指定解碼方式,編碼方式,例如

await resp.text(encoding='windows-1251')
           

或者也可以選擇不編碼,适合讀取圖像等,是無法編碼的

await resp.read()
           

發起一個session請求

首先是導入aiohttp子產品:

import aiohttp
           

然後我們試着擷取一個web源碼,這裡以GitHub的公共Time-line頁面為例:

async with aiohttp.ClientSession() as session:
    async with session.get('https://api.github.com/events') as resp:
        print(resp.status)
        print(await resp.text())
           

上面的代碼中,我們建立了一個 ClientSession 對象命名為session,然後通過session的get方法得到一個 ClientResponse 對象,命名為resp,get方法中傳入了一個必須的參數url,就是要獲得源碼的http url。至此便通過協程完成了一個異步IO的get請求。

有get請求當然有post請求,并且post請求也是一個協程:

session.post('http://httpbin.org/post', data=b'data')
           

用法和get是一樣的,差別是post需要一個額外的參數data,即是需要post的資料。

除了get和post請求外,其他http的操作方法也是一樣的:

session.put('http://httpbin.org/put', data=b'data')
session.delete('http://httpbin.org/delete')
session.head('http://httpbin.org/get')
session.options('http://httpbin.org/get')
session.patch('http://httpbin.org/patch', data=b'data')
           

小記:

不要為每次的連接配接都建立一次session,一般情況下隻需要建立一個session,然後使用這個session執行所有的請求。

每個session對象,内部包含了一個連接配接池,并且将會保持連接配接和連接配接複用(預設開啟)可以加快整體的性能。

在URL中傳遞參數

我們經常需要通過 get 在url中傳遞一些參數,參數将會作為url問号後面的一部分發給伺服器。在aiohttp的請求中,允許以dict的形式來表示問号後的參數。舉個例子,如果你想傳遞 key1=value1 key2=value2 到 httpbin.org/get 你可以使用下面的代碼:

params = {'key1': 'value1', 'key2': 'value2'}
async with session.get('http://httpbin.org/get',
                       params=params) as resp:
                       assert resp.url == 'http://httpbin.org/get?key2=value2&key1=value1'
           

可以看到,代碼正确的執行了,說明參數被正确的傳遞了進去。不管是一個參數兩個參數,還是更多的參數,都可以通過這種方式來傳遞。除了這種方式之外,還有另外一個,使用一個 list 來傳遞(這種方式可以傳遞一些特殊的參數,例如下面兩個key是相等的也可以正确傳遞):

params = [('key', 'value1'), ('key', 'value2')]
async with session.get('http://httpbin.org/get',
                       params=params) as r:
    assert r.url == 'http://httpbin.org/get?key=value2&key=value1'
           

除了上面兩種,我們也可以直接通過傳遞字元串作為參數來傳遞,但是需要注意,通過字元串傳遞的特殊字元不會被編碼:

async with session.get('http://httpbin.org/get',
                       params='key=value+1') as r:
        assert r.url == 'http://httpbin.org/get?key=value+1'
           

響應的内容

還是以GitHub的公共Time-line頁面為例,我們可以獲得頁面響應的内容:

async with session.get('https://api.github.com/events') as resp:
	print(await resp.text())
           

運作之後,會列印出類似于如下的内容:

'[{"created_at":"2015-06-12T14:06:22Z","public":true,"actor":{...
           

resp的text方法,會自動将伺服器端傳回的内容進行解碼–decode,當然我們也可以自定義編碼方式:

await resp.text(encoding='gb2312')
           

除了text方法可以傳回解碼後的内容外,我們也可以得到類型是位元組的内容:

print(await resp.read())
           

運作的結果是:

b'[{"created_at":"2015-06-12T14:06:22Z","public":true,"actor":{...
           

gzip和deflate轉換編碼已經為你自動解碼。

小記:

text(),read()方法是把整個響應體讀入記憶體,如果你是擷取大量的資料,請考慮使用”位元組流“(streaming response)

特殊響應内容:json

如果我們擷取的頁面的響應内容是json,aiohttp内置了更好的方法來處理json:

async with session.get('https://api.github.com/events') as resp:
    print(await resp.json())
           

如果因為某種原因而導緻resp.json()解析json失敗,例如傳回不是json字元串等等,那麼resp.json()将抛出一個錯誤,也可以給json()方法指定一個解碼方式:

print(await resp.json(
encoding='gb2312')) 
           

或者傳遞一個函數進去:

print(await resp.json( lambda(x:x.replace('a','b')) ))
           

以位元組流的方式讀取響應内容

雖然json(),text(),read()很友善的能把響應的資料讀入到記憶體,但是我們仍然應該謹慎的使用它們,因為它們是把整個的響應體全部讀入了記憶體。即使你隻是想下載下傳幾個位元組大小的檔案,但這些方法卻将在記憶體中加載所有的資料。是以我們可以通過控制位元組數來控制讀入記憶體的響應内容:

async with session.get('https://api.github.com/events') as resp:
    await resp.content.read(10) #讀取前10個位元組
           

一般地,我們應該使用以下的模式來把讀取的位元組流儲存到檔案中:

with open(filename, 'wb') as fd:
    while True:
        chunk = await resp.content.read(chunk_size)
        if not chunk:
            break
        fd.write(chunk)
           

自定義請求頭

如果你想添加請求頭,可以像get添加參數那樣以dict的形式,作為get或者post的參數進行請求:

import json
url = 'https://api.github.com/some/endpoint'
payload = {'some': 'data'}
headers = {'content-type': 'application/json'}

await session.post(url,
                   data=json.dumps(payload),
                   headers=headers)
           

自定義Cookie

給伺服器發送cookie,可以通過給 ClientSession 傳遞一個cookie參數:

url = 'http://httpbin.org/cookies'
cookies = {'cookies_are': 'working'}
async with ClientSession(cookies=cookies) as session:
    async with session.get(url) as resp:
        assert await resp.json() == {
           "cookies": {"cookies_are": "working"}}
           

可直接通路連結 “httpbin.org/cookies”檢視目前cookie,通路session中的cookie請見第10節。

post資料的幾種方式

(1)模拟表單post資料

payload = {'key1': 'value1', 'key2': 'value2'}
async with session.post('http://httpbin.org/post',data=payload) as resp:
	print(await resp.text())
           

注意:data=dict的方式post的資料将被轉碼,和form送出資料是一樣的作用,如果你不想被轉碼,可以直接以字元串的形式 data=str 送出,這樣就不會被轉碼。

(2)post json

import json
url = 'https://api.github.com/some/endpoint'
payload = {'some': 'data'}

async with session.post(url, data=json.dumps(payload)) as resp:
	...
           

其實json.dumps(payload)傳回的也是一個字元串,隻不過這個字元串可以被識别為json格式

(3)post 小檔案

url = 'http://httpbin.org/post'
files = {'file': open('report.xls', 'rb')}

await session.post(url, data=files)
可以設定好檔案名和content-type:
url = 'http://httpbin.org/post'
data = FormData()
data.add_field('file',
               open('report.xls', 'rb'),
               filename='report.xls',
               content_type='application/vnd.ms-excel')

await session.post(url, data=data)
           

如果将檔案對象設定為資料參數,aiohttp将自動以位元組流的形式發送給伺服器。

(4)post 大檔案

aiohttp支援多種類型的檔案以流媒體的形式上傳,是以我們可以在檔案未讀入記憶體的情況下發送大檔案。

@aiohttp.streamer
def file_sender(writer, file_name=None):
    with open(file_name, 'rb') as f:
        chunk = f.read(2**16)
        while chunk:
            yield from writer.write(chunk)
            chunk = f.read(2**16)

# Then you can use `file_sender` as a data provider:

async with session.post('http://httpbin.org/post',data=file_sender(file_name='huge_file')) as resp:
    print(await resp.text())
           

同時我們可以從一個url擷取檔案後,直接post給另一個url,并計算hash值:

async def feed_stream(resp, stream):
    h = hashlib.sha256()

    while True:
        chunk = await resp.content.readany()
        if not chunk:
            break
        h.update(chunk)
        stream.feed_data(chunk)

    return h.hexdigest()

resp = session.get('http://httpbin.org/post')
stream = StreamReader()
loop.create_task(session.post('http://httpbin.org/post', data=stream))
file_hash = await feed_stream(resp, stream)
           

因為響應内容類型是StreamReader,是以可以把get和post連接配接起來,同時進行post和get:

r = await session.get('http://python.org')
await session.post('http://httpbin.org/post',data=r.content)
           

(5)post預壓縮資料

在通過aiohttp發送前就已經壓縮的資料, 調用壓縮函數的函數名(通常是deflate 或 zlib)作為content-encoding的值:

async def my_coroutine(session, headers, my_data):
    data = zlib.compress(my_data)
    headers = {'Content-Encoding': 'deflate'}
    async with session.post('http://httpbin.org/post',
                            data=data,
                            headers=headers)
        pass
           

keep-alive, 連接配接池,共享cookie

ClientSession 用于在多個連接配接之間共享cookie:

async with aiohttp.ClientSession() as session:
    await session.get(
        'http://httpbin.org/cookies/set?my_cookie=my_value')
    filtered = session.cookie_jar.filter_cookies('http://httpbin.org')
    assert filtered['my_cookie'].value == 'my_value'
    async with session.get('http://httpbin.org/cookies') as r:
        json_body = await r.json()
        assert json_body['cookies']['my_cookie'] == 'my_value'
           

也可以為所有的連接配接設定共同的請求頭:

async with aiohttp.ClientSession(
    headers={"Authorization": "Basic bG9naW46cGFzcw=="}) as session:
    async with session.get("http://httpbin.org/headers") as r:
        json_body = await r.json()
        assert json_body['headers']['Authorization'] == \
            'Basic bG9naW46cGFzcw=='
           

ClientSession 還支援 keep-alive連接配接和連接配接池(connection pooling)

cookie安全性

預設ClientSession使用的是嚴格模式的 aiohttp.CookieJar. RFC 2109,明确的禁止接受url和ip位址産生的cookie,隻能接受 DNS 解析IP産生的cookie。可以通過設定aiohttp.CookieJar 的 unsafe=True 來配置:

jar = aiohttp.CookieJar(unsafe=True)
session = aiohttp.ClientSession(cookie_jar=jar)
           

控制同時連接配接的數量(連接配接池)

也可以了解為同時請求的數量,為了限制同時打開的連接配接數量,我們可以将限制參數傳遞給連接配接器:

conn = aiohttp.TCPConnector(limit=30)#同時最大進行連接配接的連接配接數為30,預設是100,limit=0的時候是無限制
           

限制同時打開限制同時打開連接配接到同一端點的數量((host, port, is_ssl) 三的倍數),可以通過設定 limit_per_host 參數:

conn = aiohttp.TCPConnector(limit_per_host=30)#預設是0
           

自定義域名解析

我們可以指定域名伺服器的 IP 對我們提供的get或post的url進行解析:

from aiohttp.resolver import AsyncResolver
resolver = AsyncResolver(nameservers=["8.8.8.8", "8.8.4.4"])
conn = aiohttp.TCPConnector(resolver=resolver)
           

設定代理

aiohttp支援使用代理來通路網頁:

async with aiohttp.ClientSession() as session:
    async with session.get("http://python.org",
                           proxy="http://some.proxy.com") as resp:
        print(resp.status)
           

當然也支援需要授權的頁面:

async with aiohttp.ClientSession() as session:
    proxy_auth = aiohttp.BasicAuth('user', 'pass')
    async with session.get("http://python.org",proxy="http://some.proxy.com",proxy_auth=proxy_auth) as resp:
        print(resp.status)
           

或者通過這種方式來驗證授權:

session.get("http://python.org",proxy="http://user:[email protected]")
           

響應狀态碼 response status code

可以通過 resp.status來檢查狀态碼是不是200:

async with session.get('http://httpbin.org/get') as resp:
    assert resp.status == 200
           

響應頭

我們可以直接使用 resp.headers 來檢視響應頭,得到的值類型是一個dict:

>>> resp.headers
{'ACCESS-CONTROL-ALLOW-ORIGIN': '*',
 'CONTENT-TYPE': 'application/json',
 'DATE': 'Tue, 15 Jul 2014 16:49:51 GMT',
 'SERVER': 'gunicorn/18.0',
 'CONTENT-LENGTH': '331',
 'CONNECTION': 'keep-alive'}
           

或者我們可以檢視原生的響應頭:

>>> resp.raw_headers
((b'SERVER', b'nginx'),
 (b'DATE', b'Sat, 09 Jan 2016 20:28:40 GMT'),
 (b'CONTENT-TYPE', b'text/html; charset=utf-8'),
 (b'CONTENT-LENGTH', b'12150'),
 (b'CONNECTION', b'keep-alive'))
           

重定向的響應頭

如果一個請求被重定向了,我們依然可以檢視被重定向之前的響應頭資訊:

>>> resp = await session.get('http://example.com/some/redirect/')
>>> resp
<ClientResponse(http://example.com/some/other/url/) [200]>
>>> resp.history
(<ClientResponse(http://example.com/some/redirect/) [301]>,)
           

逾時處理

預設的IO操作都有5分鐘的響應時間 我們可以通過 timeout 進行重寫:

async with session.get('https://github.com', timeout=60) as r:
    ...
           

Python協程

Python3.5協程學習研究

asyncio:高性能異步子產品使用介紹

爬蟲速度太慢?來試試用異步協程提速吧!

對Python并發程式設計的思考

aiohttp 簡易使用教程

aiohttp 中文文檔

python異步asyncio子產品的使用

Asyncio 使用經驗

Python-aiohttp百萬并發

歡迎關注公衆号:【安全研發】擷取更多相關工具,課程,資料分享哦~

Python 徹底解讀協程與異步【看完包會】同步異步概念協程協程 2.7代碼實作協程 3.5