Skip to content

rmq有两种消费模式,分别为并发消费和顺序消费。见名知义,你大概猜到了两种模式的差别,不过我们还是先从消息消费接口来深入吧。

一、消息消费接口ConsumeMessageService

消费接口定义了消费服务的一些抽象功能。过一遍方法定义,就能猜到实现类中肯定有线程池。然后聚焦方法submitConsumeRequest,就能知道这个方法大概率会把消息做成任务,然后提交到线程池去处理。

java
public interface ConsumeMessageService {
    void start();

    void shutdown(long awaitTerminateMillis);

    void updateCorePoolSize(int corePoolSize);

    void incCorePoolSize();

    void decCorePoolSize();

    int getCorePoolSize();

    ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);

    /**
     *
     *
     * 提交消费请求
     */
    void submitConsumeRequest(
        final List<MessageExt> msgs,
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final boolean dispathToConsume);
}

二、顺序消费

提交消费请求

前面的消息拉取逻辑被我省略了,我们从消息被拉取到,提交到消费任务ConsumeRequest到线程池开始。接下来只需要搞懂ConsumeRequestrun方法,那么顺序消费逻辑也就清楚了。

java
public class ConsumeMessageOrderlyService implements ConsumeMessageService {
    @Override
    public void submitConsumeRequest(
        final List<MessageExt> msgs,
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final boolean dispathToConsume) {
        if (dispathToConsume) {
            ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
            this.consumeExecutor.submit(consumeRequest);
        }
    }
}

消费逻辑

消息在被消费前,做了一系列的校验。这里有几个点需要注意下:

  • 工作队列processQueue是否被丢弃,丢弃的原因大概率是同消费组下有消费节点加入或退出。
  • processQueue必须在broker处于锁定状态,这是为了保证队列消费有序。
  • 业务处理的阈值时间为60s,超过60s任务会退出,并提交任务稍后处理。

接下来是主流程:

  • processQueue取出消息,这里取出的消息是有序的。processQueue使用TreeMap来存储消息,以消息偏移量做key保证有序。
  • 把消息丢给messageListener去处理,这里的messageListener需要业务端来实现。
  • 处理业务端的消费结果。
java
class ConsumeRequest implements Runnable {
    private final ProcessQueue processQueue;
    private final MessageQueue messageQueue;

    @Override
    public void run() {
        if (this.processQueue.isDropped()) {
            // 如果消息队列被丢弃,停止本次任务消费任务
            log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
            return;
        }

        final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
        // 并发度为消息队列,同一时刻消息队列只能被线程池中一个线程进行消费
        // 还有一点,这里是消费队列有序,而不是主题有序
        synchronized (objLock) {
            // 消息处理队列处于锁定状态 && 锁未过期
            if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
                final long beginTime = System.currentTimeMillis();
                for (boolean continueConsume = true; continueConsume; ) {
                    if (this.processQueue.isDropped()) {
                        log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                        break;
                    }

                    if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                        && !this.processQueue.isLocked()) {
                        log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
                        ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                        break;
                    }

                    if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                        && this.processQueue.isLockExpired()) {
                        log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
                        ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
                        break;
                    }

                    long interval = System.currentTimeMillis() - beginTime;
                    // 业务处理时间阈值 60s
                    if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
                        ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
                        break;
                    }

                    final int consumeBatchSize =
                        ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();

                    List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);
                    defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
                    if (!msgs.isEmpty()) {
                        final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);

                        ConsumeOrderlyStatus status = null;

                        ConsumeMessageContext consumeMessageContext = null;
                        if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                            consumeMessageContext = new ConsumeMessageContext();
                            consumeMessageContext
                                .setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
                            consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
                            consumeMessageContext.setMq(messageQueue);
                            consumeMessageContext.setMsgList(msgs);
                            consumeMessageContext.setSuccess(false);
                            // init the consume context type
                            consumeMessageContext.setProps(new HashMap<String, String>());
                            ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
                        }

                        long beginTimestamp = System.currentTimeMillis();
                        ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
                        boolean hasException = false;
                        try {
                            this.processQueue.getConsumeLock().lock();
                            if (this.processQueue.isDropped()) {
                                log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
                                    this.messageQueue);
                                break;
                            }

                            // 业务处理
                            status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                        } catch (Throwable e) {
                            log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
                                RemotingHelper.exceptionSimpleDesc(e),
                                ConsumeMessageOrderlyService.this.consumerGroup,
                                msgs,
                                messageQueue), e);
                            hasException = true;
                        } finally {
                            this.processQueue.getConsumeLock().unlock();
                        }

                        if (null == status
                            || ConsumeOrderlyStatus.ROLLBACK == status
                            || ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
                            log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}",
                                ConsumeMessageOrderlyService.this.consumerGroup,
                                msgs,
                                messageQueue);
                        }

                        long consumeRT = System.currentTimeMillis() - beginTimestamp;
                        if (null == status) {
                            if (hasException) {
                                returnType = ConsumeReturnType.EXCEPTION;
                            } else {
                                returnType = ConsumeReturnType.RETURNNULL;
                            }
                        } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
                            returnType = ConsumeReturnType.TIME_OUT;
                        } else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
                            returnType = ConsumeReturnType.FAILED;
                        } else if (ConsumeOrderlyStatus.SUCCESS == status) {
                            returnType = ConsumeReturnType.SUCCESS;
                        }

                        if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                            consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
                        }

                        if (null == status) {
                            status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                        }

                        if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                            consumeMessageContext.setStatus(status.toString());
                            consumeMessageContext
                                .setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
                            ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
                        }

                        ConsumeMessageOrderlyService.this.getConsumerStatsManager()
                            .incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);

                        // 消费结果处理
                        continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
                    } else {
                        continueConsume = false;
                    }
                }
            } else {
                if (this.processQueue.isDropped()) {
                    log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
                    return;
                }

                ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
            }
        }
    }

}

