常見模型分類
循環伺服器模型 :循環接收用戶端請求,處理請求。同一時刻隻能處理一個請求,處理完畢後再
處理下一個。
- 優點:實作簡單,占用資源少
- 缺點:無法同時處理多個用戶端請求
- 适用情況:處理的任務可以很快完成,用戶端無需長期占用服務端程式。udp比tcp更适合循環。
IO并發模型:利用IO多路複用,異步IO等技術,同時處理多個用戶端IO請求。
- 優點 : 資源消耗少,能同時高效處理多個IO行為
- 缺點 : 隻能處理并發産生的IO事件,無法處理cpu計算
- 适用情況:HTTP請求,網絡傳輸等都是IO行為。
多程序/線程網絡并發模型:每當一個用戶端連接配接伺服器,就建立一個新的程序/線程為該用戶端服務,用戶端退出時再銷毀該程序/線程。
- 優點:能同時滿足多個用戶端長期占有服務端需求,可以處理各種請求。
- 缺點: 資源消耗較大
- 适用情況:用戶端同時連接配接量較少,需要處理行為較複雜情況。
基于fork的多程序網絡并發模型
- 建立監聽套接字
- 等待接收用戶端請求
- 用戶端連接配接建立新的程序處理用戶端請求
- 原程序繼續等待其他用戶端連接配接
- 如果用戶端退出,則銷毀對應的程序
from socket import *
import os
import signal
# 建立監聽套接字
HOST = '0.0.0.0'
PORT = 8888
ADDR = (HOST,PORT)
# 用戶端服務函數
def handle(c):
while True:
data = c.recv(1024)
if not data:
break
print(data.decode())
c.send(b'OK')
c.close()
s = socket() # tcp套接字
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) # 設定套接字端口重用
s.bind(ADDR)
s.listen(3)
signal.signal(signal.SIGCHLD,signal.SIG_IGN) # 處理僵屍程序
print("Listen the port %d..." % PORT)
# 循環等待用戶端連接配接
while True:
try:
c,addr = s.accept()
except KeyboardInterrupt:
os._exit(0)
except Exception as e:
print(e)
continue
# 建立子程序處理這個用戶端
pid = os.fork()
if pid == 0: # 處理用戶端請求
s.close()
handle(c)
os._exit(0) # handle處理完用戶端請求子程序也退出
# 無論出錯或者父程序都要循環回去接受請求
# c對于父程序沒用
c.close()
基于threading的多線程網絡并發
- 建立監聽套接字
- 循環接收用戶端連接配接請求
- 當有新的用戶端連接配接建立線程處理用戶端請求
- 主線程繼續等待其他用戶端連接配接
- 當用戶端退出,則對應分支線程退出
from socket import *
from threading import Thread
import sys
# 建立監聽套接字
HOST = '0.0.0.0'
PORT = 8888
ADDR = (HOST,PORT)
# 處理用戶端請求
def handle(c):
while True:
data = c.recv(1024)
if not data:
break
print(data.decode())
c.send(b'OK')
c.close()
s = socket() # tcp套接字
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind(ADDR)
s.listen(3)
print("Listen the port %d..."%PORT)
# 循環等待用戶端連接配接
while True:
try:
c,addr = s.accept()
except KeyboardInterrupt:
sys.exit("伺服器退出")
except Exception as e:
print(e)
continue
# 建立線程處理用戶端請求
t = Thread(target=handle, args=(c,))
t.setDaemon(True) # 父程序結束則所有程序終止
t.start()
ftp 檔案伺服器
項目功能 :
* 用戶端有簡單的頁面指令提示: 功能包含:
- 檢視伺服器檔案庫中的檔案清單(普通檔案)
- 可以下載下傳其中的某個檔案到本地
- 可以上傳用戶端檔案到伺服器檔案庫
* 伺服器需求 :
- 允許多個用戶端同時操作
- 每個用戶端可能回連續發送指令
技術分析:
- tcp套接字更适合檔案傳輸
- 并發方案 ---》 fork 多程序并發
- 對檔案的讀寫操作
- 擷取檔案清單 ----》 os.listdir()
粘包的處理
整體結構設計
- 伺服器功能封裝在類中(上傳,下載下傳,檢視清單)
- 建立套接字,流程函數調用 main()
- 用戶端負責發起請求,接受回複,展示
服務端負責接受請求,邏輯處理
from socket import *
from threading import Thread
import os
import time
# 全局變量
HOST = '0.0.0.0'
PORT = 8080
ADDR = (HOST,PORT)
FTP = "/home/tarena/FTP/" # 檔案庫位置
# 建立檔案伺服器服務端功能類
class FTPServer(Thread):
def __init__(self,connfd):
self.connfd = connfd
super().__init__()
def do_list(self):
# 擷取檔案清單
files = os.listdir(FTP)
if not files:
self.connfd.send("檔案庫為空".encode())
return
else:
self.connfd.send(b'OK')
time.sleep(0.1) # 防止和後面發送内容粘包
# 拼接檔案清單
files_ = ""
for file in files:
if file[0] != '.' and \
os.path.isfile(FTP+file):
files_ += file + '\n'
self.connfd.send(files_.encode())
def do_get(self,filename):
try:
fd = open(FTP+filename,'rb')
except Exception:
self.connfd.send("檔案不存在".encode())
return
else:
self.connfd.send(b'OK')
time.sleep(0.1)
# 檔案發送
while True:
data = fd.read(1024)
if not data:
time.sleep(0.1)
self.connfd.send(b'##')
break
self.connfd.send(data)
# 循環接收用戶端請求
def run(self):
while True:
data = self.connfd.recv(1024).decode()
if not data or data == 'Q':
return
elif data == 'L':
self.do_list()
elif data[0] == 'G': # G filename
filename = data.split(' ')[-1]
self.do_get(filename)
# 網絡搭建
def main():
# 建立套接字
sockfd = socket()
sockfd.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
sockfd.bind(ADDR)
sockfd.listen(3)
print("Listen the port %d..."%PORT)
while True:
try:
connfd,addr = sockfd.accept()
print("Connect from",addr)
except KeyboardInterrupt:
print("伺服器程式退出")
return
except Exception as e:
print(e)
continue
# 建立新的線程處理用戶端
client = FTPServer(connfd)
client.setDaemon(True)
client.start() # 運作run方法
if __name__ == "__main__":
main()
ftp_sever
from socket import *
import sys
ADDR = ('127.0.0.1',8080) # 伺服器位址
# 用戶端功能處理類
class FTPClient:
def __init__(self,sockfd):
self.sockfd = sockfd
def do_list(self):
self.sockfd.send(b'L') # 發送請求
# 等待回複
data = self.sockfd.recv(128).decode()
if data == 'OK':
# 一次接收檔案清單字元串
data = self.sockfd.recv(4096)
print(data.decode())
else:
print(data)
def do_get(self,filename):
# 發送請求
self.sockfd.send(('G '+filename).encode())
# 等待回複
data = self.sockfd.recv(128).decode()
if data == 'OK':
fd = open(filename,'wb')
# 接收檔案
while True:
data = self.sockfd.recv(1024)
if data == b'##':
break
fd.write(data)
fd.close()
else:
print(data)
def do_quit(self):
self.sockfd.send(b'Q')
self.sockfd.close()
sys.exit("謝謝使用")
# 建立用戶端網絡
def main():
sockfd = socket()
try:
sockfd.connect(ADDR)
except Exception as e:
print(e)
return
ftp = FTPClient(sockfd) # 執行個體化對象
# 循環發送請求
while True:
print("\n=========指令選項==========")
print("**** list ****")
print("**** get file ****")
print("**** put file ****")
print("**** quit ****")
print("=============================")
cmd = input("輸入指令:")
if cmd.strip() == 'list':
ftp.do_list()
elif cmd[:3] == 'get':
# get filename
filename = cmd.strip().split(' ')[-1]
ftp.do_get(filename)
elif cmd[:3] == 'put':
# put ../filename
filename = cmd.strip().split(' ')[-1]
ftp.do_put(filename)
elif cmd.strip() == 'quit':
ftp.do_quit()
else:
print("請輸入正确指令")
if __name__ == "__main__":
main()
ftp_client
IO并發
定義:在記憶體中資料交換的操作被定義為IO操作,IO------輸入輸出
記憶體和磁盤進行資料交換: 檔案的讀寫 資料庫更新 記憶體和終端資料交換 : input print sys.stdin sys.stdout sys.stderr 記憶體和網絡資料的交換: 網絡連接配接 recv send recvfrom
IO密集型程式 : 程式執行中有大量的IO操作,而較少的cpu運算操作。消耗cpu較少,IO運作時間長
CPU(計算)密集型程式:程式中存在大量的cpu運算,IO操作相對較少,消耗cpu大。
IO分類
IO分為:阻塞IO、非阻塞IO、IO多路複用、事件驅動IO、異步IO
阻塞IO
- 定義: 在執行IO操作時如果執行條件不滿足則阻塞。阻塞IO是IO的預設形态。
- 效率: 阻塞IO是效率很低的一種IO。但是由于邏輯簡單是以是預設IO行為。
阻塞情況:
- 因為某種執行條件沒有滿足造成的函數阻塞 e.g. accept input recv
- 處理IO的時間較長産生的阻塞狀态 e.g. 網絡傳輸, 大檔案讀寫
非阻塞IO
定義 : 通過修改IO屬性行為, 使原本阻塞的IO變為非阻塞的狀态。
- 設定套接字為非阻塞IO
- sockfd.setblocking(bool)
- 功能: 設定套接字為非阻塞IO
- 參數: 預設為True,表示套接字IO阻塞;設定為False則套接字IO變為非阻塞
- 逾時檢測 :設定一個最長阻塞時間,超過該時間後則不再阻塞等待。
- sockfd.settimeout(sec)
- 功能:設定套接字的逾時時間
- 參數:設定的時間
IO多路複用
定義 :通過一個監測,可以同時監控多個IO事件的行為。當哪個IO事件可以執行,即讓這個IO事件發生。
rs, ws, xs = select(rlist, wlist, xlist[, timeout]) 監控IO事件,阻塞等待監控的IO時間發生
參數 :
- rlist 清單,存放(被動)等待處理的IO (接收)
- wlist 清單,存放主動處理的IO(發送)
- xlist 清單,存放出錯,希望去處理的IO(異常)
- timeout 逾時檢測
傳回值:
- rs 清單 rlist中準備就緒的IO
- ws 清單 wlist中準備就緒的IO
- xs 清單 xlist中準備就緒的IO
select 實作tcp服務
- 将關注的IO放入對應的監控類别清單
- 通過select函數進行監控
- 周遊select傳回值清單,确定就緒IO事件
- 處理發生的IO事件
from socket import *
from select import select
# 建立一個監聽套接字作為關注的IO
s = socket()
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind(('0.0.0.0',8888))
s.listen(3)
# 設定關注清單
rlist = [s]
wlist = []
xlist = [s]
# 循環監控IO
while True:
rs,ws,xs = select(rlist,wlist,xlist)
# 周遊三個傳回清單,處理IO
for r in rs:
# 根據周遊到IO的不同使用if分情況處理
if r is s:
c,addr = r.accept()
print("Connect from",addr)
rlist.append(c) # 增加新的IO事件
# else為用戶端套接字就緒情況
else:
data = r.recv(1024)
# 用戶端退出
if not data:
rlist.remove(r) # 從關注清單移除
r.close()
continue # 繼續處理其他就緒IO
print("Receive:",data.decode())
# r.send(b'OK')
# 我們希望主動處理這個IO對象
wlist.append(r)
for w in ws:
w.send(b'OK')
wlist.remove(w) # 使用後移除
for x in xs:
pass
注意:
- wlist中如果存在IO事件,則select立即傳回給ws
- 處理IO過程中不要出現死循環占有服務端的情況
- IO多路複用消耗資源較少,效率較高
擴充: 位運算
将整數轉換為二進制, 按照二進制位進行運算符操作 & 按位與 | 按位或 ^ 按位異或 << 左移 >> 右移 11 1011 14 1110 (11 & 14 1010) (11 | 14 1111) (11 ^ 14 0101 ) 11 << 2 ===> 44 右側補0 14 >> 2 ===> 3 擠掉右側的數字 使用 : 1. 在做底層硬體時操作寄存器 2. 做标志位的過濾
poll方法實作IO多路複用
p = select.poll() 建立poll對象
p.register(fd,event) 注冊關注的IO事件
- fd 要關注的IO
- event 要關注的IO事件類型
常用類型:
-
- POLLIN 讀IO事件(rlist)
- POLLOUT 寫IO事件 (wlist)
- POLLERR 異常IO (xlist)
- POLLHUP 斷開連接配接
e.g. p.register(sockfd,POLLIN|POLLERR)
p.unregister(fd) 取消對IO的關注
- 參數: IO對象或者IO對象的fileno
events = p.poll()
- 功能: 阻塞等待監控的IO事件發生
- 傳回值: 傳回發生的IO事件
events 是一個清單 [(fileno,evnet),(),()....]
每個元組為一個就緒IO,元組第一項是該IO的fileno,第二項為該IO就緒的事件類型
poll_server 步驟
- 建立套接字
- 将套接字register
- 建立查找字典,并維護
- 循環監控IO發生
- 處理發生的IO
from socket import *
from select import *
# 建立套接字
s = socket()
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind(('0.0.0.0',8888))
s.listen(3)
# 建立poll對象關注s
p = poll()
# 建立查找字典,用于通過fileno查找IO對象
fdmap = {s.fileno():s}
# 關注s
p.register(s,POLLIN|POLLERR)
# 循環監控
while True:
events = p.poll()
# 循環周遊發生的事件 fd-->fileno
for fd,event in events:
# 區分事件進行處理
if fd == s.fileno():
c,addr = fdmap[fd].accept()
print("Connect from",addr)
# 添加新的關注IO
p.register(c,POLLIN|POLLERR)
fdmap[c.fileno()] = c # 維護字典
# 按位與判定是POLLIN就緒
elif event & POLLIN:
data = fdmap[fd].recv(1024)
if not data:
p.unregister(fd) # 取消關注
fdmap[fd].close()
del fdmap[fd] # 從字典中删除
continue
print("Receive:",data.decode())
fdmap[fd].send(b'OK')
epoll方法
1. 使用方法 : 基本與poll相同
- 生成對象改為 epoll()
- 将所有事件類型改為EPOLL類型
2. epoll特點
- epoll 效率比select poll要高
- epoll 監控IO數量比select要多
- epoll 的觸發方式比poll要多 (EPOLLET邊緣觸發)
from socket import *
from select import *
# 建立套接字
s = socket()
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind(('0.0.0.0',8888))
s.listen(3)
# 建立epoll對象關注s
ep = epoll()
# 建立查找字典,用于通過fileno查找IO對象
fdmap = {s.fileno():s}
# 關注s
ep.register(s,EPOLLIN|EPOLLERR)
# 循環監控
while True:
events = ep.poll()
# 循環周遊發生的事件 fd-->fileno
for fd,event in events:
print("親,你有IO需要處理哦")
# 區分事件進行處理
if fd == s.fileno():
c,addr = fdmap[fd].accept()
print("Connect from",addr)
# 添加新的關注IO
# 将觸發方式變為邊緣觸發
ep.register(c,EPOLLIN|EPOLLERR|EPOLLET)
fdmap[c.fileno()] = c # 維護字典
# 按位與判定是EPOLLIN就緒
# elif event & EPOLLIN:
# data = fdmap[fd].recv(1024)
# if not data:
# ep.unregister(fd) # 取消關注
# fdmap[fd].close()
# del fdmap[fd] # 從字典中删除
# continue
# print("Receive:",data.decode())
# fdmap[fd].send(b'OK')