千家信息网

RocketMQ中broker消息存储之如何实现拉取消息

发表于:2024-11-23 作者:千家信息网编辑
千家信息网最后更新 2024年11月23日,这篇文章给大家分享的是有关RocketMQ中broker消息存储之如何实现拉取消息的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。在consumer拉取消息时,broker首
千家信息网最后更新 2024年11月23日RocketMQ中broker消息存储之如何实现拉取消息

这篇文章给大家分享的是有关RocketMQ中broker消息存储之如何实现拉取消息的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。

在consumer拉取消息时,broker首先会根据待拉取的topic+queueId得到对应的ConsumeQueue,再根据消费offset从ConsumeQueue相应的偏移位置中获取该消息在commitlog里真实的offset/msgsize/tagscode信息,最后再从commitlog查出消息体。

消息拉取在broker存储层的调用入口为DefaultMessageStore.getMessage方法。核心逻辑如下:

    // DefaultMessageStore.java    public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,        final int maxMsgNums,        final MessageFilter messageFilter) {        // ...        // 1. 定位ConsumeQueue        ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);        if (consumeQueue != null) {            minOffset = consumeQueue.getMinOffsetInQueue();            maxOffset = consumeQueue.getMaxOffsetInQueue();            if (maxOffset == 0) {                status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;                nextBeginOffset = nextOffsetCorrection(offset, 0);            } else if (offset < minOffset) {                status = GetMessageStatus.OFFSET_TOO_SMALL;                nextBeginOffset = nextOffsetCorrection(offset, minOffset);            } else if (offset == maxOffset) {                status = GetMessageStatus.OFFSET_OVERFLOW_ONE;                nextBeginOffset = nextOffsetCorrection(offset, offset);            } else if (offset > maxOffset) {                status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;                if (0 == minOffset) {                    nextBeginOffset = nextOffsetCorrection(offset, minOffset);                } else {                    nextBeginOffset = nextOffsetCorrection(offset, maxOffset);                }            } else {                // 2. 从ConsumeQueue中读取消费偏移offset处的内容                SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);                if (bufferConsumeQueue != null) {                    try {                        status = GetMessageStatus.NO_MATCHED_MESSAGE;                        long nextPhyFileStartOffset = Long.MIN_VALUE;                        long maxPhyOffsetPulling = 0;                        int i = 0;                        final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE); // 单个请求最大拉取数据量                        final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();                        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();                        for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {                            long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();  // commitlog offset 8bytes                            int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); // msg size 4bytes                            long tagsCode = bufferConsumeQueue.getByteBuffer().getLong(); // tags hashcode 8bytes                            // ...                            // 3. 通过tagscode快速过滤                            if (messageFilter != null                                && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {                                if (getResult.getBufferTotalSize() == 0) {                                    status = GetMessageStatus.NO_MATCHED_MESSAGE;                                }                                continue;                            }                            // 4. 从commitlog获取消息体                            SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);                            if (null == selectResult) {                                if (getResult.getBufferTotalSize() == 0) {                                    status = GetMessageStatus.MESSAGE_WAS_REMOVING;                                }                                nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);                                continue;                            }                            // 5. 通过消息体过滤                            if (messageFilter != null                                && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {                                if (getResult.getBufferTotalSize() == 0) {                                    status = GetMessageStatus.NO_MATCHED_MESSAGE;                                }                                // release...                                selectResult.release();                                continue;                            }                            this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();                            // 6.添加到返回结果                            getResult.addMessage(selectResult);                            status = GetMessageStatus.FOUND;                            nextPhyFileStartOffset = Long.MIN_VALUE;                        }                        // ...                    } finally {                        bufferConsumeQueue.release();                    }                } else {                    // ...                }            }        } else {            status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;            nextBeginOffset = nextOffsetCorrection(offset, 0);        }        // ...        return getResult;    }

ConsumeQueue中存储的是固定长度(每个消息20字节)的内容,因此访问比较简单:

    // ConsumeQueue.java        public SelectMappedBufferResult getIndexBuffer(final long startIndex) {        int mappedFileSize = this.mappedFileSize;        long offset = startIndex * CQ_STORE_UNIT_SIZE; // 消费者offset * 固定20字节长度        if (offset >= this.getMinLogicOffset()) {            // 定位到所属的MappedFile            MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);            if (mappedFile != null) {                SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize)); // 从MappedFile中读取实际的数据                return result;            }        }        return null;    }

通过ConsumeQueue获取消息在commitlog中的偏移量以及消息大小之后,获取消息体的方法如下

    // CommitLog.java    public SelectMappedBufferResult getMessage(final long offset, final int size) {        int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();        // 定位消息所在的MappedFile        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);        if (mappedFile != null) {            int pos = (int) (offset % mappedFileSize);            return mappedFile.selectMappedBuffer(pos, size); // 从MappedFile中获取消息体        }        return null;    }

消息拉取整体流程如下

感谢各位的阅读!关于"RocketMQ中broker消息存储之如何实现拉取消息"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!

0