天天看點

Python線程指南

線程有5種狀态,狀态轉換的過程如下圖所示:

多線程的優勢在于可以同時運作多個任務(至少感覺起來是這樣)。但是當線程需要共享資料時,可能存在資料不同步的問題。考慮這樣一種情況:一個清單裡所有元素都是0,線程"set"從後向前把所有元素改成1,而線程"print"負責從前往後讀取清單并列印。那麼,可能線程"set"開始改的時候,線程"print"便來列印清單了,輸出就成了一半0一半1,這就是資料的不同步。為了避免這種情況,引入了鎖的概念。

鎖有兩種狀态——鎖定和未鎖定。每當一個線程比如"set"要通路共享資料時,必須先獲得鎖定;如果已經有别的線程比如"print"獲得鎖定了,那麼就讓線程"set"暫停,也就是同步阻塞;等到線程"print"通路完畢,釋放鎖以後,再讓線程"set"繼續。經過這樣的處理,列印清單時要麼全部輸出0,要麼全部輸出1,不會再出現一半0一半1的尴尬場面。

線程與鎖的互動如下圖所示:

然而還有另外一種尴尬的情況:清單并不是一開始就有的;而是通過線程"create"建立的。如果"set"或者"print" 在"create"還沒有運作的時候就通路清單,将會出現一個異常。使用鎖可以解決這個問題,但是"set"和"print"将需要一個無限循環——他們不知道"create"什麼時候會運作,讓"create"在運作後通知"set"和"print"顯然是一個更好的解決方案。于是,引入了條件變量。

條件變量允許線程比如"set"和"print"在條件不滿足的時候(清單為None時)等待,等到條件滿足的時候(清單已經建立)發出一個通知,告訴"set" 和"print"條件已經有了,你們該起床幹活了;然後"set"和"print"才繼續運作。

線程與條件變量的互動如下圖所示:

最後看看線程運作和阻塞狀态的轉換。

阻塞有三種情況:    

同步阻塞是指處于競争鎖定的狀态,線程請求鎖定時将進入這個狀态,一旦成功獲得鎖定又恢複到運作狀态;     

等待阻塞是指等待其他線程通知的狀态,線程獲得條件鎖定後,調用“等待”将進入這個狀态,一旦其他線程發出通知,線程将進入同步阻塞狀态,再次競争條件鎖定;     

而其他阻塞是指調用time.sleep()、anotherthread.join()或等待IO時的阻塞,這個狀态下線程不會釋放已獲得的鎖定。

tips: 如果能了解這些内容,接下來的主題将是非常輕松的;并且,這些内容在大部分流行的程式設計語言裡都是一樣的。(意思就是非看懂不可 >_< 嫌作者水準低找别人的教程也要看懂)

Python通過兩個标準庫thread和threading提供對線程的支援。thread提供了低級别的、原始的線程以及一個簡單的鎖。

<a href="http://www.cnblogs.com/huxi/archive/2010/06/26/1765808.html#">?</a>

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

<code># encoding: UTF-8</code>

<code>import</code><code>thread</code>

<code>import</code><code>time</code>

<code># 一個用于線上程中執行的函數</code>

<code>def</code><code>func():</code>

<code></code><code>for</code><code>i </code><code>in</code><code>range</code><code>(</code><code>5</code><code>):</code>

<code></code><code>print</code><code>'func'</code>

<code></code><code>time.sleep(</code><code>1</code><code>)</code>

<code></code>

<code></code><code># 結束目前線程</code>

<code></code><code># 這個方法與thread.exit_thread()等價</code>

<code></code><code>thread.exit() </code><code># 當func傳回時,線程同樣會結束</code>

<code># 啟動一個線程,線程立即開始運作</code>

<code># 這個方法與thread.start_new_thread()等價</code>

<code># 第一個參數是方法,第二個參數是方法的參數</code>

<code>thread.start_new(func, ()) </code><code># 方法沒有參數時需要傳入空tuple</code>

<code># 建立一個鎖(LockType,不能直接執行個體化)</code>

<code># 這個方法與thread.allocate_lock()等價</code>

<code>lock </code><code>=</code><code>thread.allocate()</code>

<code># 判斷鎖是鎖定狀态還是釋放狀态</code>

<code>print</code><code>lock.locked()</code>

<code># 鎖通常用于控制對共享資源的通路</code>

