1 什麼是WebSocket
WebSocket 是 HTML5 開始提供的一種在單個 TCP 連接配接上進行全雙工通訊的協定。
WebSocket 使得用戶端和伺服器之間的資料交換變得更加簡單,允許服務端主動向用戶端推送資料。在 WebSocket API 中,浏覽器和伺服器隻需要完成一次握手,兩者之間就直接可以建立持久性的連接配接,并進行雙向資料傳輸。
在 WebSocket API 中,浏覽器和伺服器隻需要做一個握手的動作,然後,浏覽器和伺服器之間就形成了一條快速通道。兩者之間就直接可以資料互相傳送。
現在,很多網站為了實作推送技術,所用的技術都是 Ajax 輪詢。輪詢是在特定的的時間間隔(如每1秒),由浏覽器對伺服器發出HTTP請求,然後由伺服器傳回最新的資料給用戶端的浏覽器。這種傳統的模式帶來很明顯的缺點,即浏覽器需要不斷的向伺服器送出請求,然而HTTP請求可能包含較長的頭部,其中真正有效的資料可能隻是很小的一部分,顯然這樣會浪費很多的帶寬等資源。
HTML5 定義的 WebSocket 協定,能更好的節省伺服器資源和帶寬,并且能夠更實時地進行通訊。

