异步IO Python语法-多进程、多线程、协程( 二 )

 
asyncio.wait和asyncio.gather 1234567891011121314import threadingimport asyncioasync def myfun(index):print(f'[{index}]({threading.currentThread().name})')await asyncio.sleep(1)loop = asyncio.get_event_loop()tasks = [myfun(1), myfun(2)]loop.run_until_complete(asyncio.wait(tasks))#loop.run_until_complete(asyncio.gather(*tasks))loop.close() asyncio.gather 和asyncio.wait区别:
在内部wait()使用一个set保存它创建的Task实例 。因为set是无序的所以这也就是我们的任务不是顺序执行的原因 。wait的返回值是一个元组,包括两个集合,分别表示已完成和未完成的任务 。wait第二个参数为一个超时值
达到这个超时时间后,未完成的任务状态变为pending,当程序退出时还有任务没有完成此时就会看到如下的错误提示 。
gather的使用
gather的作用和wait类似不同的是 。

  1. gather任务无法取消 。
  2. 返回值是一个结果列表
  3. 可以按照传入参数的 顺序,顺序输出 。

异步IO Python语法-多进程、多线程、协程

文章插图
 
协程和多线程结合同时多个请求 123456789101112131415161718192021222324import asyncioimport timefrom concurrent.futures import ThreadPoolExecutorimport requestsdef myquery(url):r = requests.get(url)print(r.text)return r.textif __name__ == "__main__":loop = asyncio.get_event_loop()executor = ThreadPoolExecutor(3)urls = ["https://www.psvmc.cn/userlist.json", "https://www.psvmc.cn/login.json"]tasks = []start_time = time.time()for url in urls:task = loop.run_in_executor(executor, myquery, url)tasks.append(task)loop.run_until_complete(asyncio.wait(tasks))print(f"用时{time.time() - start_time}") 结果
123{"code":0,"msg":"success","obj":{"name":"小明","sex":"男","token":"psvmc"}}{"code":0,"msg":"success","obj":[{"name":"小明","sex":"男"},{"name":"小红","sex":"女"},{"name":"小刚","sex":"未知"}]}用时0.11207175254821777  
单个请求添加回调 123456789101112131415161718192021222324252627282930【异步IO Python语法-多进程、多线程、协程】import asyncioimport threadingimport timefrom concurrent.futures import ThreadPoolExecutorimport requestsdef myquery(url):print(f"请求所在线程:{threading.current_thread().name}")r = requests.get(url)return r.textdef myfuture(future):print(f"回调所在线程:{threading.current_thread().name}")print(future.result())if __name__ == "__main__":loop = asyncio.get_event_loop()executor = ThreadPoolExecutor(3)url = "https://www.psvmc.cn/userlist.json"tasks = []start_time = time.time()task = loop.run_in_executor(executor, myquery, url)future = asyncio.ensure_future(task)future.add_done_callback(myfuture)loop.run_until_complete(future)print(f"用时{time.time() - start_time}")  
多线程与多进程多线程引用模块 12345678from threading import Threaddef func(num):return numt = Thread(target=func, args=(100,))t.start()t.join() 数据通信 12345import queueq = queue.Queue()q.put(1)item = q.get()12345from threading import Locklock = Lock()with lock:pass  
池化技术 12345678from concurrent.futures import ThreadPoolExecutorwith ThreadPoolExecutor() as executor:# 方法1results = executor.map(func, [1, 2, 3])# 方法2future = executor.submit(func, 1)result = future.result() 示例 12345678910111213141516171819202122232425from concurrent.futures import ThreadPoolExecutorimport threadingimport time# 定义一个准备作为线程任务的函数def action(num):print(threading.current_thread().name)time.sleep(num)return num + 100if __name__ == "__main__":# 创建一个包含3条线程的线程池with ThreadPoolExecutor(max_workers=3) as pool:future1 = pool.submit(action, 3)future1.result()print(f"单个任务返回:{future1.result()}")print('------------------------------')# 使用线程执行map计算results = pool.map(action, (1, 3, 5))for r in results:print(f"多个任务返回:{r}")


推荐阅读