天天看點

淺析python中socketserver子產品使用

         雖然說用python編寫簡單的網絡程式很友善,但是複雜一點的網絡程式還是用現成的架構比較好,這樣就可以專心事物邏輯,而不是套接字的各種細節。Socketserver子產品簡化了編寫網絡服務程式,同時socketserver子產品也是python标準庫中很多伺服器架構的基礎。

socketserver子產品類介紹

      SocketServer内部使用 IO多路複用 以及 “多線程” 和 “多程序” ,進而實作并發處理多個用戶端請求的Socket服務端。即:每個用戶端請求連接配接到伺服器時,Socket服務端都會在伺服器是建立一個“線程”或者“進 程” 專門負責處理目前用戶端的所有請求。

        socketserver子產品可以簡化網絡伺服器的編寫,python把網絡服務抽象成兩個主要的類,一個是server類,用于處理連接配接相關的網絡操作,另一個是RequestHandler類,用于處理資料相關的操作。并且提供兩個Mixln類,用于擴充server,實作多程序或者多線程。

Server類

   它包含了五種server類,分别是  Baseserver(不直接對外服務);TCPServer(使用TCP協定) ;UDPServer(使用UDP協定),UinixStreamServer    和UnixDatagramServer(後面兩個僅僅在unix環境下遊泳,一般不常用)

他們五個的關系如下:

淺析python中socketserver子產品使用

BaseServer的源碼:

淺析python中socketserver子產品使用
淺析python中socketserver子產品使用
class BaseServer:

    """Base class for server classes.

    Methods for the caller:

    - __init__(server_address, RequestHandlerClass)
    - serve_forever(poll_interval=0.5)
    - shutdown()
    - handle_request()  # if you do not use serve_forever()
    - fileno() -> int   # for select()

    Methods that may be overridden:

    - server_bind()
    - server_activate()
    - get_request() -> request, client_address
    - handle_timeout()
    - verify_request(request, client_address)
    - server_close()
    - process_request(request, client_address)
    - shutdown_request(request)
    - close_request(request)
    - service_actions()
    - handle_error()

    Methods for derived classes:

    - finish_request(request, client_address)

    Class variables that may be overridden by derived classes or
    instances:

    - timeout
    - address_family
    - socket_type
    - allow_reuse_address

    Instance variables:

    - RequestHandlerClass
    - socket

    """

    timeout = None

    def __init__(self, server_address, RequestHandlerClass):
        """Constructor.  May be extended, do not override."""
        self.server_address = server_address
        self.RequestHandlerClass = RequestHandlerClass
        self.__is_shut_down = threading.Event()
        self.__shutdown_request = False

    def server_activate(self):
        """Called by constructor to activate the server.

        May be overridden.

        """
        pass

    def serve_forever(self, poll_interval=0.5):
        """Handle one request at a time until shutdown.

        Polls for shutdown every poll_interval seconds. Ignores
        self.timeout. If you need to do periodic tasks, do them in
        another thread.
        """
        self.__is_shut_down.clear()
        try:
            while not self.__shutdown_request:
                # XXX: Consider using another file descriptor or
                # connecting to the socket to wake this up instead of
                # polling. Polling reduces our responsiveness to a
                # shutdown request and wastes cpu at all other times.
                r, w, e = _eintr_retry(select.select, [self], [], [],
                                       poll_interval)
                if self in r:
                    self._handle_request_noblock()

                self.service_actions()
        finally:
            self.__shutdown_request = False
            self.__is_shut_down.set()

    def shutdown(self):
        """Stops the serve_forever loop.

        Blocks until the loop has finished. This must be called while
        serve_forever() is running in another thread, or it will
        deadlock.
        """
        self.__shutdown_request = True
        self.__is_shut_down.wait()

    def service_actions(self):
        """Called by the serve_forever() loop.

        May be overridden by a subclass / Mixin to implement any code that
        needs to be run during the loop.
        """
        pass

    # The distinction between handling, getting, processing and
    # finishing a request is fairly arbitrary.  Remember:
    #
    # - handle_request() is the top-level call.  It calls
    #   select, get_request(), verify_request() and process_request()
    # - get_request() is different for stream or datagram sockets
    # - process_request() is the place that may fork a new process
    #   or create a new thread to finish the request
    # - finish_request() instantiates the request handler class;
    #   this constructor will handle the request all by itself

    def handle_request(self):
        """Handle one request, possibly blocking.

        Respects self.timeout.
        """
        # Support people who used socket.settimeout() to escape
        # handle_request before self.timeout was available.
        timeout = self.socket.gettimeout()
        if timeout is None:
            timeout = self.timeout
        elif self.timeout is not None:
            timeout = min(timeout, self.timeout)
        fd_sets = _eintr_retry(select.select, [self], [], [], timeout)
        if not fd_sets[0]:
            self.handle_timeout()
            return
        self._handle_request_noblock()

    def _handle_request_noblock(self):
        """Handle one request, without blocking.

        I assume that select.select has returned that the socket is
        readable before this function was called, so there should be
        no risk of blocking in get_request().
        """
        try:
            request, client_address = self.get_request()
        except OSError:
            return
        if self.verify_request(request, client_address):
            try:
                self.process_request(request, client_address)
            except:
                self.handle_error(request, client_address)
                self.shutdown_request(request)

    def handle_timeout(self):
        """Called if no new request arrives within self.timeout.

        Overridden by ForkingMixIn.
        """
        pass

    def verify_request(self, request, client_address):
        """Verify the request.  May be overridden.

        Return True if we should proceed with this request.

        """
        return True

    def process_request(self, request, client_address):
        """Call finish_request.

        Overridden by ForkingMixIn and ThreadingMixIn.

        """
        self.finish_request(request, client_address)
        self.shutdown_request(request)

    def server_close(self):
        """Called to clean-up the server.

        May be overridden.

        """
        pass

    def finish_request(self, request, client_address):
        """Finish one request by instantiating RequestHandlerClass."""
        self.RequestHandlerClass(request, client_address, self)

    def shutdown_request(self, request):
        """Called to shutdown and close an individual request."""
        self.close_request(request)

    def close_request(self, request):
        """Called to clean up an individual request."""
        pass

    def handle_error(self, request, client_address):
        """Handle an error gracefully.  May be overridden.

        The default is to print a traceback and continue.

        """
        print('-'*40)
        print('Exception happened during processing of request from', end=' ')
        print(client_address)
        import traceback
        traceback.print_exc() # XXX But this goes to stderr!
        print('-'*40)      

