这里解决的方式是通过asyncio.wait方法等待一个协程列表,默认是等待所有协程结束后返回,会返回一个完成(done)列表,以及一个待办(pending)列表 。
如果我们不想要协程对象而是结果,那么我们可以使用asyncio.gather
# test4.pyimport asynciofrom datetime import datetimeimport aiohttpasync def run(sess: aiohttp.ClientSession, url: str, id: int):async with sess.get(url) as resp:print("响应内容", await resp.json())return idasync def main(workers: int, url: str):async with aiohttp.ClientSession() as sess:futures = []for i in range(workers):futures.append(asyncio.ensure_future(run(sess, url, i)))# 注意: 这里要讲列表解开rets = await asyncio.gather(*futures)print(rets)if __name__ == "__main__":loop = asyncio.get_event_loop()start = datetime.now()loop.run_until_complete(main(3, "http://127.0.0.1:8000/"))end = datetime.now()print("耗时:", end - start)
结果输出如下:
$ python test4.py响应内容 {'msg': 'ok'}响应内容 {'msg': 'ok'}响应内容 {'msg': 'ok'}[0, 1, 2]耗时: 0:00:01.011840
小结通过asyncio.ensure_future我们就能创建一个协程,跟调用一个函数差别不大,为了等待所有任务完成之后退出,我们需要使用asyncio.wait等方法来等待,如果只想要协程输出的结果,我们可以使用asyncio.gather来获取结果 。
同步虽然前面能够随心所欲的创建协程,但是就像多线程一样,我们也需要处理协程之间的同步问题,为了保持语法及使用情况的一致,多线程中用到的同步功能,asyncio中基本也能找到, 并且用法基本一致,不一致的地方主要是需要用异步的关键字,比如async with/ await等
锁 lock通过锁让并发慢下来,让协程一个一个的运行 。
# test5.pyimport asynciofrom datetime import datetimeimport aiohttplock = asyncio.Lock()async def run(sess: aiohttp.ClientSession, url: str, id: int):async with lock:async with sess.get(url) as resp:print("响应内容", await resp.json())return idasync def main(workers: int, url: str):async with aiohttp.ClientSession() as sess:futures = []for i in range(workers):futures.append(asyncio.ensure_future(run(sess, url, i)))# 注意: 这里要讲列表解开rets = await asyncio.gather(*futures)print(rets)if __name__ == "__main__":loop = asyncio.get_event_loop()start = datetime.now()loop.run_until_complete(main(3, "http://127.0.0.1:8000/"))end = datetime.now()print("耗时:", end - start)
输出如下:
$ python test5.py响应内容 {'msg': 'ok'}响应内容 {'msg': 'ok'}响应内容 {'msg': 'ok'}[0, 1, 2]耗时: 0:00:03.007251
通过观察很容易发现,并发的速度因为锁而慢下来了,因为每次只有一个协程能获得锁,所以并发变成了串行 。
事件 event通过事件来通知特定的协程开始工作,假设有一个任务是根据http响应结果选择是否激活 。
# test6.pyimport asynciofrom datetime import datetimeimport aiohttpbig_event = asyncio.Event()small_event = asyncio.Event()async def big_waiter():await small_event.wait()print(f"{datetime.now()} big waiter 收到任务事件")async def small_waiter():await big_event.wait()print(f"{datetime.now()} small waiter 收到任务事件")async def run(sess: aiohttp.ClientSession, url: str, id: int):async with sess.get(url) as resp:ret = await resp.json()print("响应内容", ret)data = https://www.isolves.com/it/cxkf/yy/Python/2022-07-18/ret["msg"]if data > 0.5:big_event.set()else:small_event.set()return dataasync def main(workers: int, url: str):asyncio.ensure_future(big_waiter())asyncio.ensure_future(big_waiter())asyncio.ensure_future(small_waiter())asyncio.ensure_future(small_waiter())async with aiohttp.ClientSession() as sess:futures = []for i in range(workers):futures.append(asyncio.ensure_future(run(sess, url, i)))await asyncio.wait(futures)if not big_event.is_set():big_event.set()if not small_event.is_set():small_event.set()# 等到其他pending可马上运行完成的任务运行结束await asyncio.sleep(0)if __name__ == "__main__":loop = asyncio.get_event_loop()start = datetime.now()loop.run_until_complete(main(3, "http://127.0.0.1:8000/random"))end = datetime.now()print("耗时:", end - start)
输出如下:
响应内容 {'msg': 0.9879470259657458}2022-07-11 10:16:51.577579 small waiter 收到任务事件2022-07-11 10:16:51.577579 small waiter 收到任务事件响应内容 {'msg': 0.33312954919903903}2022-07-11 10:16:51.578574 big waiter 收到任务事件2022-07-11 10:16:51.578574 big waiter 收到任务事件响应内容 {'msg': 0.41934453838367824}耗时: 0:00:00.996697
可以看到事件(Event)等待者都是在得到响应内容之后输出,并且事件(Event)可以是多个协程同时等待 。
推荐阅读
- 一文搞懂响应式编程
- 手机Python编程神器——AidLearning
- 基于Python+vue的自动化运维、完全开源的云管理平台
- plc是什么?
- python接口自动化之MySQL数据连接
- 在线编程 IDE = 远程网络攻击?
- Python异步之aiohttp
- 在线编程 IDE 居然可被黑客用于发起远程网络攻击
- 这才是像样的C语言编程规范
- 用通俗易懂的多态世界观实例:理解python类的多态