導航
1、Thread類
2、線程同步
3、threading.Condition
4、threading.Event
5、threading.Semaphore 信号量
6、queue子產品,線程隊列
7、cpu密集型與IO密集型
8、線程池
線程是cpu運作的最小機關,沒有自己的記憶體空間,同一線程的多線程共享一個記憶體空間,同一線程下的多線程都可以通路全局變量,對全局變量進行操作時要注意同步問題。
threading子產品是建立在 _thread 子產品上的,對其進行封裝,包含較多的功能。
threading.Thread(group=None, target=None, name=None, args=(), kwargs=None, daemon=None)
- group 線程組,預留的還沒有什麼用,應設定為None
- target 目的方法,線程要執行的函數入口,這是直接通過Thread建立多線程需要指定的函數入口;通過繼承Thread類建立的多線程,target應設定為None
- name 線程名
- args/kwargs target函數的參數
- daemon 是否設定為守護線程
1)建立多線程的方式
建立多線程主要有兩種方式,①通過Thread類直接建立子線程,并指定子線程需要執行的函數邏輯。②通過繼承Thread類建立執行個體對象,重寫run方法,開啟子線程

1 from threading import Thread
2
3
4 def fn(i):
5 # 子線程邏輯
6 print("--子線程--", i)
7
8
9 if __name__ == "__main__":
10 t = Thread(target=fn, args=(100,))
11 t.start()
12 print("--主線程--")
13
14
15 # 輸出結果
16 --子線程-- 100
17 --主線程--
Thread建立多線程

1 from threading import Thread
2
3
4 class MyThread(Thread):
5 def __init__(self, name):
6 super().__init__()
7 self.name = name
8
9 def run(self):
10 # 子線程邏輯
11 print("--子線程--", self.name)
12
13
14 if __name__ == "__main__":
15 t = MyThread("my_thread_01")
16 t.start()
17 print("--主線程--")
18
19
20 # 輸出結果
21 --子線程-- my_thread_01
22 --主線程--
繼承Thread建立多線程
2)下面來看一下使用多線程與不使用多線程的差別

1 from threading import Thread
2 import time
3
4
5 def fn(i):
6 # 子線程邏輯
7 print("+++", i)
8 time.sleep(1)
9
10
11 if __name__ == "__main__":
12 start_time = time.time()
13 t_l = []
14 for i in range(5):
15 t = Thread(target=fn, args=(i,))
16 t.start()
17 t_l.append(t)
18 for i in range(5):
19 t.join()
20 print("運作所需時間:", time.time() - start_time)
21
22
23 # 輸出結果
24 +++ 0
25 +++ 1
26 +++ 2
27 +++ 3
28 +++ 4
29 運作所需時間: 1.0024588108062744
多線程簡單例子

1 import time
2
3
4 def fn(i):
5 # 子線程邏輯
6 print("+++", i)
7 time.sleep(1)
8
9
10 if __name__ == "__main__":
11 start_time = time.time()
12 for i in range(5):
13 fn(i)
14 print("運作所需時間:", time.time() - start_time)
15
16
17 # 輸出結果
18 +++ 0
19 +++ 1
20 +++ 2
21 +++ 3
22 +++ 4
23 運作所需時間: 5.0021538734436035
未使用多線程
可以看到同樣的需要執行5次fn函數的邏輯,使用多線程的效率卻比單線程的要高
3)daemon守護線程的設定
設定守護線程有有兩種方法 ① t.daemon = True 直接設定屬性 ② t.setDaemon(True) 通過方法設定。
當子線程設定為守護線程是,主線程結束時程式将不再等待子線程是否執行完畢直接退出程式。如下邊子線程設定為守護線程後,主線程結束了,子線程也将跟着結束,并未輸出子線程的語句

1 from threading import Thread
2 import time
3
4
5 def fn(i):
6 # 子線程邏輯
7 time.sleep(1)
8 print("+++", i)
9
10
11 if __name__ == "__main__":
12 for i in range(5):
13 t = Thread(target=fn, args=(i,))
14 t.setDaemon(True)
15 t.start()
16 print("---end---")
17
18
19 # 輸出結果
20 ---end---
daemon設定守護線程
4)join(timeout=None)
join方法堵塞目前上下文環境的線程,直到調用join方法的子線程執行結束後目前線程才繼續執行。在哪裡調用就在哪堵塞,最多堵塞timeout秒

