导航
1、Thread类
2、线程同步
3、threading.Condition
4、threading.Event
5、threading.Semaphore 信号量
6、queue模块,线程队列
7、cpu密集型与IO密集型
8、线程池
线程是cpu运行的最小单位,没有自己的内存空间,同一线程的多线程共享一个内存空间,同一线程下的多线程都可以访问全局变量,对全局变量进行操作时要注意同步问题。
threading模块是建立在 _thread 模块上的,对其进行封装,包含较多的功能。
threading.Thread(group=None, target=None, name=None, args=(), kwargs=None, daemon=None)
- group 线程组,预留的还没有什么用,应设置为None
- target 目的方法,线程要执行的函数入口,这是直接通过Thread创建多线程需要指定的函数入口;通过继承Thread类创建的多线程,target应设置为None
- name 线程名
- args/kwargs target函数的参数
- daemon 是否设置为守护线程
1)创建多线程的方式
创建多线程主要有两种方式,①通过Thread类直接创建子线程,并指定子线程需要执行的函数逻辑。②通过继承Thread类创建实例对象,重写run方法,开启子线程

1 from threading import Thread
2
3
4 def fn(i):
5 # 子线程逻辑
6 print("--子线程--", i)
7
8
9 if __name__ == "__main__":
10 t = Thread(target=fn, args=(100,))
11 t.start()
12 print("--主线程--")
13
14
15 # 输出结果
16 --子线程-- 100
17 --主线程--
Thread创建多线程

1 from threading import Thread
2
3
4 class MyThread(Thread):
5 def __init__(self, name):
6 super().__init__()
7 self.name = name
8
9 def run(self):
10 # 子线程逻辑
11 print("--子线程--", self.name)
12
13
14 if __name__ == "__main__":
15 t = MyThread("my_thread_01")
16 t.start()
17 print("--主线程--")
18
19
20 # 输出结果
21 --子线程-- my_thread_01
22 --主线程--
继承Thread创建多线程
2)下面来看一下使用多线程与不使用多线程的区别

1 from threading import Thread
2 import time
3
4
5 def fn(i):
6 # 子线程逻辑
7 print("+++", i)
8 time.sleep(1)
9
10
11 if __name__ == "__main__":
12 start_time = time.time()
13 t_l = []
14 for i in range(5):
15 t = Thread(target=fn, args=(i,))
16 t.start()
17 t_l.append(t)
18 for i in range(5):
19 t.join()
20 print("运行所需时间:", time.time() - start_time)
21
22
23 # 输出结果
24 +++ 0
25 +++ 1
26 +++ 2
27 +++ 3
28 +++ 4
29 运行所需时间: 1.0024588108062744
多线程简单例子

1 import time
2
3
4 def fn(i):
5 # 子线程逻辑
6 print("+++", i)
7 time.sleep(1)
8
9
10 if __name__ == "__main__":
11 start_time = time.time()
12 for i in range(5):
13 fn(i)
14 print("运行所需时间:", time.time() - start_time)
15
16
17 # 输出结果
18 +++ 0
19 +++ 1
20 +++ 2
21 +++ 3
22 +++ 4
23 运行所需时间: 5.0021538734436035
未使用多线程
可以看到同样的需要执行5次fn函数的逻辑,使用多线程的效率却比单线程的要高
3)daemon守护线程的设置
设置守护线程有有两种方法 ① t.daemon = True 直接设置属性 ② t.setDaemon(True) 通过方法设置。
当子线程设置为守护线程是,主线程结束时程序将不再等待子线程是否执行完毕直接退出程序。如下边子线程设置为守护线程后,主线程结束了,子线程也将跟着结束,并未输出子线程的语句

1 from threading import Thread
2 import time
3
4
5 def fn(i):
6 # 子线程逻辑
7 time.sleep(1)
8 print("+++", i)
9
10
11 if __name__ == "__main__":
12 for i in range(5):
13 t = Thread(target=fn, args=(i,))
14 t.setDaemon(True)
15 t.start()
16 print("---end---")
17
18
19 # 输出结果
20 ---end---
daemon设置守护线程
4)join(timeout=None)
join方法堵塞当前上下文环境的线程,直到调用join方法的子线程执行结束后当前线程才继续执行。在哪里调用就在哪堵塞,最多堵塞timeout秒

