天天看點

Python基礎之協程(Coroutine)

引言

之前我們學習了線程、程序的概念,了解了在作業系統中程序是資源配置設定的最小機關,線程是CPU排程的最小機關。按道理來說我們已經算是把cpu的使用率提高很多了。但是我們知道無論是建立多程序還是建立多線程來解決問題,都要消耗一定的時間來建立程序、建立線程、以及管理他們之間的切換。

随着我們對于效率的追求不斷提高,基于單線程來實作并發又成為一個新的課題,即隻用一個主線程(很明顯可利用的cpu隻有一個)情況下實作并發。這樣就可以節省建立線程序所消耗的時間。

為此我們需要先回顧下并發的本質:切換+儲存狀态

cpu正在運作一個任務,會在兩種情況下切走去執行其他的任務(切換由作業系統強制控制),一種情況是該任務發生了阻塞,另外一種情況是該任務計算的時間過長。

Python基礎之協程(Coroutine)

程序排程

在介紹程序理論時,提及程序的三種執行狀态,而線程才是執行機關,是以也可以将上圖了解為線程的三種狀态。

排程程式選擇另一程序并不能提升效率,隻是為了讓cpu能夠雨露均沾,實作看起來所有任務都被“同時”執行的效果,如果多個任務都是純計算的,這種切換反而會降低效率。

為此我們可以基于yield來驗證。yield本身就是一種在單線程下可以儲存任務運作狀态的方法,我們來簡單複習一下。

執行個體1:

import time


def consumer(res):
    pass


def producer():
    res = []
    for i in range(100000000):  # 一億
        res.append(i)
    return res


start_time = time.time()
res = producer()
consumer(res)
end_time = time.time()
print("Time:", end_time - start_time)
           

結果:

Time: 11.075389862060547

Process finished with exit code 0

執行個體2:

import time


def consumer():
    while 1:
        x = yield


def producer():
    g = consumer()
    next(g)
    for i in range(100000000):  # 一億
        g.send(i)


start_time = time.time()
producer()
end_time = time.time()
print("Time:", end_time - start_time)
           

結果:

Time: 9.08711290359497

Process finished with exit code 0

協程介紹

指的是單線程下的并發,又稱微線程,協程是一種使用者态的輕量級線程,即協程是由使用者程式自己控制排程的。

注意:

  • python的線程屬于核心級别的,即由作業系統控制排程(如單線程遇到io或執行時間過長就會被迫交出cpu執行權限,切換其他線程運作)。
  • 單線程内開啟協程,一旦遇到io,就會從應用程式級别(而非作業系統)控制切換,以此來提升效率(!!!非io操作的切換與效率無關)。

對比作業系統控制線程的切換,使用者在單線程内控制協程的切換。

優點:

  • 協程的切換開銷更小,屬于程式級别的切換,作業系統完全感覺不到,因而更加輕量級。
  • 單線程内就可以實作并發的效果,最大限度地利用cpu。

協程特點:

  • 必須在隻有一個單線程裡實作并發。
  • 修改共享資料不需加鎖。
  • 使用者程式裡自己儲存多個控制流的上下文棧。
  • 附加:一個協程遇到IO操作自動切換到其它協程(如何實作檢測IO,yield、greenlet都無法實作,就用到了gevent子產品(select機制))。

greenlet子產品

這個子產品不是内置子產品,需要安額外安裝。

執行個體1:greenlet子產品的第一次切換

from greenlet import greenlet


def eat(name):
    print("%s正在吃牛排" % name)
    g_two.switch('張三')
    print("%s正在吃狗糧" % name)
    g_two.switch()


def play(name):
    print("%s正在打籃球" % name)
    g_one.switch()
    print("%s正在踢足球" % name)


if __name__ == '__main__':
    g_one = greenlet(eat)
    g_two = greenlet(play)
    g_one.switch("李四")
           

結果:

李四正在吃牛排

張三正在打籃球

李四正在吃狗糧

張三正在踢足球

Process finished with exit code 0

上述完全模拟了單純的切換,但是......在沒有遇到IO操作或者開辟重複空間的情況下,反而會降低程式的執行效率。

執行個體2:串行程式

import time


def func_one():
    s = 0
    for i in range(100000000):
        s += 1


def func_two():
    s = 1
    for i in range(100000000):
        s += 1