1 from threading import Thread
2 import time
3
4
5 def fn():
6 # 子線程邏輯
7 time.sleep(5)
8
9
10 if __name__ == "__main__":
11 start_time = time.time()
12 t = Thread(target=fn)
13 t.start()
14 t.join()
15 print("所需時間-->", time.time() - start_time)
16
17
18 # 輸出結果
19 所需時間--> 5.001126527786255
join方法
5)is_alive(),傳回子線程存活狀态
is_alive方法判斷指定對象線程的存活狀态。線程 start 後一直到該線程結束都傳回True。線程 start 前 或線程已經結束傳回False
當多個線程的同時對一個變量進行操作時便會存在着安全的隐患問題,
如下:有2個線程同時對全局變量進行 + 1,000,000 為什麼結果卻不是 2,000,000。這是因為當線程一執行完11行時還未開始執行12行,全局變量a沒有進行 +1,就已經切換了線程二執行;線程二從10行開始執行,取到的值依舊是還沒有 +1 的值,執行完12行,切換回線程一;線程一繼續上次的斷點執行12行,這時就相當于兩次線程總共才完成了一次 +1 操作。多次循環如此,那麼全局變量最後的結果就不可能是2,000,000。這裡為了看到明顯的效果,我将 a += 1 變成了三條語句來實作

1 from threading import Thread
2
3
4 a = 0
5
6
7 def fn():
8 global a
9 for i in range(1000000):
10 t = a
11 t += 1
12 a = t
13
14
15 if __name__ == "__main__":
16 t_l = []
17 for i in range(2):
18 t = Thread(target=fn)
19 t_l.append(t)
20 t.start()
21 for t in t_l:
22 t.join()
23 print("全局變量 a-->", a)
24
25
26 # 輸出結果
27 全局變量 a--> 1332605
多線程對全局變量的修改

1 from threading import Thread
2
3 def fn():
4 for i in range(5):
5 print("1234")
6
7
8 for i in range(2):
9 t = Thread(target=fn)
10 t.start()
11
12
13 # 輸出結果1234
14 1234
15 12341234
16
17 1234
18 1234
19 1234
20 12341234
21
22 1234
多線程同時對控制台列印
經過這個例子,就可以看出當多線程對共享的資源進行修改是就涉及到同步問題了,我們希望同一時刻隻有一個線程t執行某段邏輯,其它線程必須等待這個線程t執行完才能執行。
1)threading.Lock()
Lock線程鎖,在需要進行同步的語句塊加上可以確定線程間的同步問題

1 from threading import Thread, Lock
2
3
4 def fn():
5 global a
6 for i in range(1000000):
7 lock.acquire()
8 t = a
9 t += 1
10 a = t
11 lock.release()
12
13
14 a = 0
15 t_l = []
16 lock = Lock()
17 for i in range(2):
18 t = Thread(target=fn)
19 t_l.append(t)
20 t.start()
21 for t in t_l:
22 t.join()
23 print("全局變量 a-->", a)
24
25
26 # 全局變量
27 全局變量 a--> 2000000
Lock後
2)死鎖
關于死鎖的概念我就不再叙述,就簡單的說說下邊的代碼,同時開啟兩個線程,線程t1中執行到8行,線程t2執行到20行的時候,t1發現鎖lock2沒有被釋放便會在這一直等待,t2發現鎖lock1沒有被釋放也在這一直等待,這樣誰也不會先釋放鎖,就會造成死鎖的現象,程式一直卡着。

