天天看點

Python socket TCP群聊軟體服務端程式設計使用場景及應用示例

作者:運維木子李

#頭條創作挑戰賽#

以下是使用Python的socket子產品實作的幾個案例:

Python socket TCP群聊軟體服務端程式設計的原理是基于TCP協定的網絡通信。TCP(傳輸控制協定)是一種可靠的、面向連接配接的協定,它提供了可靠的資料傳輸、流控制和擁塞控制等功能。

服務端通過監聽指定的IP和端口,等待用戶端的連接配接。一旦有用戶端連接配接成功,服務端會為每個用戶端建立一個獨立的線程用于處理與該用戶端的通信。在服務端接收到用戶端的消息後,會将消息發送給所有連接配接的用戶端(除了發送者)。

Python socket TCP群聊軟體服務端程式設計使用場景及應用示例

使用場景

以下是一些可能的使用場景:

  • 線上聊天室:多個使用者可以同時連接配接到服務端,彼此之間可以發送消息進行聊天。
  • 多人協同編輯:多個使用者可以同時連接配接到服務端,實時編輯共享的文檔或代碼。
  • 線上遊戲:多個玩家可以通過連接配接到服務端進行實時的遊戲對戰或合作。
  • 實時監控系統:多個監控裝置連接配接到服務端,将實時的監控資料發送給服務端,服務端将資料發送給所有連接配接的用戶端進行實時監控。
  • 遠端控制系統:用戶端連接配接到服務端,通過服務端中轉控制指令,實作遠端控制被控制裝置的功能。

應用示例

線上聊天室:

服務端代碼:

import socket
import threading

# 伺服器IP和端口
SERVER_HOST = '0.0.0.0'
SERVER_PORT = 12345

# 存儲所有連接配接的用戶端socket
client_sockets = []

def handle_client(client_socket):
    while True:
        try:
            # 接收用戶端消息
            message = client_socket.recv(1024).decode('utf-8')
            if message:
                print(f"收到消息:{message}")
                
                # 将消息發送給所有連接配接的用戶端(除了發送者)
                for socket in client_sockets:
                    if socket != client_socket:
                        socket.send(message.encode('utf-8'))
            else:
                # 用戶端斷開連接配接
                remove_client(client_socket)
                break
        except:
            # 出現異常,斷開連接配接
            remove_client(client_socket)
            break

def remove_client(client_socket):
    # 移除用戶端socket
    client_sockets.remove(client_socket)
    client_socket.close()

def start_server():
    # 建立伺服器socket
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    
    # 綁定IP和端口
    server_socket.bind((SERVER_HOST, SERVER_PORT))
    
    # 監聽連接配接
    server_socket.listen()
    
    print(f"聊天室伺服器已啟動,監聽端口:{SERVER_PORT}")
    
    while True:
        # 接受用戶端連接配接
        client_socket, client_address = server_socket.accept()
        client_sockets.append(client_socket)
        
        # 建立線程處理用戶端消息
        client_thread = threading.Thread(target=handle_client, args=(client_socket,))
        client_thread.start()

start_server()
           

用戶端代碼:

import socket
import threading

# 伺服器IP和端口
SERVER_HOST = '127.0.0.1'
SERVER_PORT = 12345

def receive_message(client_socket):
    while True:
        try:
            # 接收伺服器消息
            message = client_socket.recv(1024).decode('utf-8')
            print(message)
        except:
            print("與伺服器的連接配接斷開")
            break

def start_client():
    # 建立用戶端socket
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    
    try:
        # 連接配接伺服器
        client_socket.connect((SERVER_HOST, SERVER_PORT))
        
        # 建立線程接收伺服器消息
        receive_thread = threading.Thread(target=receive_message, args=(client_socket,))
        receive_thread.start()
        
        while True:
            # 發送消息給伺服器
            message = input()
            client_socket.send(message.encode('utf-8'))
    except:
        print("無法連接配接到伺服器")

start_client()           

多人協同編輯:

import socket
import threading

HOST = 'localhost'  # 伺服器IP位址
PORT = 5000  # 伺服器端口号

# 建立一個socket對象
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 綁定IP位址和端口号
server_socket.bind((HOST, PORT))
# 監聽連接配接
server_socket.listen()

clients = []  # 存儲連接配接的用戶端socket對象
nicknames = []  # 存儲用戶端的昵稱

def broadcast(message):
    # 向所有用戶端廣播消息
    for client in clients:
        client.send(message)

