天天看點

python多核多線程程式設計_Python自動化開發學習9-多線程、隊列

threading 子產品

先了解一下程序與線程的概念和差別,然後通過threading子產品來學習了解線程。程序要下次講了。

之後看一下兩種調用線程的方式,效果和實作都一樣。貌似也沒有什麼時候用哪種,反正愛用哪種用哪種。一般的話直接調用就好了。

線程與程序

線程,是作業系統能夠進行運算排程的最小機關。

程序,是對各種資源管理的集合。

程序就是一個執行中的程式。程式并不能單獨運作,隻有将程式裝載到記憶體中,系統為它配置設定資源才能運作,而這種執行的程式就稱之為程序。程式和程序的差別就在于:程式是指令的集合,它是程序運作的靜态描述文本;程序是程式的一次執行活動,屬于動态概念。

程序要進行運算,必須要先建立一個線程。因為程序不具備執行的動作,但是他包含線程,通過線程來進行運算。所有在同一個程序裡的線程,是共享同一塊記憶體空間的。

直接調用

import threading

import time

def task(num):

print("running on task", num)

time.sleep(3)

print("task over", num)

if __name__ == '__main__':

t1 = threading.Thread(target=task, args=(1,)) # 生成一個線程執行個體

t2 = threading.Thread(target=task, args=(2,)) # 再生成一個執行個體

t1.start() # 啟動線程

t2.start() # 再啟動一個

print(t1.getName()) # 擷取線程名

print(t2.getName())

參數注意:函數名和參數要分開寫。并且參數要寫成元組的形式,這裡隻有一個參數,是以也必須用括号括起來後面加個逗号,表示這是一個元組。

繼承式調用

上面是直接執行個體化了 threading.Thread這個類,我們也可以像下面這樣先繼承這個類,然後重構它的run方法。

import threading

import time

class MyThread(threading.Thread):

def __init__(self, num):

super(MyThread, self).__init__() # 繼承父類的構造函數

self.num = num

def run(self):

"每個線程要運作的函數,必須寫到run方法裡"

print("running on task", self.num)

time.sleep(3)

print("task over", self.num)

if __name__ == '__main__':

t1 = MyThread(1)

t2 = MyThread(2)

t1.start() # 這裡就會執行run方法

t2.start()

print(t1.getName())

print(t2.getName())

在直接調用中,就是将你的函數名和運作參數,在執行個體化的時候,通過類的構造函數傳遞給了threading.Thread類的run方法。而這裡我們是重構了這個run方法

使用for循環調用多個線程

如果需要一次調用多個線程,就不能再像上面那樣一個一個寫了。可以用一個for,來循環調用啟動

import threading

import time

def task(num):

print("running on task", num)

time.sleep(3)

print("task over", num)

if __name__ == '__main__':

for i in range(50):

t = threading.Thread(target=task, args=('t%s' % i,))

t.start()

print("全部運作結束?") # 注意這句print執行的時間

程式主函數的線程

在上面的例子中,最後的print并沒有等待之前的sleep運作結束,而是直接執行了。這裡主函數是一個線程,其他使用threading啟動的都是這個主線程啟動的子線程。所有線程都是獨立執行的,主線程啟動了子線程之後兩者就互相獨立了,互相獨立并行執行。

我們可以通過 threading.current_thread() 擷取到目前的線程名:

import threading

import time

def task(num):

print("running on task", num, threading.current_thread())

time.sleep(3)

print("task over", num)

if __name__ == '__main__':

print("running on Main", threading.current_thread())

for i in range(50):

t = threading.Thread(target=task, args=('t%s' % i,))

t.start()

print("全部運作結束?") # 注意這句print執行的時間

可以看到,主函數是MainThread。每一個線程都有線程名和線程号。

join方法

如果我們希望所有的子線程都是并行的,但是主函數需要等待所有子線程都執行完畢後再統一繼續執行,就需要join方法。

join方法,是等待這個線程執行完畢後才會繼續執行之後的語句。每一個線程都要有一個join,否則就不會等待這個線程執行完畢。

import threading

import time

def task(num):

print("running on task", num)

