上篇《白話tornado源碼之一個腳本引發的血案》用上帝視角多整個架構做了一個概述,同時也看清了web架構的的本質,下面我們從tornado程式的起始來分析其源碼。
概述

上圖是tornado程式啟動以及接收到用戶端請求後的整個過程,對于整個過程可以分為兩大部分:
- 啟動程式階段,又稱為待請求階段(上圖1、2所有系列和3.0)
- 接收并處理用戶端請求階段(上圖3系列)
簡而言之:
1、在啟動程式階段,第一步,擷取配置檔案然後生成url映射(即:一個url對應一個XXRequestHandler,進而讓XXRequestHandler來處理指定url發送的請求);第二步,建立伺服器socket對象并添加到epoll中;第三步,建立無線循環去監聽epoll。
2、在接收并處理請求階段,第一步,接收用戶端socket發送的請求(socket.accept);第二步,從請求中擷取請求頭資訊,再然後根據請求頭中的請求url去比對某個XXRequestHandler;第三步,比對成功的XXRequestHandler處理請求;第四步,将處理後的請求發送給用戶端;第五步,關閉用戶端socket。
本篇的内容主要剖析【啟動程式階段】,下面我們就來一步一步的剖析整個過程,在此階段主要是有下面重點标注的三個方法來實作。
import tornado.ioloop
import tornado.web
class MainHandler(tornado.web.RequestHandler):
def get(self):
self.write("Hello, world")
application = tornado.web.Application([
(r"/index", MainHandler),
])
if __name__ == "__main__":
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()
一、application = tornado.web.Application([(xxx,xxx)])
執行Application類的構造函數,并傳入一個清單類型的參數,這個清單裡儲存的是url規則和對應的處理類,即:當用戶端的請求url可以配置這個規則時,那麼該請求就交由對應的Handler去執行。
注意:Handler泛指繼承自RequestHandler的所有類
Handlers泛指繼承自RequestHandler的所有類的集合
class Application(object):
def __init__(self, handlers=None, default_host="", transforms=None,wsgi=False, **settings):
#設定響應的編碼和傳回方式,對應的http相應頭:Content-Encoding和Transfer-Encoding
#Content-Encoding:gzip 表示對資料進行壓縮,然後再傳回給使用者,進而減少流量的傳輸。
#Transfer-Encoding:chunck 表示資料的傳送方式通過一塊一塊的傳輸。
if transforms is None:
self.transforms = []
if settings.get("gzip"):
self.transforms.append(GZipContentEncoding)
self.transforms.append(ChunkedTransferEncoding)
else:
self.transforms = transforms
#将參數指派為類的變量
self.handlers = []
self.named_handlers = {}
self.default_host = default_host
self.settings = settings
#ui_modules和ui_methods用于在模版語言中擴充自定義輸出
#這裡将tornado内置的ui_modules和ui_methods添加到類的成員變量self.ui_modules和self.ui_methods中
self.ui_modules = {'linkify': _linkify,
'xsrf_form_html': _xsrf_form_html,
'Template': TemplateModule,
}
self.ui_methods = {}
self._wsgi = wsgi
#擷取擷取使用者自定義的ui_modules和ui_methods,并将他們添加到之前建立的成員變量self.ui_modules和self.ui_methods中
self._load_ui_modules(settings.get("ui_modules", {}))
self._load_ui_methods(settings.get("ui_methods", {}))
#設定靜态檔案路徑,設定方式則是通過正規表達式比對url,讓StaticFileHandler來處理比對的url
if self.settings.get("static_path"):
#從settings中讀取key為static_path的值,用于設定靜态檔案路徑
path = self.settings["static_path"]
#擷取參數中傳入的handlers,如果空則設定為空清單
handlers = list(handlers or [])
#靜态檔案字首,預設是/static/
static_url_prefix = settings.get("static_url_prefix","/static/")
#在參數中傳入的handlers前再添加三個映射:
#【/static/.*】 --> StaticFileHandler
#【/(favicon\.ico)】 --> StaticFileHandler
#【/(robots\.txt)】 --> StaticFileHandler
handlers = [
(re.escape(static_url_prefix) + r"(.*)", StaticFileHandler,dict(path=path)),
(r"/(favicon\.ico)", StaticFileHandler, dict(path=path)),
(r"/(robots\.txt)", StaticFileHandler, dict(path=path)),
] + handlers
#執行本類的Application的add_handlers方法
#此時,handlers是一個清單,其中的每個元素都是一個對應關系,即:url正規表達式和處理比對該正則的url的Handler
if handlers: self.add_handlers(".*$", handlers)
# Automatically reload modified modules
#如果settings中設定了 debug 模式,那麼就使用自動加載重新開機
if self.settings.get("debug") and not wsgi:
import autoreload
autoreload.start()
Application.__init__
class Application(object):
def add_handlers(self, host_pattern, host_handlers):
#如果主機模型最後沒有結尾符,那麼就為他添加一個結尾符。
if not host_pattern.endswith("$"):
host_pattern += "$"
handlers = []
#對主機名先做一層路由映射,例如:http://www.wupeiqi.com 和 http://safe.wupeiqi.com
#即:safe對應一組url映射,www對應一組url映射,那麼當請求到來時,先根據它做第一層比對,之後再繼續進入内部比對。
#對于第一層url映射來說,由于.*會比對所有的url,所将 .* 的永遠放在handlers清單的最後,不然 .* 就會截和了...
#re.complie是編譯正規表達式,以後請求來的時候隻需要執行編譯結果的match方法就可以去比對了
if self.handlers and self.handlers[-1][0].pattern == '.*$':
self.handlers.insert(-1, (re.compile(host_pattern), handlers))
else:
self.handlers.append((re.compile(host_pattern), handlers))
#周遊我們設定的和構造函數中添加的【url->Handler】映射,将url和對應的Handler封裝到URLSpec類中(構造函數中會對url進行編譯)
#并将所有的URLSpec對象添加到handlers清單中,而handlers清單和主機名模型組成一個元祖,添加到self.Handlers清單中。
for spec in host_handlers:
if type(spec) is type(()):
assert len(spec) in (2, 3)
pattern = spec[0]
handler = spec[1]
if len(spec) == 3:
kwargs = spec[2]
else:
kwargs = {}
spec = URLSpec(pattern, handler, kwargs)
handlers.append(spec)
if spec.name:
#未使用該功能,預設spec.name = None
if spec.name in self.named_handlers:
logging.warning("Multiple handlers named %s; replacing previous value",spec.name)
self.named_handlers[spec.name] = spec
Application.add_handlers
class URLSpec(object):
def __init__(self, pattern, handler_class, kwargs={}, name=None):
if not pattern.endswith('$'):
pattern += '$'
self.regex = re.compile(pattern)
self.handler_class = handler_class
self.kwargs = kwargs
self.name = name
self._path, self._group_count = self._find_groups()
URLSpec
上述代碼主要完成了以下功能:加載配置資訊和生成url映射,并且把所有的資訊封裝在一個application對象中。
加載的配置資訊包括:
- 編碼和傳回方式資訊
- 靜态檔案路徑
- ui_modules(模版語言中使用,暫時忽略)
- ui_methods(模版語言中使用,暫時忽略)
- 是否debug模式運作
以上的所有配置資訊,都可以在settings中配置,然後在建立Application對象時候,傳入參數即可。如:application = tornado.web.Application([(r"/index", MainHandler),],**settings)
生成url映射:
- 将url和對應的Handler添加到對應的主機字首中,如:safe.index.com、www.auto.com
封裝資料:
将配置資訊和url映射關系封裝到Application對象中,資訊分别儲存在Application對象的以下字段中:
- self.transforms,儲存着編碼和傳回方式資訊
- self.settings,儲存着配置資訊
- self.ui_modules,儲存着ui_modules資訊
- self.ui_methods,儲存這ui_methods資訊
- self.handlers,儲存着所有的主機名對應的Handlers,每個handlers則是url正則對應的Handler
二、application.listen(xxx)
第一步操作将配置和url映射等資訊封裝到了application對象中,而這第二步執行application對象的listen方法,該方法内部又把之前包含各種資訊的application對象封裝到了一個HttpServer對象中,然後繼續調用HttpServer對象的liseten方法。
class Application(object):
#建立服務端socket,并綁定IP和端口并添加相應設定,注:未開始通過while監聽accept,等待用戶端連接配接
def listen(self, port, address="", **kwargs):
from tornado.httpserver import HTTPServer
server = HTTPServer(self, **kwargs)
server.listen(port, address)
詳細代碼:
class HTTPServer(object):
def __init__(self, request_callback, no_keep_alive=False, io_loop=None,xheaders=False, ssl_options=None):
#Application對象
self.request_callback = request_callback
#是否長連接配接
self.no_keep_alive = no_keep_alive
#IO循環
self.io_loop = io_loop
self.xheaders = xheaders
#Http和Http
self.ssl_options = ssl_options
self._socket = None
self._started = False
def listen(self, port, address=""):
self.bind(port, address)
self.start(1)
def bind(self, port, address=None, family=socket.AF_UNSPEC):
assert not self._socket
#建立服務端socket對象,IPV4和TCP連接配接
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
flags = fcntl.fcntl(self._socket.fileno(), fcntl.F_GETFD)
flags |= fcntl.FD_CLOEXEC
fcntl.fcntl(self._socket.fileno(), fcntl.F_SETFD, flags)
#配置socket對象
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._socket.setblocking(0)
#綁定IP和端口
self._socket.bind((address, port))
#最大阻塞數量
self._socket.listen(128)
def start(self, num_processes=1):
assert not self._started
self._started = True
if num_processes is None or num_processes <= 0:
num_processes = _cpu_count()
if num_processes > 1 and ioloop.IOLoop.initialized():
logging.error("Cannot run in multiple processes: IOLoop instance "
"has already been initialized. You cannot call "
"IOLoop.instance() before calling start()")
num_processes = 1
#如果程序數大于1
if num_processes > 1:
logging.info("Pre-forking %d server processes", num_processes)
for i in range(num_processes):
if os.fork() == 0:
import random
from binascii import hexlify
try:
# If available, use the same method as
# random.py
seed = long(hexlify(os.urandom(16)), 16)
except NotImplementedError:
# Include the pid to avoid initializing two
# processes to the same value
seed(int(time.time() * 1000) ^ os.getpid())
random.seed(seed)
self.io_loop = ioloop.IOLoop.instance()
self.io_loop.add_handler(
self._socket.fileno(), self._handle_events,
ioloop.IOLoop.READ)
return
os.waitpid(-1, 0)
#程序數等于1,預設
else:
if not self.io_loop:
#設定成員變量self.io_loop為IOLoop的執行個體,注:IOLoop使用methodclass完成了一個單例模式
self.io_loop = ioloop.IOLoop.instance()
#執行IOLoop的add_handler方法,将socket句柄、self._handle_events方法和IOLoop.READ當參數傳入
self.io_loop.add_handler(self._socket.fileno(),
self._handle_events,
ioloop.IOLoop.READ)
def _handle_events(self, fd, events):
while True:
try:
#====important=====#
connection, address = self._socket.accept()
except socket.error, e:
if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
return
raise
if self.ssl_options is not None:
assert ssl, "Python 2.6+ and OpenSSL required for SSL"
try:
#====important=====#
connection = ssl.wrap_socket(connection,server_side=True,do_handshake_on_connect=False,**self.ssl_options)
except ssl.SSLError, err:
if err.args[0] == ssl.SSL_ERROR_EOF:
return connection.close()
else:
raise
except socket.error, err:
if err.args[0] == errno.ECONNABORTED:
return connection.close()
else:
raise
try:
if self.ssl_options is not None:
stream = iostream.SSLIOStream(connection, io_loop=self.io_loop)
else:
stream = iostream.IOStream(connection, io_loop=self.io_loop)
#====important=====#
HTTPConnection(stream, address, self.request_callback,self.no_keep_alive, self.xheaders)
except:
logging.error("Error in connection callback", exc_info=True)
HTTPServer
class IOLoop(object):
# Constants from the epoll module
_EPOLLIN = 0x001
_EPOLLPRI = 0x002
_EPOLLOUT = 0x004
_EPOLLERR = 0x008
_EPOLLHUP = 0x010
_EPOLLRDHUP = 0x2000
_EPOLLONESHOT = (1 << 30)
_EPOLLET = (1 << 31)
# Our events map exactly to the epoll events
NONE = 0
READ = _EPOLLIN
WRITE = _EPOLLOUT
ERROR = _EPOLLERR | _EPOLLHUP | _EPOLLRDHUP
def __init__(self, impl=None):
self._impl = impl or _poll()
if hasattr(self._impl, 'fileno'):
self._set_close_exec(self._impl.fileno())
self._handlers = {}
self._events = {}
self._callbacks = []
self._timeouts = []
self._running = False
self._stopped = False
self._blocking_signal_threshold = None
# Create a pipe that we send bogus data to when we want to wake
# the I/O loop when it is idle
if os.name != 'nt':
r, w = os.pipe()
self._set_nonblocking(r)
self._set_nonblocking(w)
self._set_close_exec(r)
self._set_close_exec(w)
self._waker_reader = os.fdopen(r, "rb", 0)
self._waker_writer = os.fdopen(w, "wb", 0)
else:
self._waker_reader = self._waker_writer = win32_support.Pipe()
r = self._waker_writer.reader_fd
self.add_handler(r, self._read_waker, self.READ)
@classmethod
def instance(cls):
if not hasattr(cls, "_instance"):
cls._instance = cls()
return cls._instance
def add_handler(self, fd, handler, events):
"""Registers the given handler to receive the given events for fd."""
self._handlers[fd] = stack_context.wrap(handler)
self._impl.register(fd, events | self.ERROR)
IOLoop
def wrap(fn):
'''Returns a callable object that will resore the current StackContext
when executed.
Use this whenever saving a callback to be executed later in a
different execution context (either in a different thread or
asynchronously in the same thread).
'''
if fn is None:
return None
# functools.wraps doesn't appear to work on functools.partial objects
#@functools.wraps(fn)
def wrapped(callback, contexts, *args, **kwargs):
# If we're moving down the stack, _state.contexts is a prefix
# of contexts. For each element of contexts not in that prefix,
# create a new StackContext object.
# If we're moving up the stack (or to an entirely different stack),
# _state.contexts will have elements not in contexts. Use
# NullContext to clear the state and then recreate from contexts.
if (len(_state.contexts) > len(contexts) or
any(a[1] is not b[1]
for a, b in itertools.izip(_state.contexts, contexts))):
# contexts have been removed or changed, so start over
new_contexts = ([NullContext()] +
[cls(arg) for (cls,arg) in contexts])
else:
new_contexts = [cls(arg)
for (cls, arg) in contexts[len(_state.contexts):]]
if len(new_contexts) > 1:
with contextlib.nested(*new_contexts):
callback(*args, **kwargs)
elif new_contexts:
with new_contexts[0]:
callback(*args, **kwargs)
else:
callback(*args, **kwargs)
if getattr(fn, 'stack_context_wrapped', False):
return fn
contexts = _state.contexts
result = functools.partial(wrapped, fn, contexts)
result.stack_context_wrapped = True
return result
stack_context.wrap
備注:stack_context.wrap其實就是對函數進行一下封裝,即:函數在不同情況下上下文資訊可能不同。
上述代碼本質上就幹了以下這麼四件事:
- 把包含了各種配置資訊的application對象封裝到了HttpServer對象的request_callback字段中
- 建立了服務端socket對象
- 單例模式建立IOLoop對象,然後将socket對象句柄作為key,被封裝了的函數_handle_events作為value,添加到IOLoop對象的_handlers字段中
- 向epoll中注冊監聽服務端socket對象的讀可用事件
目前,我們隻是看到上述代碼大緻幹了這四件事,而其目的有什麼?他們之間的聯系又是什麼呢?
答:現在不妨先來做一個猜想,待之後再在源碼中确認驗證是否正确!猜想:通過epoll監聽服務端socket事件,一旦請求到達時,則執行3中被封裝了的_handle_events函數,該函數又利用application中封裝了的各種配置資訊對用戶端url來指定判定,然後指定對應的Handler處理該請求。
注意:使用epoll建立服務端socket
import socket, select
EOL1 = b'/n/n'
EOL2 = b'/n/r/n'
response = b'HTTP/1.0 200 OK/r/nDate: Mon, 1 Jan 1996 01:01:01 GMT/r/n'
response += b'Content-Type: text/plain/r/nContent-Length: 13/r/n/r/n'
response += b'Hello, world!'
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serversocket.bind(('0.0.0.0', 8080))
serversocket.listen(1)
serversocket.setblocking(0)
epoll = select.epoll()
epoll.register(serversocket.fileno(), select.EPOLLIN)
try:
connections = {}; requests = {}; responses = {}
while True:
events = epoll.poll(1)
for fileno, event in events:
if fileno == serversocket.fileno():
connection, address = serversocket.accept()
connection.setblocking(0)
epoll.register(connection.fileno(), select.EPOLLIN)
connections[connection.fileno()] = connection
requests[connection.fileno()] = b''
responses[connection.fileno()] = response
elif event & select.EPOLLIN:
requests[fileno] += connections[fileno].recv(1024)
if EOL1 in requests[fileno] or EOL2 in requests[fileno]:
epoll.modify(fileno, select.EPOLLOUT)
print('-'*40 + '/n' + requests[fileno].decode()[:-2])
elif event & select.EPOLLOUT:
byteswritten = connections[fileno].send(responses[fileno])
responses[fileno] = responses[fileno][byteswritten:]
if len(responses[fileno]) == 0:
epoll.modify(fileno, 0)
connections[fileno].shutdown(socket.SHUT_RDWR)
elif event & select.EPOLLHUP:
epoll.unregister(fileno)
connections[fileno].close()
del connections[fileno]
finally:
epoll.unregister(serversocket.fileno())
epoll.close()
serversocket.close()
Code
上述,其實就是利用epoll對象的poll(timeout)方法去輪詢已經注冊在epoll中的socket句柄,當有讀可用的資訊時候,則傳回包含目前句柄和Event Code的序列,然後在通過句柄對用戶端的請求進行處理
三、tornado.ioloop.IOLoop.instance().start()
上一步中建立了socket對象并使得socket對象和epoll建立了關系,該步驟則就來執行epoll的epoll方法去輪詢已經注冊在epoll對象中的socket句柄,當有讀可用資訊時,則觸發一些操作什麼的....
class IOLoop(object):
def add_handler(self, fd, handler, events):
#HttpServer的Start方法中會調用該方法
self._handlers[fd] = stack_context.wrap(handler)
self._impl.register(fd, events | self.ERROR)
def start(self):
while True:
poll_timeout = 0.2
try:
#epoll中輪詢
event_pairs = self._impl.poll(poll_timeout)
except Exception, e:
#省略其他
#如果有讀可用資訊,則把該socket對象句柄和Event Code序列添加到self._events中
self._events.update(event_pairs)
#周遊self._events,處理每個請求
while self._events:
fd, events = self._events.popitem()
try:
#以socket為句柄為key,取出self._handlers中的stack_context.wrap(handler),并執行
#stack_context.wrap(handler)包裝了HTTPServer類的_handle_events函數的一個函數
#是在上一步中執行add_handler方法時候,添加到self._handlers中的資料。
self._handlers[fd](fd, events)
except:
#省略其他
class IOLoop(object):
def start(self):
"""Starts the I/O loop.
The loop will run until one of the I/O handlers calls stop(), which
will make the loop stop after the current event iteration completes.
"""
if self._stopped:
self._stopped = False
return
self._running = True
while True:
# Never use an infinite timeout here - it can stall epoll
poll_timeout = 0.2
# Prevent IO event starvation by delaying new callbacks
# to the next iteration of the event loop.
callbacks = self._callbacks
self._callbacks = []
for callback in callbacks:
self._run_callback(callback)
if self._callbacks:
poll_timeout = 0.0
if self._timeouts:
now = time.time()
while self._timeouts and self._timeouts[0].deadline <= now:
timeout = self._timeouts.pop(0)
self._run_callback(timeout.callback)
if self._timeouts:
milliseconds = self._timeouts[0].deadline - now
poll_timeout = min(milliseconds, poll_timeout)
if not self._running:
break
if self._blocking_signal_threshold is not None:
# clear alarm so it doesn't fire while poll is waiting for
# events.
signal.setitimer(signal.ITIMER_REAL, 0, 0)
try:
event_pairs = self._impl.poll(poll_timeout)
except Exception, e:
# Depending on python version and IOLoop implementation,
# different exception types may be thrown and there are
# two ways EINTR might be signaled:
# * e.errno == errno.EINTR
# * e.args is like (errno.EINTR, 'Interrupted system call')
if (getattr(e, 'errno', None) == errno.EINTR or
(isinstance(getattr(e, 'args', None), tuple) and
len(e.args) == 2 and e.args[0] == errno.EINTR)):
continue
else:
raise
if self._blocking_signal_threshold is not None:
signal.setitimer(signal.ITIMER_REAL,
self._blocking_signal_threshold, 0)
# Pop one fd at a time from the set of pending fds and run
# its handler. Since that handler may perform actions on
# other file descriptors, there may be reentrant calls to
# this IOLoop that update self._events
self._events.update(event_pairs)
while self._events:
fd, events = self._events.popitem()
try:
self._handlers[fd](fd, events)
except (KeyboardInterrupt, SystemExit):
raise
except (OSError, IOError), e:
if e.args[0] == errno.EPIPE:
# Happens when the client closes the connection
pass
else:
logging.error("Exception in I/O handler for fd %d",
fd, exc_info=True)
except:
logging.error("Exception in I/O handler for fd %d",
fd, exc_info=True)
# reset the stopped flag so another start/stop pair can be issued
self._stopped = False
if self._blocking_signal_threshold is not None:
signal.setitimer(signal.ITIMER_REAL, 0, 0)
View Code
對于上述代碼,執行start方法後,程式就進入“死循環”,也就是會一直不停的輪詢的去檢查是否有請求到來,如果有請求到達,則執行封裝了HttpServer類的_handle_events方法和相關上下文的stack_context.wrap(handler)(其實就是執行HttpServer類的_handle_events方法),詳細見下篇博文,簡要代碼如下:
class HTTPServer(object):
def _handle_events(self, fd, events):
while True:
try:
connection, address = self._socket.accept()
except socket.error, e:
if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
return
raise
if self.ssl_options is not None:
assert ssl, "Python 2.6+ and OpenSSL required for SSL"
try:
connection = ssl.wrap_socket(connection,
server_side=True,
do_handshake_on_connect=False,
**self.ssl_options)
except ssl.SSLError, err:
if err.args[0] == ssl.SSL_ERROR_EOF:
return connection.close()
else:
raise
except socket.error, err:
if err.args[0] == errno.ECONNABORTED:
return connection.close()
else:
raise
try:
if self.ssl_options is not None:
stream = iostream.SSLIOStream(connection, io_loop=self.io_loop)
else:
stream = iostream.IOStream(connection, io_loop=self.io_loop)
HTTPConnection(stream, address, self.request_callback,
self.no_keep_alive, self.xheaders)
except:
logging.error("Error in connection callback", exc_info=True)
結束
本篇博文介紹了“待請求階段”的所作所為,簡要來說其實就是三件事:其一、把setting中的各種配置以及url和Handler之間的映射關系封裝到來application對象中(application對象又被封裝到了HttpServer對象的request_callback字段中);其二、結合epoll建立服務端socket;其三、當請求到達時交由HttpServer類的_handle_events方法處理請求,即:處理請求的入口。對于處理請求的詳細,請參見下篇部落格(客官莫急,加班編寫中...)
作者:武沛齊
出處:http://www.cnblogs.com/wupeiqi/
本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接配接。