天天看點

Python學習九:使用程序和線程

文章目錄

  • ​​一、什麼是程序​​
  • ​​二、建立程序的常用方法​​
  • ​​2.1使用multiprocessing子產品建立程序​​
  • ​​2.2 使用Process子類建立程序​​
  • ​​2.3 使用程序池Pool建立程序​​
  • ​​三、程序間通訊​​
  • ​​3.1 隊列簡介​​
  • ​​3.2 多程序隊列的使用​​
  • ​​四、什麼是線程​​
  • ​​五、建立線程​​
  • ​​5.1 使用threading子產品建立線程​​
  • ​​5.2 使用Thread子類建立線程​​
  • ​​六、線程間通信​​
  • ​​6.1 什麼是互斥鎖​​
  • ​​6.2 使用互斥鎖​​
  • ​​6.3 使用隊列線上程間通信​​

一、什麼是程序

​就是指作業系統能執行多個任務​

eg:使用Windows作業系統可以同時聽音樂、學習python課程等多個任務。每個任務就是一個程序。

​程序四計算機已運作城西的實體​

​ 程序和程式不同,程式本身隻有指令、資料、及其組織形式的描述,程序才是程式(那些指令和資料)的運作執行個體。

二、建立程序的常用方法

在python中有多個子產品可以建立程序,比較常用的有​

​os.fork()函數​

​​、​

​multiprocessing​

​​子產品和​

​Pool​

​程序池

2.1使用multiprocessing子產品建立程序

文法:

​​

​Process([group[,target[,name[,args[,kwargs]]]]])​

  1. group:參數未使用,值始終為None
  2. target:表示目前程序啟動時執行的可調用對象
  3. name:為目前程序執行個體的别名
  4. args:表示傳遞給target函數的參數元組
  5. kwargs:表示傳遞給target函數的參數字典
from multiprocessing import Process

# 執行子程序代碼

def test(interval):
    print('我是子程序')

def main():
    print('主程序開始')
    p=Process(target=test,args=(1,))
    p.start()
    print('主程序結束')

if __name__ == '__main__':
    main()
>>>
主程序開始
主程序結束
我是子程序      

上述代碼中p執行個體除了常用的方法start()外,還有如下方法:

  • is_alive(): 判斷程序執行個體是否還在執行
  • join([timeout]):是否等待程序執行個體執行結束,或者等待多少秒
  • start():啟動程序執行個體(建立子程序)
  • run():如果沒有給定target參數,對這個對象調用start()方法時,就将執行對象中的run()方法;
  • terminate():不管任務是否完成,立即終止
  • name:目前執行個體别名,預設為Process-N,N為從1開始遞增的整數
  • pid:目前程序執行個體的PID值
# -*- coding:utf-8 -*-
from multiprocessing import Process
import time
import os

#兩個子程序将會調用的兩個方法
def  child_1(interval):
    print("子程序(%s)開始執行,父程序為(%s)" % (os.getpid(), os.getppid()))
    t_start = time.time()   # 計時開始
    time.sleep(interval)    # 程式将會被挂起interval秒
    t_end = time.time()     # 計時結束
    print("子程序(%s)執行時間為'%0.2f'秒"%(os.getpid(),t_end - t_start))

def  child_2(interval):
    print("子程序(%s)開始執行,父程序為(%s)" % (os.getpid(), os.getppid()))
    t_start = time.time()   # 計時開始
    time.sleep(interval)    # 程式将會被挂起interval秒
    t_end = time.time()     # 計時結束
    print("子程序(%s)執行時間為'%0.2f'秒"%(os.getpid(),t_end - t_start))

if __name__ == '__main__':
    print("------父程序開始執行-------")
    print("父程序PID:%s" % os.getpid())                  # 輸出目前程式的ID
    p1=Process(target=child_1,args=(1,))                   # 執行個體化程序p1
    p2=Process(target=child_2,name="mrsoft",args=(2,))     # 執行個體化程序p2
    p1.start()  # 啟動程序p1
    p2.start()  # 啟動程序p2
    #同時父程序仍然往下執行,如果p2程序還在執行,将會傳回True
    print("p1.is_alive=%s"%p1.is_alive())
    print("p2.is_alive=%s"%p2.is_alive())
    #輸出p1和p2程序的别名和pid
    print("p1.name=%s"%p1.name)
    print("p1.pid=%s"%p1.pid)
    print("p2.name=%s"%p2.name)
    print("p2.pid=%s"%p2.pid)
    print("------等待子程序-------")
    p1.join() # 等待p1程序結束
    p2.join() # 等待p2程序結束
    print("------父程序執行結束-------")