def handle(client):
    while True:
        try:
            # 接收用戶端發送的消息
            message = client.recv(1024)
            # 廣播消息給其他用戶端
            broadcast(message)
        except:
            # 用戶端斷開連接配接
            index = clients.index(client)
            clients.remove(client)
            client.close()
            nickname = nicknames[index]
            nicknames.remove(nickname)
            broadcast(f'{nickname}離開了聊天室'.encode('utf-8'))
            break

def receive():
    while True:
        # 接受用戶端連接配接
        client, address = server_socket.accept()
        print(f'已連接配接到 {address}')
        client.send('NICK'.encode('utf-8'))
        nickname = client.recv(1024).decode('utf-8')
        nicknames.append(nickname)
        clients.append(client)
        print(f'昵稱為 {nickname} 的使用者已加入聊天室')
        broadcast(f'{nickname}加入了聊天室'.encode('utf-8'))
        client.send('已成功加入聊天室'.encode('utf-8'))
        client.send('你可以開始聊天了'.encode('utf-8'))
        # 啟動一個新的線程處理用戶端的消息
        thread = threading.Thread(target=handle, args=(client,))
        thread.start()

print('伺服器正在運作...')
receive()           

這個示例代碼建立了一個聊天伺服器,當有用戶端連接配接時,将其加入聊天室,并向所有用戶端廣播該使用者已加入聊天室的消息。當用戶端發送消息時,伺服器會将其廣播給其他用戶端。當用戶端斷開連接配接時,伺服器會将其從聊天室中移除,并向其他用戶端廣播該使用者已離開的消息。

你可以在用戶端使用socket子產品連接配接到該伺服器,并發送消息來實作多人協同編輯的功能。

線上遊戲:

要使用Python的socket子產品實作線上遊戲,需要設計一個伺服器來處理玩家之間的通信和遊戲邏輯。以下是一個簡單的示例代碼,示範了如何使用socket子產品實作一個簡單的多人線上遊戲:

import socket
import threading

HOST = 'localhost'  # 伺服器IP位址
PORT = 5000  # 伺服器端口号

# 建立一個socket對象
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 綁定IP位址和端口号
server_socket.bind((HOST, PORT))
# 監聽連接配接
server_socket.listen()

clients = []  # 存儲連接配接的用戶端socket對象
player_positions = {}  # 存儲玩家的位置資訊

def broadcast(message):
    # 向所有用戶端廣播消息
    for client in clients:
        client.send(message)

def handle(client):
    while True:
        try:
            # 接收用戶端發送的消息
            message = client.recv(1024)
            # 廣播消息給其他用戶端
            broadcast(message)
        except:
            # 用戶端斷開連接配接
            index = clients.index(client)
            clients.remove(client)
            client.close()
            del player_positions[client]
            broadcast(f'Player {index+1}離開了遊戲'.encode('utf-8'))
            break

def receive():
    while True:
        # 接受用戶端連接配接
        client, address = server_socket.accept()
        print(f'已連接配接到 {address}')
        clients.append(client)
        player_positions[client] = (0, 0)  # 初始化玩家位置
        print(f'Player {len(clients)}加入了遊戲')
        broadcast(f'Player {len(clients)}加入了遊戲'.encode('utf-8'))
        client.send('已成功加入遊戲'.encode('utf-8'))
        client.send('你可以開始遊戲了'.encode('utf-8'))
        # 啟動一個新的線程處理用戶端的消息
        thread = threading.Thread(target=handle, args=(client,))
        thread.start()

print('伺服器正在運作...')
receive()           

這個示例代碼建立了一個遊戲伺服器,當有用戶端連接配接時,将其加入遊戲,并向所有用戶端廣播該玩家已加入遊戲的消息。當用戶端發送消息時,伺服器會将其廣播給其他用戶端。當用戶端斷開連接配接時,伺服器會将其從遊戲中移除,并向其他用戶端廣播該玩家已離開遊戲的消息。

你可以在用戶端使用socket子產品連接配接到該伺服器,并發送消息來與其他玩家進行互動。你可以根據遊戲的需求,在伺服器端添加遊戲邏輯來處理玩家的移動、攻擊等操作,并将相關資訊廣播給其他玩家。

實時監控系統:

服務端代碼:

import socket
import cv2
import numpy as np
import threading

# 伺服器IP和端口
SERVER_HOST = '0.0.0.0'
SERVER_PORT = 12345

# 存儲所有連接配接的用戶端socket
client_sockets = []

