Python异步编程全攻略( 二 )

这里解决的方式是通过asyncio.wait方法等待一个协程列表,默认是等待所有协程结束后返回,会返回一个完成(done)列表,以及一个待办(pending)列表 。
如果我们不想要协程对象而是结果,那么我们可以使用asyncio.gather
# test4.pyimport asynciofrom datetime import datetimeimport aiohttpasync def run(sess: aiohttp.ClientSession, url: str, id: int):async with sess.get(url) as resp:print("响应内容", await resp.json())return idasync def main(workers: int, url: str):async with aiohttp.ClientSession() as sess:futures = []for i in range(workers):futures.append(asyncio.ensure_future(run(sess, url, i)))# 注意: 这里要讲列表解开rets = await asyncio.gather(*futures)print(rets)if __name__ == "__main__":loop = asyncio.get_event_loop()start = datetime.now()loop.run_until_complete(main(3, "http://127.0.0.1:8000/"))end = datetime.now()print("耗时:", end - start)结果输出如下:
$ python test4.py响应内容 {'msg': 'ok'}响应内容 {'msg': 'ok'}响应内容 {'msg': 'ok'}[0, 1, 2]耗时: 0:00:01.011840小结通过asyncio.ensure_future我们就能创建一个协程,跟调用一个函数差别不大,为了等待所有任务完成之后退出,我们需要使用asyncio.wait等方法来等待,如果只想要协程输出的结果,我们可以使用asyncio.gather来获取结果 。
同步虽然前面能够随心所欲的创建协程,但是就像多线程一样,我们也需要处理协程之间的同步问题,为了保持语法及使用情况的一致,多线程中用到的同步功能,asyncio中基本也能找到, 并且用法基本一致,不一致的地方主要是需要用异步的关键字,比如async with/ await等
锁 lock通过锁让并发慢下来,让协程一个一个的运行 。
# test5.pyimport asynciofrom datetime import datetimeimport aiohttplock = asyncio.Lock()async def run(sess: aiohttp.ClientSession, url: str, id: int):async with lock:async with sess.get(url) as resp:print("响应内容", await resp.json())return idasync def main(workers: int, url: str):async with aiohttp.ClientSession() as sess:futures = []for i in range(workers):futures.append(asyncio.ensure_future(run(sess, url, i)))# 注意: 这里要讲列表解开rets = await asyncio.gather(*futures)print(rets)if __name__ == "__main__":loop = asyncio.get_event_loop()start = datetime.now()loop.run_until_complete(main(3, "http://127.0.0.1:8000/"))end = datetime.now()print("耗时:", end - start)输出如下:
$ python test5.py响应内容 {'msg': 'ok'}响应内容 {'msg': 'ok'}响应内容 {'msg': 'ok'}[0, 1, 2]耗时: 0:00:03.007251通过观察很容易发现,并发的速度因为锁而慢下来了,因为每次只有一个协程能获得锁,所以并发变成了串行 。
事件 event通过事件来通知特定的协程开始工作,假设有一个任务是根据http响应结果选择是否激活 。
# test6.pyimport asynciofrom datetime import datetimeimport aiohttpbig_event = asyncio.Event()small_event = asyncio.Event()async def big_waiter():await small_event.wait()print(f"{datetime.now()} big waiter 收到任务事件")async def small_waiter():await big_event.wait()print(f"{datetime.now()} small waiter 收到任务事件")async def run(sess: aiohttp.ClientSession, url: str, id: int):async with sess.get(url) as resp:ret = await resp.json()print("响应内容", ret)data = https://www.isolves.com/it/cxkf/yy/Python/2022-07-18/ret["msg"]if data > 0.5:big_event.set()else:small_event.set()return dataasync def main(workers: int, url: str):asyncio.ensure_future(big_waiter())asyncio.ensure_future(big_waiter())asyncio.ensure_future(small_waiter())asyncio.ensure_future(small_waiter())async with aiohttp.ClientSession() as sess:futures = []for i in range(workers):futures.append(asyncio.ensure_future(run(sess, url, i)))await asyncio.wait(futures)if not big_event.is_set():big_event.set()if not small_event.is_set():small_event.set()# 等到其他pending可马上运行完成的任务运行结束await asyncio.sleep(0)if __name__ == "__main__":loop = asyncio.get_event_loop()start = datetime.now()loop.run_until_complete(main(3, "http://127.0.0.1:8000/random"))end = datetime.now()print("耗时:", end - start)输出如下:
响应内容 {'msg': 0.9879470259657458}2022-07-11 10:16:51.577579 small waiter 收到任务事件2022-07-11 10:16:51.577579 small waiter 收到任务事件响应内容 {'msg': 0.33312954919903903}2022-07-11 10:16:51.578574 big waiter 收到任务事件2022-07-11 10:16:51.578574 big waiter 收到任务事件响应内容 {'msg': 0.41934453838367824}耗时: 0:00:00.996697可以看到事件(Event)等待者都是在得到响应内容之后输出,并且事件(Event)可以是多个协程同时等待 。


推荐阅读