1、打印日志
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {try {for (MessageExt messageExt : msgs) {long start = System.currentTimeMillis();// TODO 业务逻辑logger.info("MessageId:" + messageExt.getMsgId() + " costTime:" + (System.currentTimeMillis() - start));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} catch (Exception e) {logger.error("consumeMessage error:", e);return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
文章插图
图片
当确定好消费耗时后,可以根据耗时大?。?采取不同的措施 。
- 若查看到消费耗时较长,则需要查看客户端 JVM 堆栈信息排查具体业务逻辑 , 并优化消费逻辑 。
- 若查看到消费耗时正常,则有可能是因为消费并发度不够导致消息堆积 , 需要逐步调大消费线程或扩容节点来解决 。
- 通过 jps -m 或者 ps -ef | grep JAVA 命令获取当前正在运行的 Java 程序 , 通过启动主类即可获得应用的进程 pid ;
- 通过 jstack pid > stack.log 命令获取线程的堆栈 。
- 执行以下命令,查看 ConsumeMessageThread 的信息。
cat stack.log | grep ConsumeMessageThread -A 10 --color
- 1.
- 示例1:空闲无堆积的堆栈 。消费空闲情况下消费线程都会处于 WAITING 状态等待从消费任务队里中获取消息 。
文章插图
图片
- 示例2:消费逻辑有抢锁休眠等待等情况 。消费线程阻塞在内部的一个睡眠等待上,导致消费缓慢 。
文章插图
图片
- 示例3:消费逻辑操作数据库等外部存储卡住 。消费线程阻塞在外部的 HTTP 调用上 , 导致消费缓慢 。
文章插图
图片
5 总结客户端使用 Push模式 启动后 , 消费消息时,分为以下两个阶段:拉取消息和消费消息 。
客户端消费原理可以看出,消息堆积的主要瓶颈在于本地客户端的消费能力,即消费耗时和消费并发度 。
首先分析消费耗时,然后根据耗时大小 , 采取不同的措施 。
- 若查看到消费耗时较长,则查看客户端堆栈信息排查具体业务逻辑,并优化消费逻辑 。
- 若查看到消费耗时正常,则有可能是因为消费并发度不够导致消息堆积,需要逐步调大消费线程或扩容节点来解决 。
阿里云官方文档:
https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-4-x-series/use-cases/message-accumulation-and-latency#concept-2004064
【如何应对 RocketMQ 消息堆积】
推荐阅读
- JS问题:项目中如何区分使用防抖或节流?
- 如何设计更优雅的 React 组件?
- python是如何进行内存管理的
- 新手冬日护肤攻略,让你轻松应对寒风冰雪
- 如何提升家庭财运
- 如何看懂珠宝玉石的鉴定证书
- 熬夜后如何逆袭,肌肤焕发光彩?
- 积碳会给汽车造成什么影响?如何清除它?
- 迷你世界如何制作岩石砖,迷你世界刷岩石塔怎么做
- 如何开通抖音团购功能?开通后对销售有何影响?