>>>第一次執行

------父程序開始執行-------
父程序PID:1980
p1.is_alive=True
p2.is_alive=True
p1.name=Process-1
p1.pid=3304
p2.name=mrsoft
p2.pid=12300
------等待子程序-------
子程序(3304)開始執行,父程序為(1980)
子程序(12300)開始執行,父程序為(1980)
子程序(3304)執行時間為'1.01'秒
子程序(12300)執行時間為'2.01'秒
------父程序執行結束-------

Process finished with exit code 0

>>>第二次執行
------父程序開始執行-------
父程序PID:10104
p1.is_alive=True
p2.is_alive=True
p1.name=Process-1
p1.pid=6004
p2.name=mrsoft
p2.pid=2852
------等待子程序-------
子程序(6004)開始執行,父程序為(10104)
子程序(2852)開始執行,父程序為(10104)
子程序(6004)執行時間為'1.00'秒
子程序(2852)執行時間為'2.00'秒
------父程序執行結束-------

Process finished with exit code 0      

2.2 使用Process子類建立程序

對于一些簡單的小任務,使用Process(target=test)方式實作多程序。但是使用複雜的程序,通常定義成一個類,使其​

​繼承Process類​

​每次執行個體化這個類的時候,就等同執行個體化一個程序對象。

# -*- coding:utf-8 -*-
from multiprocessing import Process
import time
import os

#繼承Process類
class SubProcess(Process):
    # 由于Process類本身也有__init__初識化方法,這個子類相當于重寫了父類的這個方法
    def __init__(self,interval,name=''):
        Process.__init__(self)      # 調用Process父類的初始化方法
        self.interval = interval    # 接收參數interval
        if name:                    # 判斷傳遞的參數name是否存在
            self.name = name        # 如果傳遞參數name,則為子程序建立name屬性,否則使用預設屬性
    #重寫了Process類的run()方法
    def run(self):
        print("子程序(%s) 開始執行,父程序為(%s)"%(os.getpid(),os.getppid()))
        t_start = time.time()
        time.sleep(self.interval)
        t_stop = time.time()
        print("子程序(%s)執行結束,耗時%0.2f秒"%(os.getpid(),t_stop-t_start))

if __name__=="__main__":
    print("------父程序開始執行-------")
    print("父程序PID:%s" % os.getpid())                  # 輸出目前程式的ID
    p1 = SubProcess(interval=1,name='mrsoft')
    p2 = SubProcess(interval=2)
    #對一個不包含target屬性的Process類執行start()方法,就會運作這個類中的run()方法,是以這裡會執行p1.run()
    p1.start()  # 啟動程序p1
    p2.start()  # 啟動程序p2
    # 輸出p1和p2程序的執行狀态,如果真正進行,傳回True,否則傳回False
    print("p1.is_alive=%s"%p1.is_alive())
    print("p2.is_alive=%s"%p2.is_alive())
    #輸出p1和p2程序的别名和pid
    print("p1.name=%s"%p1.name)
    print("p1.pid=%s"%p1.pid)
    print("p2.name=%s"%p2.name)
    print("p2.pid=%s"%p2.pid)
    print("------等待子程序-------")
    p1.join() # 等待p1程序結束
    p2.join() # 等待p2程序結束
    print("------父程序執行結束-------")


>>>
------父程序開始執行-------
父程序PID:19576
p1.is_alive=True
p2.is_alive=True
p1.name=mrsoft
p1.pid=20424
p2.name=SubProcess-2
p2.pid=19304
------等待子程序-------
子程序(20424) 開始執行,父程序為(19576)
子程序(19304) 開始執行,父程序為(19576)
子程序(20424)執行結束,耗時1.01秒
子程序(19304)執行結束,耗時2.00秒
------父程序執行結束-------      

