文章目錄
- 一、什麼是程序
- 二、建立程序的常用方法
- 2.1使用multiprocessing子產品建立程序
- 2.2 使用Process子類建立程序
- 2.3 使用程序池Pool建立程序
- 三、程序間通訊
- 3.1 隊列簡介
- 3.2 多程序隊列的使用
- 四、什麼是線程
- 五、建立線程
- 5.1 使用threading子產品建立線程
- 5.2 使用Thread子類建立線程
- 六、線程間通信
- 6.1 什麼是互斥鎖
- 6.2 使用互斥鎖
- 6.3 使用隊列線上程間通信
一、什麼是程序
就是指作業系統能執行多個任務
eg:使用Windows作業系統可以同時聽音樂、學習python課程等多個任務。每個任務就是一個程序。
程序四計算機已運作城西的實體
程序和程式不同,程式本身隻有指令、資料、及其組織形式的描述,程序才是程式(那些指令和資料)的運作執行個體。
二、建立程序的常用方法
在python中有多個子產品可以建立程序,比較常用的有
os.fork()函數
、
multiprocessing
子產品和
Pool
程序池
2.1使用multiprocessing子產品建立程序
文法:
Process([group[,target[,name[,args[,kwargs]]]]])
- group:參數未使用,值始終為None
- target:表示目前程序啟動時執行的可調用對象
- name:為目前程序執行個體的别名
- args:表示傳遞給target函數的參數元組
- kwargs:表示傳遞給target函數的參數字典
from multiprocessing import Process
# 執行子程序代碼
def test(interval):
print('我是子程序')
def main():
print('主程序開始')
p=Process(target=test,args=(1,))
p.start()
print('主程序結束')
if __name__ == '__main__':
main()
>>>
主程序開始
主程序結束
我是子程序
上述代碼中p執行個體除了常用的方法start()外,還有如下方法:
- is_alive(): 判斷程序執行個體是否還在執行
- join([timeout]):是否等待程序執行個體執行結束,或者等待多少秒
- start():啟動程序執行個體(建立子程序)
- run():如果沒有給定target參數,對這個對象調用start()方法時,就将執行對象中的run()方法;
- terminate():不管任務是否完成,立即終止
- name:目前執行個體别名,預設為Process-N,N為從1開始遞增的整數
- pid:目前程序執行個體的PID值
# -*- coding:utf-8 -*-
from multiprocessing import Process
import time
import os
#兩個子程序将會調用的兩個方法
def child_1(interval):
print("子程序(%s)開始執行,父程序為(%s)" % (os.getpid(), os.getppid()))
t_start = time.time() # 計時開始
time.sleep(interval) # 程式将會被挂起interval秒
t_end = time.time() # 計時結束
print("子程序(%s)執行時間為'%0.2f'秒"%(os.getpid(),t_end - t_start))
def child_2(interval):
print("子程序(%s)開始執行,父程序為(%s)" % (os.getpid(), os.getppid()))
t_start = time.time() # 計時開始
time.sleep(interval) # 程式将會被挂起interval秒
t_end = time.time() # 計時結束
print("子程序(%s)執行時間為'%0.2f'秒"%(os.getpid(),t_end - t_start))
if __name__ == '__main__':
print("------父程序開始執行-------")
print("父程序PID:%s" % os.getpid()) # 輸出目前程式的ID
p1=Process(target=child_1,args=(1,)) # 執行個體化程序p1
p2=Process(target=child_2,name="mrsoft",args=(2,)) # 執行個體化程序p2
p1.start() # 啟動程序p1
p2.start() # 啟動程序p2
#同時父程序仍然往下執行,如果p2程序還在執行,将會傳回True
print("p1.is_alive=%s"%p1.is_alive())
print("p2.is_alive=%s"%p2.is_alive())
#輸出p1和p2程序的别名和pid
print("p1.name=%s"%p1.name)
print("p1.pid=%s"%p1.pid)
print("p2.name=%s"%p2.name)
print("p2.pid=%s"%p2.pid)
print("------等待子程序-------")
p1.join() # 等待p1程序結束
p2.join() # 等待p2程序結束
print("------父程序執行結束-------")
>>>第一次執行
------父程序開始執行-------
父程序PID:1980
p1.is_alive=True
p2.is_alive=True
p1.name=Process-1
p1.pid=3304
p2.name=mrsoft
p2.pid=12300
------等待子程序-------
子程序(3304)開始執行,父程序為(1980)
子程序(12300)開始執行,父程序為(1980)
子程序(3304)執行時間為'1.01'秒
子程序(12300)執行時間為'2.01'秒
------父程序執行結束-------
Process finished with exit code 0
>>>第二次執行
------父程序開始執行-------
父程序PID:10104
p1.is_alive=True
p2.is_alive=True
p1.name=Process-1
p1.pid=6004
p2.name=mrsoft
p2.pid=2852
------等待子程序-------
子程序(6004)開始執行,父程序為(10104)
子程序(2852)開始執行,父程序為(10104)
子程序(6004)執行時間為'1.00'秒
子程序(2852)執行時間為'2.00'秒
------父程序執行結束-------
Process finished with exit code 0
2.2 使用Process子類建立程序
對于一些簡單的小任務,使用Process(target=test)方式實作多程序。但是使用複雜的程序,通常定義成一個類,使其
繼承Process類
每次執行個體化這個類的時候,就等同執行個體化一個程序對象。
# -*- coding:utf-8 -*-
from multiprocessing import Process
import time
import os
#繼承Process類
class SubProcess(Process):
# 由于Process類本身也有__init__初識化方法,這個子類相當于重寫了父類的這個方法
def __init__(self,interval,name=''):
Process.__init__(self) # 調用Process父類的初始化方法
self.interval = interval # 接收參數interval
if name: # 判斷傳遞的參數name是否存在
self.name = name # 如果傳遞參數name,則為子程序建立name屬性,否則使用預設屬性
#重寫了Process類的run()方法
def run(self):
print("子程序(%s) 開始執行,父程序為(%s)"%(os.getpid(),os.getppid()))
t_start = time.time()
time.sleep(self.interval)
t_stop = time.time()
print("子程序(%s)執行結束,耗時%0.2f秒"%(os.getpid(),t_stop-t_start))
if __name__=="__main__":
print("------父程序開始執行-------")
print("父程序PID:%s" % os.getpid()) # 輸出目前程式的ID
p1 = SubProcess(interval=1,name='mrsoft')
p2 = SubProcess(interval=2)
#對一個不包含target屬性的Process類執行start()方法,就會運作這個類中的run()方法,是以這裡會執行p1.run()
p1.start() # 啟動程序p1
p2.start() # 啟動程序p2
# 輸出p1和p2程序的執行狀态,如果真正進行,傳回True,否則傳回False
print("p1.is_alive=%s"%p1.is_alive())
print("p2.is_alive=%s"%p2.is_alive())
#輸出p1和p2程序的别名和pid
print("p1.name=%s"%p1.name)
print("p1.pid=%s"%p1.pid)
print("p2.name=%s"%p2.name)
print("p2.pid=%s"%p2.pid)
print("------等待子程序-------")
p1.join() # 等待p1程序結束
p2.join() # 等待p2程序結束
print("------父程序執行結束-------")
>>>
------父程序開始執行-------
父程序PID:19576
p1.is_alive=True
p2.is_alive=True
p1.name=mrsoft
p1.pid=20424
p2.name=SubProcess-2
p2.pid=19304
------等待子程序-------
子程序(20424) 開始執行,父程序為(19576)
子程序(19304) 開始執行,父程序為(19576)
子程序(20424)執行結束,耗時1.01秒
子程序(19304)執行結束,耗時2.00秒
------父程序執行結束-------
2.3 使用程序池Pool建立程序
當需要建立的⼦程序數量不多時, 可以直接利⽤multiprocessing.Process動态生成多個程序, 但如果要建立很多程序時,⼿動建立的話⼯作量會非常大,此時就可以⽤到
multiprocessing
子產品提供的Pool去建立一個程序池。
multiprocessing.Pool常⽤函數:
- apply_async(func, args, kwds):使⽤⾮阻塞⽅式調⽤func(任務并⾏執⾏),args為傳遞給func的參數清單,kwds為傳遞給func的關鍵字參數清單
- apply(func, args, kwds):使⽤阻塞⽅式調⽤func,必須等待上⼀個程序執行完任務後才能執⾏下⼀個程序,了解即可,幾乎不用
- close():關閉Pool,使其不再接受新的任務
- terminate():不管任務是否完成,⽴即終⽌
- join():主程序阻塞,等待⼦程序的退出,必須在close或terminate之後使⽤
初始化Pool時,可以指定⼀個最⼤程序數,當有新的任務送出到Pool中時,如果程序池還沒有滿,那麼就會建立⼀個新的程序⽤來執⾏該任務,但如果程序池已滿(池中的程序數已經達到指定的最⼤值),那麼該任務就會等待,直到池中有程序結束才會建立新的程序來執⾏。
# -*- coding=utf-8 -*-
from multiprocessing import Pool
import os, time
def task(name):
print('子程序(%s)執行task %s ...' % ( os.getpid() ,name))
time.sleep(1) # 休眠1秒
if __name__=='__main__':
print('父程序(%s).' % os.getpid())
p = Pool(3)
for i in range(10): # 從0開始循環10次
p.apply_async(task, args=(i,)) # 使用非阻塞方式調用task()函數
print('等待所有子程序結束...')
p.close() # 關閉程序池,關閉後p不再接收新的請求
p.join() # 等待子程序結束
print('所有子程序結束.')
>>>
父程序(22220).
等待所有子程序結束...
子程序(24124)執行task 0 ...
子程序(20972)執行task 1 ...
子程序(17992)執行task 2 ...
子程序(24124)執行task 3 ...
子程序(17992)執行task 4 ...
子程序(20972)執行task 5 ...
子程序(17992)執行task 6 ...
子程序(24124)執行task 8 ...
子程序(20972)執行task 7 ...
子程序(20972)執行task 9 ...
所有子程序結束.
這裡看到的是不一定是順序的
三、程序間通訊
3.1 隊列簡介
隊列就是模仿現實中的排隊
3.2 多程序隊列的使用
可以使用
multiprocessing
子產品的Queue實作多程序之間的資料傳遞。
初始化Queue()對象(例如:q=Quere(num)),若括号中沒有指定最大可接受的資訊數量,或數量為負值,那麼久代表可接受的消息數量沒有上線(直到記憶體的盡頭)
- Queue. qsize():傳回目前隊列包含的消息數量
- Queue. empty():如果隊列為空,傳回Ture,反之傳回False
- Queue. full():如果隊列滿了,傳回Ture,反之傳回False
- Queue. get([block[,timeout]]):擷取隊列中的一條消息,然後将其從隊列中移除,block預設值為Ture
1.如果block使用預設值,且沒有設定timeout(機關,秒),消息隊列為空,此時程式将被阻塞(停在讀取狀态),直到從消息隊列讀到資訊為止,如果設定了timeout,則會等待timeout秒,若還沒有讀到任何資訊,則抛出Queue.Empty異常 2.如果block值為False,消息隊列為空,則會立即抛出Queue.Empty異常
- Queue. get_nowait():相當于Queue.get(False)
- Queue.put(item,[block[,timeout]]):将item消息寫入隊列,block預設值為Ture
1.如果block使用預設值,且沒有設定timeout(機關,秒),消息隊列如果已經沒有空間寫入,此時程式将被阻塞(停在寫入狀态),直到從消息隊列騰出空間為止,如果設定了timeout,則會等待timeout秒,若還沒有空間,則抛出Queue.Full異常 2.如果block值為False,消息隊列沒有空間寫入,則會立即抛出Queue.Full異常
- Queue.put_nowait(item):相當于Queue.put(item,False)
# -*- coding: utf-8 -*-
from multiprocessing import Process, Queue
# 向隊列中寫入資料
def write_task(q):
if not q.full():
for i in range(5):
message = "消息" + str(i)
q.put(message)
print("寫入:%s"%message)
# 從隊列讀取資料
def read_task(q):
while not q.empty():
print("讀取:%s" % q.get(True,2)) # 等待2秒,如果還沒讀取到任何消息,則抛出"Queue.Empty"異常
if __name__ == "__main__":
print("-----父程序開始-----")
q = Queue() # 父程序建立Queue,并傳給各個子程序
pw = Process(target=write_task, args=(q,)) # 執行個體化寫入隊列的子程序,并且傳遞隊列
pr = Process(target=read_task, args=(q,)) # 執行個體化讀取隊列的子程序,并且傳遞隊列
pw.start() # 啟動子程序 pw,寫入
pr.start() # 啟動子程序 pr,讀取
pw.join() # 等待 pw 結束
pr.join() # 等待 pr 結束
print("-----父程序結束-----")
>>>
-----父程序開始-----
寫入:消息0
寫入:消息1
寫入:消息2
寫入:消息3
寫入:消息4
-----父程序結束-----
四、什麼是線程
一個任務比作一個程序,一個程序有多個線程組成
線程是作業系統能夠進行運算排程的最小機關
五、建立線程
由于線程是作業系統直接支援的執行機關,是以進階語言(python、java)通常内置了對線程的支援。python的标準庫提供了兩個子產品:_thread和threading,_thread是低級子產品,threading是進階子產品,對_thread進行了封裝。絕大多數我們隻要用threading這個進階子產品
5.1 使用threading子產品建立線程
threading子產品提供了一個Thread類來代表一個線程對象,文法如下
Thread([group[,target[,name[,args[,kwargs]]]]])
- group:值為none,為以後版本而保留
- target:表示一個可調用對象,線程啟動的時候,run()方法将調用此對象,預設值為None,表示不調用任何内容
- name:表示目前線程的名稱,預設建立一個Thread-N格式的唯一名稱
- args:表示傳遞給target函數的參數元組
- kwargs:表示傳遞給target函數的參數字典
# -*- coding: utf-8 -*-
import threading,time
def process():
for i in range(3):
time.sleep(1)
msg = "子線程"+threading.current_thread().name+'執行,i='+str(i) #name屬性中儲存的是目前線程的名字
print(msg)
if __name__ == '__main__':
print('-----主線程開始-----')
threads=[threading.Thread(target=process) for i in range(4)]
for t in threads:
t.start()
for t in threads:
t.join()
print('-----主線程結束-----')
》》》
-----主線程開始-----
子線程Thread-3執行,i=0
子線程Thread-4執行,i=0
子線程Thread-2執行,i=0
子線程Thread-1執行,i=0
子線程Thread-3執行,i=1
子線程Thread-4執行,i=1
子線程Thread-1執行,i=1
子線程Thread-2執行,i=1
子線程Thread-4執行,i=2
子線程Thread-3執行,i=2
子線程Thread-1執行,i=2
子線程Thread-2執行,i=2
-----主線程結束-----
5.2 使用Thread子類建立線程
Thread 線程類和Process程序類的使用方式費城相似,也可以定義一個子類,使其繼承Thread線程類來建立線程。
# -*- coding: utf-8 -*-
import threading
import time
class SubThread(threading.Thread):
def run(self):
for i in range(3):
time.sleep(1)
msg = "子線程"+self.name+'執行,i='+str(i) #name屬性中儲存的是目前線程的名字
print(msg)
if __name__ == '__main__':
print('-----主線程開始-----')
t1 = SubThread() # 建立子線程t1
t2 = SubThread() # 建立子線程t2
t1.start() # 啟動子線程t1
t2.start() # 啟動子線程t2
t1.join() # 等待子線程t1
t2.join() # 等待子線程t2
print('-----主線程結束-----')
>>>
-----主線程開始-----
子線程Thread-2執行,i=0
子線程Thread-1執行,i=0
子線程Thread-2執行,i=1
子線程Thread-1執行,i=1
子線程Thread-2執行,i=2
子線程Thread-1執行,i=2
-----主線程結束-----
六、線程間通信
線程之間可以通行,可以定義一個全局變量,讓後不同線程調用這變量,可以共享資料
6.1 什麼是互斥鎖
就是防止多個線程同時讀寫某一塊記憶體區域。互斥鎖為資源引入一個狀态:鎖定和非鎖定
6.2 使用互斥鎖
mutex=threading.Lock() # 建立鎖
mutex.acquire([blocking]) # 鎖定
mutex.release() # 釋放鎖
from threading import Thread,Lock
import time
n=100 # 共100張票
def task():
global n
mutex.acquire() # 上鎖
temp=n # 指派給臨時變量
time.sleep(0.1) # 休眠0.1秒
n=temp-1 # 數量減1
print('購買成功,剩餘%d張電影票'%n)
mutex.release() # 釋放鎖
if __name__ == '__main__':
mutex=Lock() # 執行個體化Lock類
t_l=[] # 初始化一個清單
for i in range(10):
t=Thread(target=task) # 執行個體化線程類
t_l.append(t) # 将線程執行個體存入清單中
t.start() # 建立線程
for t in t_l:
t.join() # 等待子線程結束
>>>
購買成功,剩餘99張電影票
購買成功,剩餘98張電影票
購買成功,剩餘97張電影票
購買成功,剩餘96張電影票
購買成功,剩餘95張電影票
購買成功,剩餘94張電影票
購買成功,剩餘93張電影票
購買成功,剩餘92張電影票
購買成功,剩餘91張電影票
購買成功,剩餘90張電影票
6.3 使用隊列線上程間通信
# -*- coding: utf-8 -*-
from queue import Queue
import random,threading,time
#生産者類
class Producer(threading.Thread):
def __init__(self, name,queue):
threading.Thread.__init__(self, name=name)
self.data=queue
def run(self):
for i in range(5):
print("生産者%s将産品%d加入隊列!" % (self.getName(), i))
self.data.put(i)
time.sleep(random.random())
print("生産者%s完成!" % self.getName())
#消費者類
class Consumer(threading.Thread):
def __init__(self,name,queue):
threading.Thread.__init__(self,name=name)
self.data=queue
def run(self):
for i in range(5):
val = self.data.get()
print("消費者%s将産品%d從隊列中取出!" % (self.getName(),val))
time.sleep(random.random())
print("消費者%s完成!" % self.getName())
if __name__ == '__main__':
print('-----主線程開始-----')
queue = Queue() # 執行個體化隊列
producer = Producer('Producer',queue) # 執行個體化線程Producer,并傳入隊列作為參數
consumer = Consumer('Consumer',queue) # 執行個體化線程Consumer,并傳入隊列作為參數
producer.start() # 啟動線程Producer
consumer.start() # 啟動線程Consumer
producer.join() # 等待線程Producer結束
consumer.join() # 等待線程Consumer結束
print('-----主線程結束-----')
>>>
-----主線程開始-----
生産者Producer将産品0加入隊列!
消費者Consumer将産品0從隊列中取出!
生産者Producer将産品1加入隊列!
消費者Consumer将産品1從隊列中取出!
生産者Producer将産品2加入隊列!
消費者Consumer将産品2從隊列中取出!
生産者Producer将産品3加入隊列!
生産者Producer将産品4加入隊列!
生産者Producer完成!
消費者Consumer将産品3從隊列中取出!
消費者Consumer将産品4從隊列中取出!
消費者Consumer完成!
-----主線程結束-----
Process finished with exit code 0