time.sleep(3)

print("task over", num)

if __name__ == '__main__':

t1 = threading.Thread(target=task, args=(1,)) # 生成一個線程執行個體

t2 = threading.Thread(target=task, args=(2,)) # 再生成一個執行個體

t1.start() # 啟動線程

t2.start() # 再啟動一個

t1.join() # 為每個線程加一個join

t2.join()

print(t1.getName())

print(t2.getName())

上面是2個子線程的情況,如果是之前那樣的50個子線程,那麼還需要再寫一個for循環來執行join。這次可以來計算一下程式的運作時間。

import threading

import time

def task(num):

print("running on task", num)

time.sleep(3)

print("task over", num)

if __name__ == '__main__':

t_objs = [] # 先定義一個空清單

start_time = time.time()

for i in range(50):

t = threading.Thread(target=task, args=('t%s' % i,))

t.start()

t_objs.append(t) # 儲存每一個執行個體,否則跳出目前for循環後無法調用

for j in t_objs:

j.join()

print("總共運作時間:", time.time()-start_time) # 計算總共的運作時間

print("全部運作結束?") # 注意這句print執行的時間

活動的線程個數

通過 threading.active_count() 可以擷取到活動的線程個數

import threading

import time

def task(num):

print("running on task", num)

time.sleep(3)

print("task over", num)

if __name__ == '__main__':

print(threading.active_count()) # 運作前是1,因為有一個主函數的線程

t_objs = [] # 先定義一個空清單

start_time = time.time()

for i in range(50):

t = threading.Thread(target=task, args=('t%s' % i,))

t.start()

t_objs.append(t) # 儲存沒一個執行個體,否則跳出目前for循環後無法調用

print(threading.active_count()) # 所有子線程都起來了,一個50個,再加上一個主函數

for j in t_objs:

j.join()

print(threading.active_count()) # 所有子線程都結束了,又隻剩一個主函數了

print("總共運作時間:", time.time()-start_time) # 計算總共的運作時間

守護線程

之前的程序互相之間都是獨立的,雖然有父線程建立子線程,但是建立完之後,這2個線程也就互相獨立了。這個是預設的設定。

守護線程,在建立完線程還沒有運作之前,可以将線程設定為守護線程。守護線程依賴主線程而存在,主線程一旦運作完畢,守護線程無論是什麼情況,都會停止。

兩者的差别就是,之前的情況,所有線程都是獨立運作的。如果沒有使用join,所有的線程包括主線程都是獨立運作,當所有線程全部運作結束後,我們的程式才會結束。如果将線程設為守護線程後,那麼當主線程和其他線程運作完之後,不會等待守護線程運作結束,程式會直接結束。

import threading

import time

def task(num):

print("running on task", num)

time.sleep(3) # 設為守護線程後,不會等待守護線程運作結束了

print("task over", num) # 是以這裡也不會列印了

if __name__ == '__main__':

for i in range(50):

t = threading.Thread(target=task, args=('t%s' % i,))

# 将線程設定為守護線程,當主線程退出時,守護線程也會退出

# 并且由這個守護線程啟動的其它子線程也是守護線程,也會會同時退出,不管是否執行完任務

t.setDaemon(True) # 必須在建立線程後,但是在運作前才能将線程設定為守護線程

t.start()

print("運作結束,程序數量", threading.active_count()) # 這裡不會等待子線程運作完畢

小結

線程類的方法:

start :線程準備就緒,等待CPU排程

setName :為線程設定名稱

getName :擷取線程名稱

setDaemon:設定守護線程,設為True就是守護線程。預設False

join :逐個執行每個線程,執行完畢後繼續往下執行

run :線程被cpu排程後自動執行線程對象的run方法

threading.current_thread : 檢視目前的線程名

threading.active_count : 檢視活動線程的數量

Python GIL

Python GIL(Global Interpreter Lock),全程解釋器鎖。無論你啟多少個線程,你的cpu是多少核,Python在執行的時候隻能是單核運作。這個是使用Python解析器(CPython)時會有的情況。Python除了(CPython)還可以通過CPython,PyPy,Psyco等不同的Python執行環境來執行,但是CPython是大部分環境下預設的Python執行環境。