1 from threading import Thread
2 import time
3
4
5 def fn():
6 # 子线程逻辑
7 time.sleep(5)
8
9
10 if __name__ == "__main__":
11 start_time = time.time()
12 t = Thread(target=fn)
13 t.start()
14 t.join()
15 print("所需时间-->", time.time() - start_time)
16
17
18 # 输出结果
19 所需时间--> 5.001126527786255
join方法
5)is_alive(),返回子线程存活状态
is_alive方法判断指定对象线程的存活状态。线程 start 后一直到该线程结束都返回True。线程 start 前 或线程已经结束返回False
当多个线程的同时对一个变量进行操作时便会存在着安全的隐患问题,
如下:有2个线程同时对全局变量进行 + 1,000,000 为什么结果却不是 2,000,000。这是因为当线程一执行完11行时还未开始执行12行,全局变量a没有进行 +1,就已经切换了线程二执行;线程二从10行开始执行,取到的值依旧是还没有 +1 的值,执行完12行,切换回线程一;线程一继续上次的断点执行12行,这时就相当于两次线程总共才完成了一次 +1 操作。多次循环如此,那么全局变量最后的结果就不可能是2,000,000。这里为了看到明显的效果,我将 a += 1 变成了三条语句来实现

1 from threading import Thread
2
3
4 a = 0
5
6
7 def fn():
8 global a
9 for i in range(1000000):
10 t = a
11 t += 1
12 a = t
13
14
15 if __name__ == "__main__":
16 t_l = []
17 for i in range(2):
18 t = Thread(target=fn)
19 t_l.append(t)
20 t.start()
21 for t in t_l:
22 t.join()
23 print("全局变量 a-->", a)
24
25
26 # 输出结果
27 全局变量 a--> 1332605
多线程对全局变量的修改

1 from threading import Thread
2
3 def fn():
4 for i in range(5):
5 print("1234")
6
7
8 for i in range(2):
9 t = Thread(target=fn)
10 t.start()
11
12
13 # 输出结果1234
14 1234
15 12341234
16
17 1234
18 1234
19 1234
20 12341234
21
22 1234
多线程同时对控制台打印
经过这个例子,就可以看出当多线程对共享的资源进行修改是就涉及到同步问题了,我们希望同一时刻只有一个线程t执行某段逻辑,其它线程必须等待这个线程t执行完才能执行。
1)threading.Lock()
Lock线程锁,在需要进行同步的语句块加上可以确保线程间的同步问题

1 from threading import Thread, Lock
2
3
4 def fn():
5 global a
6 for i in range(1000000):
7 lock.acquire()
8 t = a
9 t += 1
10 a = t
11 lock.release()
12
13
14 a = 0
15 t_l = []
16 lock = Lock()
17 for i in range(2):
18 t = Thread(target=fn)
19 t_l.append(t)
20 t.start()
21 for t in t_l:
22 t.join()
23 print("全局变量 a-->", a)
24
25
26 # 全局变量
27 全局变量 a--> 2000000
Lock后
2)死锁
关于死锁的概念我就不再叙述,就简单的说说下边的代码,同时开启两个线程,线程t1中执行到8行,线程t2执行到20行的时候,t1发现锁lock2没有被释放便会在这一直等待,t2发现锁lock1没有被释放也在这一直等待,这样谁也不会先释放锁,就会造成死锁的现象,程序一直卡着。

