天天看点

[5]网络编程-【5】WebSocket异步编程

作者:测开人小江

异步编程概述

异步编程是一种高效的编程方式,可以在单线程中同时处理多个任务,提高程序的性能和响应速度。在传统的同步编程中,程序在执行一个任务时会阻塞当前线程,直到该任务完成后才能执行下一个任务。而在异步编程中,任务之间可以并发执行,不会阻塞当前线程,从而充分利用系统资源。

Python中的asyncio库是Python标准库中用于异步编程的模块,提供了丰富的异步I/O操作和工具。通过事件循环(event loop)、协程(coroutine)和任务(task)的概念,asyncio库可以实现异步编程。其中,协程是一种轻量级的子程序,可以在执行过程中挂起并让出控制权,以便其他任务得以执行。协程之间可以通过async/await关键字来进行协作,从而实现高效的异步编程。

async/await关键字的使用

async/await关键字是Python中用于定义和使用协程的语法。async关键字用于定义一个协程函数,表示该函数是一个可被挂起的协程;而await关键字用于挂起协程的执行,等待其他协程或者异步操作完成。

下面是一个简单的示例,展示了如何使用async/await关键字定义和使用协程:

import asyncio

async def foo():
    print("Start foo")
    await asyncio.sleep(1)
    print("End foo")

async def bar():
    print("Start bar")
    await asyncio.sleep(2)
    print("End bar")

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(foo(), bar()))
           

在以上示例中,foo()和bar()都是协程函数,通过async关键字进行定义。在函数体中,使用await关键字来挂起协程的执行,等待asyncio.sleep()函数完成。通过asyncio.gather()函数可以同时运行多个协程,并在所有协程完成后返回。

WebSocket库的使用

在Python中,有多个支持WebSocket协议的库可供选择,例如websockets、aiohttp等。这里以websockets库为例,介绍如何使用该库进行WebSocket异步编程。

首先,需要安装websockets库,可以使用以下命令:

pip install websockets
           

接下来,可以使用websockets库来创建WebSocket服务器和客户端,进行异步通信。

创建WebSocket服务器

以下是使用websockets库创建WebSocket服务器的简单示例:

import asyncio
import websockets

async def handle(websocket, path):
    # 处理WebSocket连接
    await websocket.send("Welcome to WebSocket Server!")
    while True:
        try:
            message = await websocket.recv()
            # 处理接收到的消息
            print(f"Received message: {message}")
            await websocket.send(f"You said: {message}")
        except websockets.exceptions.ConnectionClosedOK:
            # WebSocket连接关闭
            print("WebSocket connection closed")
            break

start_server = websockets.serve(handle, 'localhost', 8000)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
           

在以上示例中,handle()函数用于处理WebSocket连接。在函数体中,使用websocket参数来进行消息的发送和接收。websockets库提供了send()和recv()方法来发送和接收WebSocket消息。通过serve()函数可以创建一个WebSocket服务器,并监听指定的地址和端口。asyncio.get_event_loop().run_until_complete()用于运行服务器,asyncio.get_event_loop().run_forever()用于保持服务器运行。

创建WebSocket客户端

以下是使用websockets库创建WebSocket客户端的简单示例:

import asyncio
import websockets

async def connect():
    async with websockets.connect('ws://localhost:8000') as websocket:
        # 连接到WebSocket服务器
        print("Connected to WebSocket Server")
        while True:
            message = input("Enter a message (q to quit): ")
            if message == 'q':
                break
            await websocket.send(message)
            response = await websocket.recv()
            print(f"Received response: {response}")

asyncio.get_event_loop().run_until_complete(connect())
           

在以上示例中,connect()函数用于连接到WebSocket服务器。在函数体中,使用websockets.connect()方法来建立WebSocket连接。通过send()方法发送消息,通过recv()方法接收服务器返回的消息。在这个示例中,用户可以输入消息并发送到服务器,然后接收服务器返回的响应。通过asyncio.get_event_loop().run_until_complete()来运行客户端。

异步处理WebSocket消息

在WebSocket异步编程中,可以使用协程来处理WebSocket消息,从而实现异步的消息处理。以下是一个示例:

import asyncio
import websockets

async def handle(websocket, path):
    # 处理WebSocket连接
    await websocket.send("Welcome to WebSocket Server!")
    while True:
        try:
            message = await websocket.recv()
            # 异步处理接收到的消息
            asyncio.ensure_future(process_message(message))
        except websockets.exceptions.ConnectionClosedOK:
            # WebSocket连接关闭
            print("WebSocket connection closed")
            break

async def process_message(message):
    # 异步处理接收到的消息
    print(f"Processing message: {message}")
    # 模拟处理耗时任务
    await asyncio.sleep(2)
    print(f"Processed message: {message}")

start_server = websockets.serve(handle, 'localhost', 8000)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()           

在以上示例中,`process_message()`函数用于异步处理接收到的消息。在函数体中,可以编写处理消息的逻辑,例如保存到数据库、发送到其他服务等耗时任务。通过`asyncio.ensure_future()`方法将处理消息的协程添加到事件循环中,从而实现异步处理。

错误处理和关闭WebSocket连接

在WebSocket异步编程中,需要考虑错误处理和正确关闭WebSocket连接的情况。以下是一个示例:

import asyncio
import websockets

async def handle(websocket, path):
    # 处理WebSocket连接
    await websocket.send("Welcome to WebSocket Server!")
    while True:
        try:
            message = await websocket.recv()
            # 异步处理接收到的消息
            asyncio.ensure_future(process_message(message))
        except websockets.exceptions.ConnectionClosedOK:
            # WebSocket连接正常关闭
            print("WebSocket connection closed")
            break
        except websockets.exceptions.WebSocketException as e:
            # 处理WebSocket连接错误
            print(f"WebSocket error: {e}")
            break

async def process_message(message):
    # 异步处理接收到的消息
    print(f"Processing message: {message}")
    # 模拟处理耗时任务
    await asyncio.sleep(2)
    print(f"Processed message: {message}")

start_server = websockets.serve(handle, 'localhost', 8000)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
           

在以上示例中,通过捕获websockets.exceptions.ConnectionClosedOK异常来处理WebSocket连接正常关闭的情况,通过捕获websockets.exceptions.WebSocketException异常来处理其他WebSocket连接错误的情况。在处理错误时,可以进行相应的处理逻辑,例如记录日志、发送通知等。

另外,还需要注意在WebSocket异步编程中正确关闭WebSocket连接。可以使用websocket.close()方法来主动关闭连接,也可以在handle()函数中通过break语句退出循环从而关闭连接。

以上是一个简单的WebSocket异步编程的学习教程,希望对您有帮助!在实际应用中,您可以根据具体需求和业务逻辑进行更复杂的WebSocket异步编程实现。