<code>count </code><code>=</code><code>0</code>

<code># 獲得鎖,成功獲得鎖定後傳回True</code>

<code># 可選的timeout參數不填時将一直阻塞直到獲得鎖定</code>

<code># 否則逾時後将傳回False</code>

<code>if</code><code>lock.acquire():</code>

<code></code><code>count </code><code>+</code><code>=</code><code>1</code>

<code></code><code># 釋放鎖</code>

<code></code><code>lock.release()</code>

<code># thread子產品提供的線程都将在主線程結束後同時結束</code>

<code>time.sleep(</code><code>6</code><code>)</code>

thread 子產品提供的其他方法:     

thread.interrupt_main(): 在其他線程中終止主線程。   

thread.get_ident(): 獲得一個代表目前線程的魔法數字,常用于從一個字典中獲得線程相關的資料。這個數字本身沒有任何含義,并且當線程結束後會被新線程複用。

thread還提供了一個ThreadLocal類用于管理線程相關的資料,名為 thread._local,threading中引用了這個類。

由于thread提供的線程功能不多,無法在主線程結束後繼續運作,不提供條件變量等等原因,一般不使用thread子產品,這裡就不多介紹了。

threading基于Java的線程模型設計。鎖(Lock)和條件變量(Condition)在Java中是對象的基本行為(每一個對象都自帶了鎖和條件變量),而在Python中則是獨立的對象。Python Thread提供了Java Thread的行為的子集;沒有優先級、線程組,線程也不能被停止、暫停、恢複、中斷。Java Thread中的部分被Python實作了的靜态方法在threading中以子產品方法的形式提供。

threading 子產品提供的常用方法:

threading.currentThread(): 傳回目前的線程變量。   

threading.enumerate(): 傳回一個包含正在運作的線程的list。正在運作指線程啟動後、結束前,不包括啟動前和終止後的線程。   

threading.activeCount(): 傳回正在運作的線程數量,與len(threading.enumerate())有相同的結果。

threading子產品提供的類:  

Thread, Lock, Rlock, Condition, [Bounded]Semaphore, Event, Timer, local.

Thread是線程類,與Java類似,有兩種使用方法,直接傳入要運作的方法或從Thread繼承并覆寫run():

<code>import</code><code>threading</code>

<code># 方法1:将要執行的方法作為參數傳給Thread的構造方法</code>

<code></code><code>print</code><code>'func() passed to Thread'</code>

<code>t </code><code>=</code><code>threading.Thread(target</code><code>=</code><code>func)</code>

<code>t.start()</code>

<code># 方法2:從Thread繼承,并重寫run()</code>

<code>class</code><code>MyThread(threading.Thread):</code>

<code></code><code>def</code><code>run(</code><code>self</code><code>):</code>

<code></code><code>print</code><code>'MyThread extended from Thread'</code>

<code>t </code><code>=</code><code>MyThread()</code>

構造方法:     

Thread(group=None, target=None, name=None, args=(), kwargs={})   

group: 線程組,目前還沒有實作,庫引用中提示必須是None;   

target: 要執行的方法;   

name: 線程名;   

args/kwargs: 要傳入方法的參數。

執行個體方法:     

isAlive(): 傳回線程是否在運作。正在運作指啟動後、終止前。   

get/setName(name): 擷取/設定線程名。   

is/setDaemon(bool): 擷取/設定是否守護線程。初始值從建立該線程的線程繼承。當沒有非守護線程仍在運作時,程式将終止。   

start(): 啟動線程。   

join([timeout]): 阻塞目前上下文環境的線程,直到調用此方法的線程終止或到達指定的timeout(可選參數)。

一個使用join()的例子:

<code>def</code><code>context(tJoin):</code>

<code></code><code>print</code><code>'in threadContext.'</code>

<code></code><code>tJoin.start()</code>

<code></code><code># 将阻塞tContext直到threadJoin終止。</code>

<code></code><code>tJoin.join()</code>

<code></code><code># tJoin終止後繼續執行。</code>

<code></code><code>print</code><code>'out threadContext.'</code>

<code>def</code><code>join():</code>

<code></code><code>print</code><code>'in threadJoin.'</code>

<code></code><code>print</code><code>'out threadJoin.'</code>

<code>tJoin </code><code>=</code><code>threading.Thread(target</code><code>=</code><code>join)</code>

