天天看點

Python多線程(threading子產品)

線程(thread)是作業系統能夠進行運算排程的最小機關。它被包含在程序之中,是程序中的實際運作機關。一條線程指的是程序中一個單一順序的控制流,一個程序中可以并發多個線程,每條線程并行執行不同的任務。

由于GIL(Global Interpreter Lock)的存在,python并不能真的實作并行,隻能同時允許一個程序運作。GIL是CPython解釋器的概念,并不是python的缺陷。

threading子產品

python中通過threading子產品實作多線程

線程的2種調用方式

直接調用

threading.Thread(target=sayhi, args=(1, )) target的值是函數名,args傳入的參數,元組形式

1 import threading
 2 import time
 3  
 4 def sayhi(num): #定義每個線程要運作的函數
 5  
 6     print("running on number:%s" %num)
 7  
 8     time.sleep(3)
 9  
10 if __name__ == '__main__':
11  
12     t1 = threading.Thread(target=sayhi,args=(1,)) #生成一個線程執行個體
13     t2 = threading.Thread(target=sayhi,args=(2,)) #生成另一個線程執行個體
14  
15     t1.start() #啟動線程
16     t2.start() #啟動另一個線程
17  
18     print(t1.getName()) #擷取線程名
19     print(t2.getName())
20 ####### 結果 ###########
21 running on numbei: 1
22 running on numbei: 2
23 Thread-1
24 Thread-2      

繼承式調用

1 import threading
 2 import time
 3  
 4  
 5 class MyThread(threading.Thread):
 6     def __init__(self,num):
 7         threading.Thread.__init__(self)
 8         self.num = num
 9  
10     def run(self):#定義每個線程要運作的函數
11  
12         print("running on number:%s" %self.num)
13  
14         time.sleep(3)
15  
16 if __name__ == '__main__':
17  
18     t1 = MyThread(1)
19     t2 = MyThread(2)
20     t1.start()
21     t2.start()      

join & Daemon

1 import threading
 2 from time import ctime, sleep
 3 
 4 
 5 def music(func):
 6     for i in range(2):
 7         print("Begin listening to %s. %s" % (func, ctime()))
 8         sleep(2)
 9         print("end listening %s" % ctime())
10 
11 
12 def move(func):
13     for i in range(2):
14         print("Begin watching at the %s! %s" % (func, ctime()))
15         sleep(3)
16         print('end watching %s' % ctime())
17 
18 threads = []
19 t1 = threading.Thread(target=music, args=('七裡香',))
20 threads.append(t1)
21 t2 = threading.Thread(target=move, args=('阿甘正傳',))
22 threads.append(t2)
23 
24 if __name__ == '__main__':
25 
26     for t in threads:
27         # t.setDaemon(True)
28         t.start()
29         # t.join()
30     # t1.join()
31     t2.join()  # 考慮這三種join位置下的結果?
32     print("all over %s" % ctime())      

join():

  在子線程完成之前,這個子線程的父線程将一直被阻塞。

setDaemon(True):

  将線程聲明為守護線程,必須在start() 方法調用之前設定, 如果不設定為守護線程程式會被無限挂起。這個方法基本和join是相反的。當我們 在程式運作中,執行一個主線程,如果主線程又建立一個子線程,主線程和子線程 就分兵兩路,分别運作,那麼當主線程完成想退出時,會檢驗子線程是否完成。如 果子線程未完成,則主線程會等待子線程完成後再退出。但是有時候我們需要的是 隻要主線程完成了,不管子線程是否完成,都要和主線程一起退出,這時就可以 用setDaemon方法啦 

join以子線程為主判斷,setDaemon(True)以主線程為主判斷

thread 子產品提供的其他方法:
# threading.currentThread(): 傳回目前的線程變量。
# threading.enumerate(): 傳回一個包含正在運作的線程的list。正在運作指線程啟動後、結束前,不包括啟動前和終止後的線程。
# threading.activeCount(): 傳回正在運作的線程數量,與len(threading.enumerate())有相同的結果。
# 除了使用方法外,線程子產品同樣提供了Thread類來處理線程,Thread類提供了以下方法:
# run(): 用以表示線程活動的方法。
# start():啟動線程活動。
# join([time]): 等待至線程中止。這阻塞調用線程直至線程的join() 方法被調用中止-正常退出或者抛出未處理的異常-或者是可選的逾時發生。
# isAlive(): 傳回線程是否活動的。
# getName(): 傳回線程名。
# setName(): 設定線程名。      

 同步鎖

1 import time
 2 import threading
 3 
 4 def addNum():
 5     global num #在每個線程中都擷取這個全局變量
 6     # num-=1
 7 
 8     temp=num
 9     print('--get num:',num )
10     #time.sleep(0.1)
11     num =temp-1 #對此公共變量進行-1操作
12 
13 
14 num = 100  #設定一個共享變量
15 thread_list = []
16 for i in range(100):
17     t = threading.Thread(target=addNum)
18     t.start()
19     thread_list.append(t)
20 
21 for t in thread_list: #等待所有線程執行完畢
22     t.join()
23 
24 print('final num:', num )      
Python多線程(threading子產品)