2.3 使用程序池Pool建立程序

當需要建立的⼦程序數量不多時, 可以直接利⽤multiprocessing.Process動态生成多個程序, 但如果要建立很多程序時,⼿動建立的話⼯作量會非常大,此時就可以⽤到​

​multiprocessing​

​子產品提供的Pool去建立一個程序池。

multiprocessing.Pool常⽤函數:

  • apply_async(func, args, kwds):使⽤⾮阻塞⽅式調⽤func(任務并⾏執⾏),args為傳遞給func的參數清單,kwds為傳遞給func的關鍵字參數清單
  • apply(func, args, kwds):使⽤阻塞⽅式調⽤func,必須等待上⼀個程序執行完任務後才能執⾏下⼀個程序,了解即可,幾乎不用
  • close():關閉Pool,使其不再接受新的任務
  • terminate():不管任務是否完成,⽴即終⽌
  • join():主程序阻塞,等待⼦程序的退出,必須在close或terminate之後使⽤

初始化Pool時,可以指定⼀個最⼤程序數,當有新的任務送出到Pool中時,如果程序池還沒有滿,那麼就會建立⼀個新的程序⽤來執⾏該任務,但如果程序池已滿(池中的程序數已經達到指定的最⼤值),那麼該任務就會等待,直到池中有程序結束才會建立新的程序來執⾏。

# -*- coding=utf-8 -*-
from multiprocessing import Pool
import os, time

def task(name):
    print('子程序(%s)執行task %s ...' % ( os.getpid() ,name))
    time.sleep(1)       # 休眠1秒

if __name__=='__main__':
    print('父程序(%s).' % os.getpid())
    p = Pool(3)
    for i in range(10):                 # 從0開始循環10次    
        p.apply_async(task, args=(i,))  # 使用非阻塞方式調用task()函數   
    print('等待所有子程序結束...')
    p.close()   # 關閉程序池,關閉後p不再接收新的請求
    p.join()    # 等待子程序結束
    print('所有子程序結束.')
>>>
父程序(22220).
等待所有子程序結束...
子程序(24124)執行task 0 ...
子程序(20972)執行task 1 ...
子程序(17992)執行task 2 ...
子程序(24124)執行task 3 ...
子程序(17992)執行task 4 ...
子程序(20972)執行task 5 ...
子程序(17992)執行task 6 ...
子程序(24124)執行task 8 ...
子程序(20972)執行task 7 ...
子程序(20972)執行task 9 ...
所有子程序結束.      

這裡看到的是不一定是順序的

三、程序間通訊

3.1 隊列簡介

​隊列就是模仿現實中的排隊​

3.2 多程序隊列的使用

可以使用 ​

​multiprocessing​

​ 子產品的Queue實作多程序之間的資料傳遞。

初始化Queue()對象(例如:q=Quere(num)),若括号中沒有指定最大可接受的資訊數量,或數量為負值,那麼久代表可接受的消息數量沒有上線(直到記憶體的盡頭)

  • Queue. qsize():傳回目前隊列包含的消息數量
  • Queue. empty():如果隊列為空,傳回Ture,反之傳回False
  • Queue. full():如果隊列滿了,傳回Ture,反之傳回False
  • Queue. get([block[,timeout]]):擷取隊列中的一條消息,然後将其從隊列中移除,block預設值為Ture ​

    ​1.如果block使用預設值,且沒有設定timeout(機關,秒),消息隊列為空,此時程式将被阻塞(停在讀取狀态),直到從消息隊列讀到資訊為止,如果設定了timeout,則會等待timeout秒,若還沒有讀到任何資訊,則抛出Queue.Empty異常 2.如果block值為False,消息隊列為空,則會立即抛出Queue.Empty異常​

  • Queue. get_nowait():相當于Queue.get(False)
  • Queue.put(item,[block[,timeout]]):将item消息寫入隊列,block預設值為Ture​

    ​1.如果block使用預設值,且沒有設定timeout(機關,秒),消息隊列如果已經沒有空間寫入,此時程式将被阻塞(停在寫入狀态),直到從消息隊列騰出空間為止,如果設定了timeout,則會等待timeout秒,若還沒有空間,則抛出Queue.Full異常 2.如果block值為False,消息隊列沒有空間寫入,則會立即抛出Queue.Full異常​

  • Queue.put_nowait(item):相當于Queue.put(item,False)