1 from threading import Thread, Lock
2 import time
3
4 def fn1():
5 lock1.acquire()
6 print("fn1-->lock1.acquire")
7 time.sleep(1) # 确保另一个线程已经执行fn2函数且lock2已经锁上
8 lock2.acquire()
9 print("fn1-->lock2.acquire")
10 lock2.release()
11 print("fn1-->lock2.release")
12 lock1.release()
13 print("fn1-->lock1.release")
14
15
16 def fn2():
17 lock2.acquire()
18 print("fn1-->lock2.acquire")
19 time.sleep(1) # 确保另一个线程已经执行fn1函数且lock1已经锁上
20 lock1.acquire()
21 print("fn1-->lock1.acquire")
22 lock1.release()
23 print("fn1-->lock1.release")
24 lock2.release()
25 print("fn1-->lock2.release")
26
27
28 lock1 = Lock()
29 lock2 = Lock()
30 t1 = Thread(target=fn1)
31 t2 = Thread(target=fn2)
32 t1.start()
33 t2.start()
34 t1.join()
35 t2.join()
36 print("----end----")
37
38
39 # 输出结果
40 fn1-->lock1.acquire
41 fn1-->lock2.acquire
死锁
- acquire() 获取锁
- release() 释放锁
- wait(timeout=None) 等待notify的通知,没有等到则在此堵塞,并释放锁cdt.release();有notify则继续往下执行,并释放锁。timeout:堵塞最长时间秒,默认一直堵塞。使用这个方法之前要已经acquire(),因为这个方法会释放锁
- notify(n=1) 通知一个正在 wait 堵塞 的线程,n:通知线程数,默认为1
- notify_all() 通知所有正在 wait 堵塞 的线程

1 from threading import Thread, Condition
2 import time
3
4
5 def producer():
6 global cdt
7 while True:
8 time.sleep(1)
9 cdt.acquire()
10 print("+++++ 1个包子")
11 cdt.notify()
12 cdt.release()
13
14
15 def consumer():
16 global cdt
17 while True:
18 cdt.acquire()
19 cdt.wait()
20 print("----- 1个包子, 时间:", time.strftime("%X"))
21 # cdt.release()
22
23
24 cdt = Condition()
25 prd = Thread(target=producer)
26 csm = Thread(target=consumer)
27 prd.start()
28 csm.start()
29 prd.join()
30 csm.join()
31 print("----end----")
32
33
34 # 输出结果
35 +++++ 1个包子
36 ----- 1个包子, 时间: 10:05:30
37 +++++ 1个包子
38 ----- 1个包子, 时间: 10:05:31
39 +++++ 1个包子
40 ----- 1个包子, 时间: 10:05:32
41 +++++ 1个包子
42 ----- 1个包子, 时间: 10:05:33
43 +++++ 1个包子
44 ----- 1个包子, 时间: 10:05:34
Condition,1个生产者与1个消费者

1 from threading import Thread, Condition
2 import time
3
4
5 def producer():
6 global cdt
7 while True:
8 time.sleep(1)
9 cdt.acquire()
10 print("+++++ 1个包子")
11 cdt.notify()
12 cdt.release()
13
14
15 def consumer(name):
16 global cdt
17 while True:
18 cdt.acquire()
19 cdt.wait()
20 print("消费者:%s----- 1个包子, 时间:%s" % (name, time.strftime("%X")))
21 # cdt.release()
22
23
24 cdt = Condition()
25 prd = Thread(target=producer)
26 prd.start()
27 for i in range(5):
28 csm = Thread(target=consumer, args=(i,))
29 csm.start()
30
31
32 # 输出结果
33 +++++ 1个包子
34 消费者:0----- 1个包子, 时间:10:7:07
35 +++++ 1个包子
36 消费者:1----- 1个包子, 时间:10:7:08
37 +++++ 1个包子
38 消费者:2----- 1个包子, 时间:10:7:09
39 +++++ 1个包子
40 消费者:3----- 1个包子, 时间:10:7:10
41 +++++ 1个包子
42 消费者:4----- 1个包子, 时间:10:7:11
43 +++++ 1个包子
44 消费者:0----- 1个包子, 时间:10:7:12
45 +++++ 1个包子
46 消费者:1----- 1个包子, 时间:10:7:13
47 +++++ 1个包子
48 消费者:2----- 1个包子, 时间:10:7:14
49 +++++ 1个包子
50 消费者:3----- 1个包子, 时间:10:7:15
51 +++++ 1个包子
52 消费者:4----- 1个包子, 时间:10:7:16
Condition,1个生产者与多个消费者
- set() 设置flag=True
- clear() 清除flag,即flag=False
- wait(timeout=None) 堵塞线程等待set()后继续执行,timeout:堵塞最长时间秒,默认一直堵塞。
Event内部定义了一个flag,当flag=Flase时,wait()将会一直堵塞线程;flag=True时,wait()不会堵塞线程,继续向下执行,这时的wait()相当与pass语句