处理消费结果

处理消费结果很简单,正常情况下是更新/提交消费偏移量就好了。我们主要分析异常情况,当statusSUSPEND_CURRENT_QUEUE_A_MOMENT的时候,会检查当前消息的重试次数,允许重试则将当前消息重新加入到processQueue后重新消费。

为了保证消息消费的顺序性,这里会允许消息无限重试(其实是重试Integer.Max_Value次)。如果使用不当会造成消息堆积的。推荐业务方监控重试次数,并在多次消费失败后放行此消息。

java
public boolean processConsumeResult(
    final List<MessageExt> msgs,
    final ConsumeOrderlyStatus status,
    final ConsumeOrderlyContext context,
    final ConsumeRequest consumeRequest
) {
    boolean continueConsume = true;
    long commitOffset = -1L;
    if (context.isAutoCommit()) {
        switch (status) {
            case COMMIT:
            case ROLLBACK:
                log.warn("the message queue consume result is illegal, we think you want to ack these message {}",
                    consumeRequest.getMessageQueue());
            case SUCCESS:
                // 获取提交偏移量
                commitOffset = consumeRequest.getProcessQueue().commit();
                this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                break;
            case SUSPEND_CURRENT_QUEUE_A_MOMENT:
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                if (checkReconsumeTimes(msgs)) {
                    // 未超过最大重试次数, 本次消费停止, 重新提交消息到线程池
                    consumeRequest.getProcessQueue().makeMessageToConsumeAgain(msgs);
                    this.submitConsumeRequestLater(
                        consumeRequest.getProcessQueue(),
                        consumeRequest.getMessageQueue(),
                        context.getSuspendCurrentQueueTimeMillis());
                    continueConsume = false;
                } else {
                    // 超过最大重试次数, 提交消息到broker死信队列, 这里认为msg消费成功, 提交偏移量到本地
                    commitOffset = consumeRequest.getProcessQueue().commit();
                }
                break;
            default:
                break;
        }
    } else {
        switch (status) {
            case SUCCESS:
                this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                break;
            case COMMIT:
                commitOffset = consumeRequest.getProcessQueue().commit();
                break;
            case ROLLBACK:
                consumeRequest.getProcessQueue().rollback();
                this.submitConsumeRequestLater(
                    consumeRequest.getProcessQueue(),
                    consumeRequest.getMessageQueue(),
                    context.getSuspendCurrentQueueTimeMillis());
                continueConsume = false;
                break;
            case SUSPEND_CURRENT_QUEUE_A_MOMENT:
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                if (checkReconsumeTimes(msgs)) {
                    consumeRequest.getProcessQueue().makeMessageToConsumeAgain(msgs);
                    this.submitConsumeRequestLater(
                        consumeRequest.getProcessQueue(),
                        consumeRequest.getMessageQueue(),
                        context.getSuspendCurrentQueueTimeMillis());
                    continueConsume = false;
                }
                break;
            default:
                break;
        }
    }

    if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
        // 更新消费偏移量
        this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
    }

    return continueConsume;
}

private int getMaxReconsumeTimes() {
    // default reconsume times: Integer.MAX_VALUE
    if (this.defaultMQPushConsumer.getMaxReconsumeTimes() == -1) {
        return Integer.MAX_VALUE;
    } else {
        return this.defaultMQPushConsumer.getMaxReconsumeTimes();
    }
}

private boolean checkReconsumeTimes(List<MessageExt> msgs) {
    boolean suspend = false;
    if (msgs != null && !msgs.isEmpty()) {
        for (MessageExt msg : msgs) {
            if (msg.getReconsumeTimes() >= getMaxReconsumeTimes()) {
                MessageAccessor.setReconsumeTime(msg, String.valueOf(msg.getReconsumeTimes()));
                if (!sendMessageBack(msg)) {
                    suspend = true;
                    msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                }
            } else {
                suspend = true;
                msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
            }
        }
    }
    return suspend;
}

三、并发消费

提交消费请求

并发消费的提交请求我们可以来讲讲。偏个题,因为用到了线程池,我们也可以先看一下它的线程池是怎么设计的。