View Code

RequestHandler類

所有requestHandler都繼承BaseRequestHandler基類。

請求的基類是baserequesthandler,其中一般需要重寫的方法就是handle方法,主要就是如何處理接下裡的請求,在這個類中,主要有三個方法,分别是setup ,handle ,finish方法,在調用這個類的時候,先調用setup進行一些初始化的工作,然後調用handle方法進行處理請求,然後調用finish方法,做一些關閉連接配接什麼的,在這個裡面最主要的參數就是self.request,也就是請求的socket對象,其中可以發送消息sendall或者send,接受消息就是recv

在請求處理的子類中有兩個,一個是SreamRequestHandle和DatagramRequestHandle,在這個裡面重寫了基類的setup方法和finish方法,handle方法沒有重寫,因為這個是留給使用者做處理請求的方法

requestHandler源碼:

class BaseServer:

    """Base class for server classes.

    Methods for the caller:

    - __init__(server_address, RequestHandlerClass)
    - serve_forever(poll_interval=0.5)
    - shutdown()
    - handle_request()  # if you do not use serve_forever()
    - fileno() -> int   # for select()

    Methods that may be overridden:

    - server_bind()
    - server_activate()
    - get_request() -> request, client_address
    - handle_timeout()
    - verify_request(request, client_address)
    - server_close()
    - process_request(request, client_address)
    - shutdown_request(request)
    - close_request(request)
    - service_actions()
    - handle_error()

    Methods for derived classes:

    - finish_request(request, client_address)

    Class variables that may be overridden by derived classes or
    instances:

    - timeout
    - address_family
    - socket_type
    - allow_reuse_address

    Instance variables:

    - RequestHandlerClass
    - socket

    """

    timeout = None

    def __init__(self, server_address, RequestHandlerClass):
        """Constructor.  May be extended, do not override."""
        self.server_address = server_address
        self.RequestHandlerClass = RequestHandlerClass
        self.__is_shut_down = threading.Event()
        self.__shutdown_request = False

    def server_activate(self):
        """Called by constructor to activate the server.

        May be overridden.

        """
        pass

    def serve_forever(self, poll_interval=0.5):
        """Handle one request at a time until shutdown.

        Polls for shutdown every poll_interval seconds. Ignores
        self.timeout. If you need to do periodic tasks, do them in
        another thread.
        """
        self.__is_shut_down.clear()
        try:
            while not self.__shutdown_request:
                # XXX: Consider using another file descriptor or
                # connecting to the socket to wake this up instead of
                # polling. Polling reduces our responsiveness to a
                # shutdown request and wastes cpu at all other times.
                r, w, e = _eintr_retry(select.select, [self], [], [],
                                       poll_interval)
                if self in r:
                    self._handle_request_noblock()

                self.service_actions()
        finally:
            self.__shutdown_request = False
            self.__is_shut_down.set()

    def shutdown(self):
        """Stops the serve_forever loop.

        Blocks until the loop has finished. This must be called while
        serve_forever() is running in another thread, or it will
        deadlock.
        """
        self.__shutdown_request = True
        self.__is_shut_down.wait()

    def service_actions(self):
        """Called by the serve_forever() loop.

        May be overridden by a subclass / Mixin to implement any code that
        needs to be run during the loop.
        """
        pass

    # The distinction between handling, getting, processing and
    # finishing a request is fairly arbitrary.  Remember:
    #
    # - handle_request() is the top-level call.  It calls
    #   select, get_request(), verify_request() and process_request()
    # - get_request() is different for stream or datagram sockets
    # - process_request() is the place that may fork a new process
    #   or create a new thread to finish the request
    # - finish_request() instantiates the request handler class;
    #   this constructor will handle the request all by itself

    def handle_request(self):
        """Handle one request, possibly blocking.

        Respects self.timeout.
        """
        # Support people who used socket.settimeout() to escape
        # handle_request before self.timeout was available.
        timeout = self.socket.gettimeout()
        if timeout is None:
            timeout = self.timeout
        elif self.timeout is not None:
            timeout = min(timeout, self.timeout)
        fd_sets = _eintr_retry(select.select, [self], [], [], timeout)
        if not fd_sets[0]:
            self.handle_timeout()
            return
        self._handle_request_noblock()

    def _handle_request_noblock(self):
        """Handle one request, without blocking.

        I assume that select.select has returned that the socket is
        readable before this function was called, so there should be
        no risk of blocking in get_request().
        """
        try:
            request, client_address = self.get_request()
        except OSError:
            return
        if self.verify_request(request, client_address):
            try:
                self.process_request(request, client_address)
            except:
                self.handle_error(request, client_address)
                self.shutdown_request(request)

    def handle_timeout(self):
        """Called if no new request arrives within self.timeout.

        Overridden by ForkingMixIn.
        """
        pass

    def verify_request(self, request, client_address):
        """Verify the request.  May be overridden.

        Return True if we should proceed with this request.

        """
        return True

    def process_request(self, request, client_address):
        """Call finish_request.

        Overridden by ForkingMixIn and ThreadingMixIn.

        """
        self.finish_request(request, client_address)
        self.shutdown_request(request)

    def server_close(self):
        """Called to clean-up the server.

        May be overridden.

        """
        pass

    def finish_request(self, request, client_address):
        """Finish one request by instantiating RequestHandlerClass."""
        self.RequestHandlerClass(request, client_address, self)

    def shutdown_request(self, request):
        """Called to shutdown and close an individual request."""
        self.close_request(request)

    def close_request(self, request):
        """Called to clean up an individual request."""
        pass

    def handle_error(self, request, client_address):
        """Handle an error gracefully.  May be overridden.

        The default is to print a traceback and continue.

        """
        print('-'*40)
        print('Exception happened during processing of request from', end=' ')
        print(client_address)
        import traceback
        traceback.print_exc() # XXX But this goes to stderr!
        print('-'*40)
      

  

