1、程序基本操作
os.getpid擷取程序id:
import os
import time
print('start')
time.sleep(40) #配合time.sleep()
print(os.getpid(),os.getppid(),'end') #擷取程序id os.getpid() os.getppid() 父程序id
------------結果:
start
5028 1960 end # 5028 程序id 目前檔案執行的程序id,程式結束,程序停止;1960 父程序pycharm id
# pid process id
# ppid parent process id
# 子程序
# 父程序 在父程序中建立子程序
# 在pycharm中啟動的所有py程式都是pycharm的子程序
程序建立,執行本檔案内的函數:
import os
import time
from multiprocessing import Process #[1]從多程序ing導入程序P Process類
def func():
print('start',os.getpid()) #[3]開啟的程序函數做的操作:擷取程序id,因為這是子程序執行的程式内容
time.sleep(1) #是以這裡的os.getpid()表示的是這個開啟的程序的pid
print('end',os.getpid())
if __name__ == '__main__':
p = Process(target=func) #[2]建立程序對象 Process(目标=函數名) #目标=xx#檢視源碼可知 Process類init方法中有傳參 target
p.start() # 異步 調用開啟程序的方法 但是并不等待這個程序真的開啟,
print('main :',os.getpid())
----------------結果:
main : 3012 #[4]先執行start,但是先列印main。因為是異步的,調用開啟程序的方法,p.start 執行之後就會建立程序,将傳進去取的target加括号()執行。無需等待這個子程序開啟和執行,目前程式繼續往下執行。
start 5748 #[5]這兩個擷取的id 都是子程序程式運作裡的,擷取的都是子程序pid,值是一樣的。
end 5748
>tasklist|findstr "py" #Windows檢視程序的指令
程序是并發的:
import os
import time
from multiprocessing import Process
def eat():
print('start eating',os.getpid())
time.sleep(1)
print('end eating',os.getpid())
def sleep():
print('start sleeping',os.getpid())
time.sleep(1)
prinst('end sleeping',os.getpid())
if __name__ == '__main__':
p1 = Process(target=eat) # [1]建立一個即将要執行eat函數的程序對象
p1.start() # [2]開啟一個程序
p2 = Process(target=sleep) # [3]建立一個即将要執行sleep函數的程序對象
p2.start() # [4]開啟程序
print('main :',os.getpid())
--------------------------結果:
main : 6216 #[5]異步,父程序沒有等到子程序結束再往下執行,由于程序建立需要花費時間,所有在子程序建立的過程中,主程序已經往下執行并列印出主程序的pid.
start eating 6468 #[6]由此處可知,代碼先建立并執行p1程序,再建立并執行p2程序,但是二者并沒有說先執行完p1
start sleeping 7120 #程序,然後才執行p2程序。也就是說建立多個程序,程序運作是異步的,是并發執行。
end eating 6468
end sleeping 7120
if __name__
== ' __main__
':的作用:以及作業系統建立程序的方式差別
__name__
__main__
import os
import time
from multiprocessing import Process
def func():
print('start',os.getpid())
time.sleep(1)
print('end',os.getpid())
if __name__ == '__main__':
p = Process(target=func)
p.start() # 異步 調用開啟程序的方法 但是并不等待這個程序真的開啟
print('main :',os.getpid())
-------------結果:
main : 6420
start 6736
end 6736
# 作業系統建立程序的方式不同
# windows作業系統執行開啟程序的代碼
# 實際上新的子程序需要通過import父程序的代碼來完成資料的導入工作
# 是以有一些内容我們隻希望在父程序中完成,就寫在if __name__ == '__main__':下面
# ios linux作業系統建立程序 fork
錯誤一:
如果 在windows裡面沒有使用if __name__ == '__main__':,那麼會報錯:
if __name__ == '__main__':
freeze_support()
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
-
__name__
':的作用:__main__
- 兩個檔案列印
,分别執行檔案之後列印__name__
__main__
的作用:
1)執行檔案中列印本檔案中的
,結果顯示__name__
;執行檔案中import導入其它檔案,其它檔案中有個列印__main__
即print(__name__
__name__
),在本檔案中顯示那個檔案的檔案名,即子產品名字。
2)是以,當本程式執行中存在if
__name__
':,由于等式成立,是以if判斷下的程式會執行。而導入的其它檔案中存在if__main__
__name__
':,由于在本檔案導入時,__main__
不等于 '__name__
':,而是等于那個的子產品名,所有其它檔案中的if__main__
__name__
__main__
':下的内容不會執行
3)由此可知:如果想讓本檔案中的程式在其它檔案導入時執行,就不要寫在if
__name__
':下;反之,如果想讓本檔案中的程式在其它檔案導入時不執行,就寫在if__main__
__name__
__main__
':下;
4)在以上基礎上:在Windows中,在檔案中開啟一個子程序以導入的方式在子程序記憶體空間中執行一遍。如果開啟程序的指令沒有寫在if
__name__
__main__
':下,那麼開啟一個子程序後,子程序再開啟一個子程序,這樣就會出現遞歸建立子程序,所有會報錯;
如果是在Linux或者mac作業系統下,是fork建立程序,不需要寫if
__name__
':也不會報錯。建立程序的方式不同,他們是将父程序記憶體中的資料拷貝一份子程序中。__main__
-
print([__name__]) if __name__ == '__main__': # 控制當這個py檔案被當作腳本直接執行的時候,就執行這裡面的代碼 # 當這個py檔案被當作子產品導入的時候,就不執行這裡面的代碼 print('hello hello') # __name__ == '__main__' # 執行的檔案就是__name__所在的檔案 # __name__ == '檔案名' # __name__所在的檔案被導入執行的時候
主程序和子程序之間的關系
# 主程序和子程序之間的關系
import os
import time
from multiprocessing import Process
def func():
print('start',os.getpid())
time.sleep(10)
print('end',os.getpid())
if __name__ == '__main__':
p = Process(target=func)
p.start() # 異步 調用開啟程序的方法 但是并不等待這個程序真的開啟
print('main :',os.getpid())
-------------------結果:
main : 6636
start 6752
end 6752
# 主程序沒結束 :等待子程序結束
# 主程序負責回收子程序的資源
# 如果子程序執行結束,父程序沒有回收資源,那麼這個子程序會變成一個僵屍程序
# 主程序的結束邏輯
# 主程序的代碼結束
# 所有的子程序結束
# 給子程序回收資源
# 主程序結束
import os
import time
from multiprocessing import Process
def func():
print('start',os.getpid())
time.sleep(1)
print('end',os.getpid())
print("mcw")
if __name__ == '__main__':
p = Process(target=func)
p.start() # 異步 調用開啟程序的方法 但是并不等待這個程序真的開啟
print('main :',os.getpid())
----------------結果:
mcw
main : 5244
mcw
main : 3392
start 3392
end 3392
#1)建立一個程序對象,并開啟程序。對子程序的建立開啟都是在父程序程式中執行的。子程序首先将父程序所有的執行程式導入并執行一次,因為if __name__ == '__main__':在子程序中不滿足條件,是以沒有執行。
2)在父程序的程式中先列印"mcw",建立好程序後,再列印“main”此處列印父程序pid。子程序導入這個程式的内容,先列印"mcw",再列印“main”此處列印子程序pid,因程式是在子程序中運作。然後将父程序中建立子程序時傳進去的函數執行一下。列印start ,end。建立對象,對象.start()相當于給函數名加個括号,讓函數執行。因為子程序的記憶體空間中已經有func函數了,是以程序對象.start(),在這個空間中找到func函數并執行。
3)疑問?建立的子程序是先導入内容,再執行傳進來的這個函數嗎?
主程序怎麼知道子程序結束了的呢?
#[1] 主程序怎麼知道子程序結束了的呢?
# 基于網絡、檔案
# [2]join方法 :阻塞,直到子程序結束就結束
import time
from multiprocessing import Process
def send_mail():
time.sleep(3)
print('發送了一封郵件')
if __name__ == '__main__':
p = Process(target=send_mail)
p.start() # 異步 非阻塞
# time.sleep(5)
print('join start')
p.join() #[3] 同步 阻塞 直到p對應的程序結束之後才結束阻塞
print('5000封郵件已發送完畢')
-----------------結果:
join start
發送了一封郵件
5000封郵件已發送完畢
#[4]如果沒有對象.join,由于沒有阻塞,父程序和子程序是異步執行,而且建立程序花時間長些。在程式列印'join start'之後,會先列印'5000封郵件已發送完畢',由于程式中遇到對象.join(),是以會阻塞,等待子程序結束,父程序才繼續執行這個程式。
# 開啟5個程序,給公司的5000個人發郵件,發送完郵件之後,列印一個消息“5000封郵件已發送完畢”
import time
import random
from multiprocessing import Process
def send_mail(a):
time.sleep(random.random())
print('發送了一封郵件',a)
if __name__ == '__main__':
l = []
for i in range(5):
p = Process(target=send_mail,args=(i,))
p.start()
l.append(p)
print(l)
for p in l:p.join() #[1]join起到阻塞,将之前的異步非阻塞變成同步阻塞
# 阻塞 直到上面的五個程序都結束
print('5000封郵件已發送完畢')
-----------------------結果:
[<Process(Process-1, started)>, <Process(Process-2, started)>, <Process(Process-3, started)>, <Process(Process-4, started)>, <Process(Process-5, started)>]
發送了一封郵件 3
發送了一封郵件 4
發送了一封郵件 0
發送了一封郵件 1
發送了一封郵件 2
5000封郵件已發送完畢
分解分析上面if
__name__
__main__
':後面的 代碼:
需求應為:同時發送多個郵件,直到所有郵件發送完畢才發送'5000封郵件已發送完畢'
(1)情況一:
建立五個程序,沒有join阻塞。還沒發郵件就已經列印發送完畢,有問題。
if __name__ == '__main__':
for i in range(5):
p = Process(target=send_mail,args=(i,))
p.start()
print('5000封郵件已發送完畢')
------------結果:
5000封郵件已發送完畢
發送了一封郵件 0
發送了一封郵件 1
發送了一封郵件 3
發送了一封郵件 2
發送了一封郵件 4
(2)情況二:
因為每個都join阻塞,發送五次郵件,每次都是上一封郵件發送完畢才能發送下一封郵件,串行 效率低,不是想要的需求。
if __name__ == '__main__':
for i in range(5):
p = Process(target=send_mail,args=(i,))
p.start()
p.join()
print('5000封郵件已發送完畢')
-------------結果:
發送了一封郵件 0
發送了一封郵件 1
發送了一封郵件 2
發送了一封郵件 3
發送了一封郵件 4
5000封郵件已發送完畢
(3)情況三:
并發執行五個程序,但是隻阻塞最後一個了。因為for循環五次之後p代表最後一個,p.join()隻阻塞最後一個。是以,列印5000封郵件已發送完畢 每次都是最後建立的那個程序運作完畢才列印,但是其它程序如果有比較慢才執行完畢的,那麼就是還沒有發送完所有郵件,就已經列印發送所有郵件完畢。這樣有問題。
if __name__ == '__main__':
for i in range(5):
p = Process(target=send_mail,args=(i,))
p.start()
p.join()
print('5000封郵件已發送完畢')
-------------結果:
發送了一封郵件 2
發送了一封郵件 1
發送了一封郵件 4
5000封郵件已發送完畢
發送了一封郵件 3
發送了一封郵件 0
(4)情況四:
[1]将每次建立并開啟程序的程序對象追加到清單,循環清單對每一個程序執行 對象.join()阻塞。
[2]對已經結束的p.join就相當于pass,沒影響。
[3]沒有結束的就會阻塞住,主程式會等待沒有結束的子程序結束。子程序是并發執行,并且每個都有阻塞
[4]完成的阻塞一下沒問題,沒完成的阻塞一下,所有阻塞結束,程式往下執行,即完成所有子程序結束,父程序重新從阻塞後面開始執行。所有子程序結束時間總長是以最長子程序時間為所有子程序結束時間的
假設運作時間是: 2 4 1 3 2
所有子程序結束所需時間4s,時間花費是最大時長的那個子s程序所花費的時長。
if __name__ == '__main__':
l = []
for i in range(5):
p = Process(target=send_mail,args=(i,))
p.start()
l.append(p)
for p in l:p.join() #join起到阻塞,将之前的異步非阻塞變成同步阻塞
# 阻塞 直到上面的五個程序都結束
print('5000封郵件已發送完畢')
#綜上:想要實作并發并且還要等到所有的子程序結束之後,父程序才結束。那麼就将每個程序追加到清單,循環清單對每個清單元素進行阻塞。
本程式中的建立子程序對象,也可以是傳進導入的其它子產品中的函數
xx.py----------------------
import os,time
def func():
print('start',os.getpid())
time.sleep(1)
print('end',os.getpid())
test.py------------------
import os
import xx
from multiprocessing import Process
if __name__ == '__main__':
p = Process(target=xx.func) #此處可以是導入的函數,也可以用子產品.函數執行,估計類中方法也是可以的。
p.start() #
print('main :',os.getpid())
-----------------------test.py列印結果:
main : 5768
start 5896
end 5896
往子程序中傳參(思考,傳參字典等特殊傳參呢?):
from multiprocessing import Process
def func(arg1,arg2):
print(arg1,arg2)
if __name__ == '__main__':
p = Process(target=func,args=(1,2))
p.start()
-------------結果:
1 2
#給程序要執行的函數傳參,在建立程序對象的時候添加args=(1,)。假如是單個參數,那麼加個逗号才是元組。元組内元素與形參位置一一對應
from multiprocessing import Process
def func(arg1):
print(arg1)
if __name__ == '__main__':
p = Process(target=func,args=(1,))
p.start()
------------結果:
1
傳參總結:
[1]給程序要執行的函數傳參,在建立程序對象的時候添加args=(1,)。假如是單個參數,那麼加個逗号才是元組。元組内元素與形參位置一一對應
# 總結
# 1.開啟一個程序
# 函數名(參數1,參數2)
# from multiprocessing import Process
# p = Process(target=函數名,args=(參數1,參數2))
# p.start()
#注意:函數名可以是從其它子產品導入過來的。
# 2.父程序 和 子程序
# 3.父程序會等待着所有的子程序結束之後才結束
# 為了回收資源
# 4.程序開啟的過程中windows和 linux/ios之間的差別
# 開啟程序的過程需要放在if __name__ == '__main__'下
# windows中 相當于在子程序中把主程序檔案又從頭到尾執行了一遍
# 除了放在if __name__ == '__main__'下的代碼
# linux中 不執行代碼,直接執行調用的func函數
# 5.join方法
# 把一個程序的結束事件封裝成一個join方法
# 執行join方法的效果就是 阻塞直到這個子程序執行結束就結束阻塞
# 在多個子程序中使用join
# p_l= []
# for i in range(10):
# p = Process(target=函數名,args=(參數1,參數2))
# p.start()
# p_l.append(p)
# for p in p_l:p.join()
# 所有的子程序都結束之後要執行的代碼寫在這裡
#注意:[1]發起建立子程序,子程序不是立即執行,有一點點延遲,并且父程序和子程序是異步的,資料等資源互相隔離,執行互不幹擾。
[2]父子各自列印這個func位址是不一樣的,表示子程序中的資源是獨立存在的。是隔離開來的。
[3]主程序程式執行完後,主程序還沒結束,在等待子程序結束;主程序要負責回收子程序的資源;如果子程序執行結束,父程序沒有回收資源,那麼這個子程序會成為一個僵屍程序
[4]主程序結束邏輯:
守護程序:
# 有一個參數可以把一個子程序設定為一個守護程序
import time
from multiprocessing import Process
def son1(a,b):
while True:
print('shou hu')
time.sleep(0.5)
def son2():
for i in range(5):
print('qi ta jin cheng')
time.sleep(1)
if __name__ == '__main__':
p = Process(target=son1,args=(1,2))
p.daemon = True ##一定要在p.start()前設定,設定p為守護程序,禁止p建立子程序,并且父程序代碼執行結束,p即終止運作
p.start() # 把p子程序設定成了一個守護程序
p2 = Process(target=son2)
p2.start()
print("================")
time.sleep(2)
print("-----------------------") ##隻要終端列印出這一行内容,那麼守護程序p也就跟着結束掉了
-----------------結果:
================
qi ta jin cheng
shou hu
shou hu
qi ta jin cheng
shou hu
shou hu
-----------------------
qi ta jin cheng
qi ta jin cheng
qi ta jin cheng
#守護程序建立:[1]對象.daemon=True變守護程序
[2]一定要在p.start()前設定,設定p為守護程序,禁止p建立子程序,并且父程序代碼執行結束,p即終止運作
# 守護程序是随着主程序的代碼結束而結束的
# 生産者消費者模型的時候
# 和守護線程做對比的時候
# 所有的子程序都必須在主程序結束之前結束,由主程序來負責回收資源
關于守護程序需要強調兩點:
其一:守護程序會在主程序代碼執行結束後就終止
其二:守護程序内無法再開啟子程序,否則抛出異常:AssertionError: daemonic processes are not allowed to have children
如果我們有兩個任務需要并發執行,那麼開一個主程序和一個子程序分别去執行就ok了,如果子程序的任務在主程序任務結束後就沒有存在的必要了,那麼該子程序應該在開啟前就被設定成守護程序。主程序代碼運作結束,守護程序随即終止;
使用場景:随着主程序的代碼結束而結束就用守護程序
程序其它方法is_alive() terminate():
import time
from multiprocessing import Process
def son1():
while True:
print('is alive')
time.sleep(0.5)
if __name__ == '__main__':
p = Process(target=son1)
p.start() # 異步 非阻塞
print(p.is_alive())
time.sleep(1)
p.terminate() # 異步的 非阻塞
print(p.is_alive()) # 程序還活着 因為作業系統還沒來得及關閉程序
time.sleep(0.01)
print(p.is_alive()) # 作業系統已經響應了我們要關閉程序的需求,再去檢測的時候,得到的結果是程序已經結束了
--------------結果:
True
is alive
is alive
True
False
# 什麼是異步非阻塞?
# terminate
[1]檢視程序是否運作狀态 對象.is_alive
[2]終止程序操作 對象.終止(terminate()) 終止之後判斷是否還活着,結果是還活着一段時間,即終止也是需要時間終止的
終止程序操作 對象.終止 終止之後判斷是否還活着,結果是還活着一段時間。異步,不等他 非阻塞
start異步非阻塞
面向對象建立程序
import os
import time
from multiprocessing import Process
class MyProcecss2(Process):
def run(self):
while True:
print('is alive')
time.sleep(0.5)
class MyProcecss1(Process):
def __init__(self,x,y):
self.x = x
self.y = y
super().__init__()
def run(self):
print(self.x,self.y,os.getpid())
for i in range(5):
print('in son2')
time.sleep(1)
if __name__ == '__main__':
mp = MyProcecss1(1,2)
mp.daemon = True
mp.start()
print(mp.is_alive())
mp.terminate()
mp2 = MyProcecss2()
mp2.start()
print('main :',os.getpid())
time.sleep(1)
----------------結果:
True
main : 5808
is alive
is alive
面向對象程式設計簡寫:
class MyProcecss1(Process): #
def run(self):
for i in range(5):
print('jincheng1')
time.sleep(1)
if __name__ == '__main__':
mp = MyProcecss1()
mp.start()
----------------結果:
jincheng1
jincheng1
jincheng1
jincheng1
jincheng1
class MyProcecss1(Process): #1)建立一個類,繼承Process。
def __init__(self,x,y): #4)建立init初始化方法,往裡面傳參,然後run裡面就可以使用傳進去的參數了。怎麼傳進去?建立執行個體的時候傳入參數。
self.x = x
self.y = y
super().__init__()#5)如果子程序不需要傳參,init方法可以不寫。因為父類中init中有很多建立程序的步驟自定義沒有,無法建立程序,是以super().無法建立時報錯如下結果:
def run(self): #2)必須寫一個run方法,run方法裡面寫子程序的運作代碼
for i in range(5):
print('jincheng%s',self.x,self.y)
time.sleep(1)
if __name__ == '__main__':
mp = MyProcecss1(1,2) # #3)建立這個類的對象,往對象中傳參執行個體變量。不能往run方法裡面傳參,因為沒有調用run,是start開啟程序類中封裝了的運作的run。那麼可以将參數放到init
mp.start()
-----------------結果:
# assert self._popen is None, 'cannot start a process twice'
#AttributeError: 'MyProcecss1' object has no attribute '_popen'
jincheng%s 1 2
jincheng%s 1 2
jincheng%s 1 2
jincheng%s 1 2
jincheng%s 1 2
面向對象建立程序。
1)建立一個類,繼承Process。
2)必須寫一個run方法,run方法裡面寫子程序的運作代碼
3)建立這個類的對象,往對象中傳參執行個體變量。不能忘run方法裡面傳參,因為沒有調用run,是start開啟程序類中封裝了的運作的run。那麼可以放到init
4)建立init初始化方法,往裡面傳參,然後run裡面就可以使用傳進去的參數了。怎麼傳進去?建立執行個體的時候傳入參數。
5)如果子程序不需要傳參,init方法可以不寫。因為父類中init中有很多建立程序的步驟自定義沒有,無法建立程序,是以super()
同步阻塞 比如p.join 等待子程序結束父程序才往下執行
# Process類總結:
# 開啟程序的方式
# 面向函數
# def 函數名:要在子程序中執行的代碼
# p = Process(target= 函數名,args=(參數1,))
# 面向對象
# class 類名(Process):
# def __init__(self,參數1,參數2): # 如果子程序不需要參數可以不寫
# self.a = 參數1
# self.b = 參數2
# super().__init__()
# def run(self):
# 要在子程序中執行的代碼
# p = 類名(參數1,參數2)
# Process提供的操作程序的方法
# p.start() 開啟程序 異步非阻塞
# p.terminate() 結束程序 異步非阻塞
# p.join() 同步阻塞
# p.isalive() 擷取目前程序的狀态
# daemon = True 設定為守護程序,守護程序永遠在主程序的代碼結束之後自動結束
程序直接通信:
并發程式設計 買票系統 鎖的概念:
并發程序做什麼事。
并發程式設計結合網絡程式設計
一:并發程式設計結合網絡程式設計
1)并發程式設計建立子程序。子程序有自己的獨立空間
2)網絡程式設計tcp程式設計,多個用戶端連接配接服務端,一個用戶端在服務端收發資訊的部分就建立一個程序,這樣每個用戶端對應服務端一個程序,實作多個用戶端連接配接服務端。
二:車票購票系統:
購票多個人買票,一個一個排隊購買,多人同時購買(建立子程序,實作并發)
使用者太多,一台伺服器處理并發子程序數量有限,再來一台伺服器響應處理。多台伺服器同時響應請求就要出現排程的伺服器,負載均衡伺服器。
買票資訊存在資料庫。沒台處理伺服器一個資料庫,資訊不一緻。公用一台資料庫
處理伺服器,處理一條買票資訊,資料庫就要減少一個票,二者通信,網絡通信。通信有延遲。
處理伺服器和資料庫的通信:
10個人搶一張票
-
車票購票系統分析:
1)查票一個函數
2)10個人搶就是10個程序來執行這個函數,誰查詢的解決,誰要傳參進去,傳參 args,傳的是元組
3)買票了:買票一個函數 time模拟網絡延遲
4)這時,1張票結果多個人都買到了,這時是有問題的,
5)問題分析,因為多個人是有并發的,一個人讀取到一張票,網絡延遲,寫入減少一張票需要0.1秒,在0.1秒内多個人又來并發讀到一張票,做同樣的操作。于是多個人在0.1秒内成功執行了操作.一張票多人都讀取到買到了
6)多個人來資料庫讀取請求。有個鎖,隻有一個人成功執行之後才能下一個人進來操作。隻有一個程序對同一個檔案操作完之後别的程序才能進來操作這個檔案,保證資料一緻性
7)lock.acquire()加鎖,沒有搶到鎖的程序被阻塞了,等待釋放鎖。代碼前加鎖,一個程序執行完釋放鎖,suo.釋放()
8)查票不加鎖,買票必須加鎖,不加鎖資料會出錯,買票是串行,非并行。
是以:
1、如果在一個并發的場景下涉及到某部分内容時需要修改一些所有程序共享的資料資源 需要加鎖來維護資料的安全
2、在資料安全的基礎上,才考慮效率問題。
這裡查票函數并發,買票函數涉及到修改共享資料是以需要加鎖串行,保證資料一緻性。并發是多程序實作,一個網絡連接配接(網絡程式設計)建立一個子程序。
3、同步存在的意義:
10)程序與程序間資料隔離,任何對共享資料資源修改都存在資料安全問題,加鎖解決
11)加鎖還可以上下文管理,用with封裝了異常處理的,不會出現release不能解鎖的問題 release是放在finamly的
12)with加鎖可以放在買票函數裡,也可以放在task任務裡。在task任務裡,加鎖然後執行買票函數
13)在子程序中 對需要加鎖的代碼 進行with lock:
14)查票函數 買票函數 任務函數(檢視不需加鎖 并發,加鎖買票 串行 目的 保證資料一緻性,不會混亂(如果修改共享資料也并發,每個人都讀到一張票,然後做買票,再修改共享資料減去一張票,就是多個人買到票了))
# 并發 能夠做的事兒
# 1.實作能夠響應多個client端的server
# 2.搶票系統
#'ticket_count.txt'文本内容:
{"count": 0}
import time
import json
from multiprocessing import Process,Lock
def search_ticket(user):
with open('ticket_count.txt') as f:
dic = json.load(f)
print('%s查詢結果 : %s張餘票'%(user,dic['count']))
def buy_ticket(user,lock):
# with lock:
# lock.acquire() # 給這段代碼加上一把鎖
time.sleep(0.02)
with open('ticket_count.txt') as f:
dic = json.load(f)
if dic['count'] > 0:
print('%s買到票了'%(user))
dic['count'] -= 1
else:
print('%s沒買到票' % (user))
time.sleep(0.02)
with open('ticket_count.txt','w') as f:
json.dump(dic,f)
# lock.release() # 給這段代碼解鎖
def task(user, lock):
search_ticket(user)
with lock:
buy_ticket(user, lock)
if __name__ == '__main__':
lock = Lock()
for i in range(10):
p = Process(target=task,args=('user%s'%i,lock))
p.start()
-------------結果:
user0查詢結果 : 3張餘票
user0買到票了
user1查詢結果 : 3張餘票
user1買到票了
user3查詢結果 : 1張餘票
user3買到票了
user6查詢結果 : 0張餘票
--------------------------
# 1.如果在一個并發的場景下,涉及到某部分内容
# 是需要修改一些所有程序共享資料資源
# 需要加鎖來維護資料的安全
# 2.在資料安全的基礎上,才考慮效率問題
# 3.同步存在的意義
# 資料的安全性
# 在主程序中執行個體化 lock = Lock()
# 把這把鎖傳遞給子程序
# 在子程序中 對需要加鎖的代碼 進行 with lock:
# with lock相當于lock.acquire()和lock.release()
# 在程序中需要加鎖的場景
# 共享的資料資源(檔案、資料庫)
# 對資源進行修改、删除操作
# 加鎖之後能夠保證資料的安全性 但是也降低了程式的執行效率
加鎖的場景:
加鎖的優缺點:
加鎖方法:
加鎖的原因:
什麼是鎖:
多把鎖:
鎖的簡單了解:
場景:修改共享資料資源
方法:
從多程序子產品導入Lock類
執行個體化類
with lock:
函數執行(傳參lock進入)
def 函數(lock)不需要操作對lock
lock = Lock()
with lock:
需要加鎖解鎖的函數執行,将鎖對象傳進去
程序之間的資料隔離*
from multiprocessing import Process
n = 100
def func():
global n
n -= 1
if __name__ == '__main__':
p_l = []
for i in range(10):
p = Process(target=func)
p.start()
p_l.append(p)
for p in p_l:p.join()
print(n)
-------------結果:
100
隊列的使用:
-
兩個隔離的程序間進行通信:
一個父程序中多個子程序,一個子程序一個任務。子程序完成任務程序間需要通信。假設是計算求和,就需要傳回計算結果。
程序間通信方式: 隊列 檔案 管道 (資料表似乎也是檔案)
-
隊列:
1)導入子產品
2)建立隊列,
3)隊列傳到函數
4)函數内調用,将資料put
5)外 get
- 隊列原理:
1)相當于第三個檔案,在記憶體中的
2)子程序put 父程序get
3)先進先出
-
隊列:基于
檔案家族的sockct實作,不是基于網絡的。存取資料基于piclkle,lock
兩個程序往同一個隊列裡面存取資料,有可能兩個程序往同一個地方存取到資料,所有要保證資料安全,隊列加鎖了,串行
-
隊列 sockct piclkle,lock
管道 sockct piclkle
隊列基于管道+鎖實作
管道基于sockct piclkle實作
(重寫隊列方法,會報錯缺少東西,說明内部使用了)
-
管道 :一個發一個收 效率高,多個的話那麼資料不安全,它本身沒有鎖,所有效率高
隊列:多個收發
-
q.get在隊列為空時發生阻塞。這樣就會等待有值再往下執行
建立隊列傳一個參數 為隊列大小 。 q = Queue(5)
put多個,滿了阻塞
put_nowait 當隊列為滿的時候再向隊列中放資料 會報錯并且會丢失資料
get_nowait # 在隊列為空的時候 直接報錯
get 阻塞 get_nowait非阻塞
-
三個方法不準确,有隐患
原因:在并發中擷取三個值的路途中就已經被别的程序修改了,
比如:擷取的到是空的,值在傳回的途中,有個程序已經放入值,這時空的結果就是有問題的。
q.empty
q.qsize
q.full
初始化裡面初始化方法
from multiprocessing import Queue,Process
# 先進先出
def func(exp,q):
ret = eval(exp)
q.put({ret,2,3})
q.put(ret*2)
q.put(ret*4)
if __name__ == '__main__':
q = Queue()
Process(target=func,args=('1+2+3',q)).start()
print(q.get())
print(q.get())
print(q.get())
-----------------結果:
{2, 3, 6}
12
24
# Queue基于 天生就是資料安全的
# 檔案家族的socket pickle lock
# pipe 管道(不安全的) = 檔案家族的socket pickle
# 隊列 = 管道 + 鎖
# from multiprocessing import Pipe
# pip = Pipe()
# pip.send()
# pip.recv()
import queue
from multiprocessing import Queue
q = Queue(5)
q.put(1)
q.put(2)
q.put(3)
q.put(4)
q.put(5) # 當隊列為滿的時候再向隊列中放資料 隊列會阻塞
print('5555555')
try:
q.put_nowait(6) # 當隊列為滿的時候再向隊列中放資料 會報錯并且會丢失資料
except queue.Full:
pass
print('6666666')
--------------結果:
5555555
6666666import queue
from multiprocessing import Queue
q = Queue(5)
q.put(1)
q.put(2)
q.put(3)
q.put(4)
q.put(5) # 當隊列為滿的時候再向隊列中放資料 隊列會阻塞
print('5555555')
try:
q.put_nowait(6) # 當隊列為滿的時候再向隊列中放資料 會報錯并且會丢失資料
except queue.Full:
pass
print('6666666')
print(q.get())
print(q.get())
print(q.get()) # 在隊列為空的時候會發生阻塞
print(q.get()) # 在隊列為空的時候會發生阻塞
print(q.get()) # 在隊列為空的時候會發生阻塞
try:
print(q.get_nowait()) # 在隊列為空的時候 直接報錯
except queue.Empty:pass
-----------------結果:
5555555
6666666
1
2
3
4
5
from multiprocessing import Queue
q = Queue(5) #從多程序導入Queue 隊列, Queue(n)隊列建立一個程序隊列對象。最大存放n個資料
q.put("mcw") #put将資料放入程序隊列
q.put("xiao")
aa=q.get() #get 将資料拿出程序隊列 mcw 先進先出
bb=q.get() #xiao
print(aa)
print(bb)
----------------結果:
mcw
xiao