浏覽器通過 JavaScript 向伺服器發出建立 WebSocket 連接配接的請求,連接配接建立以後,用戶端和伺服器端就可以通過 TCP 連接配接直接交換資料。
當你擷取 Web Socket 連接配接後,你可以通過 send() 方法來向伺服器發送資料,并通過 onmessage 事件來接收伺服器傳回的資料。
來源:https://www.runoob.com/html/html5-websocket.html
2 并發通信的實作
2.1 WebSockets庫
使用websockets庫來實作WebSockets協定,websockets是一個庫,用于在Python中建構WebSocket 伺服器和用戶端,重點在于正确性和簡單性。
它建立在asyncioPython的标準異步I / O架構之上,提供了一個優雅的基于協程的API。
用戶端發送和接收消息的方式如下:
#!/usr/bin/env python
import asyncio
import websockets
async def hello():
uri = "ws://localhost:8765"
async with websockets.connect(uri) as websocket:
await websocket.send("Hello world!")
await websocket.recv()
asyncio.get_event_loop().run_until_complete(hello())
伺服器的使用方式如下:
#!/usr/bin/env python
import asyncio
import websockets
async def echo(websocket, path):
async for message in websocket:
await websocket.send(message)
start_server = websockets.serve(echo, "localhost", 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
總體來說,websockets庫提供相對底層的API實作,靈活度比較高,擴充性比較強。
2.2 協程(Asyncio)
asyncio 是用來編寫 并發 代碼的庫,使用 async/await 文法。
asyncio 被用作多個提供高性能 Python 異步架構的基礎,包括網絡和網站服務,資料庫連接配接庫,分布式任務隊列等等。
asyncio 往往是建構 IO 密集型和高層級 結構化 網絡代碼的最佳選擇。
下面我們簡單介紹一下基于協程的多線程的使用。
首先協程可以了解為是微線程,python通過劃分不同的時間片來異步運作協程,實作程式的“并行”。具體來說,在python裡,通過async關鍵字的修飾,函數成為可等待的對象,在運作協程時通過await關鍵字,python會自動對其進行排程。
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
async def main():
print(f"started at {time.strftime('%X')}")
await say_after(1, 'hello')
await say_after(2, 'world')
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
這段程式中,兩次輸出在同一個event_loop裡,python依次執行他們,輸出是這樣的:
started at 17:13:52
hello
world
finished at 17:13:55
通過asycio.create_task()來實作多任務的并發:
async def main():
task1 = asyncio.create_task(
say_after(1, 'hello'))
task2 = asyncio.create_task(
say_after(2, 'world'))
print(f"started at {time.strftime('%X')}")
# Wait until both tasks are completed (should take
# around 2 seconds.)
await task1
await task2
print(f"finished at {time.strftime('%X')}")
這樣python就會自動排程兩個程式,當其中一個睡眠及阻塞後,運作第二個程式,是以兩個程式看起來是并發執行的。輸出如下:
started at 17:14:32
hello
world
finished at 17:14:34
到這裡我們基本掌握了協程的使用,更詳細的内容可以參考python文檔。
2.3 功能描述
我們實作一個全雙工websocket通信示例,用戶端向服務端請求建立起連接配接後,服務端持續向用戶端發送心跳包維持連接配接,同時服務端向用戶端發送資料。用戶端處理資料的同時,并發地回複心跳包。我們通過協程來實作并發。(處理和傳輸子產品在用戶端或服務端是一樣的)
2.4 具體實作
2.4.1 用戶端
用戶端主要有幾個子產品,一個是注冊子產品:
async def register(ws): # ws為已經建立的websockets連接配接
register_message = "天王蓋地虎"
await ws.send(register_message)
print("sent", register_message)
response = await ws.recv()
response = json.loads(response)
if response["type"] == 'register' and response['value'] == "寶塔鎮河妖":
print("Received",response['value'])
else:
raise Exception("Register failed!")
用戶端發送暗語“天王蓋地虎”給服務端,若服務端傳回“寶塔鎮河妖”,則認證成功,否則傳回認證失敗的錯誤,用戶端需重新發起連接配接。
用戶端需要處理服務端發送來的心跳包和資料包,是以使用一個分發器來處理這兩類消息,同時用信号量以及資料池來控制其他子產品。
heartbeat_signal = False # 心跳的信号量,用于控制心跳包回傳子產品
data_pool = [] # 資料池
async def dispatcher(ws):
global heartbeat_signal
global data_pool
async for message in ws:
message = json.loads(message)
if message["type"] == "heartbeat": # 若消息為心跳包,更改心跳信号量
heartbeat_signal = True
elif message["type"] == "data": # 若消息為資料,将其加入資料池
data_pool.append(message['value'])
else:
raise Exception("Invalid message") # 否則傳回錯誤,重新發起連接配接
心跳包處理子產品不斷驗證信号量,若收到心跳包,則信号量應該被分發器更改為True,此時傳回心跳包,并修改信号量為False;資料處理子產品我們使用asyncio的睡眠子產品來模拟資料處理的時間,并輸出處理前後的時間,若在這個時間段内有處理心跳包,則證明這兩個子產品在并發運作。處理心跳包和資料的子產品如下:
async def heartbeat(ws):
global heartbeat_signal # 心跳信号量
while True: # 不斷驗證信号量,若收到心跳包,則信号量應該被分發器更改為True,傳回心跳包,并修改信号量為False
if heartbeat_signal == True:
await ws.send("pong")
print("pong")
heartbeat_signal= False
await asyncio.sleep(1)
async def foo(ws):
global data_pool
while True:
if len(data_pool):
data = data_pool.pop(0) # 從資料池中去資料
print("start", time.time() - start_time)
await asyncio.sleep(5)
print("end", time.time() - start_time)
print(data)
await asyncio.sleep(5) # 每5秒嘗試從資料池中取一次資料
最終用戶端的主函數如下:
import asyncio
import websockets
import json
import time
start_time = time.time()
heartbeat_signal = False
data_pool = []
async def connect_server():
url = "ws://localhost:8088"
while True:
try:
print("Initiate a connection")
async with websockets.connect(url) as ws:
await register(ws)
task1 = asyncio.create_task(dispatcher(ws))
task2 = asyncio.create_task(heartbeat(ws))
task3 = asyncio.create_task(foo(ws))
await task1
await task2
await task3
except Exception as e:
print(e)
print("Connection break!")
asyncio.get_event_loop().run_until_complete(connect_server())
注冊之後,并行地運作分發器子產品、心跳包處理子產品以及資料處理子產品即可。若出錯則重建立立連接配接。
2.4.2 服務端
服務端與用戶端類似,有幾個對應的子產品,首先是注冊子產品:
async def register(ws):
print("register")
message = json.dumps({"type": "register", "value": f"寶塔鎮河妖"})
await ws.send(message)
users.append(ws)
下面是心跳子產品,心跳子產品每5秒發送一次心跳包,如果10秒之内沒有收到回複,則認為連接配接斷開。(這裡連接配接斷開應該要從使用者池中剔除相應的使用者,因為隻是個demo所有沒有寫)
async def heartbeat(ws):
message = json.dumps({"type": "heartbeat", "value": "pong"})
while True:
await ws.send(message)
print("ping")
hb_response = None
hb_response = await asyncio.wait_for(ws.recv(), timeout=10)
try:
print(hb_response)
except:
raise Exception("Heartbeat break down")
await asyncio.sleep(5)
最後是發送資料子產品,沒隔i秒發送一次資料:
async def send_data(ws):
for i in range(6):
data = json.dumps({"type": "data", "value": i})
await ws.send(data)
print("sent data: ", i)
await asyncio.sleep(i)
主函數與用戶端類似,通過task來實作并行:
async def server(ws, path):
async for message in ws:
if message == "天王蓋地虎":
await register(ws)
task1 = asyncio.create_task(heartbeat(ws))
task2 = asyncio.create_task(send_data(ws))
await task1
await task2
else:
await ws.send("Denied")
start_server = websockets.serve(server, "localhost", 8088)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
2.5 效果
服務端效果
register
ping
sent data: 0
sent data: 1
sent data: 2
pong
sent data: 3
ping
sent data: 4
pong
sent data: 5
ping
pong
sent data: 6
ping
pong
sent data: 7
ping
pong
sent data: 8
ping
pong
sent data: 9
ping
pong
ping
服務端實作了發送資料與發送心跳包的并行。
用戶端的效果:
Initiate a connection
sent 天王蓋地虎
Received 寶塔鎮河妖
pong
start 5.017218828201294
pong
end 10.018025636672974
pong
start 15.019151210784912
pong
end 20.019917011260986
1
start 25.00786280632019
pong
end 30.0090491771698
2
pong
start 35.012359380722046
pong
end 40.0146267414093
3
pong
start 45.01650857925415
pong
end 50.01818251609802
4
start 55.01792335510254
pong
end 60.007564544677734
5
pong
pong
pong
code = 1006 (connection closed abnormally [internal]), no reason
Connection break!
Initiate a connection
[WinError 1225] 遠端計算機拒絕網絡連接配接。
Connection break!
在資料處理子產品的start和end之間有心跳包傳回,說明兩子產品在并行運作。最後我們斷掉伺服器,重連機制也運作正常。
原文連結:https://blog.csdn.net/pupilxmk/article/details/106429215