这篇文章,我们聊聊如何应对 RocketMQ 消息堆积 。
文章插图
图片
1 基础概念消费者在消费的过程中,消费的速度跟不上服务端的发送速度,未处理的消息会越来越多,消息出现堆积进而会造成消息消费延迟 。
虽然笔者经常讲:RocketMQ 、Kafka 具备堆积的能力,但是以下场景需要重点关注消息堆积和延迟的问题:
- 业务系统上下游能力不匹配造成的持续堆积,且无法自行恢复 。
- 业务系统对消息的消费实时性要求较高,即使是短暂的堆积造成的消息延迟也无法接受 。
文章插图
图片
客户端使用 Push 模式 启动后,消费消息时,分为以下两个阶段:
- 阶段一:拉取消息客户端通过长轮询批量拉取的方式从 Broker 服务端获取消息 , 将拉取到的消息缓存到本地缓冲队列中 。客户端批量拉取消息,常见内网环境下都会有很高的吞吐量,例如:1个单线程单分区的低规格机器(4C8GB)可以达到几万 TPS , 如果是多个分区可以达到几十万 TPS。所以这一阶段一般不会成为消息堆积的瓶颈 。
- 阶段二:消费消息提交消费线程,客户端将本地缓存的消息提交到消费线程中,使用业务消费逻辑进行处理 。此时客户端的消费能力就完全依赖于业务逻辑的复杂度(消费耗时)和消费逻辑并发度了 。如果业务处理逻辑复杂,处理单条消息耗时都较长 , 则整体的消息吞吐量肯定不会高,此时就会导致客户端本地缓冲队列达到上限,停止从服务端拉取消息 。
想要避免和解决消息堆积问题,必须合理的控制消费耗时和消息并发度,其中消费耗时的优先级高于消费并发度,必须先保证消费耗时的合理性,再考虑消费并发度问题 。
3 消费瓶颈3.1 消费耗时影响消费耗时的消费逻辑主要分为 CPU 内存计算和外部 I/O 操作 , 通常情况下代码中如果没有复杂的递归和循环的话,内部计算耗时相对外部 I/O 操作来说几乎可以忽略 。
外部 I/O 操作通常包括如下业务逻辑:
- 读写外部数据库,例如 MySQL 数据库读写 。
- 读写外部缓存等系统,例如 redis 读写 。
- 下游系统调用,例如 Dubbo 调用或者下游 HTTP 接口调用 。
通常消费堆积都是由于这些下游系统出现了服务异常、容量限制导致的消费耗时增加 。
例如:某业务消费逻辑中需要调用下游 Dubbo 接口,单次消费耗时为 20 ms,平时消息量小未出现异常 。业务侧进行大促活动时,下游 Dubbo 服务未进行优化,消费单条消息的耗时增加到 200 ms,业务侧可以明显感受到消费速度大幅下跌 。此时,通过提升消费并行度并不能解决问题,需要大幅提高下游 Dubbo 服务性能才行 。
3.2 消费并发度绝大部分消息消费行为都属于 IO 密集型 , 即可能是操作数据库,或者调用 RPC,这类消费行为的消费速度在于后端数据库或者外系统的吞吐量 , 通过增加消费并行度,可以提高总的消费吞吐量 , 但是并行度增加到一定程度,反而会下降 。
所以,应用必须要设置合理的并行度 。如下有几种修改消费并行度的方法:
- 同一个 ConsumerGroup 下,通过增加 Consumer 实例数量来提高并行度(需要注意的是超过订阅队列数的 Consumer 实例无效) 。可以通过加机器,或者在已有机器启动多个进程的方式 。
- 提高单个 Consumer 实例的消费并行线程,通过修改参数 consumeThreadMin、consumeThreadMax 实现 。
4.1 确认消息的消费耗时是否合理首先 , 我们需要查看消费耗时,确认消息的消费耗时是否合理 。查看消费耗时一般来讲有两种方式:
推荐阅读
- JS问题:项目中如何区分使用防抖或节流?
- 如何设计更优雅的 React 组件?
- python是如何进行内存管理的
- 新手冬日护肤攻略,让你轻松应对寒风冰雪
- 如何提升家庭财运
- 如何看懂珠宝玉石的鉴定证书
- 熬夜后如何逆袭,肌肤焕发光彩?
- 积碳会给汽车造成什么影响?如何清除它?
- 迷你世界如何制作岩石砖,迷你世界刷岩石塔怎么做
- 如何开通抖音团购功能?开通后对销售有何影响?