如何应对 RocketMQ 消息堆积( 二 )


1、打印日志
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {try {for (MessageExt messageExt : msgs) {long start = System.currentTimeMillis();// TODO 业务逻辑logger.info("MessageId:" + messageExt.getMsgId() + " costTime:" + (System.currentTimeMillis() - start));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} catch (Exception e) {logger.error("consumeMessage error:", e);return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}

  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
2、查看消息轨迹
如何应对 RocketMQ 消息堆积

文章插图
图片
当确定好消费耗时后,可以根据耗时大?。?采取不同的措施 。
  • 若查看到消费耗时较长,则需要查看客户端 JVM 堆栈信息排查具体业务逻辑 , 并优化消费逻辑 。
  • 若查看到消费耗时正常,则有可能是因为消费并发度不够导致消息堆积 , 需要逐步调大消费线程或扩容节点来解决 。
4.2 查看客户端 JVM 的堆栈假如消费耗时非常高,需要查看 Consumer 实例 JVM 的堆栈。
  1. 通过 jps -m 或者 ps -ef | grep JAVA 命令获取当前正在运行的 Java 程序 , 通过启动主类即可获得应用的进程 pid ;
  2. 通过 jstack pid > stack.log 命令获取线程的堆栈 。
  3. 执行以下命令,查看 ConsumeMessageThread 的信息。
cat stack.log | grep ConsumeMessageThread -A 10 --color
  • 1.
常见的异常堆栈信息如下:
  • 示例1:空闲无堆积的堆栈  。消费空闲情况下消费线程都会处于 WAITING 状态等待从消费任务队里中获取消息 。

如何应对 RocketMQ 消息堆积

文章插图
图片
  • 示例2:消费逻辑有抢锁休眠等待等情况  。消费线程阻塞在内部的一个睡眠等待上,导致消费缓慢 。

如何应对 RocketMQ 消息堆积

文章插图
图片
  • 示例3:消费逻辑操作数据库等外部存储卡住  。消费线程阻塞在外部的 HTTP 调用上 , 导致消费缓慢 。

如何应对 RocketMQ 消息堆积

文章插图
图片
5 总结客户端使用 Push模式 启动后 , 消费消息时,分为以下两个阶段:拉取消息和消费消息 。
客户端消费原理可以看出,消息堆积的主要瓶颈在于本地客户端的消费能力,即消费耗时和消费并发度 。
首先分析消费耗时,然后根据耗时大小 , 采取不同的措施 。
  • 若查看到消费耗时较长,则查看客户端堆栈信息排查具体业务逻辑,并优化消费逻辑 。
  • 若查看到消费耗时正常,则有可能是因为消费并发度不够导致消息堆积,需要逐步调大消费线程或扩容节点来解决 。
参考文档:
阿里云官方文档:
https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-4-x-series/use-cases/message-accumulation-and-latency#concept-2004064

【如何应对 RocketMQ 消息堆积】


推荐阅读