是以我們用CPython就會有GIL,有GIL就不是真正的多線程,隻能單核運作。我們還是會繼續在CPython下學習和運作Python,GIL還是會繼續存在。目前隻要知道這麼多就行了,怎麼利用多核是下次的内容。

線程鎖

這段上課演砸了,不過沒關系,我大概搞明白了。

一個程序下可以啟動多個線程,多個線程共享父程序的記憶體空間,也就意味着每個線程可以通路同一份資料,此時,如果2個線程同時要修改同一份資料,這是資料就混亂了。下面模拟沒有線程鎖造成資料混亂的情況。

import threading

import time

gl_num = 0

def show():

global gl_num # 聲明全局變量

gl_num += 1 # 先執行自增1,然後停頓一會

# 把下面的sleep注釋掉,可能是資料處理到輸出之間幾乎沒有間隔,看不到資料混亂的情況

time.sleep(0.1) # sleep的這段時間,其他線程也會操作這個變量

print(gl_num) # 最後輸出的時候,就是所有線程操作後的結果

for i in range(10):

t = threading.Thread(target=show)

t.start()

print("運作結束")

是以,出現了線程鎖,同一時刻隻允許一個線程修改資料。

import threading

import time

gl_num = 0

lock = threading.Lock() # 申請一把鎖,生成一個執行個體

def show():

global gl_num

lock.acquire() # 修改資料前加鎖,此時别的線程就無法操作了。

gl_num += 1

time.sleep(0.1)

print(gl_num)

lock.release() # 上面列印出結果了,釋放鎖允許别的線程繼續操作

for i in range(10):

t = threading.Thread(target=show)

t.start()

print("運作結束")

上面加了鎖之間的内容其實就變成了串行執行了。

遞歸鎖

在使用線程鎖的時候,如果你需要用到多把鎖嵌套使用,可能會導緻程式鎖死,永遠無法release。下面示範一個會出現鎖死的情況。說白了,就是大鎖中還要再包含子鎖。

import threading

def run1():

print("grab the first part data")

lock.acquire()

global num

num += 1

lock.release()

return num

def run2():

print("grab the second part data")

lock.acquire()

global num2

num2 += 1

lock.release()

return num2

def run3():

lock.acquire()

res = run1()

print('--------between run1 and run2-----')

res2 = run2()

lock.release()

print(res, res2)

if __name__ == '__main__':

num, num2 = 0, 0

# 下面使用了遞歸鎖RLock,可以正常執行,如果換成之前的Lock,就會出現鎖死的情況

lock = threading.RLock()

# 其實這裡都不是線程數量的問題,起一個子線程就會鎖死了

for i in range(10):

t = threading.Thread(target=run3)

t.start()

while threading.active_count() != 1: # 這裡沒用join,而是通過判斷活躍線程數來确認子線程是否執行完畢

print("活動線程數量:", threading.active_count())

else:

print('----all threads done---')

print(num, num2)

在上面的代碼中,程式會先進入run3,run3中有第一道鎖。然後在run3中會分别去執行run1和run2,而這裡面又都會有第二道鎖。此時可能是程式吧兩道鎖搞混了,是以導緻再也release不出來了。使用RLock來代替Lock就避免了這種情況。

也沒講RLock和Lock的其他差別,既然RLock沒問題,貌似不用Lock就好了。簡單的場景都OK,複雜的場景下一定要用遞歸鎖避免程式被鎖死。

信号量

線程鎖,同時隻允許一個線程更改資料。

信号量(Semaphore),是同時允許一定數量的線程更改資料。比如廁所有3個坑,那最多隻允許3個人上廁所,後面的人隻能等裡面有人出來了才能再進去。

信号量的使用就和線程鎖一樣,執行個體化的時候類名變了,多一個int參數。之後都是一樣的acquire上鎖和release釋放。

import threading,time

# 一般應該是寫到if裡的,不過這裡示範,從上到下按順序執行邏輯比較清晰

semaphore = threading.BoundedSemaphore(5) # 最多允許5個線程同時運作

