引言
之前我們學習了線程、程序的概念,了解了在作業系統中程序是資源配置設定的最小機關,線程是CPU排程的最小機關。按道理來說我們已經算是把cpu的使用率提高很多了。但是我們知道無論是建立多程序還是建立多線程來解決問題,都要消耗一定的時間來建立程序、建立線程、以及管理他們之間的切換。
随着我們對于效率的追求不斷提高,基于單線程來實作并發又成為一個新的課題,即隻用一個主線程(很明顯可利用的cpu隻有一個)情況下實作并發。這樣就可以節省建立線程序所消耗的時間。
為此我們需要先回顧下并發的本質:切換+儲存狀态
cpu正在運作一個任務,會在兩種情況下切走去執行其他的任務(切換由作業系統強制控制),一種情況是該任務發生了阻塞,另外一種情況是該任務計算的時間過長。

程序排程
在介紹程序理論時,提及程序的三種執行狀态,而線程才是執行機關,是以也可以将上圖了解為線程的三種狀态。
排程程式選擇另一程序并不能提升效率,隻是為了讓cpu能夠雨露均沾,實作看起來所有任務都被“同時”執行的效果,如果多個任務都是純計算的,這種切換反而會降低效率。
為此我們可以基于yield來驗證。yield本身就是一種在單線程下可以儲存任務運作狀态的方法,我們來簡單複習一下。
執行個體1:
import time
def consumer(res):
pass
def producer():
res = []
for i in range(100000000): # 一億
res.append(i)
return res
start_time = time.time()
res = producer()
consumer(res)
end_time = time.time()
print("Time:", end_time - start_time)
結果:
Time: 11.075389862060547
Process finished with exit code 0
執行個體2:
import time
def consumer():
while 1:
x = yield
def producer():
g = consumer()
next(g)
for i in range(100000000): # 一億
g.send(i)
start_time = time.time()
producer()
end_time = time.time()
print("Time:", end_time - start_time)
結果:
Time: 9.08711290359497
Process finished with exit code 0
協程介紹
指的是單線程下的并發,又稱微線程,協程是一種使用者态的輕量級線程,即協程是由使用者程式自己控制排程的。
注意:
- python的線程屬于核心級别的,即由作業系統控制排程(如單線程遇到io或執行時間過長就會被迫交出cpu執行權限,切換其他線程運作)。
- 單線程内開啟協程,一旦遇到io,就會從應用程式級别(而非作業系統)控制切換,以此來提升效率(!!!非io操作的切換與效率無關)。
對比作業系統控制線程的切換,使用者在單線程内控制協程的切換。
優點:
- 協程的切換開銷更小,屬于程式級别的切換,作業系統完全感覺不到,因而更加輕量級。
- 單線程内就可以實作并發的效果,最大限度地利用cpu。
協程特點:
- 必須在隻有一個單線程裡實作并發。
- 修改共享資料不需加鎖。
- 使用者程式裡自己儲存多個控制流的上下文棧。
- 附加:一個協程遇到IO操作自動切換到其它協程(如何實作檢測IO,yield、greenlet都無法實作,就用到了gevent子產品(select機制))。
greenlet子產品
這個子產品不是内置子產品,需要安額外安裝。
執行個體1:greenlet子產品的第一次切換
from greenlet import greenlet
def eat(name):
print("%s正在吃牛排" % name)
g_two.switch('張三')
print("%s正在吃狗糧" % name)
g_two.switch()
def play(name):
print("%s正在打籃球" % name)
g_one.switch()
print("%s正在踢足球" % name)
if __name__ == '__main__':
g_one = greenlet(eat)
g_two = greenlet(play)
g_one.switch("李四")
結果:
李四正在吃牛排
張三正在打籃球
李四正在吃狗糧
張三正在踢足球
Process finished with exit code 0
上述完全模拟了單純的切換,但是......在沒有遇到IO操作或者開辟重複空間的情況下,反而會降低程式的執行效率。
執行個體2:串行程式
import time
def func_one():
s = 0
for i in range(100000000):
s += 1
def func_two():
s = 1
for i in range(100000000):
s += 1
if __name__ == '__main__':
start_time = time.time()
func_one()
func_two()
print("串行程式,耗時%s秒" % (time.time() - start_time))
結果:
串行程式,耗時10.838690042495728秒:
Process finished with exit code 0
執行個體2:切換執行
from greenlet import greenlet
import time
def func_one():
res = 0
for i in range(100000000):
res += 1
g_two.switch()
def func_two():
res = 1
for i in range(100000000):
res += 1
g_one.switch()
if __name__ == '__main__':
start_time = time.time()
g_one = greenlet(func_one)
g_two = greenlet(func_two)
g_one.switch()
print("切換執行,耗時%s秒" % (time.time() - start_time))
結果:
切換執行,耗時54.60098838806152秒
Process finished with exit code 0
greenlet隻是提供了一種比generator更加便捷的切換方式,當切到一個任務執行時如果遇到IO,那就原地阻塞,仍然是沒有解決遇到IO自動切換來提升效率的問題。
若是想要在計算和阻塞之間交替運作的時候提高效率,我們需要用到gevent子產品。
gevent子產品
這個子產品不是内置子產品,需要安額外安裝。
gevent 是一個第三方庫,可以輕松通過gevent實作并發同步或異步程式設計,在gevent中用到的主要模式是greenlet,它是以C擴充子產品形式接入Python的輕量級協程。 greenlet全部運作在主程式作業系統程序的内部,但它們被協作式地排程。
用法介紹
g_one = gevnet.spawn(func_one, args[...]):建立一個協程對象g_one,spawn括号内第一個參數是函數名,如eat,後面可以有多個參數,可以是位置實參或關鍵字實參,都是傳給函數eat的。
g_two = gevent.spawn(func_two):執行個體化另外一個對象。
g_one.join():等待g_one結束。
g_two.join():等待g_two結束。
以上兩步也可以合起來:gevent.joinall([g_one, g_two])
g_one.value:拿到func_one的傳回值。
執行個體:
import gevent
def eat(name):
print("%s在吃牛排" % name)
gevent.sleep(2)
print("%s在吃狗糧" % name)
def play(name):
print("%s在打籃球" % name)
gevent.sleep(1)
print("%s在踢足球" % name)
if __name__ == '__main__':
g_one = gevent.spawn(eat, '張三')
g_two = gevent.spawn(play, '李四')
g_one.join()
g_two.join()
print("主線程執行完畢")
結果:
張三在吃牛排
李四在打籃球
李四在踢足球
張三在吃狗糧
主線程執行完畢
Process finished with exit code 0
上例gevent.sleep(2)模拟的是gevent可以識别的io阻塞,而time.sleep(2)或其他的阻塞,gevent是不能直接識别的需要用下面一行代碼,打更新檔,就可以識别了:
from gevent import monkey
monkey.patch_all()
該語句必須放到被打更新檔者的前面,如time,socket子產品之前。或者将它放到檔案的開頭。如:
from gevent import monkey
monkey.patch_all()
import gevent
import time
def eat(name):
print("%s在吃牛排" % name)
time.sleep(2)
print("%s在吃狗糧" % name)
def play(name):
print("%s在打籃球" % name)
time.sleep(1)
print("%s在踢足球" % name)
if __name__ == '__main__':
g_one = gevent.spawn(eat, '張三')
g_two = gevent.spawn(play, '李四')
gevent.joinall([g_one, g_two])
print("主線程執行完畢")
結果:
張三在吃牛排
李四在打籃球
李四在踢足球
張三在吃狗糧
主線程執行完畢
Process finished with exit code 0
當我注釋更新檔代碼後将會出現以下結果:
張三在吃牛排
張三在吃狗糧
李四在打籃球
李四在踢足球
主線程執行完畢
Process finished with exit code 0
程式等待時間和執行順序都不一樣。
我們可以用threading.current_thread().getName()來檢視每個g_one和g_two,檢視的結果為DummyThread-n,即虛線程。如:
from gevent import monkey
monkey.patch_all()
import threading
import gevent
import time
def eat(name):
print(threading.current_thread().getName())
print("%s在吃牛排" % name)
time.sleep(2)
print("%s在吃狗糧" % name)
def play(name):
print(threading.current_thread().getName())
print("%s在打籃球" % name)
time.sleep(1)
print("%s在踢足球" % name)
if __name__ == '__main__':
g_one = gevent.spawn(eat, '張三')
g_two = gevent.spawn(play, '李四')
gevent.joinall([g_one, g_two])
print("主線程執行完畢")
結果:
DummyThread-1
張三在吃牛排
DummyThread-2
李四在打籃球
李四在踢足球
張三在吃狗糧
主線程執行完畢
Process finished with exit code 0
gevent之同步與異步
執行個體:
from gevent import spawn, joinall, monkey
monkey.patch_all()
import time
def task(pid):
time.sleep(1)
print("任務%s完成" % pid)
def synchronous(): # 同步
for i in range(10):
task(i)
def asynchronous(): # 異步
g_one = [spawn(task, i) for i in range(10)]
joinall(g_one)
print('完成')
if __name__ == '__main__':
print('同步:')
synchronous()
print('異步:')
asynchronous()
結果:
同步:
任務0完成
任務1完成
任務2完成
任務3完成
任務4完成
任務5完成
任務6完成
任務7完成
任務8完成
任務9完成
異步:
任務0完成
任務1完成
任務2完成
任務3完成
任務4完成
任務5完成
任務6完成
任務7完成
任務8完成
任務9完成
完成
Process finished with exit code 0
分析:上面程式的重要部分是将task函數封裝到Greenlet内部線程的gevent.spawn。初始化的greenlet清單存放在數組threads中,此數組被傳給gevent.joinall函數,後者阻塞目前流程,并執行所有給定的greenlet任務。執行流程隻會在 所有greenlet執行完後才會繼續向下走。
執行個體:利用gevent爬蟲
from gevent import monkey
monkey.patch_all()
import gevent
import requests
import time
def get_page(url):
print('GET: %s' % url)
response = requests.get(url)
if response.status_code == 200:
print('%s:%d bytes' % (url, len(response.text)))
start_time = time.time()
gevent.joinall([
gevent.spawn(get_page, 'https://www.python.org/'),
gevent.spawn(get_page, 'https://www.yahoo.com/'),
gevent.spawn(get_page, 'https://github.com/'),
])
stop_time = time.time()
print('run time is %s' % (stop_time - start_time))
結果:
GET: https://www.python.org/
GET: https://www.yahoo.com/
GET: https://github.com/
https://github.com/:61353 bytes
https://www.yahoo.com/:483185 bytes
https://www.python.org/:48823 bytes
run time is 2.1101250648498535
Process finished with exit code 0
執行個體:通過gevent實作單線程下socket開發
服務端:
from gevent import monkey
monkey.patch_all()
import socket
import gevent
# 如果不想用money.patch_all()打更新檔,可以用gevent自帶的socket
# from gevent import socket
# s = socket.socket()
def server(server_ip, port):
ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
ss.bind((server_ip, port))
ss.listen(5)
while 1:
conn, add = ss.accept()
gevent.spawn(talk, conn, add)
def talk(conn, add):
try:
while 1:
res = conn.recv(1024)
print("CLIENT %s: %s message:%s" % (add[0], add[1], res))
conn.send(res.upper())
except Exception as e:
print(e)
finally:
conn.close()
if __name__ == '__main__':
server('127.0.0.1', 8080)
服務端:
from socket import *
sc = socket(AF_INET, SOCK_STREAM)
sc.connect(('127.0.0.1', 8080))
while 1:
msg = input(">>>").strip()
if not msg:
continue
sc.send(msg.encode('utf-8'))
msg = sc.recv(1024)
print(msg.decode('utf-8'))
多線程并發用戶端:
from threading import Thread
from socket import *
import threading
def client(server_ip, port):
sc = socket(AF_INET, SOCK_STREAM)
sc.connect((server_ip, port))
count = 0
while 1:
sc.send(('%s say hello %s' % (threading.current_thread().getName(), count)).encode('utf-8'))
msg = sc.recv(1024)
print(msg.decode('utf-8'))
count += 1
if __name__ == '__main__':
for i in range(5):
t = Thread(target=client, args=('127.0.0.1', 8080))
t.start()
結果:(略)