如何精确控制 asyncio 中并发运行的多个任务

之前我们了解了如何创建多个任务来并发运行程序,方式是通过 asyncio.create_task 将协程包装成任务,如下所示:
import asyncio, timeasync def mAIn():task1 = asyncio.create_task(asyncio.sleep(3))task2 = asyncio.create_task(asyncio.sleep(3))task3 = asyncio.create_task(asyncio.sleep(3))await task1await task2await task3start = time.perf_counter()asyncio.run(main())end = time.perf_counter()print("总耗时:", end - start)"""总耗时: 3.003109625"""但这种代码编写方式只适用于简单情况,如果在同时发出数百、数千甚至更多 Web 请求的情况下 , 这种编写方式将变得冗长且混乱 。所以 asyncio 提供了许多便利的函数,支持我们一次性等待多个任务 。
等待一组任务全部完成一个被广泛用于等待一组任务的方式是使用 asyncio.gather,这个函数接收一系列的可等待对象 , 允许我们在一行代码中同时运行它们 。如果传入的 awaitable 对象是协程,gather 函数会自动将其包装成任务,以确保它们可以同时运行 。这意味着不必像之前那样,用 asyncio.create_task 单独包装,但即便如此 , 还是建议手动包装一下 。
asyncio.gather 同样返回一个 awaitable 对象,在 await 表达式中使用它时,它将暂停,直到传递给它的所有 awaitable 对象都完成为止 。一旦所有任务都完成 , asyncio.gather 将返回这些任务的结果所组成的列表 。
import asyncioimport timefrom aiohttp import ClientSessionasync def fetch_status(session: ClientSession, url: str):async with session.get(url) as resp:return resp.statusasync def main():async with ClientSession() as session:# 注意:requests 里面是 100 个协程# 传递给 asyncio.gather 之后会自动被包装成任务requests = [fetch_status(session, "http://www.baidu.com")for _ in range(100)]# 并发运行 100 个任务 , 并等待这些任务全部完成# 相比写 for 循环再单独 await,这种方式就简便多了status_codes = await asyncio.gather(*requests)print(f"{len(status_codes)} 个任务已全部完成")start = time.perf_counter()asyncio.run(main())end = time.perf_counter()print("总耗时:", end - start)"""100 个任务已全部完成总耗时: 0.552532458"""完成 100 个请求只需要 0.55 秒钟,由于网络问题,测试的结果可能不准确,但异步肯定比同步要快 。
另外传给 gather 的每个 awaitable 对象可能不是按照确定性顺序完成的 , 例如将协程 a 和 b 按顺序传递给 gather,但 b 可能会在 a 之前完成 。不过 gather 的一个很好的特性是,不管 awaitable 对象何时完成,都保证结果会按照传递它们的顺序返回 。
import asyncioimport timeasync def main():# asyncio.sleep 还可以接收一个 result 参数,作为 await 表达式的值tasks = [asyncio.sleep(second, result=f"我睡了 {second} 秒")for second in (5, 3, 4)]print(await asyncio.gather(*tasks))start = time.perf_counter()asyncio.run(main())end = time.perf_counter()print("总耗时:", end - start)"""['我睡了 5 秒', '我睡了 3 秒', '我睡了 4 秒']总耗时: 5.002968417"""然后 gather 还可以实现分组,什么意思呢?
import asyncioimport timeasync def main():gather1 = asyncio.gather(*[asyncio.sleep(second, result=f"我睡了 {second} 秒")for second in (5, 3, 4)])gather2 = asyncio.gather(*[asyncio.sleep(second, result=f"我睡了 {second} 秒")for second in (3, 3, 3)])results = await asyncio.gather(gather1, gather2, asyncio.sleep(6, "我睡了 6 秒"))print(results)start = time.perf_counter()asyncio.run(main())end = time.perf_counter()print("总耗时:", end - start)"""[['我睡了 5 秒', '我睡了 3 秒', '我睡了 4 秒'],['我睡了 3 秒', '我睡了 3 秒', '我睡了 3 秒'],'我睡了 6 秒']总耗时: 6.002826208"""asyncio.gather 里面可以通过继续接收 asyncio.gather 返回的对象,从而实现分组功能,还是比较强大的 。
如果 gather 里面啥都不传的话,那么会返回一个空列表 。
问题来了,在上面的例子中,我们假设所有请求都不会失败或抛出异常 , 这是理想情况 。但如果请求失败了呢?我们来看一下,当 gather 里面的任务出现异常时会发生什么?
import asyncioasync def normal_running():await asyncio.sleep(3)return "正常运行"async def raise_error():raise ValueError("出错啦")async def main():results = await asyncio.gather(normal_running(), raise_error())print(results)loop = asyncio.get_event_loop()loop.run_until_complete(main())"""Traceback (most recent call last):......raise ValueError("出错啦")ValueError: 出错啦"""


推荐阅读