python 中日志异步发送到远程服务器( 三 )

这种方法是可以达到不阻塞主目的,但是每打印一条日志就需要开启一个线程,也是挺浪费资源的 。我们也可以使用线程池来处理
3.2 使用线程池处理python 的 concurrent.futures 中有ThreadPoolExecutor, ProcessPoolExecutor类,是线程池和进程池,就是在初始化的时候先定义几个线程,之后让这些线程来处理相应的函数,这样不用每次都需要新创建线程
线程池的基本使用
exector = ThreadPoolExecutor(max_workers=1) # 初始化一个线程池,只有一个线程exector.submit(fn, args, kwargs) # 将函数submit到线程池中如果线程池中有n个线程,当提交的task数量大于n时,则多余的task将放到队列中.
再次修改上面的emit函数
exector = ThreadPoolExecutor(max_workers=1)def emit(self, record):    msg = self.format(record)    timeout = aiohttp.ClientTimeout(total=6)    if self.method == "GET":        if (self.url.find("?") >= 0):            sep = '&'        else:            sep = '?'        url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg}))        exector.submit(requests.get, url, timeout=6)    else:        headers = {            "Content-type": "application/x-www-form-urlencoded",            "Content-length": str(len(msg))        }        exector.submit(requests.post, self.url, data=https://www.isolves.com/it/cxkf/yy/Python/2020-09-25/{'log': msg}, headers=headers, timeout=6)这里为什么要只初始化一个只有一个线程的线程池? 因为这样的话可以保证先进队列里的日志会先被发送,如果池子中有多个线程,则不一定保证顺序了 。
3.3 使用异步aiohttp库来发送请求上面的CustomHandler类中的emit方法使用的是requests.post来发送日志,这个requests本身是阻塞运行的,也正上由于它的存在,才使得脚本卡了很长时间,所以我们可以将阻塞运行的requests库替换为异步的aiohttp来执行get和post方法, 重写一个CustomHandler中的emit方法
class CustomHandler(logging.Handler):    def __init__(self, host, uri, method="POST"):        logging.Handler.__init__(self)        self.url = "%s/%s" % (host, uri)        method = method.upper()        if method not in ["GET", "POST"]:            raise ValueError("method must be GET or POST")        self.method = method    async def emit(self, record):        msg = self.format(record)        timeout = aiohttp.ClientTimeout(total=6)        if self.method == "GET":            if (self.url.find("?") >= 0):                sep = '&'            else:                sep = '?'            url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg}))            async with aiohttp.ClientSession(timeout=timeout) as session:                async with session.get(self.url) as resp:                    print(await resp.text())        else:            headers = {                "Content-type": "application/x-www-form-urlencoded",                "Content-length": str(len(msg))            }            async with aiohttp.ClientSession(timeout=timeout, headers=headers) as session:                async with session.post(self.url, data=https://www.isolves.com/it/cxkf/yy/Python/2020-09-25/{'log': msg}) as resp: print(await resp.text())


推荐阅读