前言
真是无言以对啊这篇文章原本是12号发布,本准今晚在记录些什么,结果mac打开csdn点击新写文章,出来上次写的这篇博文内容,反复几次都是,以为是上次缓存,随手清除,结果是上次博文内容被清空了。说不清是什么问题,那就凭记忆重现吧。
资源
演示环境是在ubuntu下(原始12号的文章是在mac上演示,为了重写改在ubuntu重现,但是却发生不一样的情况。)
所用python版本为 3.4.2
本文中示例代码均在于个人github中,地址:
https://github.com/wushirenfei/tornado_async_demo
基础积累
tornado编程
能够使用tornado框架进行简单web编程,对tornado异步编程有基础了解。能够照猫画虎用gen.coroutine异步编程。
socket编程
使用python的socket包进行socket通信编程,此前博文有详细介绍:
python socket and select
selectors
python I/O复用包,是python3.4以后引入基于select实现。官网介绍:
https://docs.python.org/3/library/selectors.html
一定要看啊,后面的介绍基于其中基础方法,特别是最后一个示例。
#event两种类型,读事件/写事件
EVENT_READ
EVENT_WRITE
#选定当前解释默认最优select
DefaultSelector
#对文件对象fileobj的事件进行注册,把fileobj放到select监控表中监听event事件,data为模糊指向实为回调函数
register(fileobj, events, data=None)
#对文件对象fileobj的进行解注册,不再监听该对象的该event事件
register(fileobj, events, data=None)
#返回一个(key, event)元组的list,该元组是fileobj已经ready对象。文档示例最后给出了遍历回调的方式
select()
tornado异步浅析
服务器编写
利用tornado编写一个异步响应的服务器作为示例中访问对象。异步非阻塞的演示并非从server,而是通过编写请求慢慢演化。server.py代码实现如下:
from tornado import httpserver, ioloop, options, web
from tornado import gen
from tornado.options import define, options
define(name='port', default=, help='run on given port', type=int)
from tornado.gen import Task
class IndexHandler(web.RequestHandler):
@gen.coroutine
def get(self):
yield gen.sleep()
self.write('python.org'*)
if __name__ == '__main__':
options.parse_command_line()
app = web.Application(handlers=[(r'/index', IndexHandler)])
http_server = httpserver.HTTPServer(app)
http_server.listen(options.port)
ioloop.IOLoop.instance().start()
由实现可以看出,该服务器人为设定了一个异步延时1s,也就时说一个请求至少需要1s时间才会返回结果。
同步阻塞请求 demo1
首先利用socket编写http的GET请求。
import socket
import time
tic = lambda x: '\nat %1.1f second' % (time.time()-x)
def get_request(path):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', ))
sock.send('GET {} HTTP/1.0\r\n\r\n'.format(path).encode('utf-8'))
buffer = []
while True:
chunk = sock.recv()
if chunk:
buffer.append(chunk)
else:
break
sock.close()
print(b''.join(buffer).decode('utf-8'))
s = time.time()
get_request('/index')
get_request('/index')
print(tic(s))
用socket编写的GET请求,访问服务器两次。运行结果:

