天天看點

python協程實時輸出_【Python】基于協程的并發WebSocket通信實踐

1 什麼是WebSocket

WebSocket 是 HTML5 開始提供的一種在單個 TCP 連接配接上進行全雙工通訊的協定。

WebSocket 使得用戶端和伺服器之間的資料交換變得更加簡單,允許服務端主動向用戶端推送資料。在 WebSocket API 中,浏覽器和伺服器隻需要完成一次握手,兩者之間就直接可以建立持久性的連接配接,并進行雙向資料傳輸。

在 WebSocket API 中,浏覽器和伺服器隻需要做一個握手的動作,然後,浏覽器和伺服器之間就形成了一條快速通道。兩者之間就直接可以資料互相傳送。

現在,很多網站為了實作推送技術,所用的技術都是 Ajax 輪詢。輪詢是在特定的的時間間隔(如每1秒),由浏覽器對伺服器發出HTTP請求,然後由伺服器傳回最新的資料給用戶端的浏覽器。這種傳統的模式帶來很明顯的缺點,即浏覽器需要不斷的向伺服器送出請求,然而HTTP請求可能包含較長的頭部,其中真正有效的資料可能隻是很小的一部分,顯然這樣會浪費很多的帶寬等資源。

HTML5 定義的 WebSocket 協定,能更好的節省伺服器資源和帶寬,并且能夠更實時地進行通訊。

python協程實時輸出_【Python】基于協程的并發WebSocket通信實踐

浏覽器通過 JavaScript 向伺服器發出建立 WebSocket 連接配接的請求,連接配接建立以後,用戶端和伺服器端就可以通過 TCP 連接配接直接交換資料。

當你擷取 Web Socket 連接配接後,你可以通過 send() 方法來向伺服器發送資料,并通過 onmessage 事件來接收伺服器傳回的資料。

來源:https://www.runoob.com/html/html5-websocket.html

2 并發通信的實作

2.1 WebSockets庫

使用websockets庫來實作WebSockets協定,websockets是一個庫,用于在Python中建構WebSocket 伺服器和用戶端,重點在于正确性和簡單性。

它建立在asyncioPython的标準異步I / O架構之上,提供了一個優雅的基于協程的API。

用戶端發送和接收消息的方式如下:

#!/usr/bin/env python

import asyncio

import websockets

async def hello():

uri = "ws://localhost:8765"

async with websockets.connect(uri) as websocket:

await websocket.send("Hello world!")

await websocket.recv()

asyncio.get_event_loop().run_until_complete(hello())

伺服器的使用方式如下:

#!/usr/bin/env python

import asyncio

import websockets

async def echo(websocket, path):

async for message in websocket:

await websocket.send(message)

start_server = websockets.serve(echo, "localhost", 8765)

asyncio.get_event_loop().run_until_complete(start_server)

asyncio.get_event_loop().run_forever()

總體來說,websockets庫提供相對底層的API實作,靈活度比較高,擴充性比較強。

2.2 協程(Asyncio)

asyncio 是用來編寫 并發 代碼的庫,使用 async/await 文法。

asyncio 被用作多個提供高性能 Python 異步架構的基礎,包括網絡和網站服務,資料庫連接配接庫,分布式任務隊列等等。

asyncio 往往是建構 IO 密集型和高層級 結構化 網絡代碼的最佳選擇。

下面我們簡單介紹一下基于協程的多線程的使用。

首先協程可以了解為是微線程,python通過劃分不同的時間片來異步運作協程,實作程式的“并行”。具體來說,在python裡,通過async關鍵字的修飾,函數成為可等待的對象,在運作協程時通過await關鍵字,python會自動對其進行排程。

import asyncio

import time

async def say_after(delay, what):

await asyncio.sleep(delay)

print(what)

async def main():

print(f"started at {time.strftime('%X')}")

await say_after(1, 'hello')

await say_after(2, 'world')

print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

這段程式中,兩次輸出在同一個event_loop裡,python依次執行他們,輸出是這樣的:

started at 17:13:52

hello

world

finished at 17:13:55

通過asycio.create_task()來實作多任務的并發:

async def main():

task1 = asyncio.create_task(

say_after(1, 'hello'))

task2 = asyncio.create_task(

say_after(2, 'world'))

print(f"started at {time.strftime('%X')}")

# Wait until both tasks are completed (should take

# around 2 seconds.)

await task1

await task2

print(f"finished at {time.strftime('%X')}")

這樣python就會自動排程兩個程式,當其中一個睡眠及阻塞後,運作第二個程式,是以兩個程式看起來是并發執行的。輸出如下:

started at 17:14:32

hello

world

finished at 17:14:34

到這裡我們基本掌握了協程的使用,更詳細的内容可以參考python文檔。

2.3 功能描述

我們實作一個全雙工websocket通信示例,用戶端向服務端請求建立起連接配接後,服務端持續向用戶端發送心跳包維持連接配接,同時服務端向用戶端發送資料。用戶端處理資料的同時,并發地回複心跳包。我們通過協程來實作并發。(處理和傳輸子產品在用戶端或服務端是一樣的)

2.4 具體實作

2.4.1 用戶端

用戶端主要有幾個子產品,一個是注冊子產品:

async def register(ws): # ws為已經建立的websockets連接配接

register_message = "天王蓋地虎"

await ws.send(register_message)

print("sent", register_message)

response = await ws.recv()

response = json.loads(response)

if response["type"] == 'register' and response['value'] == "寶塔鎮河妖":

print("Received",response['value'])

else:

raise Exception("Register failed!")

