RocketMQ消费失败重试机制的示例分析
发表于:2024-10-23 作者:千家信息网编辑
千家信息网最后更新 2024年10月23日,RocketMQ消费失败重试机制的示例分析,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。现象:mq消费1次,重试3次,然
千家信息网最后更新 2024年10月23日RocketMQ消费失败重试机制的示例分析
RocketMQ消费失败重试机制的示例分析,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。
现象:mq消费1次,重试3次,然后停止,如下实例展示
首次(reconsumeTimes=0)
MQ_CON_MSG gmcf-lsc-zhongbang-repu-calc-from-topic MSG MessageExt [queueId=1, storeSize=453, queueOffset=25, sysFlag=0, bornTimestamp=1566785215908, bornHost=/10.42.0.77:54608, storeTimestamp=1566785215908, storeHost=/10.42.0.244:10911, msgId=0A2A00F400002A9F000000000B77CE84, commitLogOffset=192401028, bodyCRC=53737244, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='gmcf-lsc-zhongbang-repu-calc-from-topic', flag=0, properties={MIN_OFFSET=0, _catChildMessageId1=gmcf-lsc-job-0a2a004d-435218-15071, HASH_CODE=690132963, MAX_OFFSET=26, _catRootMessageId=lts-0a2a006b-434483-0, CONSUME_START_TIME=1566785215911, SERIALIZE_CLASS=java.lang.String, _catParentMessageId=lts-0a2a006b-435218-524, _catChildMessageId=gmcf-lsc-job-0a2a004d-435218-15072, UNIQ_KEY=0A2A004D000938AF386882EAA5A40112, WAIT=true}, body=[71, 77, 66, 69, 67, 49, 57, 48, 56, 50, 52, 49, 48, 52, 48, 52, 54, 56, 57, 52, 48, 52, 48, 56, 48], transactionId='null'}]
第一次retry(reconsumeTimes=1,DELAY=3)
MQ_CON_MSG %RETRY%gmcf-lsc-consumerGroup MSG MessageExt [queueId=0, storeSize=608, queueOffset=1187, sysFlag=0, bornTimestamp=1566785215923, bornHost=/10.42.0.77:54608, storeTimestamp=1566785226241, storeHost=/10.42.0.244:10911, msgId=0A2A00F400002A9F000000000B785900, commitLogOffset=192436480, bodyCRC=893293938, reconsumeTimes=1, preparedTransactionOffset=0, toString()=Message{topic='gmcf-lsc-zhongbang-repu-calc-from-topic', flag=0, properties={_catChildMessageId1=gmcf-lsc-job-0a2a004d-435218-15075, _catRootMessageId=lts-0a2a006b-434483-0, CONSUME_START_TIME=1566785226242, SERIALIZE_CLASS=java.lang.String, _catParentMessageId=lts-0a2a006b-435218-524, _catChildMessageId=gmcf-lsc-job-0a2a004d-435218-15076, MIN_OFFSET=0, REAL_TOPIC=%RETRY%gmcf-lsc-consumerGroup, ORIGIN_MESSAGE_ID=0A2A00F400002A9F000000000B77D049, RETRY_TOPIC=gmcf-lsc-zhongbang-repu-calc-from-topic, HASH_CODE=1506102461, MAX_OFFSET=1188, UNIQ_KEY=0A2A004D000938AF386882EAA5B30113, WAIT=false, DELAY=3, REAL_QID=0}, body=[71, 77, 66, 69, 67, 49, 57, 48, 56, 50, 52, 49, 48, 52, 49, 49, 49, 50, 48, 55, 48, 52, 48, 56, 49], transactionId='null'}]
第二次retry(reconsumeTimes=2, DELAY=4)
MQ_CON_MSG %RETRY%gmcf-lsc-consumerGroup MSG MessageExt [queueId=0, storeSize=608, queueOffset=1209, sysFlag=0, bornTimestamp=1566785215923, bornHost=/10.42.0.77:54608, storeTimestamp=1566785256680, storeHost=/10.42.0.244:10911, msgId=0A2A00F400002A9F000000000B791399, commitLogOffset=192484249, bodyCRC=893293938, reconsumeTimes=2, preparedTransactionOffset=0, toString()=Message{topic='gmcf-lsc-zhongbang-repu-calc-from-topic', flag=0, properties={_catChildMessageId1=gmcf-lsc-job-0a2a004d-435218-15075, _catRootMessageId=lts-0a2a006b-434483-0, CONSUME_START_TIME=1566785256728, SERIALIZE_CLASS=java.lang.String, _catParentMessageId=lts-0a2a006b-435218-524, _catChildMessageId=gmcf-lsc-job-0a2a004d-435218-15076, MIN_OFFSET=0, REAL_TOPIC=%RETRY%gmcf-lsc-consumerGroup, ORIGIN_MESSAGE_ID=0A2A00F400002A9F000000000B77D049, RETRY_TOPIC=gmcf-lsc-zhongbang-repu-calc-from-topic, HASH_CODE=1506102461, MAX_OFFSET=1210, UNIQ_KEY=0A2A004D000938AF386882EAA5B30113, WAIT=false, DELAY=4, REAL_QID=0}, body=[71, 77, 66, 69, 67, 49, 57, 48, 56, 50, 52, 49, 48, 52, 49, 49, 49, 50, 48, 55, 48, 52, 48, 56, 49], transactionId='null'}]
第三次retry(reconsumeTimes=3, DELAY=5)
MQ_CON_MSG %RETRY%gmcf-lsc-consumerGroup MSG MessageExt [queueId=0, storeSize=608, queueOffset=1228, sysFlag=0, bornTimestamp=1566785215923, bornHost=/10.42.0.77:54608, storeTimestamp=1566785316978, storeHost=/10.42.0.244:10911, msgId=0A2A00F400002A9F000000000B79F598, commitLogOffset=192542104, bodyCRC=893293938, reconsumeTimes=3, preparedTransactionOffset=0, toString()=Message{topic='gmcf-lsc-zhongbang-repu-calc-from-topic', flag=0, properties={_catChildMessageId1=gmcf-lsc-job-0a2a004d-435218-15075, _catRootMessageId=lts-0a2a006b-434483-0, CONSUME_START_TIME=1566785316980, SERIALIZE_CLASS=java.lang.String, _catParentMessageId=lts-0a2a006b-435218-524, _catChildMessageId=gmcf-lsc-job-0a2a004d-435218-15076, MIN_OFFSET=0, REAL_TOPIC=%RETRY%gmcf-lsc-consumerGroup, ORIGIN_MESSAGE_ID=0A2A00F400002A9F000000000B77D049, RETRY_TOPIC=gmcf-lsc-zhongbang-repu-calc-from-topic, HASH_CODE=1506102461, MAX_OFFSET=1231, UNIQ_KEY=0A2A004D000938AF386882EAA5B30113, WAIT=false, DELAY=5, REAL_QID=0}, body=[71, 77, 66, 69, 67, 49, 57, 48, 56, 50, 52, 49, 48, 52, 49, 49, 49, 50, 48, 55, 48, 52, 48, 56, 49], transactionId='null'}]
根据现象我们提出2个疑问?
1.为什么只会重试4次?而不是一直重试?
try { try { if (messageExtWrappers.size() > 0) { try { var22 = messageExtWrappers.iterator(); while(var22.hasNext()) { messageExt = (MessageExt)var22.next(); span.addEvent("MQConsumer.from", messageExt.getBornHostString()); } } catch (Throwable var16) { ; } this.consume(messageExtWrappers, context); } LOGGER.info("MQ_CON_SUCCESS {} BROKER {} QUEUE {}", new Object[]{topic, broker, queueId}); span.addEvent("MQConsumer", ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name()); span.success(); ConsumeConcurrentlyStatus var23 = ConsumeConcurrentlyStatus.CONSUME_SUCCESS; return var23; } catch (MessageListenerConcurrentlyException var17) { LOGGER.error("MQ_CON_EX {} BROKER {} QUEUE {}", new Object[]{topic, broker, queueId, var17}); throw var17; } catch (Throwable var18) { LOGGER.error("MQ_CON_EX {} BROKER {} QUEUE {}", new Object[]{topic, broker, queueId, var18}); LOGGER.info("MQ_CON_RECONSUME {} BROKER {} QUEUE {}", new Object[]{topic, broker, queueId}); span.failed(var18); span.addEvent("MQConsumer", ConsumeConcurrentlyStatus.RECONSUME_LATER.name()); if (CollectionUtils.isNotEmpty(msgs) && ((MessageExt)msgs.get(0)).getDelayTimeLevel() >= 2 + this.retryTimes) { context.setDelayLevelWhenNextConsume(-1); } }
从代码可以看出,如果消费失败了,我们自己的控制了重发次数,代码如下:
if (CollectionUtils.isNotEmpty(msgs) && ((MessageExt)msgs.get(0)).getDelayTimeLevel() >= 2 + this.retryTimes) { context.setDelayLevelWhenNextConsume(-1); }
当重试达到满足条件的时候,不再重试,直接放到dlq队列里面。如果不控制的,会一直重试到最高DelayLevel 18
2.DelayTimeLeve默认的值为什么不是从0开始,而是从3开始?
我们知道RocketMQ的默认的配置是messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,分别代表延迟level1-level18,为什么不是从1开始呢?
带着疑问我们继续深挖源码,我们从DefaultMQPullConsumerImpl类里面找到一段代码
public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, String consumerGroup) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { try { String brokerAddr = null != brokerName ? this.mQClientFactory.findBrokerAddressInPublish(brokerName) : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()); if (UtilAll.isBlank(consumerGroup)) { consumerGroup = this.defaultMQPullConsumer.getConsumerGroup(); } this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, consumerGroup, delayLevel, 3000L, this.defaultMQPullConsumer.getMaxReconsumeTimes()); } catch (Exception var8) { this.log.error("sendMessageBack Exception, " + this.defaultMQPullConsumer.getConsumerGroup(), var8); Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPullConsumer.getConsumerGroup()), msg.getBody()); String originMsgId = MessageAccessor.getOriginMessageId(msg); MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId); newMsg.setFlag(msg.getFlag()); MessageAccessor.setProperties(newMsg, msg.getProperties()); MessageAccessor.putProperty(newMsg, "RETRY_TOPIC", msg.getTopic()); MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1)); MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(this.defaultMQPullConsumer.getMaxReconsumeTimes())); newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes()); this.mQClientFactory.getDefaultMQProducer().send(newMsg); } }
从代码中看到DelayTimeLevel =3+reconsumeTime
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
所以默认重试时,实际是从3开始的,从时间的角度,也验证为什么会重试4次,而且每次间隔的时间是10s/30s/1m .
看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注行业资讯频道,感谢您对的支持。
代码
消费
时间
现象
疑问
帮助
控制
机制
示例
分析
最高
清楚
代表
内容
实例
实际
对此
文章
新手
时候
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
手游服务器代理权多少
推动创建人员数据库
网络技术员简称
如何提升教师网络安全
linux 跳转服务器
打开服务器的任务管理器
服务器密码过期修改快捷键
计算机网络安全第六版
国际服务器如何下载
终端和服务器通讯用什么
淮安安卓软件开发
北京启名星辰网络安全
软件开发协作规范
身份鉴别属于网络安全技术吗
mud游戏西游记服务器下载
太原市服务器包装有哪些
数据库怎么设置身份证格式
ftp服务器配置过程
数据库app组成
JAVA的web数据库配置
信息网络技术的快速演进和发展
太子庙服务器有充电桩吗
矿井网络安全测评二级
嘉定区正规网络技术服务售后服务
软件开发去天津市哪个区
云服务器需要什么系统
区块链数据库交互线程
什么专业学网络技术
多功能软件开发优势
天刀大区服务器名字