1 from threading import Thread, Lock
2 import time
3
4 def fn1():
5 lock1.acquire()
6 print("fn1-->lock1.acquire")
7 time.sleep(1) # 確定另一個線程已經執行fn2函數且lock2已經鎖上
8 lock2.acquire()
9 print("fn1-->lock2.acquire")
10 lock2.release()
11 print("fn1-->lock2.release")
12 lock1.release()
13 print("fn1-->lock1.release")
14
15
16 def fn2():
17 lock2.acquire()
18 print("fn1-->lock2.acquire")
19 time.sleep(1) # 確定另一個線程已經執行fn1函數且lock1已經鎖上
20 lock1.acquire()
21 print("fn1-->lock1.acquire")
22 lock1.release()
23 print("fn1-->lock1.release")
24 lock2.release()
25 print("fn1-->lock2.release")
26
27
28 lock1 = Lock()
29 lock2 = Lock()
30 t1 = Thread(target=fn1)
31 t2 = Thread(target=fn2)
32 t1.start()
33 t2.start()
34 t1.join()
35 t2.join()
36 print("----end----")
37
38
39 # 輸出結果
40 fn1-->lock1.acquire
41 fn1-->lock2.acquire
死鎖
- acquire() 擷取鎖
- release() 釋放鎖
- wait(timeout=None) 等待notify的通知,沒有等到則在此堵塞,并釋放鎖cdt.release();有notify則繼續往下執行,并釋放鎖。timeout:堵塞最長時間秒,預設一直堵塞。使用這個方法之前要已經acquire(),因為這個方法會釋放鎖
- notify(n=1) 通知一個正在 wait 堵塞 的線程,n:通知線程數,預設為1
- notify_all() 通知所有正在 wait 堵塞 的線程

1 from threading import Thread, Condition
2 import time
3
4
5 def producer():
6 global cdt
7 while True:
8 time.sleep(1)
9 cdt.acquire()
10 print("+++++ 1個包子")
11 cdt.notify()
12 cdt.release()
13
14
15 def consumer():
16 global cdt
17 while True:
18 cdt.acquire()
19 cdt.wait()
20 print("----- 1個包子, 時間:", time.strftime("%X"))
21 # cdt.release()
22
23
24 cdt = Condition()
25 prd = Thread(target=producer)
26 csm = Thread(target=consumer)
27 prd.start()
28 csm.start()
29 prd.join()
30 csm.join()
31 print("----end----")
32
33
34 # 輸出結果
35 +++++ 1個包子
36 ----- 1個包子, 時間: 10:05:30
37 +++++ 1個包子
38 ----- 1個包子, 時間: 10:05:31
39 +++++ 1個包子
40 ----- 1個包子, 時間: 10:05:32
41 +++++ 1個包子
42 ----- 1個包子, 時間: 10:05:33
43 +++++ 1個包子
44 ----- 1個包子, 時間: 10:05:34
Condition,1個生産者與1個消費者

1 from threading import Thread, Condition
2 import time
3
4
5 def producer():
6 global cdt
7 while True:
8 time.sleep(1)
9 cdt.acquire()
10 print("+++++ 1個包子")
11 cdt.notify()
12 cdt.release()
13
14
15 def consumer(name):
16 global cdt
17 while True:
18 cdt.acquire()
19 cdt.wait()
20 print("消費者:%s----- 1個包子, 時間:%s" % (name, time.strftime("%X")))
21 # cdt.release()
22
23
24 cdt = Condition()
25 prd = Thread(target=producer)
26 prd.start()
27 for i in range(5):
28 csm = Thread(target=consumer, args=(i,))
29 csm.start()
30
31
32 # 輸出結果
33 +++++ 1個包子
34 消費者:0----- 1個包子, 時間:10:7:07
35 +++++ 1個包子
36 消費者:1----- 1個包子, 時間:10:7:08
37 +++++ 1個包子
38 消費者:2----- 1個包子, 時間:10:7:09
39 +++++ 1個包子
40 消費者:3----- 1個包子, 時間:10:7:10
41 +++++ 1個包子
42 消費者:4----- 1個包子, 時間:10:7:11
43 +++++ 1個包子
44 消費者:0----- 1個包子, 時間:10:7:12
45 +++++ 1個包子
46 消費者:1----- 1個包子, 時間:10:7:13
47 +++++ 1個包子
48 消費者:2----- 1個包子, 時間:10:7:14
49 +++++ 1個包子
50 消費者:3----- 1個包子, 時間:10:7:15
51 +++++ 1個包子
52 消費者:4----- 1個包子, 時間:10:7:16
Condition,1個生産者與多個消費者
- set() 設定flag=True
- clear() 清除flag,即flag=False
- wait(timeout=None) 堵塞線程等待set()後繼續執行,timeout:堵塞最長時間秒,預設一直堵塞。
Event内部定義了一個flag,當flag=Flase時,wait()将會一直堵塞線程;flag=True時,wait()不會堵塞線程,繼續向下執行,這時的wait()相當與pass語句

