Skip to content

一、实现思路

rmq支持固定延时级别的消息,那么延时消息服务端咋实现的呢?

延时消息被服务端接收时,主题会被改写为SCHEDULE_TOPIC_XXXX,等到达发送时间后,恢复消息的真实主题,重新发送到真实下的消息队列中。

二、延时服务

我们先看一下延时消息服务对应的属性,就能把实现流程猜个七七八八。

  • offsetTable保存了延时级别 -> 偏移量,可以推测出每个延时级别对应一个队列,拥有自己的偏移量
  • delayLevelTable则是延时级别 -> 延时时间
  • Timer定时器线程
  • defaultMessageStore消息存储服务

看到这儿,你应该已经猜到了具体实现,无非从队列中顺序拿消息,然后判断消息是否到达发送时间,发送或继续等待。

java
public class ScheduleMessageService{
    private static final long FIRST_DELAY_TIME = 1000L;
    private static final long DELAY_FOR_A_WHILE = 100L;
    private static final long DELAY_FOR_A_PERIOD = 10000L;

    // 延时级别(1s:1000,2s:2000 ...)
    private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
        new ConcurrentHashMap<Integer, Long>(32);

    // 偏移量
    private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
        new ConcurrentHashMap<Integer, Long>(32);
    private final DefaultMessageStore defaultMessageStore;
    private final AtomicBoolean started = new AtomicBoolean(false);
    private Timer timer;
    private MessageStore writeMessageStore;
    // 最大延时级别
    private int maxDelayLevel;
}

服务启动

延时级别、偏移量、定时器的初始化时机?在服务启动时,会对以上重要属性进行初始化。主要做这几件事:

  • 创建定时器对象,毕竟所有的操作都依赖它来完成,是真正的牛马
  • 加载offsetTable,提交持久化任务到定时器中,以固定速率(10s)执行
  • 遍历delayLevelTable,为每个延时级别创建消息发送任务,提交到定时器
java
public void start() {
    if (started.compareAndSet(false, true)) {
        super.load();
        this.timer = new Timer("ScheduleMessageTimerThread", true);
        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
            Integer level = entry.getKey();
            Long timeDelay = entry.getValue();
            // 根据延时级别获取消息队列消费进度,说明每个延时级别对应一个消息队列
            Long offset = this.offsetTable.get(level);
            if (null == offset) {
                offset = 0L;
            }

            // 启动时,延时1s执行n次调度任务
            if (timeDelay != null) {
                this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
            }
        }

        // 每10s持久化延时消息队列进度
        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    if (started.get()) {
                        ScheduleMessageService.this.persist();
                    }
                } catch (Throwable e) {
                    log.error("scheduleAtFixedRate flush exception", e);
                }
            }
        }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
    }
}

消息转发

提交到定时器的延时发送任务,会定时检查消息是否抵达发送时间。流程:

  • 获取主题下延时级别对应的消费队列ConsumeQueue
  • 通过offset获取索引缓冲区数据SelectMappedBufferResult

有了索引缓冲区数据,就可以根据索引数据往后遍历了。索引缓冲区中存储着消息的容量以及在commitLog中的offset,而延时消息的tag则为等待投递的时间。

有了这些信息,rocketMQ就可以通过当前时间与等待投递时间比较,确定消息是否可投递了,两种情况都介绍下:

  • 消息可投递,从commitLog中取出消息,恢复topictagCodequeueId,然后投递
  • 消息不可投递,那就算一下投递等待时间(根据当前时间和可投递时间计算),重新创建任务投递到定时器,指定任务延时时间为投递等待时间就好啦

因为初始化时,任务没有被设置为周期任务,所以任务执行完成或发生异常时,都会创建任务到定时器的。