if __name__ == '__main__':
    start_time = time.time()
    func_one()
    func_two()
    print("串行程式,耗時%s秒" % (time.time() - start_time))
           

結果:

串行程式,耗時10.838690042495728秒:

Process finished with exit code 0

執行個體2:切換執行

from greenlet import greenlet
import time


def func_one():
    res = 0
    for i in range(100000000):
        res += 1
        g_two.switch()


def func_two():
    res = 1
    for i in range(100000000):
        res += 1
        g_one.switch()


if __name__ == '__main__':
    start_time = time.time()
    g_one = greenlet(func_one)
    g_two = greenlet(func_two)
    g_one.switch()
    print("切換執行,耗時%s秒" % (time.time() - start_time))
           

結果:

切換執行,耗時54.60098838806152秒

Process finished with exit code 0

greenlet隻是提供了一種比generator更加便捷的切換方式,當切到一個任務執行時如果遇到IO,那就原地阻塞,仍然是沒有解決遇到IO自動切換來提升效率的問題。

若是想要在計算和阻塞之間交替運作的時候提高效率,我們需要用到gevent子產品。

gevent子產品

這個子產品不是内置子產品,需要安額外安裝。

gevent 是一個第三方庫,可以輕松通過gevent實作并發同步或異步程式設計,在gevent中用到的主要模式是greenlet,它是以C擴充子產品形式接入Python的輕量級協程。 greenlet全部運作在主程式作業系統程序的内部,但它們被協作式地排程。

用法介紹

g_one = gevnet.spawn(func_one, args[...]):建立一個協程對象g_one,spawn括号内第一個參數是函數名,如eat,後面可以有多個參數,可以是位置實參或關鍵字實參,都是傳給函數eat的。

g_two = gevent.spawn(func_two):執行個體化另外一個對象。

g_one.join():等待g_one結束。

g_two.join():等待g_two結束。

以上兩步也可以合起來:gevent.joinall([g_one, g_two])

g_one.value:拿到func_one的傳回值。

執行個體:

import gevent


def eat(name):
    print("%s在吃牛排" % name)
    gevent.sleep(2)
    print("%s在吃狗糧" % name)


def play(name):
    print("%s在打籃球" % name)
    gevent.sleep(1)
    print("%s在踢足球" % name)


if __name__ == '__main__':
    g_one = gevent.spawn(eat, '張三')
    g_two = gevent.spawn(play, '李四')
    g_one.join()
    g_two.join()
    print("主線程執行完畢")
           

結果:

張三在吃牛排

李四在打籃球

李四在踢足球

張三在吃狗糧

主線程執行完畢

Process finished with exit code 0

上例gevent.sleep(2)模拟的是gevent可以識别的io阻塞,而time.sleep(2)或其他的阻塞,gevent是不能直接識别的需要用下面一行代碼,打更新檔,就可以識别了:

from gevent import monkey

monkey.patch_all()

該語句必須放到被打更新檔者的前面,如time,socket子產品之前。或者将它放到檔案的開頭。如:

from gevent import monkey

monkey.patch_all()
import gevent
import time


def eat(name):
    print("%s在吃牛排" % name)
    time.sleep(2)
    print("%s在吃狗糧" % name)


def play(name):
    print("%s在打籃球" % name)
    time.sleep(1)
    print("%s在踢足球" % name)


if __name__ == '__main__':
    g_one = gevent.spawn(eat, '張三')
    g_two = gevent.spawn(play, '李四')
    gevent.joinall([g_one, g_two])
    print("主線程執行完畢")
           

結果:

張三在吃牛排

李四在打籃球

李四在踢足球

張三在吃狗糧

主線程執行完畢

Process finished with exit code 0

當我注釋更新檔代碼後将會出現以下結果:

張三在吃牛排

張三在吃狗糧

李四在打籃球

李四在踢足球

主線程執行完畢

Process finished with exit code 0

程式等待時間和執行順序都不一樣。

我們可以用threading.current_thread().getName()來檢視每個g_one和g_two,檢視的結果為DummyThread-n,即虛線程。如:

from gevent import monkey

monkey.patch_all()
import threading
import gevent
import time


def eat(name):
    print(threading.current_thread().getName())
    print("%s在吃牛排" % name)
    time.sleep(2)
    print("%s在吃狗糧" % name)