def run(n):

semaphore.acquire()

time.sleep(1)

print("run the thread: %s" % n)

semaphore.release()

if __name__ == '__main__':

for i in range(30):

t = threading.Thread(target=run, args=(i,))

t.start()

while threading.active_count() != 1:

pass # 繼續用活動程序數量等待子程式結束

else:

print("所有程序執行完畢")

上面代碼執行後的效果就是,每次隻蹦5條結果出來。因為通過信号量限制了同一時間隻允許一定數量的線程操作資料。

這裡的代碼比較簡單,而且都是一樣的,是以看上去是每次執行5個。實際是這裡面每一個都是獨立的。一旦有一個執行完釋放了之後,就會讓下一個繼續執行。就是線程都是一個一個放行的,一旦一個執行完畢就放行下一個,而不是一批一批放行的。

定時器 timer

從線程被start方法調用開始,定時器開始計時。計時完畢後才會開始執行

import threading

def hello(name):

print("你好,%s" % name)

t = threading.Timer(2, hello, args=("世界",))

t.start() # 需要等待上面指定的秒數後才會真正執行

事件 event

事件(event),用于線程之間資料同步的。通過 event 來實作兩個或多個線程間的互動。

下面把用于控制的線程稱為服務端,被控制的線程稱為用戶端。

服務端,設定 event 的狀态,隻有 set 和 clear 兩個方法

用戶端,檢查 event 的狀态,如果是 set 就繼續執行,否則就阻塞等待 set

event = threading.Event() :使用前,先生成一個執行個體

event.set() :服務端線程,将 event 設為 set ,用戶端就可以繼續執行

event.clear() :服務端線程,将 event 的 set 清除,用戶端會阻塞直到再次設為 set

event.wait() :用戶端線程,等待 event 變成 set ,如果 set 就繼續,否則就阻塞直到 set

event.is_set() :布爾值,目前event的狀态。用戶端線程也可以用它來做控制。但是如果不是 set 和 clear時都需要執行的話,還是用 wait 來控制比較好。wait是用阻塞來控制的,而這裡是每次都要做一下判斷。

課上舉例了一個紅綠燈的例子,起一個紅綠燈的線程,如果綠燈就 set ,紅燈就 clear。然後可以起幾個車的線程,判斷event,隻在set的時候執行:

import threading, time

event = threading.Event() # 這句應該寫到if __name__裡面,先放這裡看看清楚

def light():

"模拟紅綠燈"

count = 0

while True:

if count < 10:

event.set() # 綠燈将标志位設為 set

print("\033[42;1m**\033[0m綠燈", 10-count, 'event狀态:', event.is_set())

elif count < 20:

event.clear() # 紅燈清除标志位

print("\033[41;1m**\033[0m紅燈", 20-count, 'event狀态:', event.is_set())

else:

count = 0

continue

time.sleep(1)

count += 1

def car(name):

"在紅燈的時候wait,綠燈的時候執行"

while True:

event.wait() # 隻有在綠燈的時候才會繼續執行

print(name, '正在行駛...')

time.sleep(1)

if __name__ == '__main__':

light1 = threading.Thread(target=light)

light1.start()

# 你也可以用for循環多起幾個車

car1 = threading.Thread(target=car, args=("特斯拉",))

car1.start()

car2 = threading.Thread(target=car, args=("保時捷",))

car2.start()

雖然不用 event 也可以通過檢查某個變量的狀态來實作控制。但是由于變量是程序間所有線程共享的,用戶端直接通路控制變量也可以修改它,雖然你程式裡可能不會這麼寫,但不是不可以。這裡使用了 event 将這個過程封裝了,避免用戶端直接通路這個變量。

queue子產品-隊列

隊列,可以了解為一個有順序的容器,裡面存放資料。資料來的時候先将資料存入,使用資料的時候再按一定順序從容器中取出。

其實清單也能實作,用pop方法。簡單的話,還真的用清單就好了,不過子產品封裝了更多進階的設定。

有3種隊列:

import queue

# 使用之前也是要先生成一個執行個體

