天天看點

tornado異步機制淺析前言資源基礎積累tornado異步淺析同步阻塞請求 demo1異步非阻塞改寫 demo2封裝異步非阻塞 demo3IOLoop封裝 demo4

前言

真是無言以對啊這篇文章原本是12号釋出,本準今晚在記錄些什麼,結果mac打開csdn點選新寫文章,出來上次寫的這篇博文内容,反複幾次都是,以為是上次緩存,随手清除,結果是上次博文内容被清空了。說不清是什麼問題,那就憑記憶重制吧。

資源

示範環境是在ubuntu下(原始12号的文章是在mac上示範,為了重寫改在ubuntu重制,但是卻發生不一樣的情況。)

所用python版本為 3.4.2

本文中示例代碼均在于個人github中,位址:

https://github.com/wushirenfei/tornado_async_demo

基礎積累

tornado程式設計

能夠使用tornado架構進行簡單web程式設計,對tornado異步程式設計有基礎了解。能夠照貓畫虎用gen.coroutine異步程式設計。

socket程式設計

使用python的socket包進行socket通信程式設計,此前博文有詳細介紹:

python socket and select

selectors

python I/O複用包,是python3.4以後引入基于select實作。官網介紹:

https://docs.python.org/3/library/selectors.html

一定要看啊,後面的介紹基于其中基礎方法,特别是最後一個示例。

#event兩種類型,讀事件/寫事件
EVENT_READ
EVENT_WRITE

#標明目前解釋預設最優select
DefaultSelector

#對檔案對象fileobj的事件進行注冊,把fileobj放到select監控表中監聽event事件,data為模糊指向實為回調函數
register(fileobj, events, data=None)

#對檔案對象fileobj的進行解注冊,不再監聽該對象的該event事件
register(fileobj, events, data=None)

#傳回一個(key, event)元組的list,該元組是fileobj已經ready對象。文檔示例最後給出了周遊回調的方式
select()
           

tornado異步淺析

伺服器編寫

利用tornado編寫一個異步響應的伺服器作為示例中通路對象。異步非阻塞的示範并非從server,而是通過編寫請求慢慢演化。server.py代碼實作如下:

from tornado import httpserver, ioloop, options, web
from tornado import gen
from tornado.options import define, options

define(name='port', default=, help='run on given port', type=int)
from tornado.gen import Task

class IndexHandler(web.RequestHandler):
    @gen.coroutine
    def get(self):
        yield gen.sleep()
        self.write('python.org'*)

if __name__ == '__main__':
    options.parse_command_line()
    app = web.Application(handlers=[(r'/index', IndexHandler)])
    http_server = httpserver.HTTPServer(app)
    http_server.listen(options.port)
    ioloop.IOLoop.instance().start()
           

由實作可以看出,該伺服器人為設定了一個異步延時1s,也就時說一個請求至少需要1s時間才會傳回結果。

同步阻塞請求 demo1

首先利用socket編寫http的GET請求。

import socket
import time

tic = lambda x: '\nat %1.1f second' % (time.time()-x)

def get_request(path):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect(('localhost', ))
    sock.send('GET {} HTTP/1.0\r\n\r\n'.format(path).encode('utf-8'))
    buffer = []
    while True:
        chunk = sock.recv()
        if chunk:
            buffer.append(chunk)
        else:
            break
    sock.close()
    print(b''.join(buffer).decode('utf-8'))

s = time.time()
get_request('/index')
get_request('/index')
print(tic(s))
           

用socket編寫的GET請求,通路伺服器兩次。運作結果:

tornado異步機制淺析前言資源基礎積累tornado異步淺析同步阻塞請求 demo1異步非阻塞改寫 demo2封裝異步非阻塞 demo3IOLoop封裝 demo4

這裡兩個GET是同步阻塞的GET請求,類似于python中requests包提供的get方法發送get請求。上面編寫中有個坑,socket連接配接後發送的是’GET {} HTTP/1.0\r\n\r\n’,如果改成了 HTTP/1.1會有什麼後果?二者的差別呢?留給看官自己嘗試。注意哦,此處兩個請求共用時2s多,這是通常了解的情況,通俗了解拍着對,一個一個等待完成,順序運作。

異步非阻塞改寫 demo2

如何實作異步非阻塞這兩個需求呢?

非阻塞

是通過socket設定,把socket通信設定為非阻塞, sock.setblocking(False),運作如下:

tornado異步機制淺析前言資源基礎積累tornado異步淺析同步阻塞請求 demo1異步非阻塞改寫 demo2封裝異步非阻塞 demo3IOLoop封裝 demo4

捕獲繼續:

tornado異步機制淺析前言資源基礎積累tornado異步淺析同步阻塞請求 demo1異步非阻塞改寫 demo2封裝異步非阻塞 demo3IOLoop封裝 demo4

看可以看出能夠正常發送GET并接收伺服器的傳回資訊。但是仍然是同步請求,一個接一個排隊執行,耗時2s。

此次在ubuntu python3.4.2上運作結果和上次在mac上不一樣,此處如果捕獲異常後,并沒有等待連接配接。send請求時直接成功,而上次在示範執行中回出現socket尚未連接配接的錯誤。之前運作環境是mac python3.5.2,設定的http請求是1.1版本。如果出Socket is not connected異常說明socket連接配接尚未建立,就往server去send資料,是以會報此種錯誤。

