但是坑不在这 , 而是前面提到的消费时传入的整个集合中的消息都需要被重新消费 。
具体的原因我们接着往下看
当消息处理之后 , 不论是成功还是异常 , 都需要对结果进行处理 , 代码如下
文章插图
当处理结果为RECONSUME_LATER的时候(异常会设置为RECONSUME_LATER) , 此时ackIndex会设置成-1 , 后面循环遍历的时候就会遍历到所有这次消费的消息 , 然后调用sendMessageBack方法 , sendMessageBack方式是用来实现消息重新消费的逻辑 , 这里就不展开说了 。
所以 , 一旦被消费的一批消息中出现一个消费异常的情况 , 那么就会导致整批消息被重新消费 , 从而会导致在出现异常之前的成功处理的消息都会被重复消费 , 非常坑 。
不过好在消费时传入的消息集合中的消息数量是可以设置的 , 并且默认就是1
文章插图
也就说默认情况下那个集合中就一条消息 , 所以默认情况下不会出现消费成功的消息被重复消费的情况 。
所以这个参数不要轻易设置 , 一旦设置大了 , 就可能导致消息被重新消费 。
除了并发消费消息的模式以外 , RocketMQ还支持顺序消费消息的模式 , 也会造成重复消费 , 逻辑其实差不多 , 但是在实现消息重新消费的逻辑不一样 。
消费者提交offset失败
首先来讲一讲什么是offset 。
前面说过 , 消息在发送的时候需要指定发送到 , 消息最后会被放到Queue中 , 其实真正的消息不是在Queue中 , Queue存的是每个消息的位置 , 但是你可以理解为Queue存的是消息 。
而消息在Queue中是有序号的 , 这个序号就被称为offset , 从0开始 , 单调递增1 。
文章插图
比如说 , 如上图 , 消息1的offset就是0 , 消息2的offset就是1 , 依次类推 。
这个offset的一个作用就是用来管理消费者的消费进度 。
当消费者在成功消费消息之后 , 需要将所消费的消息的offset提交给RocketMQ服务端 , 告诉RocketMQ , 这个Queue的消息我已经消费到了这个位置了 。
提交offset的代码就在上述第二节提到的处理结果的后面
文章插图
这样有一个好处 , 那么一旦消费者重启了或者其它啥的要从这个Queue拉取消息的时候 , 此时他只需要问问RocketMQ服务端上次这个Queue消息消费到哪个位置了 , 之后消费者只需要从这个位置开始消费消息就行了 , 这样就解决了接着消费的问题 。
文章插图
但是RocketMQ在设计的时候 , 当消费完消息的时候并不是同步告诉RocketMQ服务端offset , 而是定时发送 。
文章插图
如图 , 当消费者消费完消息的时候 , 会将offset保存到内存中的一个Map数据结构中 , 所以上面截图的那段代码其实是更新内存中的offset
文章插图
推荐阅读
- 一文看懂Java中的ThreadLocal源码和注意事项
- 一文看懂Redisson分布式锁的Watchdog机制源码实现
- SpringBoot整合RocketMQ,老鸟们都是这么玩的!
- 张钰|?深扒2003年“张钰事件”:20盘录像带,30个导演,1场桃色罗生门!
- ChatGPT 开源了第一款插件,都来学习一下源码吧!
- 周杰伦为什么娶了昆凌(天涯深扒昆凌)
- 学会这20个库,让你快速看懂 vue3 和 vite3 源码
- 迪丽热巴|孙红雷也移民了?深扒他的资产,为钱掉了名誉何必呢
- SpringBoot 与RabbitMQ、RocketMQ高可靠、高性能、分布式应用实践
- 头狼涨停战法通达信选股公式副图源码?有谁知道追涨停技术?