<code>tContext </code><code>=</code><code>threading.Thread(target</code><code>=</code><code>context, args</code><code>=</code><code>(tJoin,))</code>

<code>tContext.start()</code>

運作結果:

in threadContext.      in threadJoin.      out threadJoin.      out threadContext.

Lock(指令鎖)是可用的最低級的同步指令。Lock處于鎖定狀态時,不被特定的線程擁有。Lock包含兩種狀态——鎖定和非鎖定,以及兩個基本的方法。

可以認為Lock有一個鎖定池,當線程請求鎖定時,将線程至于池中,直到獲得鎖定後出池。池中的線程處于狀态圖中的同步阻塞狀态。

Lock()

acquire([timeout]): 使線程進入同步阻塞狀态,嘗試獲得鎖定。   

release(): 釋放鎖。使用前線程必須已獲得鎖定,否則将抛出異常。

<code>data </code><code>=</code><code>0</code>

<code>lock </code><code>=</code><code>threading.Lock()</code>

<code></code><code>global</code><code>data</code>

<code></code><code>print</code><code>'%s acquire lock...'</code><code>%</code><code>threading.currentThread().getName()</code>

<code></code><code># 調用acquire([timeout])時,線程将一直阻塞,</code>

<code></code><code># 直到獲得鎖定或者直到timeout秒後(timeout參數可選)。</code>

<code></code><code># 傳回是否獲得鎖。</code>

<code></code><code>if</code><code>lock.acquire():</code>

<code></code><code>print</code><code>'%s get the lock.'</code><code>%</code><code>threading.currentThread().getName()</code>

<code></code><code>data </code><code>+</code><code>=</code><code>1</code>

<code></code><code>time.sleep(</code><code>2</code><code>)</code>

<code></code><code>print</code><code>'%s release lock...'</code><code>%</code><code>threading.currentThread().getName()</code>

<code></code><code># 調用release()将釋放鎖。</code>

<code>t1 </code><code>=</code><code>threading.Thread(target</code><code>=</code><code>func)</code>

<code>t2 </code><code>=</code><code>threading.Thread(target</code><code>=</code><code>func)</code>

<code>t3 </code><code>=</code><code>threading.Thread(target</code><code>=</code><code>func)</code>

<code>t1.start()</code>

<code>t2.start()</code>

<code>t3.start()</code>

RLock(可重入鎖)是一個可以被同一個線程請求多次的同步指令。RLock使用了“擁有的線程”和“遞歸等級”的概念,處于鎖定狀态時,RLock被某個線程擁有。擁有RLock的線程可以再次調用acquire(),釋放鎖時需要調用release()相同次數。

可以認為RLock包含一個鎖定池和一個初始值為0的計數器,每次成功調用 acquire()/release(),計數器将+1/-1,為0時鎖處于未鎖定狀态。

RLock()

acquire([timeout])/release(): 跟Lock差不多。

<code>rlock </code><code>=</code><code>threading.RLock()</code>

<code></code><code># 第一次請求鎖定</code>

<code></code><code>if</code><code>rlock.acquire():</code>

<code></code><code># 第二次請求鎖定</code>

<code></code><code>print</code><code>'%s acquire lock again...'</code><code>%</code><code>threading.currentThread().getName()</code>

<code></code><code># 第一次釋放鎖</code>

<code></code><code>rlock.release()</code>

<code></code><code># 第二次釋放鎖</code>

Condition(條件變量)通常與一個鎖關聯。需要在多個Contidion中共享一個鎖時,可以傳遞一個Lock/RLock執行個體給構造方法,否則它将自己生成一個RLock執行個體。

可以認為,除了Lock帶有的鎖定池外,Condition還包含一個等待池,池中的線程處于狀态圖中的等待阻塞狀态,直到另一個線程調用notify()/notifyAll()通知;得到通知後線程進入鎖定池等待鎖定。

Condition([lock/rlock])

acquire([timeout])/release(): 調用關聯的鎖的相應方法。   

wait([timeout]): 調用這個方法将使線程進入Condition的等待池等待通知,并釋放鎖。使用前線程必須已獲得鎖定,否則将抛出異常。   

notify(): 調用這個方法将從等待池挑選一個線程并通知,收到通知的線程将自動調用acquire()嘗試獲得鎖定(進入鎖定池);其他線程仍然在等待池中。調用這個方法不會釋放鎖定。使用前線程必須已獲得鎖定,否則将抛出異常。   

