天天看點

python的Socket網絡程式設計 使用模闆

相關說明

本文給出的是TCP協定的Socket程式設計。

其中用了一個dbmanager資料庫操作子產品,這個子產品是我自己定義的,可以在我的另一個文章中找到這個子產品的分享。

python操作mysql資料庫的精美實用子產品

服務段完整代碼

【如下代碼,完全複制,直接運作,即可使用】

import socket
import threading
import json
from tools import dbmanager    #這個子產品是我自定義的,可以在我的另一個文章中找到這個子產品的分享
###################################
####伺服器參數
####################################

HOST = '0.0.0.0'            #ip 0.0.0.0 表示本機所有ip位址
PORT = 9905                 #端口号
Max_Listen =10              #最大監聽數
BUFSIZ = 1024               #每次接收資料長度
ENDMARK = "messageover"     #資訊結束标記



####################################
####業務處理函數
####################################
def searchuser(sock, dict_data):
    the_searchstring = dict_data['par']
    #########構造sql語句
    sqlstring0 = "SELECT * FROM hr_user WHERE id>0 "

    if the_searchstring != None and the_searchstring != "":
        sqlstring0 = sqlstring0 + " AND (username LIKE '%" + the_searchstring + "%')"

    ########執行資料庫查詢
    data0 = dbmanager.executeSelectAllback(sqlstring0)
    if data0 == False:
        message = {'action': 'Error_SqlConnet'}
        sock.sendall(json.dumps(message).encode("utf-8"))    ####發送資料給用戶端
        return
    if data0 != None:
        message = {'action': 'SearchUser_Success'}
        message['data'] = data0
        sock.sendall(json.dumps(message).encode("utf-8"))    ####發送資料給用戶端
        return



####################################
####線程處理函數
####################################
def readRequest(sock, addr):
    print('Accept new connection from %s:%s...' % addr)
    allresponse = ""
    while True:
        ########接收資料
        data = sock.recv(BUFSIZ).decode('utf-8')
        if len(data):
            allresponse = allresponse + data
            if ENDMARK not in allresponse:
                continue
        if allresponse == "":
            break


        ########處理資料
        allresponse =allresponse[:-len(ENDMARK)]
        dict_data = json.loads(allresponse)
        action = dict_data['action']
        if action == "SearchUser":
            searchuser(sock, dict_data)  #業務處理
            break
        elif action == "SearchCompany":
            break                        #業務處理
        else:
            break


    ########關閉連接配接
    sock.close()
    print('Connection from %s:%s closed.' % addr)








####################################
#程式入口
####################################
if __name__ == "__main__":
    #####建立一個socket
    the_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  # AF_INET指定使用IPv4協定  #SOCK_STREAM指定使用面向流的TCP協定

    #####綁定端口
    the_socket.bind((HOST, PORT))

    #####監聽端口
    the_socket.listen(Max_Listen)

    print('Waiting for connection...')

    while True:
        # 接受一個新連接配接
        sock, addr = the_socket.accept()

        # 建立新線程來處理TCP連接配接
        the_thread = threading.Thread(target=readRequest, args=(sock, addr))
        the_thread.start()
    ####################################
           

用戶端完整代碼

import socket
import json

#############################################################################################
####參數
#############################################################################################
HOST = '127.0.0.1'
PORT = 9905
BUFSIZ = 1024
ENDMARK = "messageover"



#############################################################################################
#######請求處理函數
#############################################################################################
def issueRequest(action, par):
    # 建立一個socket
    the_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # 建立連接配接:
    try:
        the_socket.connect((HOST, PORT))
    except:
        print("伺服器連接配接失敗!")
        return

    #############################
    try:
        ########發送資料:
        message = {}
        message['action'] = action
        message['par'] = par
        last_message = json.dumps(message) + ENDMARK  # son.dumps()将 Python 對象編碼成 JSON 字元串
        print(message)
        the_socket.sendall(last_message.encode("utf-8"))

        ########接收資料
        allresponse = ""
        while True:
            response = the_socket.recv(BUFSIZ).decode('utf-8')
            if len(response):
                allresponse = allresponse + response
                continue
            if allresponse == "":
                break

            #######處理資料
            dict_data = json.loads(allresponse)  # json.loads()将已編碼的 JSON 字元串解碼為 Python 對象
            action = dict_data['action']
            if action == "SearchUser_Success":
                data0 = dict_data['data']
                print(data0)
                break
            elif action == "Error_SqlConnet":
                print("資料查詢失敗!")
                break
            else:
                break
    except:
        print("伺服器連接配接異常,資料查詢失敗!")
    finally:
        #######關閉連接配接
        the_socket.close()



####################################
#程式入口
####################################
if __name__ == "__main__":
    the_searchstring ="張三"
    issueRequest("SearchUser", the_searchstring)
           

本文如有幫助,敬請留言鼓勵。

本文如有錯誤,敬請留言改進。