RocketMQ中broker消息存储之如何实现消息转储
发表于:2025-02-07 作者:千家信息网编辑
千家信息网最后更新 2025年02月07日,小编给大家分享一下RocketMQ中broker消息存储之如何实现消息转储,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!broker在接收到producer发送的消息之后,首先会将消
千家信息网最后更新 2025年02月07日RocketMQ中broker消息存储之如何实现消息转储
小编给大家分享一下RocketMQ中broker消息存储之如何实现消息转储,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!
broker在接收到producer发送的消息之后,首先会将消息存储到CommitLog的末尾,然后通过一个异步的分发线程ReputMessageService将消息转储到ConsumeQueue以及IndexFile中。
转储的核心逻辑在ReputMessageService.doReput中:
// DefaultMessageStore.ReputMessageService private void doReput() { if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) { log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.", this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset()); this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset(); } for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) { if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) { break; } // 1. 获取reputFromOffset偏移所指向的数据 SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset); if (result != null) { try { this.reputFromOffset = result.getStartOffset(); for (int readSize = 0; readSize < result.getSize() && doNext; ) { // 2. 解析消息体 DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize(); if (dispatchRequest.isSuccess()) { if (size > 0) { // 3. 执行分发 DefaultMessageStore.this.doDispatch(dispatchRequest); // ... this.reputFromOffset += size; readSize += size; // ... } else if (size == 0) { this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset); readSize = result.getSize(); } } else if (!dispatchRequest.isSuccess()) { // ... } } } finally { result.release(); } } else { doNext = false; } } }
ConsumeQueue的插入操作如下:
// ConsumeQueue.java private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode, final long cqOffset) { if (offset + size <= this.maxPhysicOffset) { log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset); return true; } // 1. 将commitlog offset/msg size/tags code写到内存缓存 this.byteBufferIndex.flip(); this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE); this.byteBufferIndex.putLong(offset); this.byteBufferIndex.putInt(size); this.byteBufferIndex.putLong(tagsCode); final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE; // ConsumeQueue中偏移 // 2. 获取最后一个MappedFile MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset); if (mappedFile != null) { // ... this.maxPhysicOffset = offset + size; // 3. 写入索引数据 return mappedFile.appendMessage(this.byteBufferIndex.array()); } return false; }
IndexFile的写入逻辑如下:
// IndexService.java public void buildIndex(DispatchRequest req) { IndexFile indexFile = retryGetAndCreateIndexFile(); if (indexFile != null) { long endPhyOffset = indexFile.getEndPhyOffset(); DispatchRequest msg = req; String topic = msg.getTopic(); String keys = msg.getKeys(); if (msg.getCommitLogOffset() < endPhyOffset) { return; } // ... if (keys != null && keys.length() > 0) { String[] keyset = keys.split(MessageConst.KEY_SEPARATOR); for (int i = 0; i < keyset.length; i++) { // 为每个key执行写入 String key = keyset[i]; if (key.length() > 0) { indexFile = putKey(indexFile, msg, buildKey(topic, key)); if (indexFile == null) { log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey()); return; } } } } } else { log.error("build index error, stop building index"); } } private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) { for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) { log.warn("Index file [" + indexFile.getFileName() + "] is full, trying to create another one"); indexFile = retryGetAndCreateIndexFile(); // 文件已满,重试 if (null == indexFile) { return null; } ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); } return indexFile; }
消息转储的整体流程如下图:
看完了这篇文章,相信你对"RocketMQ中broker消息存储之如何实现消息转储"有了一定的了解,如果想了解更多相关知识,欢迎关注行业资讯频道,感谢各位的阅读!
消息
存储
数据
篇文章
逻辑
偏移
内存
完了
指向
整体
文件
更多
末尾
核心
流程
知识
索引
线程
缓存
行业
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
郑州二手服务器回收价格多少
杭州火箭网络技术有限公司
洛阳网络技术公司招聘
大陆汽车软件开发面试题
c 数据库修改某一列的值
网络安全工程师有黑客吗
平谷区网络软件开发价目表
vps创建数据库
郑州企业软件开发价钱
网络安全保护法
全球网络安全战略论文
青海湖住宿软件开发
通讯网络安全防护检查方式
有数据库的动态网站
六安医院软件开发定制公司
本溪榕枫网络技术
永恒之塔目前还有几个服务器
适合学计算机网络技术的电脑
易赛诺网络技术有限公司徽章
求数据库系统工程师资源
Lua安卓软件开发
移动服务器密码
dota 2连接服务器延缓
山西新一代软件开发参考价格
哈利波特忘了之前登陆的服务器
江西昌大高科网络技术有限公司
如何修改管家婆sql服务器名称
ice服务器真实情况
es集群数据库怎么安装
微信小程序由谁提供服务器