如何精确控制 asyncio 中并发运行的多个任务( 五 )

对于 asyncio.gather 而言,如果某个任务出现异常,那么异常会向上抛给 await 所在的位置 。如果不希望它抛,那么可以将 gather 里面的 return_exceptions 参数指定为 True,这样当出现异常时,会将异常返回 。
而 asyncio.wait 也是如此,如果任务出现异常了,那么会直接视为已完成,异常同样不会向上抛 。但是从程序开发的角度来讲,返回值可以不要,但异常不能不处理 。
所以当任务执行出错时,虽然异常不会向上抛,但 asyncio 会将它打印出来,于是就有了:Task exception was never retrieved 。意思就是该任务出现异常了 , 但你没有处理它 。
import asyncioasync def delay(seconds):await asyncio.sleep(seconds)if seconds == 3:raise ValueError("我出错了(second is 3)")return f"我睡了 {seconds} 秒"async def main():tasks = [asyncio.create_task(delay(seconds)) for seconds in range(1, 6)]# done 里面保存的都是已完成的任务done, pending = await asyncio.wait(tasks)print(f"已完成的任务数: {len(done)}")print(f"未完成的任务数: {len(pending)}")# 所以我们直接遍历 done 即可for done_task in done:# 这里不能使用 await done_task,因为当任务完成时 , 它就等价于 done_task.result()# 而任务出现异常时 , 调用 result() 是会将异常抛出来的,所以我们需要先检测异常是否为空exc = done_task.exception()if exc:print(exc)else:print(done_task.result())loop = asyncio.get_event_loop()loop.run_until_complete(main())"""已完成的任务数: 5未完成的任务数: 0我睡了 5 秒我睡了 2 秒我出错了(second is 3)我睡了 4 秒我睡了 1 秒"""这里调用 result 和 exception 有一个前提,就是任务必须处于已完成状态,否则会抛异常:InvalidStateError: Result is not ready. 。但对于我们当前是没有问题的,因为 done 里面的都是已完成的任务 。
这里能再次看到和 gather 的区别,gather 会帮你把返回值都取出来,放在一个列表中,并且顺序就是任务添加的顺序 。而 wait 返回的是集合 , 集合里面是任务 , 我们需要手动拿到返回值 。
某个完成出现异常时取消其它任务从目前来讲 , wait 的作用和 gather 没有太大的区别,都是等到任务全部结束再解除等待(出现异常也视作任务完成,并且其它任务不受影响) 。那如果我希望当有任务出现异常时,立即取消其它任务该怎么做呢?显然这就依赖 wait 函数里面的 return_when,它有三个可选值:

  • asyncio.ALL_COMPLETED:等待所有任务完成后返回;
  • asyncio.FIRST_COMPLETED:有一个任务完成就返回;
  • asyncio.FIRST_EXCEPTION:当有任务出现异常时返回;
显然为完成这个需求,我们应该将 return_when 指定为 FIRST_EXCEPTION 。
import asyncioasync def delay(seconds):await asyncio.sleep(seconds)if seconds == 3:raise ValueError("我出错了(second is 3)")return f"我睡了 {seconds} 秒"async def main():tasks = [asyncio.create_task(delay(seconds), name=f"睡了 {seconds} 秒的任务")for seconds in range(1, 6)]done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)print(f"已完成的任务数: {len(done)}")print(f"未完成的任务数: {len(pending)}")print("都有哪些任务完成了?")for t in done:print("" + t.get_name())print("还有哪些任务没完成?")for t in pending:print("" + t.get_name())loop = asyncio.get_event_loop()loop.run_until_complete(main())"""已完成的任务数: 3未完成的任务数: 2都有哪些任务完成了?睡了 2 秒的任务睡了 3 秒的任务睡了 1 秒的任务还有哪些任务没完成?睡了 4 秒的任务睡了 5 秒的任务"""当 delay(3) 失败时,显然 delay(1)、delay(2) 已完成 , 而 delay(4) 和 delay(5) 未完成 。此时集合 done 里面的就是已完成的任务,pending 里面则是未完成的任务 。
当 wait 返回时,未完成的任务仍在后台继续运行,如果我们希望将剩余未完成的任务取消掉,那么直接遍历 pending 集合即可 。
【如何精确控制 asyncio 中并发运行的多个任务】import asyncioasync def delay(seconds):await asyncio.sleep(seconds)if seconds == 3:raise ValueError("我出错了(second is 3)")print(f"我睡了 {seconds} 秒")async def main():tasks = [asyncio.create_task(delay(seconds))for seconds in range(1, 6)]done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)print(f"已完成的任务数: {len(done)}")print(f"未完成的任务数: {len(pending)}")# 此时未完成的任务仍然在后台运行,这时候我们可以将它们取消掉for t in pending:t.cancel()# 阻塞 3 秒await asyncio.sleep(3)loop = asyncio.get_event_loop()loop.run_until_complete(main())"""我睡了 1 秒我睡了 2 秒已完成的任务数: 3未完成的任务数: 2"""


推荐阅读