用戶端發送暗語“天王蓋地虎”給服務端,若服務端傳回“寶塔鎮河妖”,則認證成功,否則傳回認證失敗的錯誤,用戶端需重新發起連接配接。

用戶端需要處理服務端發送來的心跳包和資料包,是以使用一個分發器來處理這兩類消息,同時用信号量以及資料池來控制其他子產品。

heartbeat_signal = False # 心跳的信号量,用于控制心跳包回傳子產品

data_pool = [] # 資料池

async def dispatcher(ws):

global heartbeat_signal

global data_pool

async for message in ws:

message = json.loads(message)

if message["type"] == "heartbeat": # 若消息為心跳包,更改心跳信号量

heartbeat_signal = True

elif message["type"] == "data": # 若消息為資料,将其加入資料池

data_pool.append(message['value'])

else:

raise Exception("Invalid message") # 否則傳回錯誤,重新發起連接配接

心跳包處理子產品不斷驗證信号量,若收到心跳包,則信号量應該被分發器更改為True,此時傳回心跳包,并修改信号量為False;資料處理子產品我們使用asyncio的睡眠子產品來模拟資料處理的時間,并輸出處理前後的時間,若在這個時間段内有處理心跳包,則證明這兩個子產品在并發運作。處理心跳包和資料的子產品如下:

async def heartbeat(ws):

global heartbeat_signal # 心跳信号量

while True: # 不斷驗證信号量,若收到心跳包,則信号量應該被分發器更改為True,傳回心跳包,并修改信号量為False

if heartbeat_signal == True:

await ws.send("pong")

print("pong")

heartbeat_signal= False

await asyncio.sleep(1)

async def foo(ws):

global data_pool

while True:

if len(data_pool):

data = data_pool.pop(0) # 從資料池中去資料

print("start", time.time() - start_time)

await asyncio.sleep(5)

print("end", time.time() - start_time)

print(data)

await asyncio.sleep(5) # 每5秒嘗試從資料池中取一次資料

最終用戶端的主函數如下:

import asyncio

import websockets

import json

import time

start_time = time.time()

heartbeat_signal = False

data_pool = []

async def connect_server():

url = "ws://localhost:8088"

while True:

try:

print("Initiate a connection")

async with websockets.connect(url) as ws:

await register(ws)

task1 = asyncio.create_task(dispatcher(ws))

task2 = asyncio.create_task(heartbeat(ws))

task3 = asyncio.create_task(foo(ws))

await task1

await task2

await task3

except Exception as e:

print(e)

print("Connection break!")

asyncio.get_event_loop().run_until_complete(connect_server())

注冊之後,并行地運作分發器子產品、心跳包處理子產品以及資料處理子產品即可。若出錯則重建立立連接配接。

2.4.2 服務端

服務端與用戶端類似,有幾個對應的子產品,首先是注冊子產品:

async def register(ws):

print("register")

message = json.dumps({"type": "register", "value": f"寶塔鎮河妖"})

await ws.send(message)

users.append(ws)

下面是心跳子產品,心跳子產品每5秒發送一次心跳包,如果10秒之内沒有收到回複,則認為連接配接斷開。(這裡連接配接斷開應該要從使用者池中剔除相應的使用者,因為隻是個demo所有沒有寫)

async def heartbeat(ws):

message = json.dumps({"type": "heartbeat", "value": "pong"})

while True:

await ws.send(message)

print("ping")

hb_response = None

hb_response = await asyncio.wait_for(ws.recv(), timeout=10)

try:

print(hb_response)

except:

raise Exception("Heartbeat break down")

await asyncio.sleep(5)

最後是發送資料子產品,沒隔i秒發送一次資料:

async def send_data(ws):

for i in range(6):

data = json.dumps({"type": "data", "value": i})

await ws.send(data)

print("sent data: ", i)

await asyncio.sleep(i)

主函數與用戶端類似,通過task來實作并行:

async def server(ws, path):

async for message in ws:

if message == "天王蓋地虎":

await register(ws)

task1 = asyncio.create_task(heartbeat(ws))

task2 = asyncio.create_task(send_data(ws))

await task1

await task2

else:

await ws.send("Denied")

start_server = websockets.serve(server, "localhost", 8088)

asyncio.get_event_loop().run_until_complete(start_server)

asyncio.get_event_loop().run_forever()

2.5 效果

服務端效果

register

ping

sent data: 0

sent data: 1

sent data: 2

pong

sent data: 3

ping

sent data: 4

pong

sent data: 5

ping

pong

sent data: 6

ping

pong

sent data: 7

ping

pong

sent data: 8

ping

pong

sent data: 9

ping

pong

ping

服務端實作了發送資料與發送心跳包的并行。

用戶端的效果:

Initiate a connection

sent 天王蓋地虎

Received 寶塔鎮河妖

pong

start 5.017218828201294

pong

end 10.018025636672974

pong

start 15.019151210784912

pong

end 20.019917011260986

1

start 25.00786280632019

pong

end 30.0090491771698

2

pong

start 35.012359380722046

pong

end 40.0146267414093

3

pong

start 45.01650857925415

pong

end 50.01818251609802

4

start 55.01792335510254

pong

end 60.007564544677734

5

pong

pong

pong

code = 1006 (connection closed abnormally [internal]), no reason

Connection break!

Initiate a connection

[WinError 1225] 遠端計算機拒絕網絡連接配接。

Connection break!

在資料處理子產品的start和end之間有心跳包傳回,說明兩子產品在并行運作。最後我們斷掉伺服器,重連機制也運作正常。

原文連結:https://blog.csdn.net/pupilxmk/article/details/106429215