1 from threading import Thread, Event
2 import time
3
4
5 def producer():
6 global eve
7 while True:
8 time.sleep(1)
9 print("+++++ 1个包子")
10 eve.set()
11
12
13 def consumer():
14 global eve
15 while True:
16 eve.wait()
17 print("----- 1个包子, 时间:%s" % (time.strftime("%X")))
18 eve.clear()
19
20
21 eve = Event()
22 prd = Thread(target=producer)
23 csm = Thread(target=consumer)
24 prd.start()
25 csm.start()
26
27
28 # 输出结果
29 +++++ 1个包子
30 ----- 1个包子, 时间:10:12:00
31 +++++ 1个包子
32 ----- 1个包子, 时间:10:12:01
33 +++++ 1个包子
34 ----- 1个包子, 时间:10:12:02
35 +++++ 1个包子
36 ----- 1个包子, 时间:10:12:03
37 +++++ 1个包子
38 ----- 1个包子, 时间:10:12:04
Event,依旧是生产者消费者

1 from threading import Thread, Event
2 import time
3
4
5 def producer():
6 global eve
7 while True:
8 time.sleep(1)
9 print("+++++ 3个包子")
10 eve.set()
11
12
13 def consumer(name):
14 global eve
15 while True:
16 eve.wait() # 在此堵塞,直到eve.set()
17 print("消费者:%s----- 1个包子, 时间:%s" % (name, time.strftime("%X")))
18 eve.clear() # 将标识清除,需重新eve.set()
19
20
21 eve = Event()
22 prd = Thread(target=producer)
23 prd.start()
24 for i in range(5):
25 csm = Thread(target=consumer, args=(i,))
26 csm.start()
27
28
29 # 输出结果
30 +++++ 3个包子
31 消费者:2----- 1个包子, 时间:10:10:25
32 消费者:1----- 1个包子, 时间:10:10:25
33 消费者:0----- 1个包子, 时间:10:10:25
34 +++++ 3个包子
35 消费者:0----- 1个包子, 时间:10:10:26
36 消费者:2----- 1个包子, 时间:10:10:26
37 消费者:1----- 1个包子, 时间:10:10:26
38 +++++ 3个包子
39 消费者:0----- 1个包子, 时间:10:10:27
40 消费者:2----- 1个包子, 时间:10:10:27
41 消费者:1----- 1个包子, 时间:10:10:27
Event,1个生产者与多个消费者
- Semaphore(value=1) 默认最大并发数为1
- acquire(blocking=True, timeout=None) 获取一个信号,内部计数器 -= 1,blocking当计数器为零时堵塞线程,timeout线程堵塞时间秒
- release() 释放一个信号,内部计数器 += 1
Semaphore内部管理着一个计数器,acquire时计数器减1,release时计数器加1,当计数器 =0 时,将在acquire语句处堵塞线程。
信号量简单来说,就是控制线程最大并发数的。将信号量比作一定数量的停车位,线程比作车,当有停车位空出来时车才能上去停泊,等停车位满了时,剩下的车只能等停车位上的车走了才能继续上去停泊。

1 from threading import Thread, Lock, Semaphore
2 import time
3
4 def fn(num):
5 global sp
6 sp.acquire()
7 time.sleep(1)
8 print("--->", num, time.strftime("%X"))
9 sp.release()
10
11
12 sp = Semaphore(3)
13 for i in range(15):
14 prd = Thread(target=fn, args=(i,))
15 prd.start()
16
17
18 # 输出结果
19 ---> 0 14:03:29
20 ---> 2 14:03:29
21 ---> 1 14:03:29
22 ---> 3 14:03:30
23 ---> 4 14:03:30
24 ---> 5 14:03:30
25 ---> 6 14:03:31
26 ---> 7 14:03:31
27 ---> 8 14:03:31
28 ---> 9 14:03:32
29 ---> 10 14:03:32
30 ---> 11 14:03:32
31 ---> 12 14:03:33
32 ---> 13 14:03:33
33 ---> 14 14:03:33
Semaphore
这个例子可以看到信号量为3,每一秒最多执行三个线程
我们知道多线程操作全局变量的时候是不安全的,那么线程队列则没有这个问题。和进程队列一样的方法
在线程中队列有三种方式,① 先进先出 queue.Queue ② 后进先出 queue.LifoQueue (其实这个应该叫"栈") ③ 优先级队列 queue.PriorityQueue 这个队列 put一个元组,元组第一个元素为优先级,第二个元素为值,优先级数值越小优先级越高,越先出来

