上一篇實作了基于tornado的高性能無阻塞udp伺服器,然而光有伺服器還是不夠的,那麼這篇就來實作一個基于tornado的異步udp用戶端。
先來分析下用戶端的任務:因為整個系統是用來進行代替傳統web api的,是以這裡的udp用戶端不是隻發一個包就萬事大吉了,而是發出一個包後等待server端傳回結果。看似很簡單的任務放到tornado上就沒那麼簡單了。傳統多程序模型每個訪客對應一個程序,在這種模型下我們可以發出udp包之後阻塞該程序等待結果傳回。然而對于tornado這種單程序模型,阻塞無異于自殺,是以需要異步寫法進而使得發出去一個包之後,tornado程序迅速去處理其他請求等到結果傳回時再通過回調函數繼續處理該請求。
分析完之後就開始動手啦。先簡單封裝一個UDPRequest,便于後面處理。隻有一點要注意的,資料以'\n\r\n\r'結尾。
class UDPRequest(object):
def __init__(self,addr,port,data):
self.addr = addr
self.port = port
self.data = data
def __getattribute__(self,name):
data = object.__getattribute__(self,name)
if name == 'data' and data.rfind('\r\n\r\n') != len(data)-4 or len(data) < 4:
data += '\r\n\r\n'
return data
再封裝一個_UDPConnection,其實嚴格來說udp是沒有連結的,但是因為我們在這裡每個請求都要收到一個傳回結果是以在這裡就認為一個包發出去了就建立了一個'連結',過一會結果就會從這個'連結'傳回。
class _UDPConnection(object):
def __init__(self,io_loop,client,request,release_callback,
final_callback,max_buffer_size):
self.start_time = time.time()
self.io_loop = io_loop
self.client = client
self.request = request
self.release_callback = release_callback
self.final_callback = final_callback
addrinfo = socket.getaddrinfo(request.addr,request.port,
socket.AF_INET,socket.SOCK_DGRAM,0,0)
af,socktype,proto,canonname,sockaddr = addrinfo[0]
self.stream = IOStream(socket.socket(af,socktype,proto),
io_loop=self.io_loop,max_buffer_size=2500)
self.stream.connect(sockaddr,self._on_connect)
def _on_connect(self):
self.stream.write(self.request.data)
self.stream.read_until('\r\n\r\n',self._on_response)
def _on_response(self,data):
if self.release_callback is not None:
release_callback = self.release_callback
self.release_callback = None
release_callback()
self.stream.close()
簡單講一下_UDPConnection的思路,正如前面所說的,發送出資料包之後就認為這個socket已經和server建立的一對一的'連結',于是利用這個socket構造一個IOStream,讓tornado幫我們實作無阻塞io操作。一旦這個socket'連結好',IOStream便會調用 _on_connect()函數,在_on_connect()函數中首先發送資料,然後設定一旦讀到'\r\n\r\n'就認為資料包結束了便調用_on_response()函數。
ps:解釋一下紅字‘連結好’,并不是傳統意義上的三次握手建立連結,實際上udp是不用連結的,這裡隻是為了相容預設的IOStream,調用這個函數隻是為了修改IOStream内部的一個參數讓IOStream以為連結已經建立好了。
最後把client的代碼放上來
class AsyncUDPClient(object):
def __init__(self, io_loop=None):
self.io_loop = io_loop or IOLoop.instance()
self.max_clients = 10
self.queue = collections.deque()
self.active = {}
self.max_buffer_size = 2500
def fetch(self,request,callback,**kwargs):
callback = stack_context.wrap(callback)
self.queue.append((request,callback))
self._process_queue()
def _process_queue(self):
with stack_context.NullContext():
while self.queue and len(self.active) < self.max_clients:
request, callback = self.queue.popleft()
key = object()
self.active[key] = (request,callback)
_UDPConnection(self.io_loop,self,request,
functools.partial(self._release_fetch,key),
callback,
self.max_buffer_size)
def _release_fetch(self,key):
del self.active[key]
self._process_queue()
不難了解,這個client就是生成_UDPConnection對象
到這裡異步udp用戶端就完成了,使用方法和官方的httpclient一模一樣。當然,這裡放出來的代碼還有很多細節例如逾時,丢包之類的沒有處理,是以想用在生産環境的話還需要完善一些細節。