notifyAll(): 調用這個方法将通知等待池中所有的線程,這些線程都将進入鎖定池嘗試獲得鎖定。調用這個方法不會釋放鎖定。使用前線程必須已獲得鎖定,否則将抛出異常。

例子是很常見的生産者/消費者模式:

41

42

43

44

45

46

47

<code># 商品</code>

<code>product </code><code>=</code><code>None</code>

<code># 條件變量</code>

<code>con </code><code>=</code><code>threading.Condition()</code>

<code># 生産者方法</code>

<code>def</code><code>produce():</code>

<code></code><code>global</code><code>product</code>

<code></code><code>if</code><code>con.acquire():</code>

<code></code><code>while</code><code>True</code><code>:</code>

<code></code><code>if</code><code>product </code><code>is</code><code>None</code><code>:</code>

<code></code><code>print</code><code>'produce...'</code>

<code></code><code>product </code><code>=</code><code>'anything'</code>

<code></code><code># 通知消費者,商品已經生産</code>

<code></code><code>con.notify()</code>

<code></code><code># 等待通知</code>

<code></code><code>con.wait()</code>

<code># 消費者方法</code>

<code>def</code><code>consume():</code>

<code></code><code>if</code><code>product </code><code>is</code><code>not</code><code>None</code><code>:</code>

<code></code><code>print</code><code>'consume...'</code>

<code></code><code>product </code><code>=</code><code>None</code>

<code></code><code># 通知生産者,商品已經沒了</code>

<code>t1 </code><code>=</code><code>threading.Thread(target</code><code>=</code><code>produce)</code>

<code>t2 </code><code>=</code><code>threading.Thread(target</code><code>=</code><code>consume)</code>

Semaphore(信号量)是計算機科學史上最古老的同步指令之一。Semaphore管理一個内置的計數器,每當調用acquire()時-1,調用release() 時+1。計數器不能小于0;當計數器為0時,acquire()将阻塞線程至同步鎖定狀态,直到其他線程調用release()。

基于這個特點,Semaphore經常用來同步一些有“訪客上限”的對象,比如連接配接池。

BoundedSemaphore 與Semaphore的唯一差別在于前者将在調用release()時檢查計數器的值是否超過了計數器的初始值,如果超過了将抛出一個異常。

構造方法:

Semaphore(value=1): value是計數器的初始值。

acquire([timeout]): 請求Semaphore。如果計數器為0,将阻塞線程至同步阻塞狀态;否則将計數器-1并立即傳回。   

release(): 釋放Semaphore,将計數器+1,如果使用BoundedSemaphore,還将進行釋放次數檢查。release()方法不檢查線程是否已獲得 Semaphore。

<code># 計數器初值為2</code>

<code>semaphore </code><code>=</code><code>threading.Semaphore(</code><code>2</code><code>)</code>

<code></code><code># 請求Semaphore,成功後計數器-1;計數器為0時阻塞</code>

<code></code><code>print</code><code>'%s acquire semaphore...'</code><code>%</code><code>threading.currentThread().getName()</code>

<code></code><code>if</code><code>semaphore.acquire():</code>

<code></code><code>print</code><code>'%s get semaphore'</code><code>%</code><code>threading.currentThread().getName()</code>

<code></code><code>time.sleep(</code><code>4</code><code>)</code>

<code></code><code># 釋放Semaphore,計數器+1</code>

<code></code><code>print</code><code>'%s release semaphore'</code><code>%</code><code>threading.currentThread().getName()</code>

<code></code><code>semaphore.release()</code>

<code>t4 </code><code>=</code><code>threading.Thread(target</code><code>=</code><code>func)</code>

<code>t4.start()</code>

<code>time.sleep(</code><code>2</code><code>)</code>

<code># 沒有獲得semaphore的主線程也可以調用release</code>

<code># 若使用BoundedSemaphore,t4釋放semaphore時将抛出異常</code>

<code>print</code><code>'MainThread release semaphore without acquire'</code>

<code>semaphore.release()</code>

Event(事件)是最簡單的線程通信機制之一:一個線程通知事件,其他線程等待事件。Event内置了一個初始為False的标志,當調用set()時設為True,調用clear()時重置為 False。wait()将阻塞線程至等待阻塞狀态。