1 >>> import queue
2 >>> q = queue.Queue()
3 >>> q.put(1)
4 >>> q.put("a")
5 >>> q.put("{}")
6 >>> q.get()
7 1
8 >>> q.get()
9 'a'
10 >>> q.get()
11 '{}'
queue.Queue

1 >>> import queue
2 >>> q = queue.LifoQueue()
3 >>> q.put(1)
4 >>> q.put("a")
5 >>> q.put("[]")
6 >>> q.get()
7 '[]'
8 >>> q.get()
9 'a'
10 >>> q.get()
11 1
queue.LifoQueue

1 >>> import queue
2 >>> q = queue.PriorityQueue()
3 >>> q.put((1, "a"))
4 >>> q.put((2, "{}"))
5 >>> q.put((3, "[]"))
6 >>> q.put((3, "[]"))
7 >>> import queue
8 >>> q = queue.PriorityQueue()
9 >>> q.put((1, "a"))
10 >>> q.put((3, "{}"))
11 >>> q.put((2, "[]"))
12 >>> q.get()
13 (1, 'a')
14 >>> q.get()
15 (2, '[]')
16 >>> q.get()
17 (3, '{}')
queue.PriorityQueue

1 from threading import Thread
2 import queue
3 import time
4
5
6 def producer():
7 global q
8 for i in range(100):
9 time.sleep(1)
10 q.put(i)
11 print("+++++ 1个包子")
12
13
14 def consumer(name):
15 global q
16 while True:
17 bz = q.get()
18 print("消费者:%s---得到1个包子%s, 时间:%s" % (name, bz, time.strftime("%X")))
19
20
21 q = queue.Queue(10) # 最多能添加10包子
22 prd = Thread(target=producer)
23 prd.start()
24 for i in range(3):
25 csm = Thread(target=consumer, args=(i,))
26 csm.start()
27
28
29 # 输出结果
30 +++++ 1个包子
31 消费者:0---得到1个包子0, 时间:15:11:30
32 +++++ 1个包子
33 消费者:1---得到1个包子1, 时间:15:11:31
34 +++++ 1个包子
35 消费者:2---得到1个包子2, 时间:15:11:32
36 +++++ 1个包子
37 消费者:0---得到1个包子3, 时间:15:11:33
38 +++++ 1个包子
39 消费者:1---得到1个包子4, 时间:15:11:34
生产者与消费者
7、CPU密集型与IO密集型
因为CPython中GIL(Global Interpreter Lock全局解释器锁)的存在,每刻只能有一个线程在cpu中运行,并不能真正的实现多线程的并行。有人可能会疑惑为什么上边的例子(传送)确实提高了效率呢。
这得从并发的两种类型说起,① CPU密集型,也称计算密集型 ② IO密集型。cpu密集型需要大量的计算,占用cpu大量的时间,cpu利用率高。IO密集型更多的是cpu在等待IO的操作,cpu的利用率并不高,当cpu遇到IO操作等待时让出cpu给其它线程执行,所以在cpython中对于IO密集型还是有用的。
实际上我们遇到的大多的是IO密集型,如:文件的读写,网络的链接,服务器等待用户的访问..等
1)CPU密集型中,多线程,多进程,单线程的区别

1 from multiprocessing import Process
2
3
4 def fn():
5 s = 0
6 for i in range(100000000):
7 s += i
8
9
10 if __name__ == '__main__':
11 start_time = time.time()
12 p_l = []
13 for i in range(4): # 开启4个进程
14 p = Process(target=fn)
15 p_l.append(p)
16 p.start()
17 for p in p_l:
18 p.join()
19 print("时间:", time.time() - start_time) # 打印消耗时间
20
21
22 # 输出结果
23 时间: 15.544550895690918
多个进程分别计算一亿累加,用时15.54s

