RocketMQ中如何实现push consumer顺序消费
发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,这篇文章主要介绍了RocketMQ中如何实现push consumer顺序消费,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。顺序消费的
千家信息网最后更新 2025年02月03日RocketMQ中如何实现push consumer顺序消费
这篇文章主要介绍了RocketMQ中如何实现push consumer顺序消费,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。
顺序消费的逻辑实现在类ConsumeMessageOrderlyService中,为了实现消费的有序性需要对queue进行加锁,包括:
在broker对message queue加锁,保证当前client占有该队列
consumer端对MessageQueue加锁,保证当前线程占有该队列
consumer端对ProcessQueue加锁,保证当前线程占有该队列
对broker上message queue加锁是在ConsumeMessageOrderlyService中周期性调度执行的:
// ConsumeMessageOrderlySerivce public void start() { if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) { this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { ConsumeMessageOrderlyService.this.lockMQPeriodically(); } }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS); } } public synchronized void lockMQPeriodically() { if (!this.stopped) { // 通过LOCK_BATCH_MQ请求在broker批量锁定mq this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll(); } }
ConsumeMessageOrderlyService中的消费请求提交:
// ConsumeMessageOrderlySerivce public void submitConsumeRequest( final Listmsgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispathToConsume) { if (dispathToConsume) { // 提交ConsumeRequest,丢弃了入参的msgs,每次都从ProcessQueue中顺序获取 ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue); this.consumeExecutor.submit(consumeRequest); } }
顺序处理了逻辑:
// ConsumeMessageOrderlyService.ConsumeRequest public void run() { if (this.processQueue.isDropped()) { log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); return; } // 1.获取MessageQueue上的锁 final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue); synchronized (objLock) { if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel()) || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) { final long beginTime = System.currentTimeMillis(); for (boolean continueConsume = true; continueConsume; ) { // 循环处理 // ... // 单个ConsumeRequest最长处理时间默认60s long interval = System.currentTimeMillis() - beginTime; if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) { ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10); break; } final int consumeBatchSize = ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); // 2. 从ProcessQueue顺序获取batchSize个消息 Listmsgs = this.processQueue.takeMessags(consumeBatchSize); defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup()); if (!msgs.isEmpty()) { final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue); ConsumeOrderlyStatus status = null; ConsumeMessageContext consumeMessageContext = null; // .... long beginTimestamp = System.currentTimeMillis(); ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; boolean hasException = false; try { // 3. 获取ProcessQueue上的锁 this.processQueue.getLockConsume().lock(); if (this.processQueue.isDropped()) { log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}", this.messageQueue); break; } // 4. 推给业务处理逻辑 status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", RemotingHelper.exceptionSimpleDesc(e), ConsumeMessageOrderlyService.this.consumerGroup, msgs, messageQueue); hasException = true; } finally { this.processQueue.getLockConsume().unlock(); // 解锁 } // ... // 5. 处理消费结果 continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this); } else { continueConsume = false; // ProcessQueue为空,停止本次推送 } } } else { if (this.processQueue.isDropped()) { log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue); return; } ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100); } } }
在processConsumeResult中主要会执行2步操作:
在ProcessQueue上执行commit(),将前一次takeMessages返回的msgs从缓存中删除
更新OffsetStore
感谢你能够认真阅读完这篇文章,希望小编分享的"RocketMQ中如何实现push consumer顺序消费"这篇文章对大家有帮助,同时也希望大家多多支持,关注行业资讯频道,更多相关知识等着你来学习!
顺序
消费
处理
篇文章
逻辑
保证
线程
最长
有序
业务
价值
兴趣
单个
同时
周期
周期性
时间
是在
更多
有序性
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数据库知识与技术专业
正规的浪潮存储服务器零售
万方数据库低
数据库技术压题
国产化服务器供应商
华为服务器管理口安装系统
网银转账说数据库表插入错误
软件开发 区块链 移动开发
西安企业云软件开发
网络安全真实事例
数据视图在软件开发的使用
网络安全概论参考文献
18-19年中国服务器市场
软件开发最好学校
上海初级软件开发招聘信息
亿纬锂能软件开发
数据库装服务器还是客户端
android更新数据库
大专毕业网络安全论文开题
广州宝网络技术有限公司
数据库分离怎么附加数据库
学生表的数据库怎么写
伪造时间戳服务器
服务器怎么启动服务
民生山西app软件开发公司
列举出三种当今主流的数据库
2019湖南网络安全
广厦网络技术股份有限公司
王牌战争怎么知道服务器
手机软件开发与自学