这里两个GET是同步阻塞的GET请求,类似于python中requests包提供的get方法发送get请求。上面编写中有个坑,socket连接后发送的是’GET {} HTTP/1.0\r\n\r\n’,如果改成了 HTTP/1.1会有什么后果?二者的区别呢?留给看官自己尝试。注意哦,此处两个请求共用时2s多,这是通常理解的情况,通俗理解拍着对,一个一个等待完成,顺序运行。
异步非阻塞改写 demo2
如何实现异步非阻塞这两个需求呢?
非阻塞
是通过socket设置,把socket通信设置为非阻塞, sock.setblocking(False),运行如下:
捕获继续:
看可以看出能够正常发送GET并接收服务器的返回信息。但是仍然是同步请求,一个接一个排队执行,耗时2s。
此次在ubuntu python3.4.2上运行结果和上次在mac上不一样,此处如果捕获异常后,并没有等待连接。send请求时直接成功,而上次在演示执行中回出现socket尚未连接的错误。之前运行环境是mac python3.5.2,设置的http请求是1.1版本。如果出Socket is not connected异常说明socket连接尚未建立,就往server去send数据,因此会报此种错误。
异步实现
异步则表示在I/O阻塞时刻,就结束该请求函数继续执行,当I/O响应返回时,转而继续完成之前挂起请求。实现上利用selectors完成:
1, 首先选定使用的select,可以直接使用默认的。
2, 对此处的读,写进行复用。即将send和recv全部作为文件对象注册到select中,之后带到读,写事件完成触发callback回调函数进行执行后续内容。代码改写运行如下:
完成了异步非阻塞的愿望。
1, 此处仅仅对读操作进行了异步I/O复用,即达到提高效率的实效。实现上监听sock读事件,如果没有地下的while则请求直接结束,所有回调函数都没有执行。while就是用来不断执行callback,即readable函数,获取数据chunk存入buffer。
2, 实际应该对发送操作也进行复用,因为只有在sock已经连接上服务器后,才send请求。所以send应该是在connected之后的回调函数。此处的实现在github库的demo2.py中。
3, while循环完成不断读监听事件调用,需要在完成请求后结束读取,即设计全局变量request_numb用以控制循环,在请求结束后结束程序。
4, 这种编写形式是通过callback实现,在readable还嵌入readable再次调用(有点递归的意思,但不是递归,只是判定获取当前数据后,还要继续监听read数据从server发送过来)。
封装异步非阻塞 demo3
tornado中调用gen.coroutine用来将finish只为False,那么只有人为去raise才能结束,这只是其中一个作用。
tornado异步非阻塞实现机制上用到了一个核心的语法:生成器——generator。在之前文章python之yield(一) 中已经详细介绍了yield用法。coroutine的另一个作用就是启动生成器,调用next不断迭代,在此实现一个简单版的coroutine装饰器:
def coroutine(func):
def wrapper(*args, **kwargs):
def run(generator):
try:
feature = next(generator)
feature.callbacks.append(lambda: run(generator))
except StopIteration:
pass
generator = func(*args, **kwargs)
run(generator)
return wrapper
tornado中yield返回的对象使用另一个类Feature对象,用以封装callback,再实现一个简单版Feature:
class Feature(object):
def __init__(self):
self.callbacks = []
def resolve(self):
for callback in self.callbacks:
callback()
这样整个异步的GET请求从编写的逻辑上就类似与同步的代码,但是实际执行是异步非阻塞:
@coroutine
def get_request(path):
global request_numb
request_numb +=
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(False)
try:
sock.connect(('localhost', ))
except BlockingIOError:
pass
f = Feature()
# callback = lambda: connected(sock, path)
# f.callbacks.append(callback)
slct.register(sock.fileno(), selectors.EVENT_WRITE, f)
yield f
# def connected(sock, path):
slct.unregister(sock.fileno())
sock.send('GET {} HTTP/1.0\r\n\r\n'.format(path).encode('utf-8'))
buffer = []
f = Feature()
# callback = lambda: readable(sock, buffer)
# f.callbacks.append(callback)
slct.register(sock.fileno(), selectors.EVENT_READ, f)
yield f
# def readable(sock, buffer):
# global request_numb
while True:
slct.unregister(sock.fileno())
chunk = sock.recv()
if chunk:
buffer.append(chunk)
f = Feature()
# callback = lambda: readable(sock, buffer)
# f.callbacks.append(callback)
slct.register(sock.fileno(), selectors.EVENT_READ, f)
yield f
else:
sock.close()
request_numb -=
print(b''.join(buffer).decode('utf-8').split()[])
break
s = time.time()
get_request('/index')
get_request('/index')
while request_numb:
events = slct.select()
for key, mask in events:
f = key.data
f.resolve()
IOLoop封装 demo4
在启动tornado服务器时并没有类似我们demo中的while去select,那么我们可以对我们的while循环再次封装,代码如下:
class IOLoop(object):
request_numb =
@staticmethod
def instance():
return IOLoop()
def start(self):
while IOLoop.request_numb:
events = slct.select()
for key, mask in events:
f = key.data
f.resolve()奥
#调用形式
get_request('/index')
get_request('/index')
IOLoop.instance().start()
print(tic(s))
如此一来整个异步非阻塞是不是就和tornado的编程风格比较类似了?而tornado的异步实现大概思想亦是如此。如果能够带着这个概况性的理解再去回头看tornado官网中给出源码,会容易理解很多。
入如果大家在mac上用csdn写博文,千万别重蹈在下覆辙,又重新补了被自己误删除博文。但是换了演示环境,又有另一番收获。