RocketMQ消费失败重试机制的示例分析
发表于:2025-01-31 作者:千家信息网编辑
千家信息网最后更新 2025年01月31日,RocketMQ消费失败重试机制的示例分析,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。现象:mq消费1次,重试3次,然
千家信息网最后更新 2025年01月31日RocketMQ消费失败重试机制的示例分析
RocketMQ消费失败重试机制的示例分析,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。
现象:mq消费1次,重试3次,然后停止,如下实例展示
首次(reconsumeTimes=0)
MQ_CON_MSG gmcf-lsc-zhongbang-repu-calc-from-topic MSG MessageExt [queueId=1, storeSize=453, queueOffset=25, sysFlag=0, bornTimestamp=1566785215908, bornHost=/10.42.0.77:54608, storeTimestamp=1566785215908, storeHost=/10.42.0.244:10911, msgId=0A2A00F400002A9F000000000B77CE84, commitLogOffset=192401028, bodyCRC=53737244, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='gmcf-lsc-zhongbang-repu-calc-from-topic', flag=0, properties={MIN_OFFSET=0, _catChildMessageId1=gmcf-lsc-job-0a2a004d-435218-15071, HASH_CODE=690132963, MAX_OFFSET=26, _catRootMessageId=lts-0a2a006b-434483-0, CONSUME_START_TIME=1566785215911, SERIALIZE_CLASS=java.lang.String, _catParentMessageId=lts-0a2a006b-435218-524, _catChildMessageId=gmcf-lsc-job-0a2a004d-435218-15072, UNIQ_KEY=0A2A004D000938AF386882EAA5A40112, WAIT=true}, body=[71, 77, 66, 69, 67, 49, 57, 48, 56, 50, 52, 49, 48, 52, 48, 52, 54, 56, 57, 52, 48, 52, 48, 56, 48], transactionId='null'}]
第一次retry(reconsumeTimes=1,DELAY=3)
MQ_CON_MSG %RETRY%gmcf-lsc-consumerGroup MSG MessageExt [queueId=0, storeSize=608, queueOffset=1187, sysFlag=0, bornTimestamp=1566785215923, bornHost=/10.42.0.77:54608, storeTimestamp=1566785226241, storeHost=/10.42.0.244:10911, msgId=0A2A00F400002A9F000000000B785900, commitLogOffset=192436480, bodyCRC=893293938, reconsumeTimes=1, preparedTransactionOffset=0, toString()=Message{topic='gmcf-lsc-zhongbang-repu-calc-from-topic', flag=0, properties={_catChildMessageId1=gmcf-lsc-job-0a2a004d-435218-15075, _catRootMessageId=lts-0a2a006b-434483-0, CONSUME_START_TIME=1566785226242, SERIALIZE_CLASS=java.lang.String, _catParentMessageId=lts-0a2a006b-435218-524, _catChildMessageId=gmcf-lsc-job-0a2a004d-435218-15076, MIN_OFFSET=0, REAL_TOPIC=%RETRY%gmcf-lsc-consumerGroup, ORIGIN_MESSAGE_ID=0A2A00F400002A9F000000000B77D049, RETRY_TOPIC=gmcf-lsc-zhongbang-repu-calc-from-topic, HASH_CODE=1506102461, MAX_OFFSET=1188, UNIQ_KEY=0A2A004D000938AF386882EAA5B30113, WAIT=false, DELAY=3, REAL_QID=0}, body=[71, 77, 66, 69, 67, 49, 57, 48, 56, 50, 52, 49, 48, 52, 49, 49, 49, 50, 48, 55, 48, 52, 48, 56, 49], transactionId='null'}]
第二次retry(reconsumeTimes=2, DELAY=4)
MQ_CON_MSG %RETRY%gmcf-lsc-consumerGroup MSG MessageExt [queueId=0, storeSize=608, queueOffset=1209, sysFlag=0, bornTimestamp=1566785215923, bornHost=/10.42.0.77:54608, storeTimestamp=1566785256680, storeHost=/10.42.0.244:10911, msgId=0A2A00F400002A9F000000000B791399, commitLogOffset=192484249, bodyCRC=893293938, reconsumeTimes=2, preparedTransactionOffset=0, toString()=Message{topic='gmcf-lsc-zhongbang-repu-calc-from-topic', flag=0, properties={_catChildMessageId1=gmcf-lsc-job-0a2a004d-435218-15075, _catRootMessageId=lts-0a2a006b-434483-0, CONSUME_START_TIME=1566785256728, SERIALIZE_CLASS=java.lang.String, _catParentMessageId=lts-0a2a006b-435218-524, _catChildMessageId=gmcf-lsc-job-0a2a004d-435218-15076, MIN_OFFSET=0, REAL_TOPIC=%RETRY%gmcf-lsc-consumerGroup, ORIGIN_MESSAGE_ID=0A2A00F400002A9F000000000B77D049, RETRY_TOPIC=gmcf-lsc-zhongbang-repu-calc-from-topic, HASH_CODE=1506102461, MAX_OFFSET=1210, UNIQ_KEY=0A2A004D000938AF386882EAA5B30113, WAIT=false, DELAY=4, REAL_QID=0}, body=[71, 77, 66, 69, 67, 49, 57, 48, 56, 50, 52, 49, 48, 52, 49, 49, 49, 50, 48, 55, 48, 52, 48, 56, 49], transactionId='null'}]
第三次retry(reconsumeTimes=3, DELAY=5)
MQ_CON_MSG %RETRY%gmcf-lsc-consumerGroup MSG MessageExt [queueId=0, storeSize=608, queueOffset=1228, sysFlag=0, bornTimestamp=1566785215923, bornHost=/10.42.0.77:54608, storeTimestamp=1566785316978, storeHost=/10.42.0.244:10911, msgId=0A2A00F400002A9F000000000B79F598, commitLogOffset=192542104, bodyCRC=893293938, reconsumeTimes=3, preparedTransactionOffset=0, toString()=Message{topic='gmcf-lsc-zhongbang-repu-calc-from-topic', flag=0, properties={_catChildMessageId1=gmcf-lsc-job-0a2a004d-435218-15075, _catRootMessageId=lts-0a2a006b-434483-0, CONSUME_START_TIME=1566785316980, SERIALIZE_CLASS=java.lang.String, _catParentMessageId=lts-0a2a006b-435218-524, _catChildMessageId=gmcf-lsc-job-0a2a004d-435218-15076, MIN_OFFSET=0, REAL_TOPIC=%RETRY%gmcf-lsc-consumerGroup, ORIGIN_MESSAGE_ID=0A2A00F400002A9F000000000B77D049, RETRY_TOPIC=gmcf-lsc-zhongbang-repu-calc-from-topic, HASH_CODE=1506102461, MAX_OFFSET=1231, UNIQ_KEY=0A2A004D000938AF386882EAA5B30113, WAIT=false, DELAY=5, REAL_QID=0}, body=[71, 77, 66, 69, 67, 49, 57, 48, 56, 50, 52, 49, 48, 52, 49, 49, 49, 50, 48, 55, 48, 52, 48, 56, 49], transactionId='null'}]
根据现象我们提出2个疑问?
1.为什么只会重试4次?而不是一直重试?
try { try { if (messageExtWrappers.size() > 0) { try { var22 = messageExtWrappers.iterator(); while(var22.hasNext()) { messageExt = (MessageExt)var22.next(); span.addEvent("MQConsumer.from", messageExt.getBornHostString()); } } catch (Throwable var16) { ; } this.consume(messageExtWrappers, context); } LOGGER.info("MQ_CON_SUCCESS {} BROKER {} QUEUE {}", new Object[]{topic, broker, queueId}); span.addEvent("MQConsumer", ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name()); span.success(); ConsumeConcurrentlyStatus var23 = ConsumeConcurrentlyStatus.CONSUME_SUCCESS; return var23; } catch (MessageListenerConcurrentlyException var17) { LOGGER.error("MQ_CON_EX {} BROKER {} QUEUE {}", new Object[]{topic, broker, queueId, var17}); throw var17; } catch (Throwable var18) { LOGGER.error("MQ_CON_EX {} BROKER {} QUEUE {}", new Object[]{topic, broker, queueId, var18}); LOGGER.info("MQ_CON_RECONSUME {} BROKER {} QUEUE {}", new Object[]{topic, broker, queueId}); span.failed(var18); span.addEvent("MQConsumer", ConsumeConcurrentlyStatus.RECONSUME_LATER.name()); if (CollectionUtils.isNotEmpty(msgs) && ((MessageExt)msgs.get(0)).getDelayTimeLevel() >= 2 + this.retryTimes) { context.setDelayLevelWhenNextConsume(-1); } }
从代码可以看出,如果消费失败了,我们自己的控制了重发次数,代码如下:
if (CollectionUtils.isNotEmpty(msgs) && ((MessageExt)msgs.get(0)).getDelayTimeLevel() >= 2 + this.retryTimes) { context.setDelayLevelWhenNextConsume(-1); }
当重试达到满足条件的时候,不再重试,直接放到dlq队列里面。如果不控制的,会一直重试到最高DelayLevel 18
2.DelayTimeLeve默认的值为什么不是从0开始,而是从3开始?
我们知道RocketMQ的默认的配置是messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,分别代表延迟level1-level18,为什么不是从1开始呢?
带着疑问我们继续深挖源码,我们从DefaultMQPullConsumerImpl类里面找到一段代码
public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, String consumerGroup) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { try { String brokerAddr = null != brokerName ? this.mQClientFactory.findBrokerAddressInPublish(brokerName) : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost()); if (UtilAll.isBlank(consumerGroup)) { consumerGroup = this.defaultMQPullConsumer.getConsumerGroup(); } this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, consumerGroup, delayLevel, 3000L, this.defaultMQPullConsumer.getMaxReconsumeTimes()); } catch (Exception var8) { this.log.error("sendMessageBack Exception, " + this.defaultMQPullConsumer.getConsumerGroup(), var8); Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPullConsumer.getConsumerGroup()), msg.getBody()); String originMsgId = MessageAccessor.getOriginMessageId(msg); MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId); newMsg.setFlag(msg.getFlag()); MessageAccessor.setProperties(newMsg, msg.getProperties()); MessageAccessor.putProperty(newMsg, "RETRY_TOPIC", msg.getTopic()); MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1)); MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(this.defaultMQPullConsumer.getMaxReconsumeTimes())); newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes()); this.mQClientFactory.getDefaultMQProducer().send(newMsg); } }
从代码中看到DelayTimeLevel =3+reconsumeTime
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
所以默认重试时,实际是从3开始的,从时间的角度,也验证为什么会重试4次,而且每次间隔的时间是10s/30s/1m .
看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注行业资讯频道,感谢您对的支持。
代码
消费
时间
现象
疑问
帮助
控制
机制
示例
分析
最高
清楚
代表
内容
实例
实际
对此
文章
新手
时候
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
网络安全eal3认证百科
锐之旗网络技术有限公司
数据库 int 11
网络安全防护技术不够
jk根服务器
用数据库查找一个方面的资料
python 化学数据库
合肥万户网络安全插画
软件开发者怎样提高编程能力
服务器伪静态配置
软件开发费忘了计入无形资产
潼南区工商软件开发服务公司
向日葵公司服务器
黑盾系列网络安全产品
软件开发具体开发啥
服务器群安全加固
剑灵vm角色数据库
苏州戴尔服务器配件
安徽计算机网络技术分数线
app实现离线读取数据库
数据库管理做什么
TBC选错服务器了怎么办
数据库 储存性别
李海峰财务软件开发
服务器解封
苏州戴尔网络安全服务器
逻辑管理网络安全
魔兽世界电信服务器
江岸哪里有软件开发
app实现离线读取数据库