RocketMQ中broker消息存储之如何实现拉取消息
发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,这篇文章给大家分享的是有关RocketMQ中broker消息存储之如何实现拉取消息的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。在consumer拉取消息时,broker首
千家信息网最后更新 2025年02月03日RocketMQ中broker消息存储之如何实现拉取消息
这篇文章给大家分享的是有关RocketMQ中broker消息存储之如何实现拉取消息的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。
在consumer拉取消息时,broker首先会根据待拉取的topic+queueId得到对应的ConsumeQueue,再根据消费offset从ConsumeQueue相应的偏移位置中获取该消息在commitlog里真实的offset/msgsize/tagscode信息,最后再从commitlog查出消息体。
消息拉取在broker存储层的调用入口为DefaultMessageStore.getMessage方法。核心逻辑如下:
// DefaultMessageStore.java public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset, final int maxMsgNums, final MessageFilter messageFilter) { // ... // 1. 定位ConsumeQueue ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); if (consumeQueue != null) { minOffset = consumeQueue.getMinOffsetInQueue(); maxOffset = consumeQueue.getMaxOffsetInQueue(); if (maxOffset == 0) { status = GetMessageStatus.NO_MESSAGE_IN_QUEUE; nextBeginOffset = nextOffsetCorrection(offset, 0); } else if (offset < minOffset) { status = GetMessageStatus.OFFSET_TOO_SMALL; nextBeginOffset = nextOffsetCorrection(offset, minOffset); } else if (offset == maxOffset) { status = GetMessageStatus.OFFSET_OVERFLOW_ONE; nextBeginOffset = nextOffsetCorrection(offset, offset); } else if (offset > maxOffset) { status = GetMessageStatus.OFFSET_OVERFLOW_BADLY; if (0 == minOffset) { nextBeginOffset = nextOffsetCorrection(offset, minOffset); } else { nextBeginOffset = nextOffsetCorrection(offset, maxOffset); } } else { // 2. 从ConsumeQueue中读取消费偏移offset处的内容 SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset); if (bufferConsumeQueue != null) { try { status = GetMessageStatus.NO_MATCHED_MESSAGE; long nextPhyFileStartOffset = Long.MIN_VALUE; long maxPhyOffsetPulling = 0; int i = 0; final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE); // 单个请求最大拉取数据量 final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded(); ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); // commitlog offset 8bytes int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); // msg size 4bytes long tagsCode = bufferConsumeQueue.getByteBuffer().getLong(); // tags hashcode 8bytes // ... // 3. 通过tagscode快速过滤 if (messageFilter != null && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.NO_MATCHED_MESSAGE; } continue; } // 4. 从commitlog获取消息体 SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy); if (null == selectResult) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.MESSAGE_WAS_REMOVING; } nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy); continue; } // 5. 通过消息体过滤 if (messageFilter != null && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.NO_MATCHED_MESSAGE; } // release... selectResult.release(); continue; } this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet(); // 6.添加到返回结果 getResult.addMessage(selectResult); status = GetMessageStatus.FOUND; nextPhyFileStartOffset = Long.MIN_VALUE; } // ... } finally { bufferConsumeQueue.release(); } } else { // ... } } } else { status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE; nextBeginOffset = nextOffsetCorrection(offset, 0); } // ... return getResult; }
ConsumeQueue中存储的是固定长度(每个消息20字节)的内容,因此访问比较简单:
// ConsumeQueue.java public SelectMappedBufferResult getIndexBuffer(final long startIndex) { int mappedFileSize = this.mappedFileSize; long offset = startIndex * CQ_STORE_UNIT_SIZE; // 消费者offset * 固定20字节长度 if (offset >= this.getMinLogicOffset()) { // 定位到所属的MappedFile MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset); if (mappedFile != null) { SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize)); // 从MappedFile中读取实际的数据 return result; } } return null; }
通过ConsumeQueue获取消息在commitlog中的偏移量以及消息大小之后,获取消息体的方法如下
// CommitLog.java public SelectMappedBufferResult getMessage(final long offset, final int size) { int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(); // 定位消息所在的MappedFile MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0); if (mappedFile != null) { int pos = (int) (offset % mappedFileSize); return mappedFile.selectMappedBuffer(pos, size); // 从MappedFile中获取消息体 } return null; }
消息拉取整体流程如下
感谢各位的阅读!关于"RocketMQ中broker消息存储之如何实现拉取消息"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!
消息
存储
内容
偏移
定位
消费
字节
数据
方法
更多
篇文章
长度
不错
实用
最大
位置
信息
入口
单个
大小
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
软件开发学院学什么
泰康人寿软件开发岗
山东监控服务器机柜云服务器
只狼npc数据库
科技核心指的是什么数据库
港股网络安全股票有哪些
数据库隔离级别查看
上海兴畅网络技术有限公司
数据库报表软
淄川纺织软件开发公司
mpp数据库优势
河北开源软件开发欢迎来电
软件开发作为无形资产的标志
主机使命召唤17在哪选服务器
数据库安全百度百科
浪潮服务器如何进bios
台服魔兽服务器状态
上位机软件开发主流
数据库dbcc
金蝶erp是什么软件开发的
系统运维和软件开发前景
品牌网络技术咨询简介
用唯物史观分析网络技术发展
手机软件开发的思路
网络安全 自查工作总结
不验证前序数据库
淄川纺织软件开发公司
交通网络安全自检方案
image图片怎么传到服务器
交通企业网络安全