socket基于tcp協定的多程序聊天(異步)
server端代碼
import socket
from multiprocessing import Process
def chat(conn): while True: # 接收消息,列印消息 msg = conn.recv(1024).decode('utf-8') print(msg) conn.send((msg + '_sb').encode('utf-8')) conn.close() if __name__ == '__main__': sk = socket.socket() sk.bind(('127.0.0.1', 9595)) sk.listen() while True: # 建立多程序 conn, addr = sk.accept() Process(target=chat, args=[conn,]).start() sk.close()
client端代碼
import socket
sk = socket.socket() sk.connect(('127.0.0.1', 9595)) while True: inp = input('>>>').encode('utf-8') sk.send(inp) msg = sk.recv(1024).decode('utf-8') print(msg) sk.close()
先運作server程式,在運作client程式,執行結果為

鎖 —— multiprocess.Lock
通過之前的學習,實作了程式的異步,讓多個任務可以同時在幾個程序中并發處理,他們之間的運作沒有順序,一旦開啟也不受我們控制。盡管并發程式設計讓我們能更加充分的利用IO資源,但是也給我們帶來了新的問題。
當多個程序使用同一份資料資源的時候,就會引發資料安全或順序混亂問題
多程序搶占輸出資源,代碼如下
import os
import time
import random
from multiprocessing import Process def work(n): print('{} : {} is running'.format(n, os.getpid())) time.sleep(random.random()) print('{} : {} is done'.format(n, os.getpid())) if __name__ == '__main__': for i in range(3): p = Process(target=work, args=(i, )) p.start()
執行結果為:
0 : 2840 is running
1 : 13824 is running
2 : 16980 is running
1 : 13824 is done
0 : 2840 is done
2 : 16980 is done
上面的輸出的結果是随機的,如果想有序的執行,就需要用到鎖,代碼如下:
由并發變成了串行,犧牲了運作效率,但避免了資料混亂
import os
import time
import random
from multiprocessing import Process from multiprocessing import Lock def work(n, lock): lock.acquire() #給程序上鎖 print('{} : {} is running'.format(n, os.getpid())) time.sleep(random.random()) print('{} : {} is done'.format(n, os.getpid())) lock.release() #給程序下鎖 if __name__ == '__main__': lock = Lock() for i in range(3): p = Process(target=work, args=(i, lock)) p.start()
執行結果
0 : 16160 is running
0 : 16160 is done
1 : 12368 is running
1 : 12368 is done
2 : 13824 is running
2 : 13824 is done
鎖的概念
如果鎖加在程序的開始和結束,那麼它和join,實作的效果是一樣的(同步)
join的效果,代碼如下
import os
import time
import random
from multiprocessing import Process def work(n): print('{} : {} is running'.format(n, os.getpid())) time.sleep(random.random()) print('{} : {} is done'.format(n, os.getpid())) if __name__ == '__main__': for i in range(3): p = Process(target=work, args=(i,)) p.start() p.join()
執行結果:
0 : 10544 is running
0 : 10544 is done
1 : 8332 is running
1 : 8332 is done
2 : 11280 is running
2 : 11280 is done
總結:同步控制
隻要用到了鎖,鎖之内的代碼就變成同步的了
鎖:控制一段代碼,同一時間,隻能被一個程序執行,資料更加安全
鎖的應用場景(12306搶票的例子)
模拟資料庫,建立一個檔案ticket,内容如下:
{"count":3} #計數(剩餘多少張票) 要先建立這個檔案
代碼如下:
執行結果
#加鎖可以保證多個程序修改同一塊資料時,同一時間隻能有一個任務可以進行修改,即串行的修改,沒錯,速度是慢了,但
犧牲了速度卻保證了資料安全。
雖然可以用檔案共享資料實作程序間通信,但問題是:
1.效率低(共享資料基于檔案,而檔案是硬碟上的資料)
2.需要自己加鎖處理
#是以我們最好找尋一種解決方案能夠兼顧:1、效率高(多個程序共享一塊記憶體的資料)2、幫我們處理好鎖問題。這就是 mutiprocessing子產品為我們提供的基于消息的IPC通信機制:隊列和管道。 隊列和管道都是将資料存放于記憶體中 隊列又是基于(管道+鎖)實作的,可以讓我們從複雜的鎖問題中解脫出來, 我們應該盡量避免使用共享資料,盡可能使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,而且在程序數目增多時,往 往可以獲得更好的可獲展性。
信号量 —— multiprocess.Semaphore(了解)
互斥鎖同時隻允許一個線程更改資料,而信号量Semaphore是同時允許一定數量的線程更改資料 。
假設商場裡有4個迷你唱吧,是以同時可以進去4個人,如果來了第五個人就要在外面等待,等到有人出來才能再進去玩。
實作:
信号量同步基于内部計數器,每調用一次acquire(),計數器減1;每調用一次release(),計數器加1.當計數器為0時, acquire()調用被阻塞。這是迪科斯徹(Dijkstra)信号量概念P()和V()的Python實作。信号量同步機制适用于通路像伺服器 這樣的有限資源。 信号量與程序池的概念很像,但是要區分開,信号量涉及到加鎖的概念
模拟ktv的房間數
示例代碼如下:在同一時間,最多進去4批人,acquire()是一個阻塞行為
import time
import random
from multiprocessing import Process, Semaphore def ktv(i,sem): sem.acquire() # 鎖住程序 print('person {} 進來唱歌了'.format(i)) time.sleep(random.randint(1, 5)) # 模拟唱歌時間 print('person {} 從ktv出去了'.format(i)) sem.release() # 解鎖程序 if __name__ == '__main__': sem = Semaphore(4) # 模拟隻有4個房間 for i in range(5): # 模拟5批人需要唱歌 Process(target=ktv, args=(i, sem)).start()
執行結果
總結:信号量和鎖有點類似,它們之間的差別,信号量,相當于電腦
信号量:
鎖 + 計數器
.acquire() 計數器-1
計數器減為0 = 阻塞
.release() 計數器+1
事件 —— multiprocess.Event(了解)
事件介紹
python線程的事件用于主線程控制其他線程的執行,事件主要提供了三個方法 set、wait、clear。
事件處理的機制:全局定義了一個“Flag”,如果“Flag”值為 False,那麼當程式執行 event.wait 方法時就會阻塞,
如果“Flag”值為True,那麼event.wait 方法時便不再阻塞。
clear:将“Flag”設定為False
set:将“Flag”設定為True
事情的使用方法
示例1:
from multiprocessing import Event
# 建立一個事件的對象
e = Event() print(e.is_set()) # 在事件的創始之初,狀态為False
執行結果
False
例2:
from multiprocessing import Event
# 建立一個事件的對象
e = Event() print(e.is_set()) # 在事件的創始之初,狀态為False e.wait() # 阻塞,等待is_set()的值變成True,如果不為True,會一直等待 # 後面的代碼不會執行 print(1111)
執行結果
False # 程式沒有執行完畢,會一直阻塞
例3
from multiprocessing import Event
# 建立一個事件的對象
e = Event() print(e.is_set()) # 列印is_set()的狀态 # 在事件的創始之初,狀态為False e.set() # 将is_set()的值改為True e.wait() # 阻塞,等待is_set()的值變成True,如果不為True,會一直等待 # 後面的代碼不會執行 print(e.is_set()) # 列印is_set()的狀态
執行結果
False
True
例4:
from multiprocessing import Event
# 建立一個事件的對象
e = Event() print(e.is_set()) # 列印is_set()的狀态 # 在事件的創始之初,狀态為False e.set() # 将is_set()的值改為True e.wait() # 阻塞,等待is_set()的值變成True,如果不為True,會一直等待 # 後面的代碼不會執行 print(e.is_set()) # 列印is_set()的狀态 e.clear() # 将is_set()的值改為False print(e.is_set()) # 列印is_set()的值 e.wait() # 阻塞,等待is_set()的值變成True print(111) # 代碼不會被執行,上面被阻塞了
執行結果
False
True
False
模拟紅綠燈
紅綠燈:
車 比喻是一個程序,wait()等紅燈
根據狀态變化,wait遇到true信号,就非阻塞
遇到False,就阻塞
交通燈 也是一個程序 紅燈->False 綠燈->True
示例代碼如下:
import time
import random
from multiprocessing import Process, Event def car(i, e): # 感覺狀态的變化 if not e.is_set(): # 目前這個事件的狀态如果是False print('car{}正在等待'.format(i)) # 這輛車正在等待通過路口 e.wait() # 阻塞,直到有一個e.set行為(True) # 了解為等紅燈 print('car{}通過路口'.format(i)) def traffic_light(e): # 修改事件的狀态 print('\033[1;31m紅燈亮\033[0m') # 事件在創始之初的狀态是False,相當于程式中的紅燈 time.sleep(2) # 紅燈亮2秒 while True: if not e.is_set(): # False print('\033[1;32m綠燈亮\033[0m') e.set() # 修改is_set()的值為 True elif e.is_set(): print('\033[1;31m紅燈亮\033[0m') e.clear() # 修改is_set()的值為 False time.sleep(2) # 等待2秒 if __name__ == '__main__': e = Event() Process(target=traffic_light, args=(e,)).start() for i in range(10): time.sleep(random.randrange(0, 5, 2)) Process(target=car, args=(i, e)).start()
執行結果:
程序間通信——隊列和管道(multiprocess.Queue、multiprocess.Pipe)
程序間通信
IPC(Inter-Process Communication)
隊列
概念介紹
建立共享的程序隊列,Queue是多程序安全的隊列,可以使用Queue實作多程序之間的資料傳遞
Queue([maxsize])
建立共享的程序隊列。
參數 :maxsize是隊列中允許的最大項數。如果省略此參數,則無大小限制。
底層隊列使用管道和鎖定實作
方法介紹
其它方法了解
q.close()
關閉隊列,防止隊列中加入更多資料。調用此方法時,背景線程将繼續寫入那些已入隊列但尚未寫入的資料,但将在此方法
完成時馬上關閉。如果q被垃圾收集,将自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的資料結束信号或異常
例如,如果某個使用者正被阻塞在get()操作上,關閉生産者中的隊列不會導緻get()方法傳回錯誤。 q.cancel_join_thread() 不會再程序退出時自動連接配接背景線程。這可以防止join_thread()方法阻塞。 q.join_thread() 連接配接隊列的背景線程。此方法用于在調用q.close()方法後,等待所有隊列項被消耗。預設情況下,此方法由不是q的原始創 建者的所有程序調用。調用q.cancel_join_thread()方法可以禁止這種行為。
Queue與之前學的queue之間的差別
import queue
它能維護一個先進先出的次序 它不能進行IPC(程序間的通信)
from multiprocessing import Queue
它能維護一個先進先出的次序 也能進行IPC(程序間的通信)
程序間的通信
示例代碼:
from multiprocessing import Queue, Process
# 能維護一個先進先出的次序,也能進行IPC
def haha(q): # 傳回q中的一個項目。如果q為空,此方法将阻塞,直到隊列中有項目可用為止 print(q.get()) if __name__ == '__main__': q = Queue() p = Process(target=haha, args=(q, )) p.start() # 将[1, 2, 3]放入隊列。如果隊列已滿,此方法将阻塞至有空間可用為止 q.put([1, 2, 3])
執行結果
[1, 2, 3]
雙向通信
import time
from multiprocessing import Queue, Process
# 能維護一個先進先出的次序,也能進行IPC
def haha(q): # 傳回q中的一個項目。如果q為空,此方法将阻塞,直到隊列中有項目可用為止 print(q.get()) # 将'你好'放入隊列。如果隊列已滿,此方法将阻塞至有空間可用為止 q.put('你好') if __name__ == '__main__': q = Queue() p = Process(target=haha, args=(q, )) p.start() # 将'hi'放入隊列。如果隊列已滿,此方法将阻塞至有空間可用為止 q.put('hi') time.sleep(0.5) # 傳回q中的一個項目。如果q為空,此方法将阻塞,直到隊列中有項目可用為止 print(q.get())
執行結果
hi
你好
生産者消費者模型
在并發程式設計中使用生産者和消費者模式能夠解決絕大多數并發問題。該模式通過平衡生産線程和消費線程的工作能力來提高程式的整體處理資料的速度。
為什麼要使用生産者和消費者模式
線上程世界裡,生産者就是生産資料的線程,消費者就是消費資料的線程。在多線程開發當中,如果生産者處理速度很快,而消費者處理速度很慢,那麼生産者就必須等待消費者處理完,才能繼續生産資料。同樣的道理,如果消費者的處理能力大于生産者,那麼消費者就必須等待生産者。為了解決這個問題于是引入了生産者和消費者模式。
什麼是生産者消費者模式
生産者消費者模式是通過一個容器來解決生産者和消費者的強耦合問題。生産者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,是以生産者生産完資料之後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生産者要資料,而是直接從阻塞隊列裡取,阻塞隊列就相當于一個緩沖區,平衡了生産者和消費者的處理能力。
基于隊列實作生産者消費者模型
示例代碼:
from multiprocessing import Process, Queue
import time, random, os def consumer(q): while True: res = q.get() time.sleep(random.randint(1, 3)) print('\033[45m%s 吃 %s\033[0m' % (os.getpid(), res)) def producer(q): for i in range(10): time.sleep(random.randint(1, 3)) res = '包子%s' % i q.put(res) print('\033[44m%s 生産了 %s\033[0m' % (os.getpid(), res)) if __name__ == '__main__': q = Queue() # 生産者們:即廚師們 p1 = Process(target=producer, args=(q,)) # 消費者們:即吃貨們 c1 = Process(target=consumer, args=(q,)) # 開始 p1.start() c1.start() print('主')
執行結果:
此時的問題是主程序永遠不會結束,原因是:生産者p在生産完後就結束了,但是消費者c在取空了q之後,則一直處于死循環中且卡在q.get()這一步。
解決方式無非是讓生産者在生産完畢後,往隊列中再發一個結束信号,這樣消費者在接收到結束信号後就可以break出死循環。
改良版--生産者消費模型
from multiprocessing import Process, Queue
import time, random, os def consumer(q): while True: res = q.get() if res is None: break # 收到結束信号則結束 time.sleep(random.randint(1, 3)) print('\033[1;32m%s 吃 %s\033[0m' % (os.getpid(), res)) def producer(q): for i in range(10): time.sleep(random.randint(1, 3)) res = '包子%s' % i q.put(res) print('\033[1;31m%s 生産了 %s\033[0m' % (os.getpid(), res)) q.put(None) # 發送結束信号 if __name__ == '__main__': q = Queue() # 生産者們:即廚師們 p1 = Process(target=producer, args=(q,)) # 消費者們:即吃貨們 c1 = Process(target=consumer, args=(q,)) # 開始 p1.start() c1.start() print('主')
執行結果
注意:結束信号None,不一定要由生産者發,主程序裡同樣可以發,但主程序需要等生産者結束後才應該發送該信号
有幾個消費者最後put多好個None
轉載于:https://www.cnblogs.com/ellisonzhang/p/10443957.html