千家信息网

RocketMQ中如何实现push consumer顺序消费

发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,这篇文章主要介绍了RocketMQ中如何实现push consumer顺序消费,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。顺序消费的
千家信息网最后更新 2025年02月03日RocketMQ中如何实现push consumer顺序消费

这篇文章主要介绍了RocketMQ中如何实现push consumer顺序消费,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。

顺序消费的逻辑实现在类ConsumeMessageOrderlyService中,为了实现消费的有序性需要对queue进行加锁,包括:

  • 在broker对message queue加锁,保证当前client占有该队列

  • consumer端对MessageQueue加锁,保证当前线程占有该队列

  • consumer端对ProcessQueue加锁,保证当前线程占有该队列

对broker上message queue加锁是在ConsumeMessageOrderlyService中周期性调度执行的:

    // ConsumeMessageOrderlySerivce        public void start() {        if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {                @Override                public void run() {                    ConsumeMessageOrderlyService.this.lockMQPeriodically();                }            }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);        }    }    public synchronized void lockMQPeriodically() {        if (!this.stopped) {            // 通过LOCK_BATCH_MQ请求在broker批量锁定mq            this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();        }    }

ConsumeMessageOrderlyService中的消费请求提交:

    // ConsumeMessageOrderlySerivce      public void submitConsumeRequest(        final List msgs,        final ProcessQueue processQueue,        final MessageQueue messageQueue,        final boolean dispathToConsume) {        if (dispathToConsume) {             // 提交ConsumeRequest,丢弃了入参的msgs,每次都从ProcessQueue中顺序获取            ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);            this.consumeExecutor.submit(consumeRequest);        }    }

顺序处理了逻辑:

    // ConsumeMessageOrderlyService.ConsumeRequest    public void run() {            if (this.processQueue.isDropped()) {                log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);                return;            }            // 1.获取MessageQueue上的锁            final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);            synchronized (objLock) {                if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())                    || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {                    final long beginTime = System.currentTimeMillis();                    for (boolean continueConsume = true; continueConsume; ) { // 循环处理                        // ...                        // 单个ConsumeRequest最长处理时间默认60s                        long interval = System.currentTimeMillis() - beginTime;                        if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {                            ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);                            break;                        }                        final int consumeBatchSize =                            ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();                        // 2. 从ProcessQueue顺序获取batchSize个消息                        List msgs = this.processQueue.takeMessags(consumeBatchSize);                        defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());                        if (!msgs.isEmpty()) {                            final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);                            ConsumeOrderlyStatus status = null;                            ConsumeMessageContext consumeMessageContext = null;                            // ....                            long beginTimestamp = System.currentTimeMillis();                            ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;                            boolean hasException = false;                            try {                                // 3. 获取ProcessQueue上的锁                                this.processQueue.getLockConsume().lock();                                if (this.processQueue.isDropped()) {                                    log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",                                        this.messageQueue);                                    break;                                }                                // 4. 推给业务处理逻辑                                status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);                            } catch (Throwable e) {                                log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",                                    RemotingHelper.exceptionSimpleDesc(e),                                    ConsumeMessageOrderlyService.this.consumerGroup,                                    msgs,                                    messageQueue);                                hasException = true;                            } finally {                                this.processQueue.getLockConsume().unlock(); // 解锁                            }                            // ...                            // 5. 处理消费结果                            continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);                        } else {                            continueConsume = false; // ProcessQueue为空,停止本次推送                        }                    }                } else {                    if (this.processQueue.isDropped()) {                        log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);                        return;                    }                    ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);                }            }        }

在processConsumeResult中主要会执行2步操作:

  • 在ProcessQueue上执行commit(),将前一次takeMessages返回的msgs从缓存中删除

  • 更新OffsetStore

感谢你能够认真阅读完这篇文章,希望小编分享的"RocketMQ中如何实现push consumer顺序消费"这篇文章对大家有帮助,同时也希望大家多多支持,关注行业资讯频道,更多相关知识等着你来学习!

0