千家信息网

RocketMQ中broker消息存储之如何实现消息转储

发表于:2025-02-07 作者:千家信息网编辑
千家信息网最后更新 2025年02月07日,小编给大家分享一下RocketMQ中broker消息存储之如何实现消息转储,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!broker在接收到producer发送的消息之后,首先会将消
千家信息网最后更新 2025年02月07日RocketMQ中broker消息存储之如何实现消息转储

小编给大家分享一下RocketMQ中broker消息存储之如何实现消息转储,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!

broker在接收到producer发送的消息之后,首先会将消息存储到CommitLog的末尾,然后通过一个异步的分发线程ReputMessageService将消息转储到ConsumeQueue以及IndexFile中。

转储的核心逻辑在ReputMessageService.doReput中:

    // DefaultMessageStore.ReputMessageService        private void doReput() {            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {                log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",                    this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();            }            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {                if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()                    && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {                    break;                }                // 1. 获取reputFromOffset偏移所指向的数据                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);                if (result != null) {                    try {                        this.reputFromOffset = result.getStartOffset();                        for (int readSize = 0; readSize < result.getSize() && doNext; ) {                            // 2. 解析消息体                            DispatchRequest dispatchRequest =                                DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);                            int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();                            if (dispatchRequest.isSuccess()) {                                if (size > 0) {                                    // 3. 执行分发                                    DefaultMessageStore.this.doDispatch(dispatchRequest);                                    // ...                                    this.reputFromOffset += size;                                    readSize += size;                                    // ...                                } else if (size == 0) {                                    this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);                                    readSize = result.getSize();                                }                            } else if (!dispatchRequest.isSuccess()) {                               // ...                            }                        }                    } finally {                        result.release();                    }                } else {                    doNext = false;                }            }        }

ConsumeQueue的插入操作如下:

    // ConsumeQueue.java    private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,        final long cqOffset) {        if (offset + size <= this.maxPhysicOffset) {            log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);            return true;        }        // 1. 将commitlog offset/msg size/tags code写到内存缓存        this.byteBufferIndex.flip();        this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);        this.byteBufferIndex.putLong(offset);        this.byteBufferIndex.putInt(size);        this.byteBufferIndex.putLong(tagsCode);        final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE; // ConsumeQueue中偏移        // 2. 获取最后一个MappedFile        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);        if (mappedFile != null) {            // ...            this.maxPhysicOffset = offset + size;            // 3. 写入索引数据            return mappedFile.appendMessage(this.byteBufferIndex.array());        }        return false;    }

IndexFile的写入逻辑如下:

    // IndexService.java    public void buildIndex(DispatchRequest req) {        IndexFile indexFile = retryGetAndCreateIndexFile();        if (indexFile != null) {            long endPhyOffset = indexFile.getEndPhyOffset();            DispatchRequest msg = req;            String topic = msg.getTopic();            String keys = msg.getKeys();            if (msg.getCommitLogOffset() < endPhyOffset) {                return;            }            // ...                     if (keys != null && keys.length() > 0) {                String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);                for (int i = 0; i < keyset.length; i++) { // 为每个key执行写入                    String key = keyset[i];                    if (key.length() > 0) {                        indexFile = putKey(indexFile, msg, buildKey(topic, key));                        if (indexFile == null) {                            log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());                            return;                        }                    }                }            }        } else {            log.error("build index error, stop building index");        }    }    private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) {        for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) {            log.warn("Index file [" + indexFile.getFileName() + "] is full, trying to create another one");            indexFile = retryGetAndCreateIndexFile(); // 文件已满,重试            if (null == indexFile) {                return null;            }            ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp());        }        return indexFile;    }

消息转储的整体流程如下图:

看完了这篇文章,相信你对"RocketMQ中broker消息存储之如何实现消息转储"有了一定的了解,如果想了解更多相关知识,欢迎关注行业资讯频道,感谢各位的阅读!

0