異步實作

異步則表示在I/O阻塞時刻,就結束該請求函數繼續執行,當I/O響應傳回時,轉而繼續完成之前挂起請求。實作上利用selectors完成:

1, 首先標明使用的select,可以直接使用預設的。

2, 對此處的讀,寫進行複用。即将send和recv全部作為檔案對象注冊到select中,之後帶到讀,寫事件完成觸發callback回調函數進行執行後續内容。代碼改寫運作如下:

tornado異步機制淺析前言資源基礎積累tornado異步淺析同步阻塞請求 demo1異步非阻塞改寫 demo2封裝異步非阻塞 demo3IOLoop封裝 demo4

完成了異步非阻塞的願望。

1, 此處僅僅對讀操作進行了異步I/O複用,即達到提高效率的實效。實作上監聽sock讀事件,如果沒有地下的while則請求直接結束,所有回調函數都沒有執行。while就是用來不斷執行callback,即readable函數,擷取資料chunk存入buffer。

2, 實際應該對發送操作也進行複用,因為隻有在sock已經連接配接上伺服器後,才send請求。是以send應該是在connected之後的回調函數。此處的實作在github庫的demo2.py中。

3, while循環完成不斷讀監聽事件調用,需要在完成請求後結束讀取,即設計全局變量request_numb用以控制循環,在請求結束後結束程式。

4, 這種編寫形式是通過callback實作,在readable還嵌入readable再次調用(有點遞歸的意思,但不是遞歸,隻是判定擷取目前資料後,還要繼續監聽read資料從server發送過來)。

封裝異步非阻塞 demo3

tornado中調用gen.coroutine用來将finish隻為False,那麼隻有人為去raise才能結束,這隻是其中一個作用。

tornado異步非阻塞實作機制上用到了一個核心的文法:生成器——generator。在之前文章python之yield(一) 中已經詳細介紹了yield用法。coroutine的另一個作用就是啟動生成器,調用next不斷疊代,在此實作一個簡單版的coroutine裝飾器:

def coroutine(func):
    def wrapper(*args, **kwargs):
        def run(generator):
            try:
                feature = next(generator)
                feature.callbacks.append(lambda: run(generator))
            except StopIteration:
                pass
        generator = func(*args, **kwargs)
        run(generator)
    return wrapper
           

tornado中yield傳回的對象使用另一個類Feature對象,用以封裝callback,再實作一個簡單版Feature:

class Feature(object):
    def __init__(self):
        self.callbacks = []
    def resolve(self):
        for callback in self.callbacks:
            callback()
           

這樣整個異步的GET請求從編寫的邏輯上就類似與同步的代碼,但是實際執行是異步非阻塞:

@coroutine
def get_request(path):
    global request_numb
    request_numb += 
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setblocking(False)

    try:
        sock.connect(('localhost', ))
    except BlockingIOError:
        pass

    f = Feature()
    # callback = lambda: connected(sock, path)
    # f.callbacks.append(callback)
    slct.register(sock.fileno(), selectors.EVENT_WRITE, f)
    yield f

# def connected(sock, path):
    slct.unregister(sock.fileno())
    sock.send('GET {} HTTP/1.0\r\n\r\n'.format(path).encode('utf-8'))

    buffer = []
    f = Feature()
    # callback = lambda: readable(sock, buffer)
    # f.callbacks.append(callback)
    slct.register(sock.fileno(), selectors.EVENT_READ, f)
    yield f

# def readable(sock, buffer):
#     global request_numb
    while True:
        slct.unregister(sock.fileno())
        chunk = sock.recv()
        if chunk:
            buffer.append(chunk)
            f = Feature()
            # callback = lambda: readable(sock, buffer)
            # f.callbacks.append(callback)
            slct.register(sock.fileno(), selectors.EVENT_READ, f)
            yield f
        else:
            sock.close()
            request_numb -= 
            print(b''.join(buffer).decode('utf-8').split()[])
            break

s = time.time()
get_request('/index')
get_request('/index')
while request_numb:
    events = slct.select()
    for key, mask in events:
        f = key.data
        f.resolve()
           

IOLoop封裝 demo4

在啟動tornado伺服器時并沒有類似我們demo中的while去select,那麼我們可以對我們的while循環再次封裝,代碼如下:

class IOLoop(object):
    request_numb = 
    @staticmethod
    def instance():
        return IOLoop()
    def start(self):
        while IOLoop.request_numb:
            events = slct.select()
            for key, mask in events:
                f = key.data
                f.resolve()奧
#調用形式
get_request('/index')
get_request('/index')
IOLoop.instance().start()
print(tic(s))
           

如此一來整個異步非阻塞是不是就和tornado的程式設計風格比較類似了?而tornado的異步實作大概思想亦是如此。如果能夠帶着這個概況性的了解再去回頭看tornado官網中給出源碼,會容易了解很多。

入如果大家在mac上用csdn寫博文,千萬别重蹈在下覆轍,又重新補了被自己誤删除博文。但是換了示範環境,又有另一番收獲。