q1 = queue.Queue() # 先進先出

q2 = queue.LifoQueue() # 後進先出,就是堆棧。last in first out (LIFO)

q3 = queue.PriorityQueue() # 可以設定優先級的隊列

上面再執行個體化的時候,都沒有參數。有一個參數 maxsize=0 ,預設的隊列大小是沒有限制。可以設定 maxsize 來設定隊列大小。

存取資料

put(item) :存入一個資料

get(item) :取出一個資料

put_nowait(item) :存入資料的另外一個方法,不等待,直接抛出異常

get_nowait(item) :取出資料的另外一個方法,不等待,直接抛出異常

可選的參數,用于 put 和 get 方法:

block=False :預設為True,就是阻塞模式。設為False,則直接抛出異常,下面的 timeout 也就無效了。

timeout=1 :預設為None,就是一直等着。設定後為阻塞多少秒,如果這段時間内可以繼續了,就馬上繼續。否則還是抛出異常。

put在存入資料的時候,如果隊列設定了大小,并且隊列已滿,就會阻塞。直到隊列裡有資料被取出空出了位置,那麼再将這個資料存入繼續。可以使用 put_nowait 存入,那麼滿的時候,就不直接抛出一個錯誤。

也可以設定參數 block=False 也是一樣直接抛出錯誤,預設是True。

還可以設定參數 timeout=1 這個是等待的時間,比如這裡等待1秒,如果1秒内隊列空出來,就存入,否則還是抛出異常。如果 block 已經設定了 False 這個 timeout 就沒有用了,會直接抛出錯誤。

get也是一樣,用上面的兩個參數控制,在隊列為空的情況下繼續要取資料,是阻塞,還是抛出異常,或者阻塞多久後再抛出異常。

下面的列子,生成2個線程,一個存入資料,一個使用者控制來取出資料,示範put和get的用法:

import threading, queue

q1 = queue.Queue(maxsize=2) # 隊列大小隻有2

def put_item():

"存入資料的線程"

for i in range(10):

q1.put(i, timeout=2) # 如果隊列已滿,最長等待2秒鐘,否則抛出異常

print("存入資料:", i) # 存入資料的時候,你會看到

print("資料已經全部存入...")

def control():

"使用者控制取出資料"

while True:

input("按回車取出一個資料,如果等待超過2秒會抛出異常... ")

print("取出資料:", q1.get_nowait()) # 如果隊列空了,再要取就抛出異常

if __name__ == '__main__':

put1 = threading.Thread(target=put_item)

put1.start()

control1 = threading.Thread(target=control)

control1.start()

其他方法

qsize() :隊列中元素的個數,0就是空隊列,等于maxsize就是隊列滿了

empty() :布爾值,隊列是否為空。空的話就get不到資料了

full() :布爾值,隊列是否已滿。滿的話就put不進資料了

import queue

q = queue.LifoQueue(3) # 來一個堆棧,大小隻有3

def show():

"每次列印這3個資料"

print('隊列中的資料數量:', q.qsize(),

'隊列是否為空:', q.empty(),

'隊列是否滿了', q.full())

show() # 什麼都還沒有存入,大小是0,并且是空的

q.put(1)

show() # 存入資料後,大小就變了,已經不是空堆棧了,但是還沒滿

q.put(2)

show()

q.put(3)

show() # 現在已經滿了

print(q.get()) # 看看取出資料的順序,是不是最後存入的最先取出

print(q.get())

print(q.get())

show() # 都取完了,現在又是一個空堆棧了

優先級隊列

優先級隊列例子1:

import queue

q1 = queue.PriorityQueue()

q1.put('Jack')

q1.put('Perter')

q1.put('Alice')

q1.put('1Zoey')

while not q1.empty(): # empty隊列是否為空

print(q1.get())

從上面可以看到,最後資料的結果給按ASCII順序取出來的。一般可能不這麼用,而是會單獨設定一個優先級,那麼就把優先級和資料已元組的形式存入

優先級隊列例子2:

import queue

q1 = queue.PriorityQueue()

q1.put((10, 'Jack')) # 注意資料隻占1個參數位置,是以這裡得2層括号,存入一個元組

