一、实现思路
rmq支持固定延时级别的消息,那么延时消息服务端咋实现的呢?
延时消息被服务端接收时,主题会被改写为SCHEDULE_TOPIC_XXXX,等到达发送时间后,恢复消息的真实主题,重新发送到真实下的消息队列中。
二、延时服务
我们先看一下延时消息服务对应的属性,就能把实现流程猜个七七八八。
offsetTable保存了延时级别 -> 偏移量,可以推测出每个延时级别对应一个队列,拥有自己的偏移量delayLevelTable则是延时级别 -> 延时时间Timer定时器线程defaultMessageStore消息存储服务
看到这儿,你应该已经猜到了具体实现,无非从队列中顺序拿消息,然后判断消息是否到达发送时间,发送或继续等待。
java
public class ScheduleMessageService{
private static final long FIRST_DELAY_TIME = 1000L;
private static final long DELAY_FOR_A_WHILE = 100L;
private static final long DELAY_FOR_A_PERIOD = 10000L;
// 延时级别(1s:1000,2s:2000 ...)
private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
new ConcurrentHashMap<Integer, Long>(32);
// 偏移量
private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
new ConcurrentHashMap<Integer, Long>(32);
private final DefaultMessageStore defaultMessageStore;
private final AtomicBoolean started = new AtomicBoolean(false);
private Timer timer;
private MessageStore writeMessageStore;
// 最大延时级别
private int maxDelayLevel;
}服务启动
延时级别、偏移量、定时器的初始化时机?在服务启动时,会对以上重要属性进行初始化。主要做这几件事:
- 创建定时器对象,毕竟所有的操作都依赖它来完成,是真正的牛马
- 加载
offsetTable,提交持久化任务到定时器中,以固定速率(10s)执行 - 遍历
delayLevelTable,为每个延时级别创建消息发送任务,提交到定时器
java
public void start() {
if (started.compareAndSet(false, true)) {
super.load();
this.timer = new Timer("ScheduleMessageTimerThread", true);
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
// 根据延时级别获取消息队列消费进度,说明每个延时级别对应一个消息队列
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
// 启动时,延时1s执行n次调度任务
if (timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}
// 每10s持久化延时消息队列进度
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
if (started.get()) {
ScheduleMessageService.this.persist();
}
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}
}消息转发
提交到定时器的延时发送任务,会定时检查消息是否抵达发送时间。流程:
- 获取主题下延时级别对应的消费队列
ConsumeQueue - 通过
offset获取索引缓冲区数据SelectMappedBufferResult
有了索引缓冲区数据,就可以根据索引数据往后遍历了。索引缓冲区中存储着消息的容量以及在commitLog中的offset,而延时消息的tag则为等待投递的时间。
有了这些信息,rocketMQ就可以通过当前时间与等待投递时间比较,确定消息是否可投递了,两种情况都介绍下:
- 消息可投递,从
commitLog中取出消息,恢复topic、tagCode、queueId,然后投递 - 消息不可投递,那就算一下投递等待时间(根据当前时间和可投递时间计算),重新创建任务投递到定时器,指定任务延时时间为投递等待时间就好啦
因为初始化时,任务没有被设置为周期任务,所以任务执行完成或发生异常时,都会创建任务到定时器的。
java
public void executeOnTimeup() {
// 根据延时消息主题和消息队列Id拿到消息队列
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
long failScheduleOffset = offset;
if (cq != null) {
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
long nextOffset = offset;
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
long tagsCode = bufferCQ.getByteBuffer().getLong();
if (cq.isExtAddr(tagsCode)) {
if (cq.getExt(tagsCode, cqExtUnit)) {
tagsCode = cqExtUnit.getTagsCode();
} else {
//can't find ext content.So re compute tags code.
log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
tagsCode, offsetPy, sizePy);
long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
}
}
long now = System.currentTimeMillis();
// 消息需要发送时间(消息真实的发送时间 + 延时时间)
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
// 是否需要发送(消息真实的发送时间 + 延时时间) - now
long countdown = deliverTimestamp - now;
if (countdown <= 0) {
// 消息已经达到延时了,该发送出去了
MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
offsetPy, sizePy);
if (msgExt != null) {
try {
// 恢复真实主题和消息队列Id
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
msgInner.getTopic(), msgInner);
continue;
}
// 发送消息到broker
PutMessageResult putMessageResult =
ScheduleMessageService.this.writeMessageStore
.putMessage(msgInner);
if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
if (ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().isEnableScheduleMessageStats()) {
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getMsgNum());
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getWroteBytes());
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getMsgNum());
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getWroteBytes());
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),
putMessageResult.getAppendMessageResult().getWroteBytes());
ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
}
continue;
} else {
// XXX: warn and notify me
log.error(
"ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
msgExt.getTopic(), msgExt.getMsgId());
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel,
nextOffset), DELAY_FOR_A_PERIOD);
ScheduleMessageService.this.updateOffset(this.delayLevel,
nextOffset);
return;
}
} catch (Exception e) {
/*
* XXX: warn and notify me
*/
log.error(
"ScheduleMessageService, messageTimeup execute error, drop it. msgExt={}, nextOffset={}, offsetPy={}, sizePy={}", msgExt, nextOffset, offsetPy, sizePy, e);
}
}
} else {
// 进行下次定时任务处理的放入
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
countdown);
// 更新延时消息的消费进度
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
} // end of for
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
// 更新延时消息的消费进度
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {
bufferCQ.release();
}
} // end of if (bufferCQ != null)
else {
long cqMinOffset = cq.getMinOffsetInQueue();
long cqMaxOffset = cq.getMaxOffsetInQueue();
if (offset < cqMinOffset) {
failScheduleOffset = cqMinOffset;
log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
offset, cqMinOffset, cqMaxOffset, cq.getQueueId());
}
if (offset > cqMaxOffset) {
failScheduleOffset = cqMaxOffset;
log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
offset, cqMinOffset, cqMaxOffset, cq.getQueueId());
}
}
} // end of if (cq != null)
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
}offset持久化
定时器每10s会去持久化 offset 一次。offset 是被序列化为 json 字符串,然后存储落盘。落盘流程:
- 备份当前 offset 到后缀 tmp 临时文件
- 备份最后一次落盘的 offset 信息到 bak 文件
- 安全删除最后落盘的 offset 文件
- 重命名 tmp,至此任务完成
持久化任务中,首先保证了 bak 和 tmp 的落盘。这样即使磁盘中 offset 删除,tmp 文件重命名失败,也不会导致最新的 offset 丢失。
java
public synchronized void persist() {
String jsonString = this.encode(true);
if (jsonString != null) {
String fileName = this.configFilePath();
try {
MixAll.string2File(jsonString, fileName);
} catch (IOException e) {
log.error("persist file " + fileName + " exception", e);
}
}
}
public static void string2File(final String str, final String fileName) throws IOException {
String tmpFile = fileName + ".tmp";
string2FileNotSafe(str, tmpFile);
String bakFile = fileName + ".bak";
String prevContent = file2String(fileName);
if (prevContent != null) {
string2FileNotSafe(prevContent, bakFile);
}
File file = new File(fileName);
file.delete();
file = new File(tmpFile);
file.renameTo(new File(fileName));
}