1 from threading import Thread
2
3
4 def fn():
5 s = 0
6 for i in range(100000000):
7 s += i
8
9
10 if __name__ == '__main__':
11 start_time = time.time()
12 p_l = []
13 for i in range(4):
14 p = Thread(target=fn) # 开启4个线程
15 p_l.append(p)
16 p.start()
17 for p in p_l:
18 p.join()
19 print("时间:", time.time() - start_time) # 打印消耗时间
20
21
22 # 输出结果
23 时间: 26.204195976257324
多个线程分别计算一亿累加,用时26.20s

1 def fn():
2 s = 0
3 for i in range(100000000):
4 s += i
5
6
7 if __name__ == '__main__':
8 start_time = time.time()
9 for i in range(4):
10 fn()
11 print("时间:", time.time() - start_time) # 打印消耗时间
12
13
14 # 输出结果
15 时间: 25.838396787643433
单线程多次计算一亿的累加,用时25.84s
通过对比可以发现在cpu密集型中使用多线程与单线程是差不多的,甚至单线程效率还高那么一点点,那是因为多线程需要频繁的切换计算所以浪费了一点时间,线程越多越明显。所以cpu密集型使用多进程可以更好的提高效率
2)IO密集型中,多线程,多进程,单线程的区别,这里通过time.sleep来模拟IO操作

1 from threading import Thread
2
3
4 def fn():
5 time.sleep(1) # 通过time.sleep来模拟IO操作
6
7
8 if __name__ == '__main__':
9 start_time = time.time()
10 t_l = []
11 for i in range(10):
12 t = Thread(target=fn)
13 t_l.append(t)
14 t.start()
15 for t in t_l:
16 t.join()
17 print("时间:", time.time() - start_time) # 打印消耗时间时间
18
19
20 # 输出结果
21 时间: 1.0024397373199463
IO操作,多线程用时1.00s

1 from multiprocessing import Process
2
3
4 def fn():
5 time.sleep(1) # 通过time.sleep来模拟IO操作
6
7
8 if __name__ == '__main__':
9 start_time = time.time()
10 t_l = []
11 for i in range(10):
12 t = Process(target=fn)
13 t_l.append(t)
14 t.start()
15 for t in t_l:
16 t.join()
17 print("时间:", time.time() - start_time) # 打印消耗时间时间
18
19
20 # 输出结果
21 时间: 1.552047872543335
IO操作,多进程用时1.55s

1 def fn():
2 time.sleep(1) # 通过time.sleep来模拟IO操作
3
4
5 if __name__ == '__main__':
6 start_time = time.time()
7 for i in range(10):
8 fn()
9 print("时间:", time.time() - start_time) # 打印消耗时间时间
10
11
12 # 输出结果
13 时间: 10.00423789024353
IO操作,单线程用时10.00
很明显的看到,在IO密集型中不管用多线程还是多进程都能提高效率,但多进程却比多线程要耗时,这是因为多进程的开启耗费的资源比多线程多得多。
concurrent.futures模块提供了封装好的线程池以及进程池,ThreadPoolExecutor线程池,ProcessPoolExecutor进程池,它两之间有着同样的接口方法。
ThreadPoolExecutor(max_workers=None) 创建线程池时需要指定线程池中线程的个数。
submit(*args, **kwargs) 不定长参数中的第一个参数为协程执行的方法fn,其余的依次为 fn 的参数。请看下边例子。
shutdown(wait=True) 这个相当于线程池中的 close + join ,调用这个方法后将不能再添加任务到线程池中。 wait=False,当前调用的线程不进行等待继续向下执行。
result() 返回结果,线程执行完 fn 函数的返回值。会堵塞当前线程,直到调用result的指定线程返回结果才往下执行。
add_done_callback(fn) 添加回调函数,当线程执行完后执行回调函数,并将线程传递给回调函数。