def handle_client(client_socket):
    cap = cv2.VideoCapture(0)
    
    while True:
        try:
            ret, frame = cap.read()
            if ret:
                # 将視訊幀轉換為字元串
                data = frame.tostring()
                
                # 将視訊幀發送給所有連接配接的用戶端
                for socket in client_sockets:
                    socket.send(data)
            else:
                # 用戶端斷開連接配接
                remove_client(client_socket)
                break
        except:
            # 出現異常,斷開連接配接
            remove_client(client_socket)
            break
    
    cap.release()

def remove_client(client_socket):
    # 移除用戶端socket
    client_sockets.remove(client_socket)
    client_socket.close()

def start_server():
    # 建立伺服器socket
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    
    # 綁定IP和端口
    server_socket.bind((SERVER_HOST, SERVER_PORT))
    
    # 監聽連接配接
    server_socket.listen()
    
    print(f"監控系統伺服器已啟動,監聽端口:{SERVER_PORT}")
    
    while True:
        # 接受用戶端連接配接
        client_socket, client_address = server_socket.accept()
        client_sockets.append(client_socket)
        
        # 建立線程處理用戶端視訊流
        client_thread = threading.Thread(target=handle_client, args=(client_socket,))
        client_thread.start()

start_server()           

用戶端代碼:

import socket
import cv2
import numpy as np
import threading

# 伺服器IP和端口
SERVER_HOST = '127.0.0.1'
SERVER_PORT = 12345

def receive_video(client_socket):
    while True:
        try:
            # 接收伺服器發送的視訊幀資料
            data = client_socket.recv(921600)
            
            # 将字元串轉換為視訊幀
            frame = np.frombuffer(data, dtype=np.uint8).reshape((480, 640, 3))
            
            # 顯示視訊幀
            cv2.imshow("Video", frame)
            
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
        except:
            print("與伺服器的連接配接斷開")
            break
    
    cv2.destroyAllWindows()

def start_client():
    # 建立用戶端socket
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    
    try:
        # 連接配接伺服器
        client_socket.connect((SERVER_HOST, SERVER_PORT))
        
        # 建立線程接收伺服器視訊流
        receive_thread = threading.Thread(target=receive_video, args=(client_socket,))
        receive_thread.start()
        
        while True:
            pass
    except:
        print("無法連接配接到伺服器")

start_client()           

遠端控制系統:

服務端代碼:

import socket
import threading
import pyautogui

# 伺服器IP和端口
SERVER_HOST = '0.0.0.0'
SERVER_PORT = 12345

# 存儲所有連接配接的用戶端socket
client_sockets = []

def handle_client(client_socket):
    while True:
        try:
            # 接收用戶端消息
            message = client_socket.recv(1024).decode('utf-8')
            if message:
                print(f"收到消息:{message}")
                
                # 執行用戶端發送的指令
                if message == 'left_click':
                    pyautogui.click()
                elif message.startswith('move_mouse'):
                    _, x, y = message.split()
                    pyautogui.moveTo(int(x), int(y))
            else:
                # 用戶端斷開連接配接
                remove_client(client_socket)
                break
        except:
            # 出現異常,斷開連接配接
            remove_client(client_socket)
            break

def remove_client(client_socket):
    # 移除用戶端socket
    client_sockets.remove(client_socket)
    client_socket.close()

def start_server():
    # 建立伺服器socket
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    
    # 綁定IP和端口
    server_socket.bind((SERVER_HOST, SERVER_PORT))
    
    # 監聽連接配接
    server_socket.listen()
    
    print(f"遠端控制系統伺服器已啟動,監聽端口:{SERVER_PORT}")
    
    while True:
        # 接受用戶端連接配接
        client_socket, client_address = server_socket.accept()
        client_sockets.append(client_socket)
        
        # 建立線程處理用戶端消息
        client_thread = threading.Thread(target=handle_client, args=(client_socket,))
        client_thread.start()

start_server()           

用戶端代碼:

import socket

# 伺服器IP和端口
SERVER_HOST = '127.0.0.1'
SERVER_PORT = 12345

def start_client():
    # 建立用戶端socket
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    
    try:
        # 連接配接伺服器
        client_socket.connect((SERVER_HOST, SERVER_PORT))
        
        while True:
            # 擷取指令
            command = input("請輸入指令(left_click, move_mouse <x> <y>):")
            
            # 發送指令給伺服器
            client_socket.send(command.encode('utf-8'))
    except:
        print("無法連接配接到伺服器")

start_client()           

以上是一些使用Python的socket子產品實作的線上聊天室、多人協同編輯、線上遊戲、實時監控系統、遠端控制系統的簡單示例代碼。這些示例代碼僅供參考,具體應用中可能需要根據需求進行相應的改進和優化。