. 本文目录#
协程中的并发
协程中的嵌套
协程中的状态
gather与wait
. 协程中的并发#
协程的并发,和线程一样。举个例子来说,就好像 一个人同时吃三个馒头,咬了第一个馒头一口,就得等这口咽下去,才能去啃第其他两个馒头。就这样交替换着吃。
asyncio实现并发,就需要多个协程来完成任务,每当有任务阻塞的时候就await,然后其他协程继续工作。
第一步,当然是创建多个协程的列表。
#协程函数
async defdo_some_work(x):print('Waiting:', x)
await asyncio.sleep(x)return 'Done after {}s'.format(x)#协程对象
coroutine1 = do_some_work(1)
coroutine2= do_some_work(2)
coroutine3= do_some_work(4)#将协程转成task,并组成list
tasks =[
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
第二步,如何将这些协程注册到事件循环中呢。
有两种方法,至于这两种方法什么区别,稍后会介绍。
使用asyncio.wait()
loop =asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
使用asyncio.gather()
#千万注意,这里的 「*」 不能省略
loop =asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))
最后,return的结果,可以用task.result()查看。
for task intasks:print('Task ret:', task.result())
完整代码如下
importasyncio#协程函数
async defdo_some_work(x):print('Waiting:', x)
await asyncio.sleep(x)return 'Done after {}s'.format(x)#协程对象
coroutine1 = do_some_work(1)
coroutine2= do_some_work(2)
coroutine3= do_some_work(4)#将协程转成task,并组成list
tasks =[
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
loop=asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))for task intasks:print('Task ret:', task.result())
输出结果
Waiting: 1Waiting:2Waiting:4Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s
协程中的嵌套#
使用async可以定义协程,协程用于耗时的io操作,我们也可以封装更多的io操作过程,这样就实现了嵌套的协程,即一个协程中await了另外一个协程,如此连接起来。
来看个例子。
importasyncio#用于内部的协程函数
async defdo_some_work(x):print('Waiting:', x)
await asyncio.sleep(x)return 'Done after {}s'.format(x)#外部的协程函数
async defmain():#创建三个协程对象
coroutine1 = do_some_work(1)
coroutine2= do_some_work(2)
coroutine3= do_some_work(4)#将协程转为task,并组成list
tasks =[
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]#【重点】:await 一个task列表(协程)
#dones:表示已经完成的任务
#pendings:表示未完成的任务
dones, pendings =await asyncio.wait(tasks)for task indones:print('Task ret:', task.result())
loop=asyncio.get_event_loop()
loop.run_until_complete(main())
如果这边,使用的是asyncio.gather(),是这么用的
#注意这边返回结果,与await不一样
results= await asyncio.gather(*tasks)for result inresults:print('Task ret:', result)
输出还是一样的。
Waiting: 1Waiting:2Waiting:4Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s
仔细查看,可以发现这个例子完全是由 上面「协程中的并发」例子改编而来。结果完全一样。只是把创建协程对象,转换task任务,封装成在一个协程函数里而已。外部的协程,嵌套了一个内部的协程。
其实你如果去看下asyncio.await()的源码的话,你会发现下面这种写法
loop.run_until_complete(asyncio.wait(tasks))
看似没有嵌套,实际上内部也是嵌套的。
这里也把源码,贴出来,有兴趣可以看下,没兴趣,可以直接跳过。
#内部协程函数
async def_wait(fs, timeout, return_when, loop):assert fs, 'Set of Futures is empty.'waiter=loop.create_future()
timeout_handle=Noneif timeout is notNone:
timeout_handle=loop.call_later(timeout, _release_waiter, waiter)
counter=len(fs)def_on_completion(f):
nonlocal counter
counter-= 1
if (counter <= 0 orreturn_when== FIRST_COMPLETED orreturn_when== FIRST_EXCEPTION and (not f.cancelled() andf.exception()is notNone)):if timeout_handle is notNone:
timeout_handle.cancel()if notwaiter.done():
waiter.set_result(None)for f infs:
f.add_done_callback(_on_completion)try:
await waiterfinally:if timeout_handle is notNone:
timeout_handle.cancel()
done, pending=set(), set()for f infs:
f.remove_done_callback(_on_completion)iff.done():
done.add(f)else:
pending.add(f)returndone, pending#外部协程函数
async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):if futures.isfuture(fs) orcoroutines.iscoroutine(fs):raise TypeError(f"expect a list of futures, not {type(fs).__name__}")if notfs:raise ValueError('Set of coroutines/Futures is empty.')if return_when not in(FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):raise ValueError(f'Invalid return_when value: {return_when}')if loop isNone:
loop=events.get_event_loop()
fs= {ensure_future(f, loop=loop) for f inset(fs)}#【重点】:await一个内部协程
return await _wait(fs, timeout, return_when, loop)
. 协程中的状态#
还记得我们在讲生成器的时候,有提及过生成器的状态。同样,在协程这里,我们也了解一下协程(准确的说,应该是Future对象,或者Task任务)有哪些状态。
Pending:创建future,还未执行
Running:事件循环正在调用执行任务
Done:任务执行完毕
Cancelled:Task被取消后的状态
可手工 python3 xx.py 执行这段代码,
importasyncioimportthreadingimporttime
asyncdefhello():print("Running in the loop...")
flag=0while flag < 1000:
with open("F:\\test.txt", "a") as f:
f.write("------")
flag+= 1
print("Stop the loop")if __name__ == '__main__':
coroutine=hello()
loop=asyncio.get_event_loop()
task=loop.create_task(coroutine)#Pending:未执行状态
print(task)try:
t1= threading.Thread(target=loop.run_until_complete, args=(task,))#t1.daemon = True
t1.start()#Running:运行中状态
time.sleep(1)print(task)
t1.join()exceptKeyboardInterrupt as e:#取消任务
task.cancel()#Cacelled:取消任务
print(task)finally:print(task)
顺利执行的话,将会打印 Pending -> Pending:Runing -> Finished 的状态变化
假如,执行后 立马按下 Ctrl+C,则会触发task取消,就会打印 Pending -> Cancelling -> Cancelling 的状态变化。
. gather与wait#
还记得上面我说,把多个协程注册进一个事件循环中有两种方法吗?
使用asyncio.wait()
loop =asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
使用asyncio.gather()
#千万注意,这里的 「*」 不能省略
loop =asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))
asyncio.gather 和 asyncio.wait 在asyncio中用得的比较广泛,这里有必要好好研究下这两货。
还是照例用例子来说明,先定义一个协程函数
importasyncio
asyncdeffactorial(name, number):
f= 1
for i in range(2, number+1):print("Task %s: Compute factorial(%s)..." %(name, i))
await asyncio.sleep(1)
f*=iprint("Task %s: factorial(%s) = %s" % (name, number, f))
接收参数方式
asyncio.wait
接收的tasks,必须是一个list对象,这个list对象里,存放多个的task。
它可以这样,用asyncio.ensure_future转为task对象
tasks=[
asyncio.ensure_future(factorial("A", 2)),
asyncio.ensure_future(factorial("B", 3)),
asyncio.ensure_future(factorial("C", 4))
]
loop=asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
也可以这样,不转为task对象。
loop =asyncio.get_event_loop()
tasks=[
factorial("A", 2),
factorial("B", 3),
factorial("C", 4)
]
loop.run_until_complete(asyncio.wait(tasks))
asyncio.gather
接收的就比较广泛了,他可以接收list对象,但是 * 不能省略
tasks=[
asyncio.ensure_future(factorial("A", 2)),
asyncio.ensure_future(factorial("B", 3)),
asyncio.ensure_future(factorial("C", 4))
]
loop=asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))
还可以这样,和上面的 * 作用一致,这是因为asyncio.gather()的第一个参数是 *coros_or_futures,它叫 非命名键值可变长参数列表,可以集合所有没有命名的变量。
loop =asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4),
))
甚至还可以这样
loop =asyncio.get_event_loop()
group1= asyncio.gather(*[factorial("A" ,i) for i in range(1, 3)])
group2= asyncio.gather(*[factorial("B", i) for i in range(1, 5)])
group3= asyncio.gather(*[factorial("B", i) for i in range(1, 7)])
loop.run_until_complete(asyncio.gather(group1, group2, group3))
返回结果不同
asyncio.wait
asyncio.wait 返回dones和pendings
dones:表示已经完成的任务
pendings:表示未完成的任务
如果我们需要获取,运行结果,需要手工去收集获取。
dones, pendings =await asyncio.wait(tasks)for task indones:print('Task ret:', task.result())
asyncio.gather
asyncio.gather 它会把值直接返回给我们,不需要手工去收集。
results = await asyncio.gather(*tasks)for result inresults:print('Task ret:', result)
wait有控制功能
importasyncioimportrandom
asyncdefcoro(tag):
await asyncio.sleep(random.uniform(0.5, 5))
loop=asyncio.get_event_loop()
tasks= [coro(i) for i in range(1, 11)]#【控制运行任务数】:运行第一个任务就返回#FIRST_COMPLETED :第一个任务完全返回#FIRST_EXCEPTION:产生第一个异常返回#ALL_COMPLETED:所有任务完成返回 (默认选项)
dones, pendings =loop.run_until_complete(
asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))print("第一次完成的任务数:", len(dones))#【控制时间】:运行一秒后,就返回
dones2, pendings2 =loop.run_until_complete(
asyncio.wait(pendings, timeout=1))print("第二次完成的任务数:", len(dones2))#【默认】:所有任务完成后返回
dones3, pendings3 =loop.run_until_complete(asyncio.wait(pendings2))print("第三次完成的任务数:", len(dones3))
loop.close()
输出结果
第一次完成的任务数: 1第二次完成的任务数:4第三次完成的任务数:5