Python异步编程全攻略

Python/ target=_blank class=infotextkey>Python异步编程全攻略如果你厌倦了多线程,不妨试试python的异步编程,再引入async, await关键字之后语法变得更加简洁和直观,又经过几年的生态发展,现在是一个很不错的并发模型 。
下面介绍一下python异步编程的方方面面 。

在python异步编程中,可能出现很多其他的对象,比如Future, Task, 后者继承自前者,但是为了统一,无论是Future还是Task,本文中统一称呼为协程 。
与多线程的比较因为GIL的存在,所以Python的多线程在CPU密集的任务下显得无力,但是对于IO密集的任务,多线程还是足以发挥多线程的优势的,而异步也是为了应对IO密集的任务,所以两者是一个可以相互替代的方案,因为设计的不同,理论上异步要比多线程快,因为异步的花销更少, 因为不需要额外系统申请额外的内存,而线程的创建跟系统有关,需要分配一定量的内存,一般是几兆,比如linux默认是8MB 。
虽然异步很好,比如可以使用更少的内存,比如更好地控制并发(也许你并不这么认为:)) 。但是由于async/await 语法的存在导致与之前的语法有些割裂,所以需要适配,需要付出额外的努力,再者就是生态远远没有同步编程强大,比如很多库还不支持异步,所以你需要一些额外的适配 。
用于测试的web服务为了不给其他网站带来困扰,这里首先在自己电脑启动web服务用于测试,代码很简单 。
# web.pyimport asynciofrom random import randomimport uvicornfrom fastapi import FastAPIApp = FastAPI()@app.get("/")async def index():await asyncio.sleep(1)return {"msg": "ok"}@app.get("/random")async def index():await asyncio.sleep(1)return {"msg": random()}if __name__ == "__main__":# uvicorn.run(app)# 如果需要热加载(reload), 需要传入一个字符串而不是application对象uvicorn.run("web:app", reload=True)import asyncioimport uvicornfrom fastapi import FastAPIapp = FastAPI()@app.get("/")async def index():await asyncio.sleep(1)return {"msg": "ok"}if __name__ == "__main__":uvicorn.run(app)本文所有依赖如下:
  • Python > 3.7+
  • fastapi
  • aiohttp
  • uvicorn
所有依赖可通过代码仓库的requirements.txt一次性安装 。
pip install requirements.txt并发,并发,并发首先看一个错误的例子
# test1.pyimport asynciofrom datetime import datetimeimport aiohttpasync def main(workers: int, url: str):async with aiohttp.ClientSession() as sess:for _ in range(workers):async with sess.get(url) as resp:print("响应内容", await resp.json())if __name__ == "__main__":loop = asyncio.get_event_loop()start = datetime.now()loop.run_until_complete(main(3, "http://127.0.0.1:8000/"))end = datetime.now()print("耗时:", end - start)输出如下:
$ python test1.py响应内容 {'msg': 'ok'}响应内容 {'msg': 'ok'}响应内容 {'msg': 'ok'}耗时: 0:00:03.011565发现花费了3秒,不符合预期呀 。。。。这是因为虽然用了协程,但是每个协程是串行的运行,也就是说后一个等前一个完成之后才开始,那么这样的异步代码并没有并发,所以我们需要让这些协程并行起来
# test2.pyimport asynciofrom datetime import datetimeimport aiohttpasync def run(sess: aiohttp.ClientSession, url: str):async with sess.get(url) as resp:print("响应内容", await resp.json())async def main(workers: int, url: str):async with aiohttp.ClientSession() as sess:for _ in range(workers):asyncio.ensure_future(run(sess, url))await asyncio.sleep(1.1)if __name__ == "__main__":loop = asyncio.get_event_loop()start = datetime.now()loop.run_until_complete(main(3, "http://127.0.0.1:8000/"))end = datetime.now()print("耗时:", end - start)为了让代码变动的不是太多,所以这里用了一个笨办法来等待所有任务完成, 之所以在main函数中等待是为了不让ClientSession关闭, 如果你移除了main函数中的等待代码会发现报告异常RuntimeError: Session is closed,而代码里的解决方案非常的不优雅,需要手动的等待,为了解决这个问题,我们再次改进代码 。
# test3.pyimport asynciofrom datetime import datetimeimport aiohttpasync def run(sess: aiohttp.ClientSession, url: str):async with sess.get(url) as resp:print("响应内容", await resp.json())async def main(workers: int, url: str):async with aiohttp.ClientSession() as sess:futures = []for _ in range(workers):futures.append(asyncio.ensure_future(run(sess, url)))done, pending = await asyncio.wait(futures)print(done, pending)if __name__ == "__main__":loop = asyncio.get_event_loop()start = datetime.now()loop.run_until_complete(main(3, "http://127.0.0.1:8000/"))end = datetime.now()print("耗时:", end - start)


推荐阅读