天天看點

tornado源碼分析(二)之iostream

與ioloop直接互動時通用的過程

需要讀資料時:将socket添加至ioloop中,并設定回掉函數,在回掉函數中從socket中讀取資料,并且檢查是否接受到了足夠的資料,如果沒有接收完則需要儲存目前的資料,直到讀去完為止。

需要寫資料時:将socket添加至ioloop中,并設定回掉函數,在回掉函數中向socket寫資料,如果資料比較多,則需要分多次去寫。

iostream的作用

讓各元件,不需要與ioloop直接互動,iostream幫助各元件将socket添加至ioloop,并設定回掉函數,讀取并儲存資料,直到所有資料都接收完為止,或向socket寫入資料,直到寫完所有資料,完成後調用其它元件設定的回掉函數。

iostream同樣可以看作一個事件循環,它提供兩類事件:讀完成、寫完成。

iostream的事件循環

iostream是基于ioloop的,建立iostream時必需給定一個檔案描述符,iostream将該檔案描述符添加至ioloop的io事件中,并設定回調函數_handle_events,該函數中實作了iostream的事件循環。

_handle_events中判斷檔案描述符的事件:

當該檔案描述符可讀時,iostream從檔案描述符中讀出資料,并存至自己的資料緩沖中。每次讀取到新的資料後,iostream都檢查是否觸發了自己管理的事件,如:是否讀到了某一特定的資料(由read_until、read_until_regex等接口注冊),是否讀到了足夠多的資料(由read_bytes注冊)等,如果觸發了事件,則調用對應事件的回調函數(異步事件為future,設定future的result即可調用異步事件的回調函數)。

當檔案描述符可寫時,首先iostream從寫緩沖中,讀取足量的資料,寫入至檔案描述符中。其次能進入目前邏輯,說明上一次寫入檔案描述符的資料已經發送出去了,此時需要逐個檢查注冊的寫事件是否已經完成(各寫事件中存儲了自己關注的寫緩沖區的位置,通過檢查該位置判斷該事件的資料是否已經發送),如果完成則調用事件的回調函數。

當檔案描述符異常(EPOLLERR、EPOLLHUP)時,将檔案描述符從ioloop中删除,并設定自己為關閉狀态,上層再向自己讀寫資料時,觸發異常。

iostream的接口

讀資料的接口:read_until,read_bytes,read_unitl_regex等,iostream在同一時間内隻能存在一個讀事件。

寫資料的接口:write,可以同時存在多個(隻要iostream的寫緩沖區足夠)。每次添加時建立一個future,并且記錄該事件的要寫的資料在寫緩沖區中的位置(加入資料後iostream的_total_write_index),每次有資料發送後根據該位置資訊判斷是否結束(iostream的_total_write_done_index是否超過了事件的資料的位置)。 

 源碼參考:tornado.iostream.IOStream

iostream用法舉例

server端

from tornado.tcpserver import TCPServer
from tornado import gen


class MyTCPConnection(object):
    def __init__(self, stream, address):
        self.stream = stream
        self.address = address

    @gen.coroutine
    def start_serving(self, server):
        while True:
            #循環從stream中讀取消息,每個消息以that is all!結尾
            future = self.stream.read_until("that is all!".encode())
            message = yield future
            message = message.decode()

            if message == "that is all!": #如果僅發送that is all!,則說明用戶端消息已發送完畢
                print("no more messages")
                self.stream.close()
                server.on_conn_close(self)
                break
            else:
                print("data recieved: ", message)
                future = self.stream.write(message.encode()) #把接收到的消息以異步的方式寫回至client
                yield future


class MyTCPServer(TCPServer):
    def __init__(self):
        super().__init__()
        self._conns = set()

    #tcpserver已經建立好了iostream,我們隻需要使用iostream即可
    def handle_stream(self, stream, address):
        conn = MyTCPConnection(stream, address)
        self._conns.add(conn)
        return conn.start_serving(self)

    def on_conn_close(self, conn):
        self._conns.remove(conn)


if __name__ == "__main__":
    server = MyTCPServer()
    server.listen(8000, '0.0.0.0')
    from tornado.ioloop import IOLoop
    IOLoop().current().start()
           

client端:

import time
import socket


HOST = '132.232.191.131'
PORT = 8000
BUFSIZE = 1024


s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
s.send("hi dude! that is all!".encode())
data = s.recv(BUFSIZE).decode()
print(data)

s.send("whats up! that is all!".encode())
data = s.recv(BUFSIZE).decode()
print(data)

s.send("that is all!".encode())
time.sleep(2)

s.close()
           

繼續閱讀