千家信息网

51. 源代码解读-RocketMQ消息重新消费

发表于:2025-01-25 作者:千家信息网编辑
千家信息网最后更新 2025年01月25日,一. 前言RocketMQ支持消息消费失败后重新消费,具体代码如下:/* * Register callback to execute on arrival of messages
千家信息网最后更新 2025年01月25日51. 源代码解读-RocketMQ消息重新消费

一. 前言

RocketMQ支持消息消费失败后重新消费,具体代码如下:

/*         *  Register callback to execute on arrival of messages fetched from brokers.         */        consumer.registerMessageListener(new MessageListenerConcurrently() {            @Override            public ConsumeConcurrentlyStatus consumeMessage(List msgs,                ConsumeConcurrentlyContext context) {                System.out.printf(Thread.currentThread().getName()  + "%n");                return ConsumeConcurrentlyStatus.RECONSUME_LATER;            }        });

也就是需要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,那这个怎么实现的呢?

二. 代码流程

以push 非同步消息为例,消息消费过程可以参考https://blog.51cto.com/483181/2056301

我们从客户端成功获取到一条消息开始,也就是DefaultMQPushConsumerImpl.pullMessage

2.1. 获取消息回调

public void pullMessage(final PullRequest pullRequest) {        ...        PullCallback pullCallback = new PullCallback() {            @Override            public void onSuccess(PullResult pullResult) {                if (pullResult != null) {                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,                        subscriptionData);                    switch (pullResult.getPullStatus()) {                        case FOUND:                             ...                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(                                    pullResult.getMsgFoundList(),                                    processQueue,                                    pullRequest.getMessageQueue(),                                    dispathToConsume);                            ....                            break;                    }                }            }            @Override            public void onException(Throwable e) {                ...            }        };        ...        try {            this.pullAPIWrapper.pullKernelImpl(                pullRequest.getMessageQueue(),                subExpression,                subscriptionData.getExpressionType(),                subscriptionData.getSubVersion(),                pullRequest.getNextOffset(),                this.defaultMQPushConsumer.getPullBatchSize(),                sysFlag,                commitOffsetValue,                BROKER_SUSPEND_MAX_TIME_MILLIS,                CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,                CommunicationMode.ASYNC,                pullCallback            );        } catch (Exception e) {        }    }

pullKernelImpl获取消息后,如果是异步请求,那么将会回调pullCallback,我们假设成功拿到消息,也就是FOUND分支。
那么就会调用submit 消费请求:consumeMessageService.submitConsumeRequest

2.2 submit消息请求

submitConsumeRequest有两个实现类,一个是pull, 一个是push。

我们以push为例。

2.3 push submit消息请求

ConsumeMessageConcurrentlyService.java

public void submitConsumeRequest(        final List msgs,        final ProcessQueue processQueue,        final MessageQueue messageQueue,        final boolean dispatchToConsume) {        final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();                          ...                ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);                try {                    this.consumeExecutor.submit(consumeRequest);                } catch (RejectedExecutionException e) {                }            }        }    }

初始化一个ConsumeRequest Runnable对象,然后提交到线程池consumeExecutor里面,那么我们继续看ConsumeRequest。

2.4 ConsumeRequest

class ConsumeRequest implements Runnable {        private final List msgs;        private final ProcessQueue processQueue;        private final MessageQueue messageQueue;        @Override        public void run() {            ...            try {                ...                status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);            } catch (Throwable e) {                ....            }                        ....            if (!processQueue.isDropped()) {                ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);            } else {                log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);            }        }    }

首先看try-catch里面,这里面就是回调客户端来消费消息。

listener.consumeMessage(Collections.unmodifiableList(msgs), context);

就像我们的消息消费重写如下:

/*         *  Register callback to execute on arrival of messages fetched from brokers.         */        consumer.registerMessageListener(new MessageListenerConcurrently() {            @Override            public ConsumeConcurrentlyStatus consumeMessage(List msgs,                ConsumeConcurrentlyContext context) {                System.out.printf(Thread.currentThread().getName()  + "%n");                return ConsumeConcurrentlyStatus.RECONSUME_LATER;            }        });

然后根据返回的status调用processConsumeResult来处理返回结果

2.5 processConsumeResult

public void processConsumeResult(        final ConsumeConcurrentlyStatus status,        final ConsumeConcurrentlyContext context,        final ConsumeRequest consumeRequest    ) {       ...        switch (this.defaultMQPushConsumer.getMessageModel()) {            case BROADCASTING:                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {                    MessageExt msg = consumeRequest.getMsgs().get(i);                    log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());                }                break;            case CLUSTERING:                log.info("CLUSTERING...");                List msgBackFailed = new ArrayList(consumeRequest.getMsgs().size());                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {                    MessageExt msg = consumeRequest.getMsgs().get(i);                    boolean result = this.sendMessageBack(msg, context);                    if (!result) {                        msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);                        msgBackFailed.add(msg);                    }                }                log.info("msgBackFailed.isEmpty() [{}]", msgBackFailed.isEmpty());                if (!msgBackFailed.isEmpty()) {                    log.info("msgBackFailed [{}]", msgBackFailed);                    consumeRequest.getMsgs().removeAll(msgBackFailed);                    this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());                }                break;            default:                break;        }        ...    }
2.5.1

首先会判断消费模式是集群还是广播模式,如果广播模式,就日志记录下不处理了。
如果是集群模式,那么把调用sendMessageBack发送消息到broker。等待下一次broker重新消费消息。
如果发送失败,那么立即就会消费消息。

2.6 发送消息到Broker

public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {        int delayLevel = context.getDelayLevelWhenNextConsume();        try {            this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());            return true;        } catch (Exception e) {            ...        }        return false;    }
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {        try {            String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)                : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());            this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,                this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());        } catch (Exception e) {            ....        }    }
public void consumerSendMessageBack(        final String addr,        final MessageExt msg,        final String consumerGroup,        final int delayLevel,        final long timeoutMillis,        final int maxConsumeRetryTimes    ) throws RemotingException, MQBrokerException, InterruptedException {        ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);        requestHeader.setGroup(consumerGroup);        requestHeader.setOriginTopic(msg.getTopic());        requestHeader.setOffset(msg.getCommitLogOffset());        requestHeader.setDelayLevel(delayLevel);        requestHeader.setOriginMsgId(msg.getMsgId());        requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);        log.info("addr [{}] request [{}] timeoutMillis [{}]", addr, request, timeoutMillis);        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),            request, timeoutMillis);        assert response != null;        switch (response.getCode()) {            case ResponseCode.SUCCESS: {                return;            }            default:                break;        }        throw new MQBrokerException(response.getCode(), response.getRemark());    }

简单的调用api通过netty发到broker而已,请求码是RequestCode.CONSUMER_SEND_MSG_BACK

public static final int CONSUMER_SEND_MSG_BACK = 36;

Broker会把消息存储到文件中,当然也会让它的reconsume次数+1

具体可以参考SendMessageProcessor.proce***equest方法,这个后续再讲

0