多個線程同時操作同一個共享資源,是以造成了資源破壞,使用join時會把整個線程停住,造成了串行,失去了多線程的意義,是以要把計算的時候串行

使用同步鎖

threading.Lock()

1 import time
 2 import threading
 3 
 4 def addNum():
 5     global num #在每個線程中都擷取這個全局變量
 6     # num-=1
 7     lock.acquire()    # 鎖開始
 8     temp=num
 9     print('--get num:',num )
10     #time.sleep(0.1)
11     num =temp-1 #對此公共變量進行-1操作
12     lock.release()   # 鎖結束
13 
14 num = 100  #設定一個共享變量
15 thread_list = []
16 lock=threading.Lock()   
17 
18 for i in range(100):
19     t = threading.Thread(target=addNum)
20     t.start()
21     thread_list.append(t)
22 
23 for t in thread_list: #等待所有線程執行完畢
24     t.join()
25 
26 print('final num:', num )      

鎖中的内容一次隻允許一個程序執行

線程死鎖和遞歸鎖

線上程間共享多個資源的時候,如果兩個線程分别占有一部分資源并且同時等待對方的資源,就會造成死鎖,因為系統判斷這部分資源都正在使用,所有這兩個線程在無外力作用下将一直等待下去。下面是一個死鎖的例子:

1 import threading,time
 2 
 3 class myThread(threading.Thread):
 4     def doA(self):
 5         lockA.acquire()
 6         print(self.name,"gotlockA",time.ctime())
 7         time.sleep(3)
 8         lockB.acquire()
 9         print(self.name,"gotlockB",time.ctime())
10         lockB.release()
11         lockA.release()
12 
13     def doB(self):
14         lockB.acquire()
15         print(self.name,"gotlockB",time.ctime())
16         time.sleep(2)
17         lockA.acquire()
18         print(self.name,"gotlockA",time.ctime())
19         lockA.release()
20         lockB.release()
21     def run(self):
22         self.doA()
23         self.doB()
24 if __name__=="__main__":
25 
26     lockA=threading.Lock()
27     lockB=threading.Lock()
28     threads=[]
29     for i in range(5):
30         threads.append(myThread())
31     for t in threads:
32         t.start()
33     for t in threads:
34         t.join()#等待線程結束,後面再講。      

為了支援在同一程序中多次請求同一資源,使用“可重入鎖”

threading.RLock()

1 import time
 2 
 3 import threading
 4 
 5 class Account:
 6     def __init__(self, _id, balance):
 7         self.id = _id
 8         self.balance = balance
 9         self.lock = threading.RLock()
10 
11     def withdraw(self, amount):
12 
13         with self.lock:
14             self.balance -= amount
15 
16     def deposit(self, amount):
17         with self.lock:
18             self.balance += amount
19 
20 
21     def drawcash(self, amount):  # lock.acquire中嵌套lock.acquire的場景
22 
23         with self.lock:
24             interest=0.05
25             count=amount+amount*interest
26 
27             self.withdraw(count)
28 
29 
30 def transfer(_from, to, amount):
31 
32     # 鎖不可以加在這裡 因為其他的其它線程執行的其它方法在不加鎖的情況下資料同樣是不安全的
33      _from.withdraw(amount)
34 
35      to.deposit(amount)
36 
37 
38 
39 alex = Account('alex',1000)
40 yuan = Account('yuan',1000)
41 
42 t1=threading.Thread(target = transfer, args = (alex,yuan, 100))
43 t1.start()
44 
45 t2=threading.Thread(target = transfer, args = (yuan,alex, 200))
46 t2.start()
47 
48 t1.join()
49 t2.join()
50 
51 print('>>>',alex.balance)
52 print('>>>',yuan.balance)      

條件變量同步(Condition)

有一些線程需要滿足條件後才能繼續執行python提供了threading.Condition對象用于條件變量線程的支援

lock_con = threading.Condition([Lock/Rlock]) 鎖是可選項,不穿入鎖,對象自動建立一個RLock

wait([timeout]): 調用這個方法将使線程進入Condition的等待池等待通知,并釋放鎖。使用前線程必須已獲得鎖定,否則将抛出異常。 
notify(): 調用這個方法将從等待池挑選一個線程并通知,收到通知的線程将自動調用acquire()嘗試獲得鎖定(進入鎖定池);其他線程仍然在等待池中。調用這個方法不會釋放鎖定。使用前線程必須已獲得鎖定,否則将抛出異常。激活時從鎖的acquire開始執行,而不是從wait開始 
notifyAll(): 調用這個方法将通知等待池中所有的線程,這些線程都将進入鎖定池嘗試獲得鎖定。調用這個方法不會釋放鎖定。使用前線程必須已獲得鎖定,否則将抛出異常。      

執行個體

1 import threading,time
 2 from random import randint
 3 class Producer(threading.Thread):
 4     def run(self):
 5         global L
 6         while True:
 7             val=randint(0,100)
 8             print('生産者',self.name,":Append"+str(val),L)
 9             if lock_con.acquire():