# -*- coding: utf-8 -*-
from multiprocessing import Process, Queue

# 向隊列中寫入資料
def write_task(q):
    if not q.full():
        for i in range(5):
            message = "消息" + str(i)
            q.put(message)
            print("寫入:%s"%message)
# 從隊列讀取資料
def read_task(q):
    while not q.empty():
        print("讀取:%s" % q.get(True,2)) # 等待2秒,如果還沒讀取到任何消息,則抛出"Queue.Empty"異常

if __name__ == "__main__":
    print("-----父程序開始-----")
    q = Queue()  # 父程序建立Queue,并傳給各個子程序
    pw = Process(target=write_task, args=(q,))      # 執行個體化寫入隊列的子程序,并且傳遞隊列
    pr = Process(target=read_task, args=(q,))       # 執行個體化讀取隊列的子程序,并且傳遞隊列
    pw.start()   # 啟動子程序 pw,寫入
    pr.start()   # 啟動子程序 pr,讀取
    pw.join()    # 等待 pw 結束
    pr.join()    # 等待 pr 結束
    print("-----父程序結束-----")
>>>
-----父程序開始-----
寫入:消息0
寫入:消息1
寫入:消息2
寫入:消息3
寫入:消息4
-----父程序結束-----      

四、什麼是線程

一個任務比作一個程序,一個程序有多個線程組成

​線程是作業系統能夠進行運算排程的最小機關​

五、建立線程

由于線程是作業系統直接支援的執行機關,是以進階語言(python、java)通常内置了對線程的支援。python的标準庫提供了兩個子產品:_thread和threading,_thread是低級子產品,threading是進階子產品,對_thread進行了封裝。絕大多數我們隻要用threading這個進階子產品

5.1 使用threading子產品建立線程

threading子產品提供了一個Thread類來代表一個線程對象,文法如下

​​

​Thread([group[,target[,name[,args[,kwargs]]]]])​

  1. group:值為none,為以後版本而保留
  2. target:表示一個可調用對象,線程啟動的時候,run()方法将調用此對象,預設值為None,表示不調用任何内容
  3. name:表示目前線程的名稱,預設建立一個Thread-N格式的唯一名稱
  4. args:表示傳遞給target函數的參數元組
  5. kwargs:表示傳遞給target函數的參數字典
# -*- coding: utf-8 -*-
import threading,time
def process():
    for i in range(3):
        time.sleep(1)
        msg = "子線程"+threading.current_thread().name+'執行,i='+str(i) #name屬性中儲存的是目前線程的名字
        print(msg)
if __name__ == '__main__':
    print('-----主線程開始-----')
    threads=[threading.Thread(target=process) for i in range(4)]
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    print('-----主線程結束-----')
》》》

-----主線程開始-----
子線程Thread-3執行,i=0
子線程Thread-4執行,i=0
子線程Thread-2執行,i=0
子線程Thread-1執行,i=0
子線程Thread-3執行,i=1
子線程Thread-4執行,i=1
子線程Thread-1執行,i=1
子線程Thread-2執行,i=1
子線程Thread-4執行,i=2
子線程Thread-3執行,i=2
子線程Thread-1執行,i=2
子線程Thread-2執行,i=2
-----主線程結束-----      

5.2 使用Thread子類建立線程

Thread 線程類和Process程序類的使用方式費城相似,也可以定義一個子類,使其繼承Thread線程類來建立線程。

# -*- coding: utf-8 -*-
import threading
import time
class SubThread(threading.Thread):
    def run(self):
        for i in range(3):
            time.sleep(1)
            msg = "子線程"+self.name+'執行,i='+str(i) #name屬性中儲存的是目前線程的名字
            print(msg)
