RocketMq 消息重试机制、死信队列( 三 )

2.注册监听类
//最大重试次数consumer.setMaxReconsumeTimes(2);//顺序消费 重试时间间隔consumer.setSuspendCurrentQueueTimeMillis(2000);
SuspendCurrentQueueTimeMillis表示重试的时间间隔,默认是1s,这里修改成2s
3.测试
@RunWith(SpringRunner.class)@SpringBootTest(classes = RocketmqApplication.class)class RocketmqApplicationTests {@Value("${rocketmq.consumer.topic}")private String topic;@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testpublic void testProducer(){String msg = "Orderly_test";rocketMQTemplate.convertAndSend(topic , msg);}}测试结果:

RocketMq 消息重试机制、死信队列

文章插图
 
可以看到三条结果,第一条是第一次消费的,其余两条是隔了2s重试的 。重试2次之后这条数据就进入了死信队列 。
顺序消费状态顺序消费目前也是两个状态:SUCCESS和
SUSPEND_CURRENT_QUEUE_A_MOMENT 。SUSPEND_CURRENT_QUEUE_A_MOMENT意思是先暂停消费一下,过SuspendCurrentQueueTimeMillis时间间隔后再重试一下,而不是放到重试队列里 。
public enum ConsumeOrderlyStatus {/*** Success consumption*/SUCCESS,/*** Rollback consumption(only for binlog consumption)*/@DeprecatedROLLBACK,/*** Commit offset(only for binlog consumption)*/@DeprecatedCOMMIT,/*** Suspend current queue a moment*/SUSPEND_CURRENT_QUEUE_A_MOMENT;}测试死信队列并发消费和顺序消费达到了最大重试次数之后就会放到死信队列 。死信队列在一开始是不会被创建的,只有需要的时候才会被创建 。就拿上面测试结果来看,进入到的死信队列就是%DLQ%consumer_group,进入死信队列的消息要收到处理 。
RocketMq 消息重试机制、死信队列

文章插图
 
死信队列特性
  • 不会再被消费者正常消费 。
  • 一个死信队列对应一个分组,而不是对应单个消费者实例 。
  • 如果一个消费者组未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列 。
  • 一个死信队列包含了对应 分组产生的所有死信消息,不论该消息属于哪个 Topic 。
  • 有效期与正常消息相同,均为 3 天,3 天后会被自动删除 。因此,请在死信消息产生后的 3 天内及时处理
作者:索码理
链接:
https://juejin.cn/post/7151571345199874084
来源:稀土掘金




推荐阅读