2.创建消费者RetryConsumerDemo
@Componentpublic class RetryConsumerDemo {@Value("${rocketmq.name-server}")private String namesrvAddr;@Value("${rocketmq.consumer.topic}")private String topic;@Value("${rocketmq.consumer.group}")private String consumerGroup;private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");@PostConstructpublic void start() {try {consumer.setNamesrvAddr(namesrvAddr);//设置集群消费模式consumer.setMessageModel(MessageModel.CLUSTERING);//设置消费超时时间(分钟)consumer.setConsumeTimeout(1);//订阅主题consumer.subscribe(topic , "*");//注册消息监听器consumer.registerMessageListener(new MessageListenerConcurrentlyImpl());//最大重试次数consumer.setMaxReconsumeTimes(2);//启动消费端consumer.start();System.out.println("Retry Consumer Start...");} catch (MQClientException e) {e.printStackTrace();}}}
测试并发消费1.创建并发消费监听类 并发消费监听类要实现
MessageListenerConcurrently类
public class MessageListenerConcurrentlyImpl implements MessageListenerConcurrently {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {if (CollectionUtils.isEmpty(msgs)) {return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}MessageExt message = msgs.get(0);try {final LocalDateTime now = LocalDateTime.now();//逐条消费String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println("当前时间:"+now+",messageId: " + message.getMsgId() + ",topic: " +message.getTopic()+ ",messageBody: " + messageBody);//模拟消费失败if ("Concurrently_test".equals(messageBody)) {int a = 1 / 0;}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} catch (Exception e) {return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}}
2.注册监听类 在消费者类RetryConsumerDemo中注册监听类
//注册消息监听器consumer.registerMessageListener(new MessageListenerConcurrentlyImpl());
3.测试
@RunWith(SpringRunner.class)@SpringBootTest(classes = RocketmqApplication.class)class RocketmqApplicationTests {@Value("${rocketmq.consumer.topic}")private String topic;@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testpublic void testProducer(){String msg = "Concurrently_test";rocketMQTemplate.convertAndSend(topic , msg);}}
测试结果:
文章插图
后面重试时间太长就不做测试了,可以看到并发消费的消息时间都是按照上面那张时间间隔表来 。
然后通过RocketMq Dashboard Topic一栏可以看到有一个重试消费者组%RETRY%consumer_group,这个消费者组内存放的就是consumer_group消费者组消费失败重试的消息 。
文章插图
并发消费的重试次数是可以修改的,重试次数对应参数DefaultMQPushConsumer类的maxReconsumeTimes属性,maxReconsumeTimes默认是-1,也就是默认会重试16次;0代表不重试,只要失败就会放入死信队列;1-16重试次数对应着上面时间间隔表中对应次数 。配置的最大重试次数超过16就按16处理 。
并发消费状态并发消费有两个状态CONSUME_SUCCESS和RECONSUME_LATER 。返回CONSUME_SUCCESS代表着消费成功,返回RECONSUME_LATER代表进行消息重试 。
public enum ConsumeConcurrentlyStatus {/*** Success consumption*/CONSUME_SUCCESS,/*** Failure consumption,later try to consume*/RECONSUME_LATER;}
当MessageListenerConcurrently接口的consumeMessage方法返回ConsumeConcurrentlyStatus#RECONSUME_LATER、null或者方法抛异常了,都会进行消息重试 。当然还是推荐返回ConsumeConcurrentlyStatus#RECONSUME_LATER 。
测试顺序消费顺序消费和并行消费其实都差不多的,只不过顺序消费实现的是MessageListenerOrderly 接口
1.创建顺序消费监听类
public class MessageListenerOrderlyImpl implements MessageListenerOrderly {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {if (CollectionUtils.isEmpty(msgs)) {return ConsumeOrderlyStatus.SUCCESS;}MessageExt message = msgs.get(0);try {final LocalDateTime now = LocalDateTime.now();//逐条消费String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);System.out.println("当前时间:"+now+",messageId: " + message.getMsgId() + ",topic: " +message.getTopic()+ ",messageBody: " + messageBody);//模拟消费失败if ("Orderly_test".equals(messageBody)) {int a = 1 / 0;}return ConsumeOrderlyStatus.SUCCESS;} catch (Exception e) {return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}}}
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 消息的倒金字塔结构——金的结构和部首是什么?
- MQ-消息堆积-JDK Bug导致线程阻塞案例分析
- 陈奕迅|陈奕迅发布演唱会消息后,网友们的第一波儿反馈已经开始了
- 5G消息是什么,相比短信有哪些优势?
- 苹果微信消息延迟解决方法
- 蓝甲虫|好消息!《蓝甲虫》获得华纳支持,打造成绿灯侠与钢铁侠融合版
- 肖战|肖战《那片海》有官方消息,剧组为演员高调庆生,考虑抢先定档?
- 最新飞机空难消息?东航紧急迫降事件
- 微信怎么设置消息推送 微信推送怎么制作
- 带货|郭杜接个电话哭一个多小时?堂嫂故作玄虚说有好消息