if __name__ == '__main__':
    print('-----主線程開始-----')
    t1 = SubThread() # 建立子線程t1
    t2 = SubThread() # 建立子線程t2
    t1.start()      # 啟動子線程t1
    t2.start()      # 啟動子線程t2
    t1.join()       # 等待子線程t1
    t2.join()       # 等待子線程t2
    print('-----主線程結束-----')
>>>
-----主線程開始-----
子線程Thread-2執行,i=0
子線程Thread-1執行,i=0
子線程Thread-2執行,i=1
子線程Thread-1執行,i=1
子線程Thread-2執行,i=2
子線程Thread-1執行,i=2
-----主線程結束-----      

六、線程間通信

線程之間可以通行,可以定義一個全局變量,讓後不同線程調用這變量,可以共享資料

6.1 什麼是互斥鎖

就是防止多個線程同時讀寫某一塊記憶體區域。互斥鎖為資源引入一個狀态:鎖定和非鎖定

6.2 使用互斥鎖

mutex=threading.Lock() # 建立鎖
mutex.acquire([blocking]) # 鎖定
mutex.release() # 釋放鎖      
from threading import Thread,Lock
import time
n=100 # 共100張票                      

def task():
    global n
    mutex.acquire()             # 上鎖
    temp=n                      # 指派給臨時變量
    time.sleep(0.1)             # 休眠0.1秒
    n=temp-1                    # 數量減1 
    print('購買成功,剩餘%d張電影票'%n)
    mutex.release()             # 釋放鎖

if __name__ == '__main__':
    mutex=Lock()                # 執行個體化Lock類
    t_l=[]                      # 初始化一個清單
    for i in range(10):         
        t=Thread(target=task)   # 執行個體化線程類    
        t_l.append(t)           # 将線程執行個體存入清單中
        t.start()               # 建立線程
    for t in t_l:               
        t.join()                # 等待子線程結束    
>>>
購買成功,剩餘99張電影票
購買成功,剩餘98張電影票
購買成功,剩餘97張電影票
購買成功,剩餘96張電影票
購買成功,剩餘95張電影票
購買成功,剩餘94張電影票
購買成功,剩餘93張電影票
購買成功,剩餘92張電影票
購買成功,剩餘91張電影票
購買成功,剩餘90張電影票      

6.3 使用隊列線上程間通信

# -*- coding: utf-8 -*-
from queue import Queue
import random,threading,time

#生産者類
class Producer(threading.Thread):
    def __init__(self, name,queue):
        threading.Thread.__init__(self, name=name)
        self.data=queue
    def run(self):
        for i in range(5):
            print("生産者%s将産品%d加入隊列!" % (self.getName(), i))
            self.data.put(i)
            time.sleep(random.random())
        print("生産者%s完成!" % self.getName())

#消費者類
class Consumer(threading.Thread):
    def __init__(self,name,queue):
        threading.Thread.__init__(self,name=name)
        self.data=queue
    def run(self):
        for i in range(5):
            val = self.data.get()
            print("消費者%s将産品%d從隊列中取出!" % (self.getName(),val))
            time.sleep(random.random())
        print("消費者%s完成!" % self.getName())

if __name__ == '__main__':
    print('-----主線程開始-----')
    queue = Queue()        # 執行個體化隊列
    producer = Producer('Producer',queue)   # 執行個體化線程Producer,并傳入隊列作為參數
    consumer = Consumer('Consumer',queue)   # 執行個體化線程Consumer,并傳入隊列作為參數
    producer.start()    # 啟動線程Producer
    consumer.start()    # 啟動線程Consumer
    producer.join()     # 等待線程Producer結束
    consumer.join()     # 等待線程Consumer結束
    print('-----主線程結束-----')
>>>
-----主線程開始-----
生産者Producer将産品0加入隊列!
消費者Consumer将産品0從隊列中取出!
生産者Producer将産品1加入隊列!
消費者Consumer将産品1從隊列中取出!
生産者Producer将産品2加入隊列!
消費者Consumer将産品2從隊列中取出!
生産者Producer将産品3加入隊列!
生産者Producer将産品4加入隊列!
生産者Producer完成!
消費者Consumer将産品3從隊列中取出!
消費者Consumer将産品4從隊列中取出!
消費者Consumer完成!
-----主線程結束-----

Process finished with exit code 0