深扒RocketMQ源码之后,我找出了RocketMQ消息重复消费的7种原因( 二 )


但是坑不在这 , 而是前面提到的消费时传入的整个集合中的消息都需要被重新消费 。
具体的原因我们接着往下看
当消息处理之后 , 不论是成功还是异常 , 都需要对结果进行处理 , 代码如下

深扒RocketMQ源码之后,我找出了RocketMQ消息重复消费的7种原因

文章插图
 
当处理结果为RECONSUME_LATER的时候(异常会设置为RECONSUME_LATER) , 此时ackIndex会设置成-1 , 后面循环遍历的时候就会遍历到所有这次消费的消息 , 然后调用sendMessageBack方法 , sendMessageBack方式是用来实现消息重新消费的逻辑 , 这里就不展开说了 。
所以 , 一旦被消费的一批消息中出现一个消费异常的情况 , 那么就会导致整批消息被重新消费 , 从而会导致在出现异常之前的成功处理的消息都会被重复消费 , 非常坑 。
不过好在消费时传入的消息集合中的消息数量是可以设置的 , 并且默认就是1
深扒RocketMQ源码之后,我找出了RocketMQ消息重复消费的7种原因

文章插图
 
也就说默认情况下那个集合中就一条消息 , 所以默认情况下不会出现消费成功的消息被重复消费的情况 。
所以这个参数不要轻易设置 , 一旦设置大了 , 就可能导致消息被重新消费 。
除了并发消费消息的模式以外 , RocketMQ还支持顺序消费消息的模式 , 也会造成重复消费 , 逻辑其实差不多 , 但是在实现消息重新消费的逻辑不一样 。
消费者提交offset失败
首先来讲一讲什么是offset 。
前面说过 , 消息在发送的时候需要指定发送到 , 消息最后会被放到Queue中 , 其实真正的消息不是在Queue中 , Queue存的是每个消息的位置 , 但是你可以理解为Queue存的是消息 。
而消息在Queue中是有序号的 , 这个序号就被称为offset , 从0开始 , 单调递增1 。
深扒RocketMQ源码之后,我找出了RocketMQ消息重复消费的7种原因

文章插图
 
比如说 , 如上图 , 消息1的offset就是0 , 消息2的offset就是1 , 依次类推 。
这个offset的一个作用就是用来管理消费者的消费进度 。
当消费者在成功消费消息之后 , 需要将所消费的消息的offset提交给RocketMQ服务端 , 告诉RocketMQ , 这个Queue的消息我已经消费到了这个位置了 。
提交offset的代码就在上述第二节提到的处理结果的后面
深扒RocketMQ源码之后,我找出了RocketMQ消息重复消费的7种原因

文章插图
 
这样有一个好处 , 那么一旦消费者重启了或者其它啥的要从这个Queue拉取消息的时候 , 此时他只需要问问RocketMQ服务端上次这个Queue消息消费到哪个位置了 , 之后消费者只需要从这个位置开始消费消息就行了 , 这样就解决了接着消费的问题 。
深扒RocketMQ源码之后,我找出了RocketMQ消息重复消费的7种原因

文章插图
 
但是RocketMQ在设计的时候 , 当消费完消息的时候并不是同步告诉RocketMQ服务端offset , 而是定时发送 。
深扒RocketMQ源码之后,我找出了RocketMQ消息重复消费的7种原因

文章插图
 
如图 , 当消费者消费完消息的时候 , 会将offset保存到内存中的一个Map数据结构中 , 所以上面截图的那段代码其实是更新内存中的offset
深扒RocketMQ源码之后,我找出了RocketMQ消息重复消费的7种原因

文章插图


推荐阅读