10                 L.append(val)
11                 lock_con.notify()    # 通知wait()
12                 lock_con.release()
13             time.sleep(3)
14 class Consumer(threading.Thread):
15     def run(self):
16         global L
17         while True:
18                 lock_con.acquire()
19                 if len(L)==0:
20                     lock_con.wait()   # 等待notify() 通知 
21                 print('消費者',self.name,":Delete"+str(L[0]),L)
22                 del L[0]
23                 lock_con.release()
24                 time.sleep(0.25)
25 
26 if __name__=="__main__":
27 
28     L=[]
29     lock_con=threading.Condition()
30     threads=[]
31     for i in range(5):
32         threads.append(Producer())
33     threads.append(Consumer())
34     for t in threads:
35         t.start()
36     for t in threads:
37         t.join()      

多線程通信

同步條件(Event)

條件同步和條件變量同步意思差不多,隻是不能加鎖

event = threading.Event()    條件環境對象,初始值為false

event.isSet():傳回event的狀态值;

event.wait():如果 event.isSet()==False将阻塞線程;

event.set(): 設定event的狀态值為True,所有阻塞池的線程激活進入就緒狀态, 等待作業系統排程;

event.clear():恢複event的狀态值為False。      
1 import threading,time
 2 class Boss(threading.Thread):
 3     def run(self):
 4         print("BOSS:今晚大家都要加班到22:00。")
 5         event.isSet() or event.set()
 6         time.sleep(5)
 7         print("BOSS:<22:00>可以下班了。")
 8         event.isSet() or event.set()
 9 class Worker(threading.Thread):
10     def run(self):
11         event.wait()
12         print("Worker:哎……命苦啊!")
13         time.sleep(0.25)
14         event.clear()
15         event.wait()
16         print("Worker:OhYeah!")
17 if __name__=="__main__":
18     event=threading.Event()
19     threads=[]
20     for i in range(5):
21         threads.append(Worker())
22     threads.append(Boss())
23     for t in threads:
24         t.start()
25     for t in threads:
26         t.join()      

信号量(Semaphore)

 信号量用來控制線程并發數的,BoundedSemaphore或Semaphore管理一個内置的計數 器,每當調用acquire()時-1,調用release()時+1。

      計數器不能小于0,當計數器為 0時,acquire()将阻塞線程至同步鎖定狀态,直到其他線程調用release()。(類似于停車位的概念)

   BoundedSemaphore與Semaphore的唯一差別在于前者将在調用release()時檢查計數 器的值是否超過了計數器的初始值,如果超過了将抛出一個異常。

1 import threading,time
 2 class myThread(threading.Thread):
 3     def run(self):
 4         if semaphore.acquire():
 5             print(self.name)
 6             time.sleep(5)
 7             semaphore.release()
 8 if __name__=="__main__":
 9     semaphore=threading.Semaphore(5)
10     thrs=[]
11     for i in range(100):
12         thrs.append(myThread())
13     for t in thrs:
14         t.start()      

多線程利器(queue)

建立一個“隊列”對象
import queue
q = queue.Queue(maxsize = 10)
queue.Queue類即是一個隊列的同步實作。隊列長度可為無限或者有限。可通過Queue的構造函數的可選參數maxsize來設定隊列長度。如果maxsize小于1就表示隊列長度無限。

将一個值放入隊列中
q.put(10)
調用隊列對象的put()方法在隊尾插入一個項目。put()有兩個參數,第一個item為必需的,為插入項目的值;第二個block為可選參數,預設為
1。如果隊列目前為空且block為1,put()方法就使調用線程暫停,直到空出一個資料單元。如果block為0,put方法将引發Full異常。

将一個值從隊列中取出
q.get()
調用隊列對象的get()方法從隊頭删除并傳回一個項目。可選參數為block,預設為True。如果隊列為空且block為True,get()就使調用線程暫停,直至有項目可用。如果隊列為空且block為False,隊列将引發Empty異常。

Python Queue子產品有三種隊列及構造函數:
1、Python Queue子產品的FIFO隊列先進先出。  class queue.Queue(maxsize)
2、LIFO類似于堆,即先進後出。             class queue.LifoQueue(maxsize)
3、還有一種是優先級隊列級别越低越先出來。   class queue.PriorityQueue(maxsize)

此包中的常用方法(q = queue.Queue()):
q.qsize() 傳回隊列的大小
q.empty() 如果隊列為空,傳回True,反之False
q.full() 如果隊列滿了,傳回True,反之False
q.full 與 maxsize 大小對應
q.get([block[, timeout]]) 擷取隊列,timeout等待時間
q.get_nowait() 相當q.get(False)
非阻塞 q.put(item) 寫入隊列,timeout等待時間
q.put_nowait(item) 相當q.put(item, False)
q.task_done() 在完成一項工作之後,q.task_done() 函數向任務已經完成的隊列發送一個信号
q.join() 實際上意味着等到隊列為空,再執行别的操作      

類似清單,不過清單在多線程裡不安全

轉載于:https://www.cnblogs.com/bw13/p/5946141.html