1 from concurrent.futures import ThreadPoolExecutor
2 import time
3
4
5 def fn(i):
6 # print("-->", i, time.strftime("%X"))
7 time.sleep(1)
8 print("--> %s %s" % (i, time.strftime("%X")))
9
10
11 t_p = ThreadPoolExecutor(max_workers=5)
12 for i in range(20):
13 t_p.submit(fn, i)
14
15
16 # 输出结果
17 --> 4 10:01:05
18 --> 3 10:01:05
19 --> 0 10:01:05
20 --> 1 10:01:05
21 --> 2 10:01:05
22 --> 9 10:01:06
23 --> 7 10:01:06
24 --> 8 10:01:06
25 --> 5 10:01:06
26 --> 6 10:01:06
27 --> 10 10:01:07
28 --> 14 10:01:07
29 --> 12 10:01:07
30 --> 13 10:01:07
31 --> 11 10:01:07
32 --> 15 10:01:08
33 --> 16 10:01:08
34 --> 19 10:01:08
35 --> 18 10:01:08
36 --> 17 10:01:08
ThreadPoolExecutor,线程池

1 from concurrent.futures import ProcessPoolExecutor
2 import time
3
4
5 def fn(i):
6 # print("-->", i, time.strftime("%X"))
7 time.sleep(1)
8 print("--> %s %s" % (i, time.strftime("%X")))
9
10
11 if __name__ == "__main__":
12 t_p = ProcessPoolExecutor(max_workers=5)
13 for i in range(20):
14 t_p.submit(fn, i)
15
16
17 # 输出结果
18 --> 4 10:02:05
19 --> 3 10:02:05
20 --> 0 10:02:05
21 --> 1 10:02:05
22 --> 2 10:02:05
23 --> 9 10:02:06
24 --> 7 10:02:06
25 --> 8 10:02:06
26 --> 5 10:02:06
27 --> 6 10:02:06
28 --> 10 10:02:07
29 --> 14 10:02:07
30 --> 12 10:02:07
31 --> 13 10:02:07
32 --> 11 10:02:07
33 --> 15 10:02:08
34 --> 16 10:02:08
35 --> 19 10:02:08
36 --> 18 10:02:08
37 --> 17 10:02:08
ProcessPoolExecutor,进程池

1 from concurrent.futures import ThreadPoolExecutor
2 import time
3
4
5 def fn(i):
6 # print("-->", i, time.strftime("%X"))
7 time.sleep(1)
8 print("--> %s %s" % (i, time.strftime("%X")))
9 return i * i
10
11
12 if __name__ == "__main__":
13 t_p = ThreadPoolExecutor(max_workers=5)
14 t_l = []
15 for i in range(10):
16 t = t_p.submit(fn, i)
17 t_l.append(t)
18 for t in t_l:
19 print("==> %d" % t.result())
20 print("+++++++++++")
21
22
23 # 输出结果
24 --> 1 10:18:11
25 --> 2 10:18:11
26 --> 0 10:18:11
27 ==> 0
28 ==> 1
29 ==> 4
30 --> 3 10:18:11
31 ==> 9
32 --> 4 10:18:11
33 ==> 16
34 --> 5 10:18:12
35 --> 6 10:18:12
36 --> 7 10:18:12
37 ==> 25
38 ==> 36
39 ==> 49
40 --> 9 10:18:12
41 --> 8 10:18:12
42 ==> 64
43 ==> 81
44 +++++++++++
result(),获取返回结果

1 from concurrent.futures import ThreadPoolExecutor
2 import time
3
4
5 def fn(i):
6 time.sleep(1)
7 print("--> %s %s" % (i, time.strftime("%X")))
8 return i * i
9
10
11 def call_back(t):
12 print("call_back--> %s" % t.result())
13
14
15 if __name__ == "__main__":
16 t_p = ThreadPoolExecutor(max_workers=2)
17 for i in range(4):
18 t = t_p.submit(fn, i)
19 t.add_done_callback(call_back)
20 print("+++++++++++")
21
22
23 # 输出结果
24 +++++++++++
25 --> 0 10:31:03
26 call_back--> 0
27 --> 1 10:31:03
28 call_back--> 1
29 --> 2 10:31:04
30 call_back--> 4
31 --> 3 10:31:04
32 call_back--> 9
add_done_callback(),回调函数