如何精确控制 asyncio 中并发运行的多个任务( 二 )

我们看到抛异常了,其实 gather 函数的原理就是等待一组任务运行完毕,当某个任务完成时,就调用它的 result 方法,拿到返回值 。但我们之前介绍 Future 和 Task 的时候说过,如果出错了 , 调用 result 方法会将异常抛出来 。
import asyncioasync def normal_running():await asyncio.sleep(3)return "正常运行"async def raise_error():raise ValueError("出错啦")async def main():try:await asyncio.gather(normal_running(), raise_error())except Exception:print("执行时出现了异常")# 但是剩余的任务仍在执行,拿到当前的所有正在执行的任务all_tasks = asyncio.all_tasks()# task 相当于对协程做了一个封装,那么通过 get_coro 方法也可以拿到对应的协程print(f"当前剩余的任务:", [task.get_coro().__name__ for task in all_tasks])# 继续等待剩余的任务完成results = await asyncio.gather(*[task for task in all_tasks if task.get_coro().__name__ != "main"])print(results)loop = asyncio.get_event_loop()loop.run_until_complete(main())"""执行时出现了异常当前剩余的任务: ['main', 'normal_running']['正常运行']"""可以看到在 await asyncio.gather() 的时候,raise_error() 协程抛异常了 , 那么异常会向上传播,在 main() 里面 await 处产生 ValueError 。我们捕获之后查看剩余未完成的任务,显然只剩下 normal_running() 和 main() , 因为任务执行出现异常也代表它完成了 。
需要注意的是,一个任务出现了异常,并不影响剩余未完成的任务,它们仍在后台运行 。我们举个例子证明这一点:
import asyncio, timeasync def normal_running():await asyncio.sleep(5)return "正常运行"async def raise_error():await asyncio.sleep(3)raise ValueError("出错啦")async def main():try:await asyncio.gather(normal_running(), raise_error())except Exception:print("执行时出现了异常")# raise_error() 会在 3 秒后抛异常,然后向上抛,被这里捕获# 而 normal_running() 不会受到影响,它仍然在后台运行# 显然接下来它只需要再过 2 秒就能运行完毕time.sleep(2)# 注意:此处会阻塞整个线程# asyncio.sleep 是不耗费 CPU 的,因此即使 time.sleep 将整个线程阻塞了,也不影响# 因为执行 time.sleep 时,normal_running() 里面的 await asyncio.sleep(5) 已经开始执行了results = await asyncio.gather(*[task for task in asyncio.all_tasks()if task.get_coro().__name__ != "main"])print(results)loop = asyncio.get_event_loop()start = time.perf_counter()loop.run_until_complete(main())end = time.perf_counter()print("总耗时:", end - start)"""执行时出现了异常['正常运行']总耗时: 5.004949666"""这里耗时是 5 秒,说明一个任务抛异常不会影响其它任务,因为 time.sleep(2) 执行完毕之后,normal_running() 里面 asyncio.sleep(5) 也已经执行完毕了,说明异常捕获之后 , 剩余的任务没有受到影响 。
并且这里我们使用了 time.sleep , 在工作中千万不要这么做,因为它会阻塞整个线程,导致主线程无法再做其他事情了 。而这里之所以用 time.sleep,主要是想说明一个任务出错,那么将异常捕获之后,其它任务不会受到影响 。
那么问题来了,如果发生异常,我不希望它将异常向上抛该怎么办呢?可能有人觉得这还不简单,直接来一个异常捕获不就行了?这是一个解决办法 , 但 asyncio.gather 提供了一个参数,可以更优雅的实现这一点 。
import asyncioasync def normal_running():await asyncio.sleep(3)return "正常运行"async def raise_error():raise ValueError("出错啦")async def main():results = await asyncio.gather(normal_running(), raise_error(),return_exceptions=True)print(results)loop = asyncio.get_event_loop()loop.run_until_complete(main())"""['正常运行', ValueError('出错啦')]"""之前在介绍任务的时候我们说了,不管正常执行结束还是出错,都代表任务已完成 , 会将结果和异常都收集起来,只不过其中肯定有一个为 None 。然后根据不同的情况,选择是否将异常抛出来 。所以在 asyncio 里面 , 异常只是一个普通的属性,会保存在任务对象里面 。
对于 asyncio.gather 也是同理,它里面有一个 return_exceptions 参数,默认为 False , 当任务出现异常时,会抛给 await 所在的位置 。如果该参数设置为 True,那么出现异常时,会直接把异常本身返回(此时任务也算是结束了) 。
在 asyncio 里面,异常变成了一个可控的属性 。因为执行是以任务为单位的 , 当出现异常时,也会作为任务的一个普通的属性 。我们可以选择将它抛出来,也可以选择隐式处理掉 。


推荐阅读