混合類(并發類)

兩個混合類,一個是ForkingMixin,主要是用fork的,産生一個新的程序去處理;一個是ThreadingMixin,産生一個新的線程,主要是用來提供異步處理的能力,其餘tcpserver和udpserver組合,又産生了新的四個類,進而提供異步處理的能力。

    (在使用混合類和伺服器類的時候,注意混合類需要寫在前面,因為混合類重寫了伺服器類的方法,進而需要放在第一個位置。)

1 class socketserver.ForkingTCPServer
2  
3 class socketserver.ForkingUDPServer
4  
5 class socketserver.ThreadingTCPServer
6  
7 class socketserver.ThreadingUDPServer      

下面主要舉例說一下socketserver伺服器端和用戶端的代碼

scoketserver伺服器端

            在socketserver子產品中,主要就是使用一些伺服器類,進而簡化socket網絡程式設計的方法,先上一段基本的伺服器代碼:

1 import socketserver
 2 
 3 ip_port = ('127.0.0.1',8888)
 4 class Myhandler(socketserver.BaseRequestHandler):
 5     def handle(self):
 6         while True:
 7             data = self.request.recv(1024)
 8             print(data,self.client_address)
 9             if data == 'exit':
10                 break
11 if __name__ == '__main__':
12     s =socketserver.ThreadingTCPServer((ip_port),Myhandler)
13     s.serve_forever()      

        在上述代碼中,首先定義了一個類,也就是處理請求的類,從基類baserequesthandler繼承,主要就是重寫了其中的handle方法,告知伺服器如何來處理用戶端的請求。然後建立了一個線程的TCP伺服器類,也就是通過多線程來進行應答用戶端,然後通過使用一直運作的方法就是serve_forever