java
public void executeOnTimeup() {
    // 根据延时消息主题和消息队列Id拿到消息队列
    ConsumeQueue cq =
        ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
            delayLevel2QueueId(delayLevel));

    long failScheduleOffset = offset;

    if (cq != null) {
        SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
        if (bufferCQ != null) {
            try {
                long nextOffset = offset;
                int i = 0;
                ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                    long offsetPy = bufferCQ.getByteBuffer().getLong();
                    int sizePy = bufferCQ.getByteBuffer().getInt();
                    long tagsCode = bufferCQ.getByteBuffer().getLong();

                    if (cq.isExtAddr(tagsCode)) {
                        if (cq.getExt(tagsCode, cqExtUnit)) {
                            tagsCode = cqExtUnit.getTagsCode();
                        } else {
                            //can't find ext content.So re compute tags code.
                            log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
                                tagsCode, offsetPy, sizePy);
                            long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
                            tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
                        }
                    }

                    long now = System.currentTimeMillis();
                    // 消息需要发送时间(消息真实的发送时间 + 延时时间)
                    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

                    nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);

                    // 是否需要发送(消息真实的发送时间 + 延时时间) - now
                    long countdown = deliverTimestamp - now;

                    if (countdown <= 0) {
                        // 消息已经达到延时了,该发送出去了
                        MessageExt msgExt =
                            ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                offsetPy, sizePy);

                        if (msgExt != null) {
                            try {
                                // 恢复真实主题和消息队列Id
                                MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
                                    log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
                                        msgInner.getTopic(), msgInner);
                                    continue;
                                }

                                // 发送消息到broker
                                PutMessageResult putMessageResult =
                                    ScheduleMessageService.this.writeMessageStore
                                        .putMessage(msgInner);

                                if (putMessageResult != null
                                    && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                                    if (ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().isEnableScheduleMessageStats()) {
                                        ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getMsgNum());
                                        ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getWroteBytes());
                                        ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getMsgNum());
                                        ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getWroteBytes());
                                        ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
                                        ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),
                                            putMessageResult.getAppendMessageResult().getWroteBytes());
                                        ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
                                    }
                                    continue;
                                } else {
                                    // XXX: warn and notify me
                                    log.error(
                                        "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
                                        msgExt.getTopic(), msgExt.getMsgId());
                                    ScheduleMessageService.this.timer.schedule(
                                        new DeliverDelayedMessageTimerTask(this.delayLevel,
                                            nextOffset), DELAY_FOR_A_PERIOD);
                                    ScheduleMessageService.this.updateOffset(this.delayLevel,
                                        nextOffset);
                                    return;
                                }
                            } catch (Exception e) {
                                /*
                                 * XXX: warn and notify me
                                 */
                                log.error(
                                    "ScheduleMessageService, messageTimeup execute error, drop it. msgExt={}, nextOffset={}, offsetPy={}, sizePy={}", msgExt, nextOffset, offsetPy, sizePy, e);
                            }
                        }
                    } else {
                        // 进行下次定时任务处理的放入
                        ScheduleMessageService.this.timer.schedule(
                            new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
                            countdown);
                        // 更新延时消息的消费进度
                        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                        return;
                    }
                } // end of for

                nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                    this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
                // 更新延时消息的消费进度
                ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                return;
            } finally {

                bufferCQ.release();
            }
        } // end of if (bufferCQ != null)
        else {

            long cqMinOffset = cq.getMinOffsetInQueue();
            long cqMaxOffset = cq.getMaxOffsetInQueue();
            if (offset < cqMinOffset) {
                failScheduleOffset = cqMinOffset;
                log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
                    offset, cqMinOffset, cqMaxOffset, cq.getQueueId());
            }

            if (offset > cqMaxOffset) {
                failScheduleOffset = cqMaxOffset;
                log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
                    offset, cqMinOffset, cqMaxOffset, cq.getQueueId());
            }
        }
    } // end of if (cq != null)

    ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
        failScheduleOffset), DELAY_FOR_A_WHILE);
}

offset持久化

定时器每10s会去持久化 offset 一次。offset 是被序列化为 json 字符串,然后存储落盘。落盘流程:

  • 备份当前 offset 到后缀 tmp 临时文件
  • 备份最后一次落盘的 offset 信息到 bak 文件
  • 安全删除最后落盘的 offset 文件
  • 重命名 tmp,至此任务完成

持久化任务中,首先保证了 bak 和 tmp 的落盘。这样即使磁盘中 offset 删除,tmp 文件重命名失败,也不会导致最新的 offset 丢失。

java
public synchronized void persist() {
    String jsonString = this.encode(true);
    if (jsonString != null) {
        String fileName = this.configFilePath();
        try {
            MixAll.string2File(jsonString, fileName);
        } catch (IOException e) {
            log.error("persist file " + fileName + " exception", e);
        }
    }
}

public static void string2File(final String str, final String fileName) throws IOException {
    String tmpFile = fileName + ".tmp";
    string2FileNotSafe(str, tmpFile);

    String bakFile = fileName + ".bak";
    String prevContent = file2String(fileName);
    if (prevContent != null) {
        string2FileNotSafe(prevContent, bakFile);
    }

    File file = new File(fileName);
    file.delete();

    file = new File(tmpFile);
    file.renameTo(new File(fileName));
}

三、依赖版本->4.9.3-SNAPSHOT