1 from threading import Thread, Event
2 import time
3
4
5 def producer():
6 global eve
7 while True:
8 time.sleep(1)
9 print("+++++ 1個包子")
10 eve.set()
11
12
13 def consumer():
14 global eve
15 while True:
16 eve.wait()
17 print("----- 1個包子, 時間:%s" % (time.strftime("%X")))
18 eve.clear()
19
20
21 eve = Event()
22 prd = Thread(target=producer)
23 csm = Thread(target=consumer)
24 prd.start()
25 csm.start()
26
27
28 # 輸出結果
29 +++++ 1個包子
30 ----- 1個包子, 時間:10:12:00
31 +++++ 1個包子
32 ----- 1個包子, 時間:10:12:01
33 +++++ 1個包子
34 ----- 1個包子, 時間:10:12:02
35 +++++ 1個包子
36 ----- 1個包子, 時間:10:12:03
37 +++++ 1個包子
38 ----- 1個包子, 時間:10:12:04
Event,依舊是生産者消費者

1 from threading import Thread, Event
2 import time
3
4
5 def producer():
6 global eve
7 while True:
8 time.sleep(1)
9 print("+++++ 3個包子")
10 eve.set()
11
12
13 def consumer(name):
14 global eve
15 while True:
16 eve.wait() # 在此堵塞,直到eve.set()
17 print("消費者:%s----- 1個包子, 時間:%s" % (name, time.strftime("%X")))
18 eve.clear() # 将辨別清除,需重新eve.set()
19
20
21 eve = Event()
22 prd = Thread(target=producer)
23 prd.start()
24 for i in range(5):
25 csm = Thread(target=consumer, args=(i,))
26 csm.start()
27
28
29 # 輸出結果
30 +++++ 3個包子
31 消費者:2----- 1個包子, 時間:10:10:25
32 消費者:1----- 1個包子, 時間:10:10:25
33 消費者:0----- 1個包子, 時間:10:10:25
34 +++++ 3個包子
35 消費者:0----- 1個包子, 時間:10:10:26
36 消費者:2----- 1個包子, 時間:10:10:26
37 消費者:1----- 1個包子, 時間:10:10:26
38 +++++ 3個包子
39 消費者:0----- 1個包子, 時間:10:10:27
40 消費者:2----- 1個包子, 時間:10:10:27
41 消費者:1----- 1個包子, 時間:10:10:27
Event,1個生産者與多個消費者
- Semaphore(value=1) 預設最大并發數為1
- acquire(blocking=True, timeout=None) 擷取一個信号,内部計數器 -= 1,blocking當計數器為零時堵塞線程,timeout線程堵塞時間秒
- release() 釋放一個信号,内部計數器 += 1
Semaphore内部管理着一個計數器,acquire時計數器減1,release時計數器加1,當計數器 =0 時,将在acquire語句處堵塞線程。
信号量簡單來說,就是控制線程最大并發數的。将信号量比作一定數量的停車位,線程比作車,當有停車位空出來時車才能上去停泊,等停車位滿了時,剩下的車隻能等停車位上的車走了才能繼續上去停泊。

1 from threading import Thread, Lock, Semaphore
2 import time
3
4 def fn(num):
5 global sp
6 sp.acquire()
7 time.sleep(1)
8 print("--->", num, time.strftime("%X"))
9 sp.release()
10
11
12 sp = Semaphore(3)
13 for i in range(15):
14 prd = Thread(target=fn, args=(i,))
15 prd.start()
16
17
18 # 輸出結果
19 ---> 0 14:03:29
20 ---> 2 14:03:29
21 ---> 1 14:03:29
22 ---> 3 14:03:30
23 ---> 4 14:03:30
24 ---> 5 14:03:30
25 ---> 6 14:03:31
26 ---> 7 14:03:31
27 ---> 8 14:03:31
28 ---> 9 14:03:32
29 ---> 10 14:03:32
30 ---> 11 14:03:32
31 ---> 12 14:03:33
32 ---> 13 14:03:33
33 ---> 14 14:03:33
Semaphore
這個例子可以看到信号量為3,每一秒最多執行三個線程
我們知道多線程操作全局變量的時候是不安全的,那麼線程隊列則沒有這個問題。和程序隊列一樣的方法
線上程中隊列有三種方式,① 先進先出 queue.Queue ② 後進先出 queue.LifoQueue (其實這個應該叫"棧") ③ 優先級隊列 queue.PriorityQueue 這個隊列 put一個元組,元組第一個元素為優先級,第二個元素為值,優先級數值越小優先級越高,越先出來