线程池是在构造方法中创建的,核心线程和最大线程默认都是20,定长线程池可以避免非核心线程频繁创建带来的开销,但这里其实也是有弊端的,如果业务消费方处理比较慢的话,会造成任务堆积在阻塞队列中的。阻塞队列使用的是无界队列,默认可以在阻塞队列堆积任务,这里除了OOM情况,基本不会丢失任务。拒绝策略这里选择的是默认,以RMQ的操作来说,应该在提交的时候做了失败兜底。

接下是提交消费请求,将消息根据最大批处理数量进行分批,然后依次提交到线程池,这没啥好说的。哦,这里有个优化点,如果想加快消费速度的话,这里可以把最大批处理数量consumeMessageBatchMaxSize放的大一些,不过记得要处理好,批量消息中部分消息消费失败的问题。

java
public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
    MessageListenerConcurrently messageListener) {
    this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();

    this.consumeExecutor = new ThreadPoolExecutor(
        // 默认20
        this.defaultMQPushConsumer.getConsumeThreadMin(),
        // 默认20
        this.defaultMQPushConsumer.getConsumeThreadMax(),
        1000 * 60,
        TimeUnit.MILLISECONDS,
        this.consumeRequestQueue,
        new ThreadFactoryImpl("ConsumeMessageThread_"));
}

@Override
public void submitConsumeRequest(
    final List<MessageExt> msgs,
    final ProcessQueue processQueue,
    final MessageQueue messageQueue,
    final boolean dispatchToConsume) {
    // 每次可处理的消息条数,当前是每次可处理1条
    final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();

    // 单次处理和分批处理
    if (msgs.size() <= consumeBatchSize) {
        ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
        try {
            this.consumeExecutor.submit(consumeRequest);
        } catch (RejectedExecutionException e) {
            this.submitConsumeRequestLater(consumeRequest);
        }
    } else {
        // 分批处理
        for (int total = 0; total < msgs.size(); ) {
            List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
            for (int i = 0; i < consumeBatchSize; i++, total++) {
                if (total < msgs.size()) {
                    msgThis.add(msgs.get(total));
                } else {
                    break;
                }
            }

            ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
            try {
                this.consumeExecutor.submit(consumeRequest);
            } catch (RejectedExecutionException e) {
                for (; total < msgs.size(); total++) {
                    msgThis.add(msgs.get(total));
                }

                // 放入失败,延迟5秒后继续放入
                this.submitConsumeRequestLater(consumeRequest);
            }
        }
    }
}

消费逻辑

重头戏依旧在消费上,不过并发消费的处理逻辑比较简单,我省略了校验、失败处理逻辑,只留下了主流程。

  1. 业务方的messageListener处理消息。
  2. 处理消费结果,提交偏移量。

消费结果处理就不能省略了,分两种情况。消费成功的时候,ackIndex=Integer.MAX_VALUE。会从工作队列ProcessQueue中移除掉当前消息并返回偏移量。这里引申出一个问题,假如我处理的消息偏移量为5,此时工作队列中存储的消息偏移量为[1,2,3,4,5],这里返回的偏移量是多少?答案是1,在并发消费模式下,消费成功时,只会返回最小的未消费的消息偏移量。这是为了防止消息丢失。

消费失败时,ackIndex=-1,会把当前的消息重新发送到broker的延时队列,等待重试的。当然重试是有次数限制的,最多16次。然后工作队列移除掉当前消息,提交偏移量。

java
class ConsumeRequest implements Runnable {
    public void run() {
        // 执行具体的消费业务逻辑
        status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
        ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
    }
}

public void processConsumeResult(
        final ConsumeConcurrentlyStatus status,
        final ConsumeConcurrentlyContext context,
        final ConsumeRequest consumeRequest
) {
    int ackIndex = context.getAckIndex();

    if (consumeRequest.getMsgs().isEmpty())
        return;

    // 根据消息消费结果计算ackIndex
    switch (status) {
        case CONSUME_SUCCESS:
            if (ackIndex >= consumeRequest.getMsgs().size()) {
                ackIndex = consumeRequest.getMsgs().size() - 1;
            }
            int ok = ackIndex + 1;
            int failed = consumeRequest.getMsgs().size() - ok;
            this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
            this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
            break;
        case RECONSUME_LATER:
            ackIndex = -1;
            this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                consumeRequest.getMsgs().size());
            break;
        default:
            break;
    }

    switch (this.defaultMQPushConsumer.getMessageModel()) {
        case BROADCASTING:
            for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                MessageExt msg = consumeRequest.getMsgs().get(i);
                log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
            }
            break;
        case CLUSTERING:
            List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
            for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                // ackIndex = -1,消息消费处理失败时,才会走到这里
                MessageExt msg = consumeRequest.getMsgs().get(i);
                // 处理失败的消息重新发回Broker
                boolean result = this.sendMessageBack(msg, context);
                if (!result) {
                    msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                    msgBackFailed.add(msg);
                }
            }

            if (!msgBackFailed.isEmpty()) {
                consumeRequest.getMsgs().removeAll(msgBackFailed);

                this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
            }
            break;
        default:
            break;
    }

    // 从处理队列中移除消费请求中的消息,返回偏移量
    long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
    if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
        // 更新消息偏移量
        this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
    }
}

四、依赖版本->4.9.3-SNAPSHOT