RocketMq 消息重试机制、死信队列( 二 )

2.创建消费者RetryConsumerDemo
@Componentpublic class RetryConsumerDemo {@Value("${rocketmq.name-server}")private String namesrvAddr;@Value("${rocketmq.consumer.topic}")private String topic;@Value("${rocketmq.consumer.group}")private String consumerGroup;private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");@PostConstructpublic void start() {try {consumer.setNamesrvAddr(namesrvAddr);//设置集群消费模式consumer.setMessageModel(MessageModel.CLUSTERING);//设置消费超时时间(分钟)consumer.setConsumeTimeout(1);//订阅主题consumer.subscribe(topic , "*");//注册消息监听器consumer.registerMessageListener(new MessageListenerConcurrentlyImpl());//最大重试次数consumer.setMaxReconsumeTimes(2);//启动消费端consumer.start();System.out.println("Retry Consumer Start...");} catch (MQClientException e) {e.printStackTrace();}}}测试并发消费1.创建并发消费监听类 并发消费监听类要实现
MessageListenerConcurrently类
public class MessageListenerConcurrentlyImpl implements MessageListenerConcurrently {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {if (CollectionUtils.isEmpty(msgs)) {return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}MessageExt message = msgs.get(0);try {final LocalDateTime now = LocalDateTime.now();//逐条消费String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println("当前时间:"+now+",messageId: " + message.getMsgId() + ",topic: " +message.getTopic()+ ",messageBody: " + messageBody);//模拟消费失败if ("Concurrently_test".equals(messageBody)) {int a = 1 / 0;}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} catch (Exception e) {return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}}2.注册监听类 在消费者类RetryConsumerDemo中注册监听类
//注册消息监听器consumer.registerMessageListener(new MessageListenerConcurrentlyImpl());3.测试
@RunWith(SpringRunner.class)@SpringBootTest(classes = RocketmqApplication.class)class RocketmqApplicationTests {@Value("${rocketmq.consumer.topic}")private String topic;@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testpublic void testProducer(){String msg = "Concurrently_test";rocketMQTemplate.convertAndSend(topic , msg);}}测试结果:

RocketMq 消息重试机制、死信队列

文章插图
 
后面重试时间太长就不做测试了,可以看到并发消费的消息时间都是按照上面那张时间间隔表来 。
然后通过RocketMq Dashboard Topic一栏可以看到有一个重试消费者组%RETRY%consumer_group,这个消费者组内存放的就是consumer_group消费者组消费失败重试的消息 。
RocketMq 消息重试机制、死信队列

文章插图
 
并发消费的重试次数是可以修改的,重试次数对应参数DefaultMQPushConsumer类的maxReconsumeTimes属性,maxReconsumeTimes默认是-1,也就是默认会重试16次;0代表不重试,只要失败就会放入死信队列;1-16重试次数对应着上面时间间隔表中对应次数 。配置的最大重试次数超过16就按16处理 。
并发消费状态并发消费有两个状态CONSUME_SUCCESS和RECONSUME_LATER 。返回CONSUME_SUCCESS代表着消费成功,返回RECONSUME_LATER代表进行消息重试 。
public enum ConsumeConcurrentlyStatus {/*** Success consumption*/CONSUME_SUCCESS,/*** Failure consumption,later try to consume*/RECONSUME_LATER;}
MessageListenerConcurrently接口的consumeMessage方法返回ConsumeConcurrentlyStatus#RECONSUME_LATER、null或者方法抛异常了,都会进行消息重试 。当然还是推荐返回ConsumeConcurrentlyStatus#RECONSUME_LATER 。
测试顺序消费顺序消费和并行消费其实都差不多的,只不过顺序消费实现的是MessageListenerOrderly 接口
1.创建顺序消费监听类
public class MessageListenerOrderlyImpl implements MessageListenerOrderly {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {if (CollectionUtils.isEmpty(msgs)) {return ConsumeOrderlyStatus.SUCCESS;}MessageExt message = msgs.get(0);try {final LocalDateTime now = LocalDateTime.now();//逐条消费String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println("当前时间:"+now+",messageId: " + message.getMsgId() + ",topic: " +message.getTopic()+ ",messageBody: " + messageBody);//模拟消费失败if ("Orderly_test".equals(messageBody)) {int a = 1 / 0;}return ConsumeOrderlyStatus.SUCCESS;} catch (Exception e) {return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}}}


推荐阅读