consumer数量变化会怎样
发表于:2025-01-31 作者:千家信息网编辑
千家信息网最后更新 2025年01月31日,本篇文章给大家分享的是有关consumer数量变化会怎样,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。ConsumerManagerpu
千家信息网最后更新 2025年01月31日consumer数量变化会怎样
本篇文章给大家分享的是有关consumer数量变化会怎样,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
ConsumerManagerpublic boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo, ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, final SetsubList, boolean isNotifyConsumerIdsChangedEnable) { ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); if (null == consumerGroupInfo) { ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere); ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp); consumerGroupInfo = prev != null ? prev : tmp; } boolean r1 = consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, consumeFromWhere); boolean r2 = consumerGroupInfo.updateSubscription(subList); if (r1 || r2) { if (isNotifyConsumerIdsChangedEnable) { //通知同组内的其他consumer this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()); } } this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList); return r1 || r2;}public void unregisterConsumer(final String group, final ClientChannelInfo clientChannelInfo, boolean isNotifyConsumerIdsChangedEnable) { ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); if (null != consumerGroupInfo) { consumerGroupInfo.unregisterChannel(clientChannelInfo); if (consumerGroupInfo.getChannelInfoTable().isEmpty()) { ConsumerGroupInfo remove = this.consumerTable.remove(group); if (remove != null) { log.info("unregister consumer ok, no any connection, and remove consumer group, {}", group); this.consumerIdsChangeListener.handle(ConsumerGroupEvent.UNREGISTER, group); } } if (isNotifyConsumerIdsChangedEnable) { //单向通知channel this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()); } }}
DefaultConsumerIdsChangeListener@Overridepublic void handle(ConsumerGroupEvent event, String group, Object... args) { case CHANGE: if (args == null || args.length < 1) { return; } Listchannels = (List ) args[0]; if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) { //对组内的其他consumer的channel连接发送单向通知(不管对方有木有收到) for (Channel chl : channels) { this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group); } } break;}
Broker2Clientpublic void notifyConsumerIdsChanged( final Channel channel, final String consumerGroup) { if (null == consumerGroup) { log.error("notifyConsumerIdsChanged consumerGroup is null"); return; } NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader(); requestHeader.setConsumerGroup(consumerGroup); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader); try { this.brokerController.getRemotingServer().invokeOneway(channel, request, 10); } catch (Exception e) { //发送异常,只是打印log log.error("notifyConsumerIdsChanged exception, " + consumerGroup, e.getMessage()); }}
通知channel是单向的,也就是不管对方有没有答复,都认为发送成功了,这样会有两种情况发生:
channel收到消息:收到消息后,channel会触发rebalance,正常逻辑
channel没收到消息:该consumer不会触发rebalance,存在问题!
register:该consumer不知道已经有新的consumer加入,造成同一个mq会有多个consumer进行消费
unregister:该consumer不知道有consumer下线,造成部分mq没有consumer负责消费
我们先看unregister这种情况
在consumer启动时,会同时启动一个RebalanceService线程,这个线程做的事就是每隔20秒主动进行一次rebalance,这样就能把unregister这种影响降低,最多导致该mq的消息会延迟20秒之后才有consumer负责消费
RebalanceServiceprivate static long waitInterval = Long.parseLong(System.getProperty("rocketmq.client.rebalance.waitInterval", "20000"));@Overridepublic void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { this.waitForRunning(waitInterval); this.mqClientFactory.doRebalance(); } log.info(this.getServiceName() + " service end");}
接下来分析比较大条的Register
同一个mq在同一组内有不同的consumer消费,这种情况在clustering模式下是有大问题的,会造成重复消费,消费进度错误等问题,带着rocketmq应该不至于犯如此低级错误的想法再继续看代码,果然别有洞天
RebalanceImplprivate void rebalanceByTopic(final String topic, final boolean isOrder) { //rebalance过程 //关键点在这,在上面rebalance完之后, 就能知道自己该负责哪些mq的消费 boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);}private boolean updateProcessQueueTableInRebalance(final String topic, final SetmqSet, final boolean isOrder) { for (MessageQueue mq : mqSet) { //如果是新增的mq,会尝试调用远程broker lock mq,获取锁失败,则说明有其他consumer获取了锁,自己应该放弃消费该mq if (!this.processQueueTable.containsKey(mq)) { if (isOrder && !this.lock(mq)) { log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); continue; } } }}
以上就是consumer数量变化会怎样,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注行业资讯频道。
消费
消息
单向
情况
问题
数量
变化
对方
就是
更多
知识
篇文章
线程
部分
错误
不同
低级
实用
主动
成功
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
网络技术专业课程智能
畜牧网络安全自查报告
分布式数据库怎么解决实际问题
手机网络安全有哪些方面
bbm软件开发公司
公共图书馆数据与网络安全
网络安全科技馆什么时候开放
浙江商业软件开发零售价
网络安靖和网络安全
cad建立数据库
软件开发包名怎么填
查看linux服务器机械码
云融科技是互联网
苹果系统服务器验证失败
sftp代理服务器
harmonyos分布式数据库
安卓个性化软件开发
阿里云服务器显卡
什么是水伺服务器
先锋云网盘中的服务器是什么
思科网络网络技术学院教程
服务器linux系统网络配置
国家网络安全定在哪里了
伦教网络安全哪家强
网络安全新闻2022
vpn服务器账号密码
苏州网络安全协同创新
我与网络安全画画
将服务器添加到eclipse
先锋云网盘中的服务器是什么