1 >>> import queue
2 >>> q = queue.Queue()
3 >>> q.put(1)
4 >>> q.put("a")
5 >>> q.put("{}")
6 >>> q.get()
7 1
8 >>> q.get()
9 'a'
10 >>> q.get()
11 '{}'
queue.Queue

1 >>> import queue
2 >>> q = queue.LifoQueue()
3 >>> q.put(1)
4 >>> q.put("a")
5 >>> q.put("[]")
6 >>> q.get()
7 '[]'
8 >>> q.get()
9 'a'
10 >>> q.get()
11 1
queue.LifoQueue

1 >>> import queue
2 >>> q = queue.PriorityQueue()
3 >>> q.put((1, "a"))
4 >>> q.put((2, "{}"))
5 >>> q.put((3, "[]"))
6 >>> q.put((3, "[]"))
7 >>> import queue
8 >>> q = queue.PriorityQueue()
9 >>> q.put((1, "a"))
10 >>> q.put((3, "{}"))
11 >>> q.put((2, "[]"))
12 >>> q.get()
13 (1, 'a')
14 >>> q.get()
15 (2, '[]')
16 >>> q.get()
17 (3, '{}')
queue.PriorityQueue

1 from threading import Thread
2 import queue
3 import time
4
5
6 def producer():
7 global q
8 for i in range(100):
9 time.sleep(1)
10 q.put(i)
11 print("+++++ 1個包子")
12
13
14 def consumer(name):
15 global q
16 while True:
17 bz = q.get()
18 print("消費者:%s---得到1個包子%s, 時間:%s" % (name, bz, time.strftime("%X")))
19
20
21 q = queue.Queue(10) # 最多能添加10包子
22 prd = Thread(target=producer)
23 prd.start()
24 for i in range(3):
25 csm = Thread(target=consumer, args=(i,))
26 csm.start()
27
28
29 # 輸出結果
30 +++++ 1個包子
31 消費者:0---得到1個包子0, 時間:15:11:30
32 +++++ 1個包子
33 消費者:1---得到1個包子1, 時間:15:11:31
34 +++++ 1個包子
35 消費者:2---得到1個包子2, 時間:15:11:32
36 +++++ 1個包子
37 消費者:0---得到1個包子3, 時間:15:11:33
38 +++++ 1個包子
39 消費者:1---得到1個包子4, 時間:15:11:34
生産者與消費者
7、CPU密集型與IO密集型
因為CPython中GIL(Global Interpreter Lock全局解釋器鎖)的存在,每刻隻能有一個線程在cpu中運作,并不能真正的實作多線程的并行。有人可能會疑惑為什麼上邊的例子(傳送)确實提高了效率呢。
這得從并發的兩種類型說起,① CPU密集型,也稱計算密集型 ② IO密集型。cpu密集型需要大量的計算,占用cpu大量的時間,cpu使用率高。IO密集型更多的是cpu在等待IO的操作,cpu的使用率并不高,當cpu遇到IO操作等待時讓出cpu給其它線程執行,是以在cpython中對于IO密集型還是有用的。
實際上我們遇到的大多的是IO密集型,如:檔案的讀寫,網絡的連結,伺服器等待使用者的通路..等
1)CPU密集型中,多線程,多程序,單線程的差別

1 from multiprocessing import Process
2
3
4 def fn():
5 s = 0
6 for i in range(100000000):
7 s += i
8
9
10 if __name__ == '__main__':
11 start_time = time.time()
12 p_l = []
13 for i in range(4): # 開啟4個程序
14 p = Process(target=fn)
15 p_l.append(p)
16 p.start()
17 for p in p_l:
18 p.join()
19 print("時間:", time.time() - start_time) # 列印消耗時間
20
21
22 # 輸出結果
23 時間: 15.544550895690918
多個程序分别計算一億累加,用時15.54s

