天天看點

twisted下使用網絡連接配接池

  通過《使用twisted編寫異步伺服器》一文的學習,我們可以輕松使用twisted來搭建自己的伺服器。前面的示例中服務端并不涉及到與外部伺服器的互動,而實際的服務端往往需要同時與其他服務端進行互動(例如進行密碼驗證)。使用傳統的os.socket連接配接伺服器的方法雖然可行,但卻并不符合異步的原則,使用deferred則可以異步的建立網絡連接配接,頻繁的網絡連接配接總是會加大伺服器壓力,是以更多的時候是使用網絡連接配接池。這裡我們主要學習在twisted下如何異步的網絡連接配接池。

  

class simpleClientFactory(protocol.ClientFactory):
    def __init__(self, simple_client_pool, d):
        self._deferred = d #傳入一個deferr對象用于回調
        self.pool = simple_client_pool
    def buildProtocol(self, addr):
        return simpleClientProtocol(self) #将自身傳遞給protocol
    def clientConnectionFailed(self, connector, reason):
        self.pool.release(None)
        self._deferred.errback(reason)
           

  與伺服器端一個factory可對應多個protocol不同, client端一個factory隻對應于一個protocol。我一般在protocol初始化的時候将factory對象傳入,以便protocol可通路factory中的對象(protocol預設其實就綁定了factory,這裡完全是個人習慣)。網絡連接配接建立後,回調将protocol自身傳回,調用者即可該protocol的sendLine發送指令到伺服器。os.socket發送完指令後,調用者隻需阻塞自己等待結果傳回就可以,但sendLine會立刻傳回,調用者并不知道自己什麼時候可以得到結果也不知道什麼時候可以拿到結果,這無疑是見郁悶的事情。我們可在sendLine上在封裝一層sendCmd,sendCmd傳回一個deferred對象,資料到達後回調該deferred即可。下面的示例實際在sendCmd上又封裝了一次,這樣用戶端根據需要執行相應的指令即可。pool相關是為建立連接配接池時使用,這裡暫時不用管。

class simpleClientProtocol(LineReceiver):
    delimiter = '\n'
    def __init__(self, factory):
        self._current = deque() # 用于存放sendCmd中傳回的deferred對象, 以便資料傳回時可以處理
        self._factory = factory
    def connectionMade(self):
        self._factory._deferred.callback(self)
    def connectionLost(self, reason):
        #如果該連接配接在連接配接池中,則從池中删除, 否則釋放該連接配接,以喚醒等待隊列
        try:
            self._factory.pool._pool.remove(self)
        except:
            self._factory.pool.release(None)
    def get(self):
        return self.sendCmd('get', 'get %s' % key)
    def sendCmd(self, cmd, fullcmd):
        cmdObj = Command(cmd, fullcmd)
        #防止在連串操作過程中連接配接丢失 
        if not self.transport.connected:
            cmdObj._deferred.errback(-)
            return cmdObj._deferred
        self._current.append(cmdObj)
        self.sendLine(fullcmd)
        self.transport.doWrite()
        return cmdObj._deferred
    def lineReceived(self, data):
        cmd = self._current.popleft()
        cmd.success(data, self)
class Command(object):
    def __init__(self):
        self._deferred = Deferred()
    def success(self, value, client): #這裡可以對傳回結果做判斷,決定是callback還是errback
        self._deferred.callback(value)
    def err(self, error):
        self._deferred.errback(error)  
           

  一般連接配接池初始化應包括連接配接所需要的伺服器資訊、池的大小、連接配接最長空閑時間、連接配接最長時間等資訊, 這裡我們不考慮連接配接最長空間時間和連接配接最長時間。 從池中擷取有效連接配接時有三種情況:1.連接配接池中存在有效連結時,從連接配接池擷取;2.當連接配接數不超過最大連接配接數時候,嘗試建立新連接配接;3.連接配接數達到最大,放入等待隊列等候處理。使用者在使用完該連接配接後需主動将連接配接歸還給連接配接池,在歸還連接配接時, 該連接配接有可能已經失效。連接配接池對象一般作為一個全局性的對象或者在需要地方将其傳入。

  

class simple_client_pool_t(object):
    def __init__(self, host, port, capacity):
        self._host = host
        self._port = port
        self._capacity = capacity
        self._in_use = 
        self._pool = []
        self._waitlist = deque()
    def create_simple_client(self, deferred):
        simple_client_factory = simpleClientFactory(self, deferred)
        reactor.connectTCP(self._host, self._port, simple_client_factory)
    def aquire(self, deferred=None): #從等待隊列過來的會自行帶deferred
        if not deferred:
            deferred = Deferred()
        if self._pool:
            simple_client = self._pool.pop()
            self._in_use += 
            deferred.callback(simple_client)
        elif self._in_use < self._capacity:
            self._in_use += 
            self.create_simple_client(deferred)
        else:
            self._waitlist.append(deferred)
        return deferred
    def release(self, simple_client):
        if not simple_client: #釋放的client已經失效
            self._in_use -= 
            if self._waitlist:
                self.aquire(self._waitlist.popleft())
            return
        if self._waitlist: #等待隊列不為空, 喚醒第一個等待
            self._waitlist.popleft().callback(simple_client)
        else:
            self._pool.append(simple_client)
            self._in_use -= 
           

我們來看看如何從連接配接池中擷取連接配接,并使用該連接配接進行網絡互動。

def get(key):
    def acquire_suc(client, key):
         client.get(key).addCallback(get_suc, key).addErrback(acquire_err)    
    def acquire_err(result): 
        deferred.errback(result) #deferred對象自動通路到外部的deferred, 變量作用域可檢視這裡

    def get_suc(value):
        deferred.errback(value)

    deferred = Deferred()
    client_pool.aquire().addCallback(acquire_suc, key).addErrback(acquire_err) 
    return deferred

client_pool = simple_client_pool_t('127.0.0.1', , )
get('key').addCallback(get_suc).addErrback(get_err) #這裡未給出get_suc和get_err的實作, 但需将連接配接歸還
           

  通過前面的介紹,我們就可以在自己的twisted伺服器中使用連接配接池與其他伺服器進行互動,以減輕伺服器壓力,加快伺服器響應速度。