天天看點

[5]網絡程式設計-【5】WebSocket異步程式設計

作者:測開人小江

異步程式設計概述

異步程式設計是一種高效的程式設計方式,可以在單線程中同時處理多個任務,提高程式的性能和響應速度。在傳統的同步程式設計中,程式在執行一個任務時會阻塞目前線程,直到該任務完成後才能執行下一個任務。而在異步程式設計中,任務之間可以并發執行,不會阻塞目前線程,進而充分利用系統資源。

Python中的asyncio庫是Python标準庫中用于異步程式設計的子產品,提供了豐富的異步I/O操作和工具。通過事件循環(event loop)、協程(coroutine)和任務(task)的概念,asyncio庫可以實作異步程式設計。其中,協程是一種輕量級的子程式,可以在執行過程中挂起并讓出控制權,以便其他任務得以執行。協程之間可以通過async/await關鍵字來進行協作,進而實作高效的異步程式設計。

async/await關鍵字的使用

async/await關鍵字是Python中用于定義和使用協程的文法。async關鍵字用于定義一個協程函數,表示該函數是一個可被挂起的協程;而await關鍵字用于挂起協程的執行,等待其他協程或者異步操作完成。

下面是一個簡單的示例,展示了如何使用async/await關鍵字定義和使用協程:

import asyncio

async def foo():
    print("Start foo")
    await asyncio.sleep(1)
    print("End foo")

async def bar():
    print("Start bar")
    await asyncio.sleep(2)
    print("End bar")

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(foo(), bar()))
           

在以上示例中,foo()和bar()都是協程函數,通過async關鍵字進行定義。在函數體中,使用await關鍵字來挂起協程的執行,等待asyncio.sleep()函數完成。通過asyncio.gather()函數可以同時運作多個協程,并在所有協程完成後傳回。

WebSocket庫的使用

在Python中,有多個支援WebSocket協定的庫可供選擇,例如websockets、aiohttp等。這裡以websockets庫為例,介紹如何使用該庫進行WebSocket異步程式設計。

首先,需要安裝websockets庫,可以使用以下指令:

pip install websockets
           

接下來,可以使用websockets庫來建立WebSocket伺服器和用戶端,進行異步通信。

建立WebSocket伺服器

以下是使用websockets庫建立WebSocket伺服器的簡單示例:

import asyncio
import websockets

async def handle(websocket, path):
    # 處理WebSocket連接配接
    await websocket.send("Welcome to WebSocket Server!")
    while True:
        try:
            message = await websocket.recv()
            # 處理接收到的消息
            print(f"Received message: {message}")
            await websocket.send(f"You said: {message}")
        except websockets.exceptions.ConnectionClosedOK:
            # WebSocket連接配接關閉
            print("WebSocket connection closed")
            break

start_server = websockets.serve(handle, 'localhost', 8000)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
           

在以上示例中,handle()函數用于處理WebSocket連接配接。在函數體中,使用websocket參數來進行消息的發送和接收。websockets庫提供了send()和recv()方法來發送和接收WebSocket消息。通過serve()函數可以建立一個WebSocket伺服器,并監聽指定的位址和端口。asyncio.get_event_loop().run_until_complete()用于運作伺服器,asyncio.get_event_loop().run_forever()用于保持伺服器運作。

建立WebSocket用戶端

以下是使用websockets庫建立WebSocket用戶端的簡單示例:

import asyncio
import websockets

async def connect():
    async with websockets.connect('ws://localhost:8000') as websocket:
        # 連接配接到WebSocket伺服器
        print("Connected to WebSocket Server")
        while True:
            message = input("Enter a message (q to quit): ")
            if message == 'q':
                break
            await websocket.send(message)
            response = await websocket.recv()
            print(f"Received response: {response}")

asyncio.get_event_loop().run_until_complete(connect())
           

在以上示例中,connect()函數用于連接配接到WebSocket伺服器。在函數體中,使用websockets.connect()方法來建立WebSocket連接配接。通過send()方法發送消息,通過recv()方法接收伺服器傳回的消息。在這個示例中,使用者可以輸入消息并發送到伺服器,然後接收伺服器傳回的響應。通過asyncio.get_event_loop().run_until_complete()來運作用戶端。

異步處理WebSocket消息

在WebSocket異步程式設計中,可以使用協程來處理WebSocket消息,進而實作異步的消息處理。以下是一個示例:

import asyncio
import websockets

async def handle(websocket, path):
    # 處理WebSocket連接配接
    await websocket.send("Welcome to WebSocket Server!")
    while True:
        try:
            message = await websocket.recv()
            # 異步處理接收到的消息
            asyncio.ensure_future(process_message(message))
        except websockets.exceptions.ConnectionClosedOK:
            # WebSocket連接配接關閉
            print("WebSocket connection closed")
            break

async def process_message(message):
    # 異步處理接收到的消息
    print(f"Processing message: {message}")
    # 模拟處理耗時任務
    await asyncio.sleep(2)
    print(f"Processed message: {message}")

start_server = websockets.serve(handle, 'localhost', 8000)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()           

在以上示例中,`process_message()`函數用于異步處理接收到的消息。在函數體中,可以編寫處理消息的邏輯,例如儲存到資料庫、發送到其他服務等耗時任務。通過`asyncio.ensure_future()`方法将處理消息的協程添加到事件循環中,進而實作異步處理。

錯誤處理和關閉WebSocket連接配接

在WebSocket異步程式設計中,需要考慮錯誤處理和正确關閉WebSocket連接配接的情況。以下是一個示例:

import asyncio
import websockets

async def handle(websocket, path):
    # 處理WebSocket連接配接
    await websocket.send("Welcome to WebSocket Server!")
    while True:
        try:
            message = await websocket.recv()
            # 異步處理接收到的消息
            asyncio.ensure_future(process_message(message))
        except websockets.exceptions.ConnectionClosedOK:
            # WebSocket連接配接正常關閉
            print("WebSocket connection closed")
            break
        except websockets.exceptions.WebSocketException as e:
            # 處理WebSocket連接配接錯誤
            print(f"WebSocket error: {e}")
            break

async def process_message(message):
    # 異步處理接收到的消息
    print(f"Processing message: {message}")
    # 模拟處理耗時任務
    await asyncio.sleep(2)
    print(f"Processed message: {message}")

start_server = websockets.serve(handle, 'localhost', 8000)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
           

在以上示例中,通過捕獲websockets.exceptions.ConnectionClosedOK異常來處理WebSocket連接配接正常關閉的情況,通過捕獲websockets.exceptions.WebSocketException異常來處理其他WebSocket連接配接錯誤的情況。在處理錯誤時,可以進行相應的處理邏輯,例如記錄日志、發送通知等。

另外,還需要注意在WebSocket異步程式設計中正确關閉WebSocket連接配接。可以使用websocket.close()方法來主動關閉連接配接,也可以在handle()函數中通過break語句退出循環進而關閉連接配接。

以上是一個簡單的WebSocket異步程式設計的學習教程,希望對您有幫助!在實際應用中,您可以根據具體需求和業務邏輯進行更複雜的WebSocket異步程式設計實作。