Event其實就是一個簡化版的 Condition。Event沒有鎖,無法使線程進入同步阻塞狀态。

Event()

isSet(): 當内置标志為True時傳回True。   

set(): 将标志設為True,并通知所有處于等待阻塞狀态的線程恢複運作狀态。   

clear(): 将标志設為False。   

wait([timeout]): 如果标志為True将立即傳回,否則阻塞線程至等待阻塞狀态,等待其他線程調用set()。

<code>event </code><code>=</code><code>threading.Event()</code>

<code></code><code># 等待事件,進入等待阻塞狀态</code>

<code></code><code>print</code><code>'%s wait for event...'</code><code>%</code><code>threading.currentThread().getName()</code>

<code></code><code>event.wait()</code>

<code></code><code># 收到事件後進入運作狀态</code>

<code></code><code>print</code><code>'%s recv event.'</code><code>%</code><code>threading.currentThread().getName()</code>

<code># 發送事件通知</code>

<code>print</code><code>'MainThread set event.'</code>

<code>event.</code><code>set</code><code>()</code>

Timer(定時器)是Thread的派生類,用于在指定時間後調用一個方法。

Timer(interval, function, args=[], kwargs={})   

interval: 指定的時間   

function: 要執行的方法   

args/kwargs: 方法的參數

Timer從Thread派生,沒有增加執行個體方法。

<code></code><code>print</code><code>'hello timer!'</code>

<code>timer </code><code>=</code><code>threading.Timer(</code><code>5</code><code>, func)</code>

<code>timer.start()</code>

local是一個小寫字母開頭的類,用于管理 thread-local(線程局部的)資料。對于同一個local,線程無法通路其他線程設定的屬性;線程設定的屬性不會被其他線程設定的同名屬性替換。

可以把local看成是一個“線程-屬性字典”的字典,local封裝了從自身使用線程作為 key檢索對應的屬性字典、再使用屬性名作為key檢索屬性值的細節。

<code>local </code><code>=</code><code>threading.local()</code>

<code>local.tname </code><code>=</code><code>'main'</code>

<code></code><code>local.tname </code><code>=</code><code>'notmain'</code>

<code></code><code>print</code><code>local.tname</code>

<code>t1.join()</code>

<code>print</code><code>local.tname</code>

熟練掌握Thread、Lock、Condition就可以應對絕大多數需要使用線程的場合,某些情況下local也是非常有用的東西。本文的最後使用這幾個類展示線程基礎中提到的場景:

<code>alist </code><code>=</code><code>None</code>

<code>condition </code><code>=</code><code>threading.Condition()</code>

<code>def</code><code>doSet():</code>

<code></code><code>if</code><code>condition.acquire():</code>

<code></code><code>while</code><code>alist </code><code>is</code><code>None</code><code>:</code>

<code></code><code>condition.wait()</code>

<code></code><code>for</code><code>i </code><code>in</code><code>range</code><code>(</code><code>len</code><code>(alist))[::</code><code>-</code><code>1</code><code>]:</code>

<code></code><code>alist[i] </code><code>=</code><code>1</code>

<code></code><code>condition.release()</code>

<code>def</code><code>doPrint():</code>

<code></code><code>for</code><code>i </code><code>in</code><code>alist:</code>

<code></code><code>print</code><code>i,</code>

<code></code><code>print</code>

<code>def</code><code>doCreate():</code>

<code></code><code>global</code><code>alist</code>

<code></code><code>if</code><code>alist </code><code>is</code><code>None</code><code>:</code>

<code></code><code>alist </code><code>=</code><code>[</code><code>0</code><code>for</code><code>i </code><code>in</code><code>range</code><code>(</code><code>10</code><code>)]</code>

<code></code><code>condition.notifyAll()</code>

<code>tset </code><code>=</code><code>threading.Thread(target</code><code>=</code><code>doSet,name</code><code>=</code><code>'tset'</code><code>)</code>

<code>tprint </code><code>=</code><code>threading.Thread(target</code><code>=</code><code>doPrint,name</code><code>=</code><code>'tprint'</code><code>)</code>

<code>tcreate </code><code>=</code><code>threading.Thread(target</code><code>=</code><code>doCreate,name</code><code>=</code><code>'tcreate'</code><code>)</code>

<code>tset.start()</code>

<code>tprint.start()</code>

<code>tcreate.start()</code>