前言
真是無言以對啊這篇文章原本是12号釋出,本準今晚在記錄些什麼,結果mac打開csdn點選新寫文章,出來上次寫的這篇博文内容,反複幾次都是,以為是上次緩存,随手清除,結果是上次博文内容被清空了。說不清是什麼問題,那就憑記憶重制吧。
資源
示範環境是在ubuntu下(原始12号的文章是在mac上示範,為了重寫改在ubuntu重制,但是卻發生不一樣的情況。)
所用python版本為 3.4.2
本文中示例代碼均在于個人github中,位址:
https://github.com/wushirenfei/tornado_async_demo
基礎積累
tornado程式設計
能夠使用tornado架構進行簡單web程式設計,對tornado異步程式設計有基礎了解。能夠照貓畫虎用gen.coroutine異步程式設計。
socket程式設計
使用python的socket包進行socket通信程式設計,此前博文有詳細介紹:
python socket and select
selectors
python I/O複用包,是python3.4以後引入基于select實作。官網介紹:
https://docs.python.org/3/library/selectors.html
一定要看啊,後面的介紹基于其中基礎方法,特别是最後一個示例。
#event兩種類型,讀事件/寫事件
EVENT_READ
EVENT_WRITE
#標明目前解釋預設最優select
DefaultSelector
#對檔案對象fileobj的事件進行注冊,把fileobj放到select監控表中監聽event事件,data為模糊指向實為回調函數
register(fileobj, events, data=None)
#對檔案對象fileobj的進行解注冊,不再監聽該對象的該event事件
register(fileobj, events, data=None)
#傳回一個(key, event)元組的list,該元組是fileobj已經ready對象。文檔示例最後給出了周遊回調的方式
select()
tornado異步淺析
伺服器編寫
利用tornado編寫一個異步響應的伺服器作為示例中通路對象。異步非阻塞的示範并非從server,而是通過編寫請求慢慢演化。server.py代碼實作如下:
from tornado import httpserver, ioloop, options, web
from tornado import gen
from tornado.options import define, options
define(name='port', default=, help='run on given port', type=int)
from tornado.gen import Task
class IndexHandler(web.RequestHandler):
@gen.coroutine
def get(self):
yield gen.sleep()
self.write('python.org'*)
if __name__ == '__main__':
options.parse_command_line()
app = web.Application(handlers=[(r'/index', IndexHandler)])
http_server = httpserver.HTTPServer(app)
http_server.listen(options.port)
ioloop.IOLoop.instance().start()
由實作可以看出,該伺服器人為設定了一個異步延時1s,也就時說一個請求至少需要1s時間才會傳回結果。
同步阻塞請求 demo1
首先利用socket編寫http的GET請求。
import socket
import time
tic = lambda x: '\nat %1.1f second' % (time.time()-x)
def get_request(path):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', ))
sock.send('GET {} HTTP/1.0\r\n\r\n'.format(path).encode('utf-8'))
buffer = []
while True:
chunk = sock.recv()
if chunk:
buffer.append(chunk)
else:
break
sock.close()
print(b''.join(buffer).decode('utf-8'))
s = time.time()
get_request('/index')
get_request('/index')
print(tic(s))
用socket編寫的GET請求,通路伺服器兩次。運作結果:

這裡兩個GET是同步阻塞的GET請求,類似于python中requests包提供的get方法發送get請求。上面編寫中有個坑,socket連接配接後發送的是’GET {} HTTP/1.0\r\n\r\n’,如果改成了 HTTP/1.1會有什麼後果?二者的差別呢?留給看官自己嘗試。注意哦,此處兩個請求共用時2s多,這是通常了解的情況,通俗了解拍着對,一個一個等待完成,順序運作。
異步非阻塞改寫 demo2
如何實作異步非阻塞這兩個需求呢?
非阻塞
是通過socket設定,把socket通信設定為非阻塞, sock.setblocking(False),運作如下:
捕獲繼續:
看可以看出能夠正常發送GET并接收伺服器的傳回資訊。但是仍然是同步請求,一個接一個排隊執行,耗時2s。
此次在ubuntu python3.4.2上運作結果和上次在mac上不一樣,此處如果捕獲異常後,并沒有等待連接配接。send請求時直接成功,而上次在示範執行中回出現socket尚未連接配接的錯誤。之前運作環境是mac python3.5.2,設定的http請求是1.1版本。如果出Socket is not connected異常說明socket連接配接尚未建立,就往server去send資料,是以會報此種錯誤。
異步實作
異步則表示在I/O阻塞時刻,就結束該請求函數繼續執行,當I/O響應傳回時,轉而繼續完成之前挂起請求。實作上利用selectors完成:
1, 首先標明使用的select,可以直接使用預設的。
2, 對此處的讀,寫進行複用。即将send和recv全部作為檔案對象注冊到select中,之後帶到讀,寫事件完成觸發callback回調函數進行執行後續内容。代碼改寫運作如下:
完成了異步非阻塞的願望。
1, 此處僅僅對讀操作進行了異步I/O複用,即達到提高效率的實效。實作上監聽sock讀事件,如果沒有地下的while則請求直接結束,所有回調函數都沒有執行。while就是用來不斷執行callback,即readable函數,擷取資料chunk存入buffer。
2, 實際應該對發送操作也進行複用,因為隻有在sock已經連接配接上伺服器後,才send請求。是以send應該是在connected之後的回調函數。此處的實作在github庫的demo2.py中。
3, while循環完成不斷讀監聽事件調用,需要在完成請求後結束讀取,即設計全局變量request_numb用以控制循環,在請求結束後結束程式。
4, 這種編寫形式是通過callback實作,在readable還嵌入readable再次調用(有點遞歸的意思,但不是遞歸,隻是判定擷取目前資料後,還要繼續監聽read資料從server發送過來)。
封裝異步非阻塞 demo3
tornado中調用gen.coroutine用來将finish隻為False,那麼隻有人為去raise才能結束,這隻是其中一個作用。
tornado異步非阻塞實作機制上用到了一個核心的文法:生成器——generator。在之前文章python之yield(一) 中已經詳細介紹了yield用法。coroutine的另一個作用就是啟動生成器,調用next不斷疊代,在此實作一個簡單版的coroutine裝飾器:
def coroutine(func):
def wrapper(*args, **kwargs):
def run(generator):
try:
feature = next(generator)
feature.callbacks.append(lambda: run(generator))
except StopIteration:
pass
generator = func(*args, **kwargs)
run(generator)
return wrapper
tornado中yield傳回的對象使用另一個類Feature對象,用以封裝callback,再實作一個簡單版Feature:
class Feature(object):
def __init__(self):
self.callbacks = []
def resolve(self):
for callback in self.callbacks:
callback()
這樣整個異步的GET請求從編寫的邏輯上就類似與同步的代碼,但是實際執行是異步非阻塞:
@coroutine
def get_request(path):
global request_numb
request_numb +=
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(False)
try:
sock.connect(('localhost', ))
except BlockingIOError:
pass
f = Feature()
# callback = lambda: connected(sock, path)
# f.callbacks.append(callback)
slct.register(sock.fileno(), selectors.EVENT_WRITE, f)
yield f
# def connected(sock, path):
slct.unregister(sock.fileno())
sock.send('GET {} HTTP/1.0\r\n\r\n'.format(path).encode('utf-8'))
buffer = []
f = Feature()
# callback = lambda: readable(sock, buffer)
# f.callbacks.append(callback)
slct.register(sock.fileno(), selectors.EVENT_READ, f)
yield f
# def readable(sock, buffer):
# global request_numb
while True:
slct.unregister(sock.fileno())
chunk = sock.recv()
if chunk:
buffer.append(chunk)
f = Feature()
# callback = lambda: readable(sock, buffer)
# f.callbacks.append(callback)
slct.register(sock.fileno(), selectors.EVENT_READ, f)
yield f
else:
sock.close()
request_numb -=
print(b''.join(buffer).decode('utf-8').split()[])
break
s = time.time()
get_request('/index')
get_request('/index')
while request_numb:
events = slct.select()
for key, mask in events:
f = key.data
f.resolve()
IOLoop封裝 demo4
在啟動tornado伺服器時并沒有類似我們demo中的while去select,那麼我們可以對我們的while循環再次封裝,代碼如下:
class IOLoop(object):
request_numb =
@staticmethod
def instance():
return IOLoop()
def start(self):
while IOLoop.request_numb:
events = slct.select()
for key, mask in events:
f = key.data
f.resolve()奧
#調用形式
get_request('/index')
get_request('/index')
IOLoop.instance().start()
print(tic(s))
如此一來整個異步非阻塞是不是就和tornado的程式設計風格比較類似了?而tornado的異步實作大概思想亦是如此。如果能夠帶着這個概況性的了解再去回頭看tornado官網中給出源碼,會容易了解很多。
入如果大家在mac上用csdn寫博文,千萬别重蹈在下覆轍,又重新補了被自己誤删除博文。但是換了示範環境,又有另一番收獲。