scoketserver用戶端

1 import socket
 2 
 3 
 4 ip_port = ('127.0.0.1',8888)
 5 sk = socket.socket()
 6 sk.connect(ip_port)
 7 print ("用戶端啟動:")
 8 while True:
 9     inp = input('>>>')
10     sk.sendall(bytes(inp,"utf8"))
11     server_response=sk.recv(1024)
12     print (str(server_response,"utf8"))
13     if inp == 'exit':
14         break
15 sk.close()      

      用戶端的代碼和socket程式設計的代碼基本相同,因為在socketserver子產品中,主要是建立socke的服務端,而不涉及到用戶端,進而用戶端不需要修改代碼即可進行運作。

class socketserver.ForkingTCPServer
 
class socketserver.ForkingUDPServer
 
class socketserver.ThreadingTCPServer
 
class socketserver.ThreadingUDPServer      

socketserver内部調用程式

    • 啟動服務端程式
    • 執行 TCPServer.init 方法,建立服務端Socket對象并綁定 IP 和 端口
    • 執行 BaseServer.init 方法,将自定義的繼承自SocketServer.BaseRequestHandler 的類 - MyRequestHandle指派給 self.RequestHandlerClass
    • 執行 BaseServer.server_forever 方法,While 循環一直監聽是否有用戶端請求到達 ...

      當用戶端連接配接到達伺服器

    • 執行 ThreadingMixIn.process_request 方法,建立一個 “線程” 用來處理請求
    • 執行 ThreadingMixIn.process_request_thread 方法
    • 執行 BaseServer.finish_request 方法,執行 self.RequestHandlerClass() 即:執行 自定義 MyRequestHandler 的構造方法(自動調用基類BaseRequestHandler的構造方法,在該構造方法中又會調用 MyRequestHandler的handle方法)

不經一番徹骨寒 怎得梅花撲鼻香