1 from threading import Thread
2
3
4 def fn():
5 s = 0
6 for i in range(100000000):
7 s += i
8
9
10 if __name__ == '__main__':
11 start_time = time.time()
12 p_l = []
13 for i in range(4):
14 p = Thread(target=fn) # 開啟4個線程
15 p_l.append(p)
16 p.start()
17 for p in p_l:
18 p.join()
19 print("時間:", time.time() - start_time) # 列印消耗時間
20
21
22 # 輸出結果
23 時間: 26.204195976257324
多個線程分别計算一億累加,用時26.20s

1 def fn():
2 s = 0
3 for i in range(100000000):
4 s += i
5
6
7 if __name__ == '__main__':
8 start_time = time.time()
9 for i in range(4):
10 fn()
11 print("時間:", time.time() - start_time) # 列印消耗時間
12
13
14 # 輸出結果
15 時間: 25.838396787643433
單線程多次計算一億的累加,用時25.84s
通過對比可以發現在cpu密集型中使用多線程與單線程是差不多的,甚至單線程效率還高那麼一點點,那是因為多線程需要頻繁的切換計算是以浪費了一點時間,線程越多越明顯。是以cpu密集型使用多程序可以更好的提高效率
2)IO密集型中,多線程,多程序,單線程的差別,這裡通過time.sleep來模拟IO操作

1 from threading import Thread
2
3
4 def fn():
5 time.sleep(1) # 通過time.sleep來模拟IO操作
6
7
8 if __name__ == '__main__':
9 start_time = time.time()
10 t_l = []
11 for i in range(10):
12 t = Thread(target=fn)
13 t_l.append(t)
14 t.start()
15 for t in t_l:
16 t.join()
17 print("時間:", time.time() - start_time) # 列印消耗時間時間
18
19
20 # 輸出結果
21 時間: 1.0024397373199463
IO操作,多線程用時1.00s

1 from multiprocessing import Process
2
3
4 def fn():
5 time.sleep(1) # 通過time.sleep來模拟IO操作
6
7
8 if __name__ == '__main__':
9 start_time = time.time()
10 t_l = []
11 for i in range(10):
12 t = Process(target=fn)
13 t_l.append(t)
14 t.start()
15 for t in t_l:
16 t.join()
17 print("時間:", time.time() - start_time) # 列印消耗時間時間
18
19
20 # 輸出結果
21 時間: 1.552047872543335
IO操作,多程序用時1.55s

1 def fn():
2 time.sleep(1) # 通過time.sleep來模拟IO操作
3
4
5 if __name__ == '__main__':
6 start_time = time.time()
7 for i in range(10):
8 fn()
9 print("時間:", time.time() - start_time) # 列印消耗時間時間
10
11
12 # 輸出結果
13 時間: 10.00423789024353
IO操作,單線程用時10.00
很明顯的看到,在IO密集型中不管用多線程還是多程序都能提高效率,但多程序卻比多線程要耗時,這是因為多程序的開啟耗費的資源比多線程多得多。
concurrent.futures子產品提供了封裝好的線程池以及程序池,ThreadPoolExecutor線程池,ProcessPoolExecutor程序池,它兩之間有着同樣的接口方法。
ThreadPoolExecutor(max_workers=None) 建立線程池時需要指定線程池中線程的個數。
submit(*args, **kwargs) 不定長參數中的第一個參數為協程執行的方法fn,其餘的依次為 fn 的參數。請看下邊例子。
shutdown(wait=True) 這個相當于線程池中的 close + join ,調用這個方法後将不能再添加任務到線程池中。 wait=False,目前調用的線程不進行等待繼續向下執行。
result() 傳回結果,線程執行完 fn 函數的傳回值。會堵塞目前線程,直到調用result的指定線程傳回結果才往下執行。
add_done_callback(fn) 添加回調函數,當線程執行完後執行回調函數,并将線程傳遞給回調函數。

