引言
之前我们学习了线程、进程的概念,了解了在操作系统中进程是资源分配的最小单位,线程是CPU调度的最小单位。按道理来说我们已经算是把cpu的利用率提高很多了。但是我们知道无论是创建多进程还是创建多线程来解决问题,都要消耗一定的时间来创建进程、创建线程、以及管理他们之间的切换。
随着我们对于效率的追求不断提高,基于单线程来实现并发又成为一个新的课题,即只用一个主线程(很明显可利用的cpu只有一个)情况下实现并发。这样就可以节省创建线进程所消耗的时间。
为此我们需要先回顾下并发的本质:切换+保存状态
cpu正在运行一个任务,会在两种情况下切走去执行其他的任务(切换由操作系统强制控制),一种情况是该任务发生了阻塞,另外一种情况是该任务计算的时间过长。

进程调度
在介绍进程理论时,提及进程的三种执行状态,而线程才是执行单位,所以也可以将上图理解为线程的三种状态。
调度程序选择另一进程并不能提升效率,只是为了让cpu能够雨露均沾,实现看起来所有任务都被“同时”执行的效果,如果多个任务都是纯计算的,这种切换反而会降低效率。
为此我们可以基于yield来验证。yield本身就是一种在单线程下可以保存任务运行状态的方法,我们来简单复习一下。
实例1:
import time
def consumer(res):
pass
def producer():
res = []
for i in range(100000000): # 一亿
res.append(i)
return res
start_time = time.time()
res = producer()
consumer(res)
end_time = time.time()
print("Time:", end_time - start_time)
结果:
Time: 11.075389862060547
Process finished with exit code 0
实例2:
import time
def consumer():
while 1:
x = yield
def producer():
g = consumer()
next(g)
for i in range(100000000): # 一亿
g.send(i)
start_time = time.time()
producer()
end_time = time.time()
print("Time:", end_time - start_time)
结果:
Time: 9.08711290359497
Process finished with exit code 0
协程介绍
指的是单线程下的并发,又称微线程,协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。
注意:
- python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其他线程运行)。
- 单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非io操作的切换与效率无关)。
对比操作系统控制线程的切换,用户在单线程内控制协程的切换。
优点:
- 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级。
- 单线程内就可以实现并发的效果,最大限度地利用cpu。
协程特点:
- 必须在只有一个单线程里实现并发。
- 修改共享数据不需加锁。
- 用户程序里自己保存多个控制流的上下文栈。
- 附加:一个协程遇到IO操作自动切换到其它协程(如何实现检测IO,yield、greenlet都无法实现,就用到了gevent模块(select机制))。
greenlet模块
这个模块不是内置模块,需要安额外安装。
实例1:greenlet模块的第一次切换
from greenlet import greenlet
def eat(name):
print("%s正在吃牛排" % name)
g_two.switch('张三')
print("%s正在吃狗粮" % name)
g_two.switch()
def play(name):
print("%s正在打篮球" % name)
g_one.switch()
print("%s正在踢足球" % name)
if __name__ == '__main__':
g_one = greenlet(eat)
g_two = greenlet(play)
g_one.switch("李四")
结果:
李四正在吃牛排
张三正在打篮球
李四正在吃狗粮
张三正在踢足球
Process finished with exit code 0
上述完全模拟了单纯的切换,但是......在没有遇到IO操作或者开辟重复空间的情况下,反而会降低程序的执行效率。
实例2:串行程序
import time
def func_one():
s = 0
for i in range(100000000):
s += 1
def func_two():
s = 1
for i in range(100000000):
s += 1
if __name__ == '__main__':
start_time = time.time()
func_one()
func_two()
print("串行程序,耗时%s秒" % (time.time() - start_time))
结果:
串行程序,耗时10.838690042495728秒:
Process finished with exit code 0
实例2:切换执行
from greenlet import greenlet
import time
def func_one():
res = 0
for i in range(100000000):
res += 1
g_two.switch()
def func_two():
res = 1
for i in range(100000000):
res += 1
g_one.switch()
if __name__ == '__main__':
start_time = time.time()
g_one = greenlet(func_one)
g_two = greenlet(func_two)
g_one.switch()
print("切换执行,耗时%s秒" % (time.time() - start_time))
结果:
切换执行,耗时54.60098838806152秒
Process finished with exit code 0
greenlet只是提供了一种比generator更加便捷的切换方式,当切到一个任务执行时如果遇到IO,那就原地阻塞,仍然是没有解决遇到IO自动切换来提升效率的问题。
若是想要在计算和阻塞之间交替运行的时候提高效率,我们需要用到gevent模块。
gevent模块
这个模块不是内置模块,需要安额外安装。
gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是greenlet,它是以C扩展模块形式接入Python的轻量级协程。 greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。
用法介绍
g_one = gevnet.spawn(func_one, args[...]):创建一个协程对象g_one,spawn括号内第一个参数是函数名,如eat,后面可以有多个参数,可以是位置实参或关键字实参,都是传给函数eat的。
g_two = gevent.spawn(func_two):实例化另外一个对象。
g_one.join():等待g_one结束。
g_two.join():等待g_two结束。
以上两步也可以合起来:gevent.joinall([g_one, g_two])
g_one.value:拿到func_one的返回值。
实例:
import gevent
def eat(name):
print("%s在吃牛排" % name)
gevent.sleep(2)
print("%s在吃狗粮" % name)
def play(name):
print("%s在打篮球" % name)
gevent.sleep(1)
print("%s在踢足球" % name)
if __name__ == '__main__':
g_one = gevent.spawn(eat, '张三')
g_two = gevent.spawn(play, '李四')
g_one.join()
g_two.join()
print("主线程执行完毕")
结果:
张三在吃牛排
李四在打篮球
李四在踢足球
张三在吃狗粮
主线程执行完毕
Process finished with exit code 0
上例gevent.sleep(2)模拟的是gevent可以识别的io阻塞,而time.sleep(2)或其他的阻塞,gevent是不能直接识别的需要用下面一行代码,打补丁,就可以识别了:
from gevent import monkey
monkey.patch_all()
该语句必须放到被打补丁者的前面,如time,socket模块之前。或者将它放到文件的开头。如:
from gevent import monkey
monkey.patch_all()
import gevent
import time
def eat(name):
print("%s在吃牛排" % name)
time.sleep(2)
print("%s在吃狗粮" % name)
def play(name):
print("%s在打篮球" % name)
time.sleep(1)
print("%s在踢足球" % name)
if __name__ == '__main__':
g_one = gevent.spawn(eat, '张三')
g_two = gevent.spawn(play, '李四')
gevent.joinall([g_one, g_two])
print("主线程执行完毕")
结果:
张三在吃牛排
李四在打篮球
李四在踢足球
张三在吃狗粮
主线程执行完毕
Process finished with exit code 0
当我注释补丁代码后将会出现以下结果:
张三在吃牛排
张三在吃狗粮
李四在打篮球
李四在踢足球
主线程执行完毕
Process finished with exit code 0
程序等待时间和执行顺序都不一样。
我们可以用threading.current_thread().getName()来查看每个g_one和g_two,查看的结果为DummyThread-n,即虚线程。如:
from gevent import monkey
monkey.patch_all()
import threading
import gevent
import time
def eat(name):
print(threading.current_thread().getName())
print("%s在吃牛排" % name)
time.sleep(2)
print("%s在吃狗粮" % name)
def play(name):
print(threading.current_thread().getName())
print("%s在打篮球" % name)
time.sleep(1)
print("%s在踢足球" % name)
if __name__ == '__main__':
g_one = gevent.spawn(eat, '张三')
g_two = gevent.spawn(play, '李四')
gevent.joinall([g_one, g_two])
print("主线程执行完毕")
结果:
DummyThread-1
张三在吃牛排
DummyThread-2
李四在打篮球
李四在踢足球
张三在吃狗粮
主线程执行完毕
Process finished with exit code 0
gevent之同步与异步
实例:
from gevent import spawn, joinall, monkey
monkey.patch_all()
import time
def task(pid):
time.sleep(1)
print("任务%s完成" % pid)
def synchronous(): # 同步
for i in range(10):
task(i)
def asynchronous(): # 异步
g_one = [spawn(task, i) for i in range(10)]
joinall(g_one)
print('完成')
if __name__ == '__main__':
print('同步:')
synchronous()
print('异步:')
asynchronous()
结果:
同步:
任务0完成
任务1完成
任务2完成
任务3完成
任务4完成
任务5完成
任务6完成
任务7完成
任务8完成
任务9完成
异步:
任务0完成
任务1完成
任务2完成
任务3完成
任务4完成
任务5完成
任务6完成
任务7完成
任务8完成
任务9完成
完成
Process finished with exit code 0
分析:上面程序的重要部分是将task函数封装到Greenlet内部线程的gevent.spawn。初始化的greenlet列表存放在数组threads中,此数组被传给gevent.joinall函数,后者阻塞当前流程,并执行所有给定的greenlet任务。执行流程只会在 所有greenlet执行完后才会继续向下走。
实例:利用gevent爬虫
from gevent import monkey
monkey.patch_all()
import gevent
import requests
import time
def get_page(url):
print('GET: %s' % url)
response = requests.get(url)
if response.status_code == 200:
print('%s:%d bytes' % (url, len(response.text)))
start_time = time.time()
gevent.joinall([
gevent.spawn(get_page, 'https://www.python.org/'),
gevent.spawn(get_page, 'https://www.yahoo.com/'),
gevent.spawn(get_page, 'https://github.com/'),
])
stop_time = time.time()
print('run time is %s' % (stop_time - start_time))
结果:
GET: https://www.python.org/
GET: https://www.yahoo.com/
GET: https://github.com/
https://github.com/:61353 bytes
https://www.yahoo.com/:483185 bytes
https://www.python.org/:48823 bytes
run time is 2.1101250648498535
Process finished with exit code 0
实例:通过gevent实现单线程下socket开发
服务端:
from gevent import monkey
monkey.patch_all()
import socket
import gevent
# 如果不想用money.patch_all()打补丁,可以用gevent自带的socket
# from gevent import socket
# s = socket.socket()
def server(server_ip, port):
ss = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ss.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
ss.bind((server_ip, port))
ss.listen(5)
while 1:
conn, add = ss.accept()
gevent.spawn(talk, conn, add)
def talk(conn, add):
try:
while 1:
res = conn.recv(1024)
print("CLIENT %s: %s message:%s" % (add[0], add[1], res))
conn.send(res.upper())
except Exception as e:
print(e)
finally:
conn.close()
if __name__ == '__main__':
server('127.0.0.1', 8080)
服务端:
from socket import *
sc = socket(AF_INET, SOCK_STREAM)
sc.connect(('127.0.0.1', 8080))
while 1:
msg = input(">>>").strip()
if not msg:
continue
sc.send(msg.encode('utf-8'))
msg = sc.recv(1024)
print(msg.decode('utf-8'))
多线程并发客户端:
from threading import Thread
from socket import *
import threading
def client(server_ip, port):
sc = socket(AF_INET, SOCK_STREAM)
sc.connect((server_ip, port))
count = 0
while 1:
sc.send(('%s say hello %s' % (threading.current_thread().getName(), count)).encode('utf-8'))
msg = sc.recv(1024)
print(msg.decode('utf-8'))
count += 1
if __name__ == '__main__':
for i in range(5):
t = Thread(target=client, args=('127.0.0.1', 8080))
t.start()
结果:(略)