Python异步编程全攻略( 三 )


条件 Condition上面的事件虽然很棒,能够在不同的协程之间同步状态,并且也能够一次性同步所有的等待协程,但是还不够精细化,比如想通知指定数量的等待协程,这个时候Event就无能为力了,所以同步原语中出现了Condition 。
# test7.pyimport asynciofrom datetime import datetimeimport aiohttpcond = asyncio.Condition()async def waiter(id):async with cond:await cond.wait()print(f"{datetime.now()} waiter[{id}]等待完成")async def run(sess: aiohttp.ClientSession, url: str, id: int):async with sess.get(url) as resp:ret = await resp.json()print("响应内容", ret)data = https://www.isolves.com/it/cxkf/yy/Python/2022-07-18/ret["msg"]async with cond:# cond.notify()# cond.notify_all()cond.notify(2)return dataasync def main(workers: int, url: str):for i in range(workers):asyncio.ensure_future(waiter(i))async with aiohttp.ClientSession() as sess:futures = []for i in range(workers):futures.append(asyncio.ensure_future(run(sess, url, i)))await asyncio.wait(futures)# 等到其他pending可马上运行完成的任务运行结束await asyncio.sleep(0)if __name__ == "__main__":loop = asyncio.get_event_loop()start = datetime.now()loop.run_until_complete(main(3, "http://127.0.0.1:8000/random"))end = datetime.now()print("耗时:", end - start)输出如下:
$ python test7.py响应内容 {'msg': 0.587516452693613}2022-07-11 10:26:13.482781 waiter[0]等待完成2022-07-11 10:26:13.483778 waiter[1]等待完成响应内容 {'msg': 0.3391774763719556}响应内容 {'msg': 0.2653464378663153}2022-07-11 10:26:13.484771 waiter[2]等待完成耗时: 0:00:01.013655可以看到,前面两个等待的协程是在同一时刻完成,而不是全部等待完成 。
信号量 Semaphore通过创建协程的数量来控制并发并不是非常优雅的方式,所以可以通过信号量的方式来控制并发 。
# test8.pyimport asynciofrom datetime import datetimeimport aiohttpsemp = asyncio.Semaphore(2)async def run(sess: aiohttp.ClientSession, url: str, id: int):async with semp:async with sess.get(url) as resp:ret = await resp.json()print(f"{datetime.now()} worker[{id}] 响应内容", ret)data = https://www.isolves.com/it/cxkf/yy/Python/2022-07-18/ret["msg"]return dataasync def main(workers: int, url: str):async with aiohttp.ClientSession() as sess:futures = []for i in range(workers):futures.append(asyncio.ensure_future(run(sess, url, i)))await asyncio.wait(futures)# 等到其他pending可马上运行完成的任务运行结束await asyncio.sleep(0)if __name__ == "__main__":loop = asyncio.get_event_loop()start = datetime.now()loop.run_until_complete(main(3, "http://127.0.0.1:8000/random"))end = datetime.now()print("耗时:", end - start)输出如下:
$ python test8.py2022-07-11 10:30:40.634801 worker[0] 响应内容 {'msg': 0.21337652123021056}2022-07-11 10:30:40.634801 worker[1] 响应内容 {'msg': 0.7591980200967501}2022-07-11 10:30:41.636346 worker[2] 响应内容 {'msg': 0.8282581038608438}耗时: 0:00:02.011661可以发现,虽然同时创建了三个协程,但是同一时刻只有两个协程工作,而另外一个协程需要等待一个协程让出信号量才能运行 。
小结无论是协程还是线程,任务之间的状态同步还是很重要的,所以有了应对各种同步机制的同步原语,因为要保证一个资源同一个时刻只能一个任务访问,所以引入了锁,又因为需要一个任务等待另一个任务,或者多个任务等待某个任务,因此引入了事件(Event),但是为了更精细的控制通知的程度,所以又引入了条件(Condition), 通过条件可以控制一次通知多少的任务 。
有时候的并发需求是通过一个变量控制并发任务的并发数而不是通过创建协程的数量来控制并发,所以引入了信号量(Semaphore),这样就可以在创建的协程数远远大于并发数的情况下让协程在指定的并发量情况下并发 。
兼容多线程,多进程不得不承认异步编程相比起同步编程的生态要小的很多,所以不可能完全异步编程,因此需要一种方式兼容 。
多线程是为了兼容同步得代码 。
多进程是为了利用CPU多核的能力 。
# test9.pyimport timefrom datetime import datetimeimport asynciofrom concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutorsemp = asyncio.Semaphore(2)def wait_io(id: int):# 为了简单起见,直接使用sleep模拟iotime.sleep(1)return f"threading({id}): done at {datetime.now()}"def more_cpu(id: int):sum(i * i for i in range(10 ** 7))return f"process({id}): done at {datetime.now()}"async def main(workers: int):loop = asyncio.get_event_loop()futures = []thread_pool = ThreadPoolExecutor(workers+1)process_pool = ProcessPoolExecutor(workers)ret = loop.run_in_executor(thread_pool, wait_io, 0, )for i in range(workers):futures.append(loop.run_in_executor(thread_pool, wait_io, i))for i in range(workers):futures.append(loop.run_in_executor(process_pool, more_cpu, i))print("n".join(await asyncio.gather(*futures)))if __name__ == "__main__":loop = asyncio.get_event_loop()start = datetime.now()loop.run_until_complete(main(3))end = datetime.now()print("耗时:", end - start)


推荐阅读