如何消息队列的数据积压问题

今天 , 就讲讲解决消息队列的数据积压的三个方案 。
 
1 概述最近生产环境的消息通知队列发生了大量的数据积压问题 , 从而影响到整个平台商户的交易无法正常进行 , 最后只能通过临时关闭交易量较大的商户来缓解消息队列积压的问题 , 经线上数据分析 , 我们的消息队列在面对交易突发洪峰的情况下无法快速的消费并处理队列中的数据 , 考虑到后续还会出现各种交易量突发状况 , 以下为针对消息队列(ActiveMQ)的优化过程 。
 
2 消息队列通信图

如何消息队列的数据积压问题

文章插图
 
3 问题定位与分析3.1 消息通知数据为什么会被积压?分析:平台中每个交易的发生可能会产生一到多条的消息通知数据 , 这些通知数据会通过消息队列(ActiveMQ)来中转消费并处理 , 那么在交易量突发洪峰的情况下会产生大量的消息通知数据 , 如果消息队列(ActiveMQ)的消费能力被阻塞的话会严重影响到数据的吞吐量 , 从而积压大量数据无法被快速处理!
3.2 配置了多个ActiveMQ的消费者为什么数据积压还是无法缓解?分析:经过分析消息队列的数据消费处理模块的代码 , 消息的消费处理是通过监听器SessionAwareMessageListener异步回调onMessage方法而接收消息的 , 但是在回调的方法onMessage上加了synchronized同步锁 , 问题就在这里 , 由于整个onMessage方法被锁 , 导致程序只能通过串行(一次只能消费一条数据)处理数据 , 而无法通过多线程并发处理数据 , 从而影响了整个队列的数据消费能力 。
public synchronized void onMessage(Message message, Session session)3.3 去掉synchronized同步锁会产生多线程并发的安全性问题吗?分析:首先多个消费者并发处理的数据是不同的 , 而且多个消费者线程并发回调onMessage方法的时候并未使用到共享的变量 , 全部在各自线程的方法栈中 , 所以理论上不会出现多线程并发产生的安全性问题 。
3.4 消息会被重复多次消费吗?
分析:
(1)通过分析ActiveMQ的消费者消息接收处理的源代码发现 , 一条消息是否已经消费是通过ack确认机制来保证的 , 如果是通过异步回调的方式接收消息的话 , 在onMessage回调函数返回之后会立即进行ack确认提交 , 那么只要保证onMessage函数内部不抛出异常 , 及需要内部捕获异常 , 那么消息就不会被重复消息 。
(2)因为我们的系统在接收到消息后会首先存入db中进行持久化 , 而且每条消息在存入数据库的时候都做了唯一性约束 , 那么即使有重复的消息也不会被正常处理 。
 
4 阶段一优化方案4.1 准备测试数据启动多个线程分别往MQ消息队列中发送数据 , 共发送15000个消息 , 然后启动消费者模块消费消息 , 设定每个消息处理耗时为10ms , 配置ActiveMQ的消费者数量为concurrency = 5-100
4.2 优化前性能测试
测试次数 是否并发处理 消息数量
queuePrefetch
consumers 耗时
1 否 15000 1000 15 151s2 否 15000 1000 16 151s3 否 15000 1000 15 151s优化前通过测试数据发现 , 虽然配置了concurrency = 5-100 (消费者动态伸缩) , 但是只有15个消费者在忙碌 , 而且消息都是串行化执行的 , 15000条消息共需要151s的时间 , 效率非常差 , ps:哈哈 , 不知道是哪位开发的大神加的同步锁!
注:queuePrefetch 为MQ的消费者一次从Queue中拉取的数量 , 默认为1000 , consumers为处理消息的消费者数量
4.3 优化后性能测试
4.3.1 取消同步锁取消在监听器的回调方法onMessage上的synchronized同步锁
4.3.2 取消同步锁后的性能测试
测试次数 是否并发处理 消息数量
queuePrefetch
consumers 耗时
1 是 15000 1000 14 13s2 是 15000 1000 15 13s3 是 15000 1000 15 13s通过以上数据发现取消同步锁 , 15000条消息只需要13s就可以处理完 , 相比之前快了近12倍 , 虽然速度提升了不少 , 但是发现配置了5-100的消费者 , 确只有15个消费者在忙碌 , 其他消费者都没有消息可处理 , 及造成了数据倾斜 , 那么接下来就要通过优化queuePrefetch 参数了 。


推荐阅读