雖然說用python編寫簡單的網絡程式很友善,但是複雜一點的網絡程式還是用現成的架構比較好,這樣就可以專心事物邏輯,而不是套接字的各種細節。Socketserver子產品簡化了編寫網絡服務程式,同時socketserver子產品也是python标準庫中很多伺服器架構的基礎。
socketserver子產品類介紹
SocketServer内部使用 IO多路複用 以及 “多線程” 和 “多程序” ,進而實作并發處理多個用戶端請求的Socket服務端。即:每個用戶端請求連接配接到伺服器時,Socket服務端都會在伺服器是建立一個“線程”或者“進 程” 專門負責處理目前用戶端的所有請求。
socketserver子產品可以簡化網絡伺服器的編寫,python把網絡服務抽象成兩個主要的類,一個是server類,用于處理連接配接相關的網絡操作,另一個是RequestHandler類,用于處理資料相關的操作。并且提供兩個Mixln類,用于擴充server,實作多程序或者多線程。
Server類
它包含了五種server類,分别是 Baseserver(不直接對外服務);TCPServer(使用TCP協定) ;UDPServer(使用UDP協定),UinixStreamServer 和UnixDatagramServer(後面兩個僅僅在unix環境下遊泳,一般不常用)
他們五個的關系如下:
BaseServer的源碼:
class BaseServer:
"""Base class for server classes.
Methods for the caller:
- __init__(server_address, RequestHandlerClass)
- serve_forever(poll_interval=0.5)
- shutdown()
- handle_request() # if you do not use serve_forever()
- fileno() -> int # for select()
Methods that may be overridden:
- server_bind()
- server_activate()
- get_request() -> request, client_address
- handle_timeout()
- verify_request(request, client_address)
- server_close()
- process_request(request, client_address)
- shutdown_request(request)
- close_request(request)
- service_actions()
- handle_error()
Methods for derived classes:
- finish_request(request, client_address)
Class variables that may be overridden by derived classes or
instances:
- timeout
- address_family
- socket_type
- allow_reuse_address
Instance variables:
- RequestHandlerClass
- socket
"""
timeout = None
def __init__(self, server_address, RequestHandlerClass):
"""Constructor. May be extended, do not override."""
self.server_address = server_address
self.RequestHandlerClass = RequestHandlerClass
self.__is_shut_down = threading.Event()
self.__shutdown_request = False
def server_activate(self):
"""Called by constructor to activate the server.
May be overridden.
"""
pass
def serve_forever(self, poll_interval=0.5):
"""Handle one request at a time until shutdown.
Polls for shutdown every poll_interval seconds. Ignores
self.timeout. If you need to do periodic tasks, do them in
another thread.
"""
self.__is_shut_down.clear()
try:
while not self.__shutdown_request:
# XXX: Consider using another file descriptor or
# connecting to the socket to wake this up instead of
# polling. Polling reduces our responsiveness to a
# shutdown request and wastes cpu at all other times.
r, w, e = _eintr_retry(select.select, [self], [], [],
poll_interval)
if self in r:
self._handle_request_noblock()
self.service_actions()
finally:
self.__shutdown_request = False
self.__is_shut_down.set()
def shutdown(self):
"""Stops the serve_forever loop.
Blocks until the loop has finished. This must be called while
serve_forever() is running in another thread, or it will
deadlock.
"""
self.__shutdown_request = True
self.__is_shut_down.wait()
def service_actions(self):
"""Called by the serve_forever() loop.
May be overridden by a subclass / Mixin to implement any code that
needs to be run during the loop.
"""
pass
# The distinction between handling, getting, processing and
# finishing a request is fairly arbitrary. Remember:
#
# - handle_request() is the top-level call. It calls
# select, get_request(), verify_request() and process_request()
# - get_request() is different for stream or datagram sockets
# - process_request() is the place that may fork a new process
# or create a new thread to finish the request
# - finish_request() instantiates the request handler class;
# this constructor will handle the request all by itself
def handle_request(self):
"""Handle one request, possibly blocking.
Respects self.timeout.
"""
# Support people who used socket.settimeout() to escape
# handle_request before self.timeout was available.
timeout = self.socket.gettimeout()
if timeout is None:
timeout = self.timeout
elif self.timeout is not None:
timeout = min(timeout, self.timeout)
fd_sets = _eintr_retry(select.select, [self], [], [], timeout)
if not fd_sets[0]:
self.handle_timeout()
return
self._handle_request_noblock()
def _handle_request_noblock(self):
"""Handle one request, without blocking.
I assume that select.select has returned that the socket is
readable before this function was called, so there should be
no risk of blocking in get_request().
"""
try:
request, client_address = self.get_request()
except OSError:
return
if self.verify_request(request, client_address):
try:
self.process_request(request, client_address)
except:
self.handle_error(request, client_address)
self.shutdown_request(request)
def handle_timeout(self):
"""Called if no new request arrives within self.timeout.
Overridden by ForkingMixIn.
"""
pass
def verify_request(self, request, client_address):
"""Verify the request. May be overridden.
Return True if we should proceed with this request.
"""
return True
def process_request(self, request, client_address):
"""Call finish_request.
Overridden by ForkingMixIn and ThreadingMixIn.
"""
self.finish_request(request, client_address)
self.shutdown_request(request)
def server_close(self):
"""Called to clean-up the server.
May be overridden.
"""
pass
def finish_request(self, request, client_address):
"""Finish one request by instantiating RequestHandlerClass."""
self.RequestHandlerClass(request, client_address, self)
def shutdown_request(self, request):
"""Called to shutdown and close an individual request."""
self.close_request(request)
def close_request(self, request):
"""Called to clean up an individual request."""
pass
def handle_error(self, request, client_address):
"""Handle an error gracefully. May be overridden.
The default is to print a traceback and continue.
"""
print('-'*40)
print('Exception happened during processing of request from', end=' ')
print(client_address)
import traceback
traceback.print_exc() # XXX But this goes to stderr!
print('-'*40)
View Code
RequestHandler類
所有requestHandler都繼承BaseRequestHandler基類。
請求的基類是baserequesthandler,其中一般需要重寫的方法就是handle方法,主要就是如何處理接下裡的請求,在這個類中,主要有三個方法,分别是setup ,handle ,finish方法,在調用這個類的時候,先調用setup進行一些初始化的工作,然後調用handle方法進行處理請求,然後調用finish方法,做一些關閉連接配接什麼的,在這個裡面最主要的參數就是self.request,也就是請求的socket對象,其中可以發送消息sendall或者send,接受消息就是recv
在請求處理的子類中有兩個,一個是SreamRequestHandle和DatagramRequestHandle,在這個裡面重寫了基類的setup方法和finish方法,handle方法沒有重寫,因為這個是留給使用者做處理請求的方法
requestHandler源碼:
class BaseServer:
"""Base class for server classes.
Methods for the caller:
- __init__(server_address, RequestHandlerClass)
- serve_forever(poll_interval=0.5)
- shutdown()
- handle_request() # if you do not use serve_forever()
- fileno() -> int # for select()
Methods that may be overridden:
- server_bind()
- server_activate()
- get_request() -> request, client_address
- handle_timeout()
- verify_request(request, client_address)
- server_close()
- process_request(request, client_address)
- shutdown_request(request)
- close_request(request)
- service_actions()
- handle_error()
Methods for derived classes:
- finish_request(request, client_address)
Class variables that may be overridden by derived classes or
instances:
- timeout
- address_family
- socket_type
- allow_reuse_address
Instance variables:
- RequestHandlerClass
- socket
"""
timeout = None
def __init__(self, server_address, RequestHandlerClass):
"""Constructor. May be extended, do not override."""
self.server_address = server_address
self.RequestHandlerClass = RequestHandlerClass
self.__is_shut_down = threading.Event()
self.__shutdown_request = False
def server_activate(self):
"""Called by constructor to activate the server.
May be overridden.
"""
pass
def serve_forever(self, poll_interval=0.5):
"""Handle one request at a time until shutdown.
Polls for shutdown every poll_interval seconds. Ignores
self.timeout. If you need to do periodic tasks, do them in
another thread.
"""
self.__is_shut_down.clear()
try:
while not self.__shutdown_request:
# XXX: Consider using another file descriptor or
# connecting to the socket to wake this up instead of
# polling. Polling reduces our responsiveness to a
# shutdown request and wastes cpu at all other times.
r, w, e = _eintr_retry(select.select, [self], [], [],
poll_interval)
if self in r:
self._handle_request_noblock()
self.service_actions()
finally:
self.__shutdown_request = False
self.__is_shut_down.set()
def shutdown(self):
"""Stops the serve_forever loop.
Blocks until the loop has finished. This must be called while
serve_forever() is running in another thread, or it will
deadlock.
"""
self.__shutdown_request = True
self.__is_shut_down.wait()
def service_actions(self):
"""Called by the serve_forever() loop.
May be overridden by a subclass / Mixin to implement any code that
needs to be run during the loop.
"""
pass
# The distinction between handling, getting, processing and
# finishing a request is fairly arbitrary. Remember:
#
# - handle_request() is the top-level call. It calls
# select, get_request(), verify_request() and process_request()
# - get_request() is different for stream or datagram sockets
# - process_request() is the place that may fork a new process
# or create a new thread to finish the request
# - finish_request() instantiates the request handler class;
# this constructor will handle the request all by itself
def handle_request(self):
"""Handle one request, possibly blocking.
Respects self.timeout.
"""
# Support people who used socket.settimeout() to escape
# handle_request before self.timeout was available.
timeout = self.socket.gettimeout()
if timeout is None:
timeout = self.timeout
elif self.timeout is not None:
timeout = min(timeout, self.timeout)
fd_sets = _eintr_retry(select.select, [self], [], [], timeout)
if not fd_sets[0]:
self.handle_timeout()
return
self._handle_request_noblock()
def _handle_request_noblock(self):
"""Handle one request, without blocking.
I assume that select.select has returned that the socket is
readable before this function was called, so there should be
no risk of blocking in get_request().
"""
try:
request, client_address = self.get_request()
except OSError:
return
if self.verify_request(request, client_address):
try:
self.process_request(request, client_address)
except:
self.handle_error(request, client_address)
self.shutdown_request(request)
def handle_timeout(self):
"""Called if no new request arrives within self.timeout.
Overridden by ForkingMixIn.
"""
pass
def verify_request(self, request, client_address):
"""Verify the request. May be overridden.
Return True if we should proceed with this request.
"""
return True
def process_request(self, request, client_address):
"""Call finish_request.
Overridden by ForkingMixIn and ThreadingMixIn.
"""
self.finish_request(request, client_address)
self.shutdown_request(request)
def server_close(self):
"""Called to clean-up the server.
May be overridden.
"""
pass
def finish_request(self, request, client_address):
"""Finish one request by instantiating RequestHandlerClass."""
self.RequestHandlerClass(request, client_address, self)
def shutdown_request(self, request):
"""Called to shutdown and close an individual request."""
self.close_request(request)
def close_request(self, request):
"""Called to clean up an individual request."""
pass
def handle_error(self, request, client_address):
"""Handle an error gracefully. May be overridden.
The default is to print a traceback and continue.
"""
print('-'*40)
print('Exception happened during processing of request from', end=' ')
print(client_address)
import traceback
traceback.print_exc() # XXX But this goes to stderr!
print('-'*40)
混合類(并發類)
兩個混合類,一個是ForkingMixin,主要是用fork的,産生一個新的程序去處理;一個是ThreadingMixin,産生一個新的線程,主要是用來提供異步處理的能力,其餘tcpserver和udpserver組合,又産生了新的四個類,進而提供異步處理的能力。
(在使用混合類和伺服器類的時候,注意混合類需要寫在前面,因為混合類重寫了伺服器類的方法,進而需要放在第一個位置。)
1 class socketserver.ForkingTCPServer
2
3 class socketserver.ForkingUDPServer
4
5 class socketserver.ThreadingTCPServer
6
7 class socketserver.ThreadingUDPServer
下面主要舉例說一下socketserver伺服器端和用戶端的代碼
scoketserver伺服器端
在socketserver子產品中,主要就是使用一些伺服器類,進而簡化socket網絡程式設計的方法,先上一段基本的伺服器代碼:
1 import socketserver
2
3 ip_port = ('127.0.0.1',8888)
4 class Myhandler(socketserver.BaseRequestHandler):
5 def handle(self):
6 while True:
7 data = self.request.recv(1024)
8 print(data,self.client_address)
9 if data == 'exit':
10 break
11 if __name__ == '__main__':
12 s =socketserver.ThreadingTCPServer((ip_port),Myhandler)
13 s.serve_forever()
在上述代碼中,首先定義了一個類,也就是處理請求的類,從基類baserequesthandler繼承,主要就是重寫了其中的handle方法,告知伺服器如何來處理用戶端的請求。然後建立了一個線程的TCP伺服器類,也就是通過多線程來進行應答用戶端,然後通過使用一直運作的方法就是serve_forever
scoketserver用戶端
1 import socket
2
3
4 ip_port = ('127.0.0.1',8888)
5 sk = socket.socket()
6 sk.connect(ip_port)
7 print ("用戶端啟動:")
8 while True:
9 inp = input('>>>')
10 sk.sendall(bytes(inp,"utf8"))
11 server_response=sk.recv(1024)
12 print (str(server_response,"utf8"))
13 if inp == 'exit':
14 break
15 sk.close()
用戶端的代碼和socket程式設計的代碼基本相同,因為在socketserver子產品中,主要是建立socke的服務端,而不涉及到用戶端,進而用戶端不需要修改代碼即可進行運作。
class socketserver.ForkingTCPServer
class socketserver.ForkingUDPServer
class socketserver.ThreadingTCPServer
class socketserver.ThreadingUDPServer
socketserver内部調用程式
- 啟動服務端程式
- 執行 TCPServer.init 方法,建立服務端Socket對象并綁定 IP 和 端口
- 執行 BaseServer.init 方法,将自定義的繼承自SocketServer.BaseRequestHandler 的類 - MyRequestHandle指派給 self.RequestHandlerClass
-
執行 BaseServer.server_forever 方法,While 循環一直監聽是否有用戶端請求到達 ...
當用戶端連接配接到達伺服器
- 執行 ThreadingMixIn.process_request 方法,建立一個 “線程” 用來處理請求
- 執行 ThreadingMixIn.process_request_thread 方法
- 執行 BaseServer.finish_request 方法,執行 self.RequestHandlerClass() 即:執行 自定義 MyRequestHandler 的構造方法(自動調用基類BaseRequestHandler的構造方法,在該構造方法中又會調用 MyRequestHandler的handle方法)
不經一番徹骨寒 怎得梅花撲鼻香