天天看點

使用twisted編寫異步伺服器

一 簡介

twisted是python下的一個事件驅動的異步網絡應用架構,其項目首頁為http://twistedmatrix.com/trac/。 網絡上關于twisted的介紹很多, 我這裡就不啰嗦了。我們先來看一個使用twisted的執行個體。

# -*- coding: cp936 -*-
import os, sys, time
from twisted.internet.protocol import Protocol,Factory
from twisted.internet import reactor, defer, threads
from twisted.protocols.basic import LineReceiver
'''
網絡連接配接建立後的,處理方法。該類原始繼承為Protocol,當連接配接建立、斷開、資料到達時會自動調用對應的處理函數
'''
class EchoProtocol(LineReceiver):

    def connectionMade(self):
        self.sendLine("welcome")
    def lineReceived(self, line):
        self.sendLine(time.strftime("%H:%M:%S")+'server: '+line)
'''
一個網絡服務隻會執行個體化一個Factory,對于與連接配接無關的公用部分, 可放到該部分實作
'''
class EchoFactory(Factory):
    def buildProtocol(self, addr):
        return EchoProtocol()
if __name__ == '__main__':
    reactor.listenTCP(, EchoFactory()) 
    reactor.run() 
else:
    from twisted.application import internet, service
    application = service.Application('echo')
    echoService = internet.TCPServer(, EchoFactory())
    echoService.setServiceParent(application)
           

在該例子中,我們可以使用python來啟動該程式,也可以直接使用twisted來啟動服務。如果使用twisted來啟動服務,可以對是否作為守護程序、日志和pid檔案的記錄位置、事件驅動方式等項目進行配置, 可以使用twistd –help來檢視具體可配置的欄目。

二 使用thread

前面我們提到,twisted采用的是純事件驅動的一種模式,這意味着一個事件未處理完畢的情況下,是不能夠在處理新事件的。但并不是所有事情都能馬上或在短時間内做完,例如讀寫檔案等。那麼我們是否就無所适從了,答案是否定的。twisted的提供了defertothread函數來将阻塞式變為非阻塞的。

(1)阻塞式

#在該函數退出之前,既不會寫回資料,也不會處理新的連接配接

def lineReceived(self, line):
        self.sendLine('server: '+line)
        time.sleep()
           

(2)非阻塞式

#在該函數會立刻結束,并在新的線程中執行time.sleep

def lineReceived(self, line):         
    self.sendLine('server: '+line)
    threads.deferToThread(time.sleep, )
           

現在我們已經可以将阻塞式的函數變化為非阻塞式的了, 那麼接下來的問題是如果阻塞函數執行完畢後需要給我傳回結果怎麼辦?其實deferToThread對象傳回的是一個defer, 我們可以添加callBack來完成該操作。

三 使用defer

   defer可以說是twisted裡面最神奇的東西之一了。 使用defer對于, 即使資料并不立刻傳回,但程式也不用等待,而是可以繼續響應其他時間,待資料就緒後再處理資料。前面提到的defertothread函數其實傳回的就是一個deferred對象。

   

def lineReceived(self, line):
    self.sendLine('server: '+line)
    d = mySleep()
    d.addCallback(self.sendLine, 'Suc').addErrback(self.sendLine, 'Err')
def mySleep(self, second):
    deferred = Deferred()
    threads.deferToThread(time.sleep, second)
    return deferred
           

deferred對象本身并不将函數變為非阻塞式函數,它隻是記錄了登記了一個事件位置,使得程式可以等待一定時間後在處理前置事件。

四 使用defer進行網絡請求

網絡操作往往是需要消耗時間的,因為你并不知道網絡何時建立、資料何時就緒。那麼我們除了使用defertoThread來進行網絡請求外,還能怎麼進行網絡請求呢。現在我們模拟這麼一種場景:伺服器接收到一個指令, 該指令為對使用者登入進行校驗, 如果校驗通過則傳回0, 否則-1。但使用者的登入校驗需要向另外一個伺服器b發送指令來進行。

方法一: 采用阻塞式網絡請求

def lineReceived(self, line):
        import socket
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.connect( ('127.0.0.1', ) )
        sock.settimeout()
        sock.send(line+'\n')
        try:
            result = sock.recv()
        except:
            result = None
        self.sendLine('0' if result else '-1')  
           

方法二: 采用twisted connect請求

def lineReceived(self, line):
        reactor.connectTCP('127.0.0.1', , VerifyFactory(line, self))   

class VerifyProtocol(LineReceiver):
    def __init__(self, fatcory):
       self._factory = factory
    def connectionMade(self):
        self.sendLine(self._factory.line)
    def lineReceived(self, line):
        self._factory.request.sendLine('0' if line else '-1')

class VerifyFactory(Factory):
    def __init__(self, line, requect):
        self.line = line
        self.request = request
    def buildProtocol(self, addr):
        return VerifyProtocol(self)
           

方法三: 采用deferred

from collections import deque
class EchoProtocol(LineReceiver):
    def __init__(self, fatcory):
        self._factory = factory
    def connectionMade(self):
        self.sendLine("welcome")
    def lineReceived(self, line):
        self._factory.verify.verify(line).addCallback(self.verify_ret)
    def verify_ret(self, result):
        self.sendLine('0' if result else '-1')

class EchoFactory(Factory):
    def __init__(self) :
        self.verfiy = None
        reactor.connectTCP('127.0.0.1', , VerifyFactory(line, self))
    def buildProtocol(self, addr):
        return EchoProtocol(self)

class VerifyProtocol(LineReceiver):
    def __init__(self, fatcory):
        self._factory = factory 
        self._current = deque()
    def connectionMade(self):
        self._factory.echo.verfiy = self
    def verify(self, user):
        deferred = Deferred()
        self._current.append(deferred)
         self.sendLine(user)
        return deferred
    def lineReceived(self, line):
        deferred = self._current.popleft()
       deferred.callback(line)

class VerifyFactory(Factory):
    def __init__(self, echo):
        self.echo = echo
    def buildProtocol(self, addr):
        return VerifyProtocol(self)
           

方法三最關鍵的三個地方是self._current.append(deferred)、self._current.popleft()和self._factory.echo.verfiy = self , 前兩個是保證處理正确的deferred,後面一個是保證請求能夠進行驗證請求。

五 結束語

   twisted是一個非常強大的網絡架構,這裡我們隻是介紹了其最簡單的應用,大家可以根據自己的需要研究相關的協定, 後面我們會對defer和基于defer的網絡連接配接池做進一步詳細的講解。