ScheduleMessageService启动之后,会根据延时队列的数目创建一一对应的
DeliverDelayedMessageTimerTask,然后周期执行 。该类继承自TimerTask,是JDK的工具类,用于执行定时任务,原理可以参考这篇文章 如何实现定时任务- JAVA Timer/TimerTask 源码原理解析
消息转投【好奇延时消息如何实现吗?来看RocketMQ是怎么做的-源码解读】可以看到
DeliverDelayedMessageTimerTask实现的 run 方法,主要逻辑都在executeOnTimeup方法中,从对应的延迟队列中取出时间已到的 message,发送到 message 对应原始topic的队列中 。只要队列没有发生消费积压,message 就会马上被消费了 。(这部分的代码实现比较复杂,感兴趣可以去看对应的源码)
class DeliverDelayedMessageTimerTask extends TimerTask {private final int delayLevel;private final long offset;public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {this.delayLevel = delayLevel;this.offset = offset;}@Overridepublic void run() {try {if (isStarted()) {this.executeOnTimeup();}} catch (Exception e) {// XXX: warn and notify melog.error("ScheduleMessageService, executeOnTimeup exception", e);ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);}}public void executeOnTimeup() {ConsumeQueue cq =ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,delayLevel2QueueId(delayLevel));long failScheduleOffset = offset;if (cq != null) {// 这部分是核心逻辑,实现的是 从延时消息队列中取出 deliverTimestamp - now <= 0 的消息,// 将消息从延时queue移到原本指定Topic的queue中,这些消息就马上会被consumer消费 。}ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,failScheduleOffset), DELAY_FOR_A_WHILE);}}
总体的原理示意图,如下所示:
文章插图
broker 在接收到延时消息的时候,会将延时消息存入到延时TOPIC的队列中,然后ScheduleMessageService中,每个 queue 对应的定时任务会不停地被执行,检查 queue 中哪些消息已到设定时间,然后转发到消息的原始TOPIC,这些消息就会被各自的 producer 消费了 。
三、拓展-消费重试平常在使用RocketMQ的时候,一般会依赖consumer的消费重试功能 。而consumer端的消费重试,其实也是通过这个和延时队列差不多的原理来实现的 。
public void consume() {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");consumer.subscribe("TopicTest", "TagA");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);// 这里如果返回RECONSUME_LATER,就会重试消费return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}
RocketMQ规定,以下三种情况统一按照消费失败处理并会发起重试 。- 业务消费方返回ConsumeConcurrentlyStatus.RECONSUME_LATER
- 业务消费方返回null
- 业务消费方主动/被动抛出异常
Consumer客户端会通过processConsumeResult方法处理每一条消息的消费结果,如果判断需要进行重试,则会通过sendMessageBack方法将消息发送到broker,重试消息会带上已重试次数的信息 。
broker收到消息之后,SendMessageProcessor会对重试消息进行处理,设置topic为RETRY_TOPIC,具体逻辑如下:
public class SendMessageProcessorextends AbstractSendMessageProcessorimplements.NETtyRequestProcessor {private RemotingCommand asyncConsumerSendMsgBack(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {// 给重试消息设置新的topicString newTopic = MixAll.getRetryTopic(requestHeader.getGroup());int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();// 根据已经发生重试的次数确定delayLevelif (0 == delayLevel) {delayLevel = 3 + msgExt.getReconsumeTimes();}msgExt.setDelayTimeLevel(delayLevel);// 重试次数+1msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);// 存储消息PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);// ...}}
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 如何搜索 WhatsApp 聊天消息
- 保时捷|坏消息:16-24岁年轻人,近两成找不到工作!该为自己找条出路了
- ?延时喷剂喷多了会怎样
- 高通|全新自研CPU架构 消息称高通将杀回服务器芯片市场
- 腾讯|领先微信!手机QQ iOS版8.9.5更新发布:视频消息能编辑了
- 考试|研究生迎来好消息,下半年这些编制岗位开始招人,有免笔试岗位
- 安卓|没悬念!iPhone 14真要涨价了:消息人士称安卓性价比竟输苹果
- 智能手环|华为手环7推送固件更新:iOS手机消息通知获优化
- 机器人300024最新消息 机器人板块股票
- 特斯拉|不老男神迎好消息!林志颖顺利完成颜面手术 有消息称特斯拉曾私下询问病情