q1.put((-1, 'Zoey'))

q1.put((10, 'Perter'))

q1.put((-1, 'Alice'))

q1.put((-1, 'Bob'))

while not q1.empty():

print(q1.get())

重構put方法,優化排序的規則

這裡還有一個問題,雖然将資料寫成了元組,但是其實排序的時候是按整個元組的内容來排序的。但是我們需要的是按照元組的第一個元素來排序,而同樣優先級的資料,仍然按照進入的順序輸出。貌似子產品本身并沒有提供這樣的方法。我們重構一個自己的優先級排序。

源碼中調用put方法,最終會調用_put方法将資料導入;

然後在_put方法中調用了heapq子產品(import queque的時候導入的)的heappush方法将資料傳遞給_siftdown方法;

最終在_siftdown方法中重新排列存儲資料的清單。問題就是這個方法,直接從清單中取出元素進行比較,而我們需要的是取出的元素是個元組,用元組的第一個元素進行比較。

是以隻要修改上面3個方法就好了。

import queue

# 繼承優先級排序的類,隻重構我們需要修改的方法

class MyPriorityQueue(queue.PriorityQueue):

"根據元組的第一個元素進行優先級排序"

def _put(self, item):

"源碼是調用heapq子產品裡的方法,這裡我們就在下面重構了,調用自己的類裡的同名方法"

self.heappush(self.queue, item)

# 下面的2個方法是heapq子產品裡的,抄過來小改一下就好了

def heappush(self, heap, item):

"這裡不用改實作的邏輯,讓它繼續調用我們下面的_siftdown方法就好了"

heap.append(item)

self._siftdown(heap, 0, len(heap) - 1)

def _siftdown(self, heap, startpos, pos):

"""主要就是修改這裡

源碼是從heap裡取出資料進行比較:if newitem < parent:

這裡改成比較資料裡的第一個元素:if newitem[0] < parent[0]:

這裡寫的比較簡單。如果存入的不是元素應該就會報錯了,不過這不是重點了,需要可以再改

"""

newitem = heap[pos]

while pos > startpos:

parentpos = (pos - 1) >> 1

parent = heap[parentpos]

if newitem[0] < parent[0]:

heap[pos] = parent

pos = parentpos

continue

break

heap[pos] = newitem

q1 = MyPriorityQueue()

q1.put((10, 'Jack')) # 注意資料隻占1個參數位置,是以這裡得2層括号,存入一個元組

q1.put((-1, 'Zoey'))

q1.put((10, 'Perter'))

q1.put((-1, 'Alice'))

q1.put((-1, 'Bob'))

while not q1.empty():

print(q1.get())

上面好長一段,其實我隻需要改一行,該怎麼做最好呢?

task_done 方法和 join 方法

task_done :每次從隊列中取出資料并且處理好之後,調用一下這個方法,來提示 join 方法,是否停止阻塞。

join :阻塞,直到隊列清空,再繼續向往後執行。

具體例子看下面的生産者消費者模型,舉例一

生産者消費者模型

回顧,之前講疊代器的時候,通過yield生成器實作過一個單線程下的有并行效果的吃包子的函數。看之前 的筆記:Python自動化開發學習4-2

現在,學習了多線程和隊列之後,我們可以用多線程來實作了,把資料存放到隊列之中。

概念

在并發程式設計中使用生産者和消費者模式能夠解決絕大多數并發問題。該模式通過平衡生産線程和消費線程的工作能力來提高程式的整體處理資料的速度。

線上程裡,生産者就是生産資料的線程,消費者就是消費資料的線程。在多線程開發當中,如果生産者處理速度很快,而消費者處理速度很慢,那麼生産者就必須等待消費者處理完,才能繼續生産資料。同樣的道理,如果消費者的處理能力大于生産者,那麼消費者就必須等待生産者。為了解決這個問題于是引入了生産者和消費者模式。

