如何精确控制 asyncio 中并发运行的多个任务( 六 )

在 await asyncio.sleep(3) 的时候 , 剩余两个任务并没有输出,所以任务确实被取消了 。注:出现异常的任务会被挂在已完成集合里面,如果没有任务在执行时出现异常,那么效果等价于 ALL_COMPLETED 。
当任务完成时处理结果ALL_COMPLETED 和 FIRST_EXCEPTION 都有一个缺点,在任务成功且不抛出异常的情况下,必须等待所有任务完成 。对于之前的用例,这可能是可以接受的 , 但如果想要在某个协程成功完成后立即处理结果,那么现在的情况将不能满足我们的需求 。
虽然这个场景可使用 as_completed 实现,但 as_completed 的问题是没有简单的方法可以查看哪些任务还在运行 , 哪些任务已经完成 。因为遍历的时候,我们无法得知哪个任务先完成,所以 as_completed 无法完成我们的需求 。
好在 wait 函数的 return_when 参数可以接收 FIRST_COMPLETED 选项 , 表示只要有一个任务完成就立即返回,而返回的可以是执行出错的任务,也可以是成功运行的任务(任务失败也表示已完成) 。然后 , 我们可以取消其他正在运行的任务,或者让某些任务继续运行,具体取决于用例 。
import asyncioasync def delay(seconds):await asyncio.sleep(seconds)if seconds == 3:raise ValueError("我出错了(second is 3)")print(f"我睡了 {seconds} 秒")async def main():tasks = [asyncio.create_task(delay(seconds))for seconds in range(1, 6)]done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)print(f"已完成的任务数: {len(done)}")print(f"未完成的任务数: {len(pending)}")loop = asyncio.get_event_loop()loop.run_until_complete(main())"""我睡了 1 秒已完成的任务数: 1未完成的任务数: 4"""当 return_when 参数为 FIRST_COMPLETED 时,那么只要有一个任务完成就会立即返回,然后我们处理完成的任务即可 。至于剩余的任务,它们仍在后台运行 , 我们可以继续对其使用 wait 函数 。
import asyncioasync def delay(seconds):await asyncio.sleep(seconds)if seconds == 3:raise ValueError("我出错了(second is 3)")return f"我睡了 {seconds} 秒"async def main():tasks = [asyncio.create_task(delay(seconds))for seconds in range(1, 6)]while True:done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)for t in done:exc = t.exception()print(exc) if exc else print(t.result())if pending:# 还有未完成的任务,那么继续使用 waittasks = pendingelse:breakloop = asyncio.get_event_loop()loop.run_until_complete(main())"""我睡了 1 秒我睡了 2 秒我出错了(second is 3)我睡了 4 秒我睡了 5 秒"""整个行为和 as_completed 是一致的,但这种做法有一个好处,就是我们每一步都可以准确地知晓哪些任务已经完成,哪些任务仍然运行,并且也可以做到精确取消指定任务 。
处理超时除了允许对如何等待协程完成进行更细粒度的控制外,wait 还允许设置超时,以指定我们希望等待完成的时间 。要启用此功能,可将 timeout 参数设置为所需的最大秒数,如果超过了这个超时时间 , wait 将立即返回 done 和 pending 任务集 。
不过与目前所看到的 wait_for 和 as_completed 相比,超时在 wait 中的行为方式存在一些差异 。
1)协程不会被取消 。
当使用 wait_for 时,如果任务超时,则引发 TimeouError,并且任务也会自动取消 。但使用 wait 的情况并非如此,它的行为更接近我们在 as_completed 中看到的情况 。如果想因为超时而取消协程,必须显式地遍历任务并取消,否则它们仍在后台运行 。
2)不会引发超时错误 。
如果发生超时,则 wait 返回所有已完成的任务,以及在发生超时的时候仍处于运行状态的所有任务 。
import asyncioasync def delay(seconds):await asyncio.sleep(seconds)return f"我睡了 {seconds} 秒"async def main():tasks = [asyncio.create_task(delay(seconds))for seconds in range(1, 6)]done, pending = await asyncio.wait(tasks, timeout=3.1)print(f"已完成的任务数: {len(done)}")print(f"未完成的任务数: {len(pending)}")loop = asyncio.get_event_loop()loop.run_until_complete(main())"""已完成的任务数: 3未完成的任务数: 2"""wait 调用将在 3 秒后返回 done 和 pending 集合 , 在 done 集合中 , 会有三个已完成的任务 。而耗时 4 秒和 5 秒的任务,由于仍在运行,因此它们将出现在 pending 集合中 。我们可以继续等待它们完成并提取返回值,也可以将它们取消掉 。
需要注意:和之前一样 , pending 集合中的任务不会被取消,并且继续运行,尽管会超时 。对于要终止待处理任务的情况,我们需要显式地遍历 pending 集合并在每个任务上调用 cancel 。


推荐阅读