def play(name):
    print(threading.current_thread().getName())
    print("%s在打籃球" % name)
    time.sleep(1)
    print("%s在踢足球" % name)


if __name__ == '__main__':
    g_one = gevent.spawn(eat, '張三')
    g_two = gevent.spawn(play, '李四')
    gevent.joinall([g_one, g_two])
    print("主線程執行完畢")
           

結果:

DummyThread-1

張三在吃牛排

DummyThread-2

李四在打籃球

李四在踢足球

張三在吃狗糧

主線程執行完畢

Process finished with exit code 0

gevent之同步與異步

執行個體:

from gevent import spawn, joinall, monkey

monkey.patch_all()
import time


def task(pid):
    time.sleep(1)
    print("任務%s完成" % pid)


def synchronous():  # 同步
    for i in range(10):
        task(i)


def asynchronous():  # 異步
    g_one = [spawn(task, i) for i in range(10)]
    joinall(g_one)
    print('完成')


if __name__ == '__main__':
    print('同步:')
    synchronous()
    print('異步:')
    asynchronous()
           

結果:

同步:

任務0完成

任務1完成

任務2完成

任務3完成

任務4完成

任務5完成

任務6完成

任務7完成

任務8完成

任務9完成

異步:

任務0完成

任務1完成

任務2完成

任務3完成

任務4完成

任務5完成

任務6完成

任務7完成

任務8完成

任務9完成

完成

Process finished with exit code 0

分析:上面程式的重要部分是将task函數封裝到Greenlet内部線程的gevent.spawn。初始化的greenlet清單存放在數組threads中,此數組被傳給gevent.joinall函數,後者阻塞目前流程,并執行所有給定的greenlet任務。執行流程隻會在 所有greenlet執行完後才會繼續向下走。

執行個體:利用gevent爬蟲

from gevent import monkey

monkey.patch_all()
import gevent
import requests
import time


def get_page(url):
    print('GET: %s' % url)
    response = requests.get(url)
    if response.status_code == 200:
        print('%s:%d bytes' % (url, len(response.text)))


start_time = time.time()
gevent.joinall([
    gevent.spawn(get_page, 'https://www.python.org/'),
    gevent.spawn(get_page, 'https://www.yahoo.com/'),
    gevent.spawn(get_page, 'https://github.com/'),
])
stop_time = time.time()
print('run time is %s' % (stop_time - start_time))
           

結果:

GET: https://www.python.org/

GET: https://www.yahoo.com/

GET: https://github.com/

https://github.com/:61353 bytes

https://www.yahoo.com/:483185 bytes

https://www.python.org/:48823 bytes

run time is 2.1101250648498535

Process finished with exit code 0

執行個體:通過gevent實作單線程下socket開發

服務端:

from gevent import monkey

monkey.patch_all()
import socket
import gevent


# 如果不想用money.patch_all()打更新檔,可以用gevent自帶的socket
# from gevent import socket
# s = socket.socket()


def server(server_ip, port):
    ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    ss.bind((server_ip, port))
    ss.listen(5)

    while 1:
        conn, add = ss.accept()
        gevent.spawn(talk, conn, add)


def talk(conn, add):
    try:
        while 1:
            res = conn.recv(1024)
            print("CLIENT %s: %s message:%s" % (add[0], add[1], res))
            conn.send(res.upper())
    except Exception as e:
        print(e)
    finally:
        conn.close()


if __name__ == '__main__':
    server('127.0.0.1', 8080)
           

服務端:

from socket import *

sc = socket(AF_INET, SOCK_STREAM)
sc.connect(('127.0.0.1', 8080))

while 1:
    msg = input(">>>").strip()
    if not msg:
        continue
    sc.send(msg.encode('utf-8'))
    msg = sc.recv(1024)
    print(msg.decode('utf-8'))
           

多線程并發用戶端:

from threading import Thread
from socket import *
import threading


def client(server_ip, port):
    sc = socket(AF_INET, SOCK_STREAM)
    sc.connect((server_ip, port))

    count = 0
    while 1:
        sc.send(('%s say hello %s' % (threading.current_thread().getName(), count)).encode('utf-8'))
        msg = sc.recv(1024)
        print(msg.decode('utf-8'))
        count += 1


if __name__ == '__main__':
    for i in range(5):
        t = Thread(target=client, args=('127.0.0.1', 8080))
        t.start()
           

結果:(略)