生産者消費者模式是通過一個容器來解決生産者和消費者的強耦合問題。生産者和消費者之間不直接通信,而是通過隊列來傳遞資料。是以生産者生産完資料之後不用等待消費者處理,直接放入隊列;消費者不找生産者要資料,而是直接從隊列裡取,隊列就相當于一個緩沖區,平衡了生産者和消費者的處理能力,這樣就實作了解耦。

用之前學習的多線程和隊列的知識,就可以實作這樣的模型了。

舉例一

舉例說明。寫一個生産者的函數,生産包子。再寫一個消費者的函數,消費包子。用上面的 task_done 和 join 的方法。

生産者一次生産10個包子,放入隊列,然後執行 join 阻塞,等到10個包子全部被取完了,才會繼續。

消費者取包子,不是直接找生産者,而且從隊列裡去取。取了之後執行一下 task_done 通知以下生産者看看包子有沒有被取完。

import queue, threading, time

def producer():

while True:

for i in range(10):

q.put("包子 %s" % i)

print("已經放入了10個包子")

q.join() # 阻塞,知道包子被取完

print("包子已經被取完了...")

def consumer(name):

while True:

# tmp = q.get()

print("%s 吃了一個包子 %s" % (name, q.get()))

q.task_done() # 通知join方法,取了一個資料了

time.sleep(1)

if __name__ == '__main__':

q = queue.Queue()

p = threading.Thread(target=producer,)

p.start()

c1 = threading.Thread(target=consumer, args=('Eric',))

c1.start()

c2 = threading.Thread(target=consumer, args=('Lassie',))

c2.start()

c3 = threading.Thread(target=consumer, args=('Snoopy',))

c3.start()

上面的例子中生産資料爽哦速度遠遠高于消費資料的速度,我們用阻塞來控制,防止生産了過多的資料來不及消費

舉例二

隊列本身就有自己的阻塞模式,由于使用了生産者消費者模型實作了解耦。我們不必關心生産和消費資料的速度。如果資料消費的快,在取空資料的時候就會進入阻塞,直到生産者把資料加入隊列。資料生産過快的情況也是一樣,隊列滿了自然進入阻塞,直到消費者消費了。

import queue, threading, time

# 随意調整一次生産的數量,以及每次sleep的時間間隔

def producer():

while True:

for i in range(5):

q.put("包子 %s" % i)

print("已經放入了5個包子")

time.sleep(2)

def consumer(name):

while True:

print("%s 吃了一個包子 %s" % (name, q.get()))

time.sleep(1)

# 這裡也是可以多開幾個生産者和消費者

if __name__ == '__main__':

q = queue.Queue(10) # 這次設定一下隊列的大小,如果生産過快,也會阻塞等待消費

p = threading.Thread(target=producer,)

p.start()

c1 = threading.Thread(target=consumer, args=('Eric',))

c1.start()

c2 = threading.Thread(target=consumer, args=('Lassie',))

c2.start()

c3 = threading.Thread(target=consumer, args=('Snoopy',))

c3.start()

作業

類 Fabric 主機管理程式開發:

運作程式列出主機組或者主機清單

選擇指定主機或主機組

選擇讓主機或者主機組執行指令或者向其傳輸檔案(上傳/下載下傳)

充分使用多線程或多程序

不同主機的使用者名密碼、端口可以不同

補充-線程池-concurrent.futures子產品

上面講線程的内容裡隻有信号量,并沒有線程池。信号量隻是限制同僚運作的線程,但是所有線程應該是全部都建立好的。線程占用的資源比較少,這也沒太大問題。不過如果要限制同一時間建立的線程的數量,就需要線程池。

從Python3.2開始,标準庫為我們提供了concurrent.futures子產品,它提供了ThreadPoolExecutor和ProcessPoolExecutor兩個類,實作了對threading和multiprocessing的更進階的抽象,對編寫線程池/程序池提供了直接的支援。

例子:

from concurrent.futures import ThreadPoolExecutor

def ssh_cmd(obj):

pass

# 執行者:建立線程池

executor = ThreadPoolExecutor(5)

for obj in objs:

executor.submit(ssh_cmd, obj) # 第一個參數是方法,之後的參數都是變量

executor.shutdown(wait=True)

項目裡隻用到這麼多。