好奇延时消息如何实现吗?来看RocketMQ是怎么做的-源码解读

一、背景之前在做电商相关业务的时候,有一个常见的需求场景是:用户下单之后,超过半小时不支付,就取消订单 。现在我们在淘宝京东买东西,或者通过美团点外卖,下单之后,如果不在指定时间内支付,订单也会取消 。那么,如何实现这样的超时取消逻辑呢,通过消息队列的延时消息,是一个非常稳定的实现方案 。
RocketMQ 就提供了这样的延时消息功能,producer 端在发送消息时,设置延迟级别,从秒级到分钟小时等等 。消息在发送之后,会在消息队列的服务器进行存储 。等过了设定的延迟时间之后,消息才会被consumer端消费到 。

好奇延时消息如何实现吗?来看RocketMQ是怎么做的-源码解读

文章插图
 
如果我们在下单的时候,发送一条设置延时30分钟的消息,这条消息会在30分钟之后被下游系统消费,然后判断订单有没有支付,如果没有支付,则取消订单 。那么这样,通过消息队列就完成了一个延迟取消的逻辑了 。
二、原理设置延时先来看一下如何设置消息的延时 消息体可以通过setDelayTimeLevel方法来设置延时级别
public void produce() {Message msg = new Message("TopicTest","TagA","OrderID188","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));msg.setDelayTimeLevel(1)SendResult sendResult = producer.send(msg);}public void consume() {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");consumer.subscribe("TopicTest", "TagA");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}其实是将延迟信息存到 Message 的 property 中(property是一个保存meta信息的hashmap)
public void setDelayTimeLevel(int level) {this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));}void putProperty(final String name, final String value) {if (null == this.properties) {this.properties = new HashMap<String, String>();}this.properties.put(name, value);}之后,broker收到 message之后,会根据 message 中设置的延时级别进行处理 可以看看延时级别的具体情况: 一共分为18个级别(1-18),对应时间从1s到2h
public class MessageStoreConfig {private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";}那么整个系统是怎么做到让consumer在设定的延时之后,开始消费指定消息的呢?
不得不说,RocketMQ 的设计还是挺巧妙的,我们接着往下看 。
消息预存对于broker收到的延时消息,并不是和普通消息一样,进入发送端指定的topic中, 而是进入专门的延时topic中,延时topic有18条队列(queueId 编号0-17),queueId 和 delayLevel 的关系是 queueId + 1 = delayLevel,是一一对应的 。所以计算延时消息的待执行时间deliverTimestamp之后,会将消息存入对应延时级别的队列中 。
// 如果是延迟消息if (msg.getDelayTimeLevel() > 0) {if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}// 重设延迟消息的topic和queueId,topic为指定的RMQ_SYS_SCHEDULE_TOPICtopic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());...// 将实际的指定topic和queueId进行存入property,进行备份MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));msg.setTopic(topic);msg.setQueueId(queueId);}之后,会由ScheduleMessageService来进行任务处理 。ScheduleMessageService是broker启动时就开始执行的,用来处理延迟队列中的消息,处理的逻辑如下所示 。
public class ScheduleMessageService extends ConfigManager {// key: delayLevel | value: delayTimeMillisprivate final ConcurrentMap<Integer, Long> delayLevelTable = new ConcurrentHashMap<Integer, Long>(32);public void start() {// 创建一个Timer,用于执行定时任务this.timer = new Timer("ScheduleMessageTimerThread", true);// 这里对每个delayLevel的queue都创建一个DeliverDelayedMessageTimerTask,// 用来处理对应queue中的消息for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {Integer level = entry.getKey();Long timeDelay = entry.getValue();Long offset = this.offsetTable.get(level);if (null == offset) {offset = 0L;}if (timeDelay != null) {this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);}}}}


推荐阅读