1 from concurrent.futures import ThreadPoolExecutor
2 import time
3
4
5 def fn(i):
6 # print("-->", i, time.strftime("%X"))
7 time.sleep(1)
8 print("--> %s %s" % (i, time.strftime("%X")))
9
10
11 t_p = ThreadPoolExecutor(max_workers=5)
12 for i in range(20):
13 t_p.submit(fn, i)
14
15
16 # 輸出結果
17 --> 4 10:01:05
18 --> 3 10:01:05
19 --> 0 10:01:05
20 --> 1 10:01:05
21 --> 2 10:01:05
22 --> 9 10:01:06
23 --> 7 10:01:06
24 --> 8 10:01:06
25 --> 5 10:01:06
26 --> 6 10:01:06
27 --> 10 10:01:07
28 --> 14 10:01:07
29 --> 12 10:01:07
30 --> 13 10:01:07
31 --> 11 10:01:07
32 --> 15 10:01:08
33 --> 16 10:01:08
34 --> 19 10:01:08
35 --> 18 10:01:08
36 --> 17 10:01:08
ThreadPoolExecutor,線程池

1 from concurrent.futures import ProcessPoolExecutor
2 import time
3
4
5 def fn(i):
6 # print("-->", i, time.strftime("%X"))
7 time.sleep(1)
8 print("--> %s %s" % (i, time.strftime("%X")))
9
10
11 if __name__ == "__main__":
12 t_p = ProcessPoolExecutor(max_workers=5)
13 for i in range(20):
14 t_p.submit(fn, i)
15
16
17 # 輸出結果
18 --> 4 10:02:05
19 --> 3 10:02:05
20 --> 0 10:02:05
21 --> 1 10:02:05
22 --> 2 10:02:05
23 --> 9 10:02:06
24 --> 7 10:02:06
25 --> 8 10:02:06
26 --> 5 10:02:06
27 --> 6 10:02:06
28 --> 10 10:02:07
29 --> 14 10:02:07
30 --> 12 10:02:07
31 --> 13 10:02:07
32 --> 11 10:02:07
33 --> 15 10:02:08
34 --> 16 10:02:08
35 --> 19 10:02:08
36 --> 18 10:02:08
37 --> 17 10:02:08
ProcessPoolExecutor,程序池

1 from concurrent.futures import ThreadPoolExecutor
2 import time
3
4
5 def fn(i):
6 # print("-->", i, time.strftime("%X"))
7 time.sleep(1)
8 print("--> %s %s" % (i, time.strftime("%X")))
9 return i * i
10
11
12 if __name__ == "__main__":
13 t_p = ThreadPoolExecutor(max_workers=5)
14 t_l = []
15 for i in range(10):
16 t = t_p.submit(fn, i)
17 t_l.append(t)
18 for t in t_l:
19 print("==> %d" % t.result())
20 print("+++++++++++")
21
22
23 # 輸出結果
24 --> 1 10:18:11
25 --> 2 10:18:11
26 --> 0 10:18:11
27 ==> 0
28 ==> 1
29 ==> 4
30 --> 3 10:18:11
31 ==> 9
32 --> 4 10:18:11
33 ==> 16
34 --> 5 10:18:12
35 --> 6 10:18:12
36 --> 7 10:18:12
37 ==> 25
38 ==> 36
39 ==> 49
40 --> 9 10:18:12
41 --> 8 10:18:12
42 ==> 64
43 ==> 81
44 +++++++++++
result(),擷取傳回結果

1 from concurrent.futures import ThreadPoolExecutor
2 import time
3
4
5 def fn(i):
6 time.sleep(1)
7 print("--> %s %s" % (i, time.strftime("%X")))
8 return i * i
9
10
11 def call_back(t):
12 print("call_back--> %s" % t.result())
13
14
15 if __name__ == "__main__":
16 t_p = ThreadPoolExecutor(max_workers=2)
17 for i in range(4):
18 t = t_p.submit(fn, i)
19 t.add_done_callback(call_back)
20 print("+++++++++++")
21
22
23 # 輸出結果
24 +++++++++++
25 --> 0 10:31:03
26 call_back--> 0
27 --> 1 10:31:03
28 call_back--> 1
29 --> 2 10:31:04
30 call_back--> 4
31 --> 3 10:31:04
32 call_back--> 9
add_done_callback(),回調函數