千家信息网

rocketmq中ExtProducerResetConfiguration的原理及应用

发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,这篇文章主要讲解了"rocketmq中ExtProducerResetConfiguration的原理及应用",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学
千家信息网最后更新 2025年01月24日rocketmq中ExtProducerResetConfiguration的原理及应用

这篇文章主要讲解了"rocketmq中ExtProducerResetConfiguration的原理及应用",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"rocketmq中ExtProducerResetConfiguration的原理及应用"吧!

本文主要研究一下rocketmq的ExtProducerResetConfiguration

ExtProducerResetConfiguration

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java

@Configurationpublic class ExtProducerResetConfiguration implements ApplicationContextAware, SmartInitializingSingleton {    private final static Logger log = LoggerFactory.getLogger(ExtProducerResetConfiguration.class);    private ConfigurableApplicationContext applicationContext;    private StandardEnvironment environment;    private RocketMQProperties rocketMQProperties;    private ObjectMapper objectMapper;    public ExtProducerResetConfiguration(ObjectMapper rocketMQMessageObjectMapper,                                             StandardEnvironment environment,                                             RocketMQProperties rocketMQProperties) {        this.objectMapper = rocketMQMessageObjectMapper;        this.environment = environment;        this.rocketMQProperties = rocketMQProperties;    }    @Override    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {        this.applicationContext = (ConfigurableApplicationContext) applicationContext;    }    @Override    public void afterSingletonsInstantiated() {        Map beans = this.applicationContext.getBeansWithAnnotation(ExtRocketMQTemplateConfiguration.class);        if (Objects.nonNull(beans)) {            beans.forEach(this::registerTemplate);        }    }    private void registerTemplate(String beanName, Object bean) {        Class clazz = AopProxyUtils.ultimateTargetClass(bean);        if (!RocketMQTemplate.class.isAssignableFrom(bean.getClass())) {            throw new IllegalStateException(clazz + " is not instance of " + RocketMQTemplate.class.getName());        }        ExtRocketMQTemplateConfiguration annotation = clazz.getAnnotation(ExtRocketMQTemplateConfiguration.class);        GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;        validate(annotation, genericApplicationContext);        DefaultMQProducer mqProducer = createProducer(annotation);        // Set instanceName same as the beanName        mqProducer.setInstanceName(beanName);        try {            mqProducer.start();        } catch (MQClientException e) {            throw new BeanDefinitionValidationException(String.format("Failed to startup MQProducer for RocketMQTemplate {}",                    beanName), e);        }        RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) bean;        rocketMQTemplate.setProducer(mqProducer);        rocketMQTemplate.setObjectMapper(objectMapper);        log.info("Set real producer to :{} {}", beanName, annotation.value());    }    private DefaultMQProducer createProducer(ExtRocketMQTemplateConfiguration annotation) {        DefaultMQProducer producer = null;        RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();        if (producerConfig == null) {            producerConfig = new RocketMQProperties.Producer();        }        String nameServer = environment.resolvePlaceholders(annotation.nameServer());        String groupName = environment.resolvePlaceholders(annotation.group());        groupName = StringUtils.isEmpty(groupName) ? producerConfig.getGroup() : groupName;        String ak = environment.resolvePlaceholders(annotation.accessKey());        ak = StringUtils.isEmpty(ak) ? producerConfig.getAccessKey() : annotation.accessKey();        String sk = environment.resolvePlaceholders(annotation.secretKey());        sk = StringUtils.isEmpty(sk) ? producerConfig.getSecretKey() : annotation.secretKey();        String customizedTraceTopic = environment.resolvePlaceholders(annotation.customizedTraceTopic());        customizedTraceTopic = StringUtils.isEmpty(customizedTraceTopic) ? producerConfig.getCustomizedTraceTopic() : customizedTraceTopic;        if (!StringUtils.isEmpty(ak) && !StringUtils.isEmpty(sk)) {            producer = new DefaultMQProducer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)),                    annotation.enableMsgTrace(), customizedTraceTopic);            producer.setVipChannelEnabled(false);        } else {            producer = new DefaultMQProducer(groupName, annotation.enableMsgTrace(), customizedTraceTopic);        }        producer.setNamesrvAddr(nameServer);        producer.setSendMsgTimeout(annotation.sendMessageTimeout() == -1 ? producerConfig.getSendMessageTimeout() : annotation.sendMessageTimeout());        producer.setRetryTimesWhenSendFailed(annotation.retryTimesWhenSendAsyncFailed() == -1 ? producerConfig.getRetryTimesWhenSendFailed() : annotation.retryTimesWhenSendAsyncFailed());        producer.setRetryTimesWhenSendAsyncFailed(annotation.retryTimesWhenSendAsyncFailed() == -1 ? producerConfig.getRetryTimesWhenSendAsyncFailed() : annotation.retryTimesWhenSendAsyncFailed());        producer.setMaxMessageSize(annotation.maxMessageSize() == -1 ? producerConfig.getMaxMessageSize() : annotation.maxMessageSize());        producer.setCompressMsgBodyOverHowmuch(annotation.compressMessageBodyThreshold() == -1 ? producerConfig.getCompressMessageBodyThreshold() : annotation.compressMessageBodyThreshold());        producer.setRetryAnotherBrokerWhenNotStoreOK(annotation.retryNextServer());        return producer;    }    private void validate(ExtRocketMQTemplateConfiguration annotation, GenericApplicationContext genericApplicationContext) {        if (genericApplicationContext.isBeanNameInUse(annotation.value())) {            throw new BeanDefinitionValidationException(String.format("Bean {} has been used in Spring Application Context, " +                            "please check the @ExtRocketMQTemplateConfiguration",                    annotation.value()));        }        if (rocketMQProperties.getNameServer() == null ||                rocketMQProperties.getNameServer().equals(environment.resolvePlaceholders(annotation.nameServer()))) {            throw new BeanDefinitionValidationException(                    "Bad annotation definition in @ExtRocketMQTemplateConfiguration, nameServer property is same with " +                            "global property, please use the default RocketMQTemplate!");        }    }}
  • ExtProducerResetConfiguration实现了ApplicationContextAware, SmartInitializingSingleton接口;其afterSingletonsInstantiated方法会取出标记有ExtRocketMQTemplateConfiguration的beans,然后挨个执行registerTemplate方法

  • registerTemplate首先判断RocketMQTemplate.class是否是该bean的超类,是的话则读取ExtRocketMQTemplateConfiguration注解,校验注解指定的NameServer地址是否与全局的相同,相同则抛出BeanDefinitionValidationException异常

  • 最后通过createProducer方法创建DefaultMQProducer,然后执行其start方法,然后复制给该bean,并设置objectMapper

ExtRocketMQTemplateConfiguration

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java

@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented@Componentpublic @interface ExtRocketMQTemplateConfiguration {    /**     * The component name of the Producer configuration.     */    String value() default "";    /**     * The property of "name-server".     */    String nameServer();    /**     * Name of producer.     */    String group() default "${rocketmq.producer.group:}";    /**     * Millis of send message timeout.     */    int sendMessageTimeout() default -1;    /**     * Compress message body threshold, namely, message body larger than 4k will be compressed on default.     */    int compressMessageBodyThreshold() default -1;    /**     * Maximum number of retry to perform internally before claiming sending failure in synchronous mode.     * This may potentially cause message duplication which is up to application developers to resolve.     */    int retryTimesWhenSendFailed() default -1;    /**     * 

Maximum number of retry to perform internally before claiming sending failure in asynchronous mode.

* This may potentially cause message duplication which is up to application developers to resolve. */ int retryTimesWhenSendAsyncFailed() default -1; /** * Indicate whether to retry another broker on sending failure internally. */ boolean retryNextServer() default false; /** * Maximum allowed message size in bytes. */ int maxMessageSize() default -1; /** * The property of "access-key". */ String accessKey() default "${rocketmq.producer.accessKey:}"; /** * The property of "secret-key". */ String secretKey() default "${rocketmq.producer.secretKey:}"; /** * Switch flag instance for message trace. */ boolean enableMsgTrace() default true; /** * The name value of message trace topic.If you don't config,you can use the default trace topic name. */ String customizedTraceTopic() default "${rocketmq.producer.customized-trace-topic:}";}
  • ExtRocketMQTemplateConfiguration注解定义了value、nameServer、group、sendMessageTimeout、compressMessageBodyThreshold、retryTimesWhenSendFailed、retryTimesWhenSendAsyncFailed、retryNextServer、maxMessageSize、accessKey、secretKey、enableMsgTrace、customizedTraceTopic属性

ExtRocketMQTemplate

import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;import org.apache.rocketmq.spring.core.RocketMQTemplate;@ExtRocketMQTemplateConfiguration(nameServer = "${demo.rocketmq.extNameServer}")public class ExtRocketMQTemplate extends RocketMQTemplate {}
  • 这里定义了ExtRocketMQTemplate类,它继承了RocketMQTemplate,同时使用了ExtRocketMQTemplateConfiguration注解

小结

  • ExtProducerResetConfiguration实现了ApplicationContextAware, SmartInitializingSingleton接口;其afterSingletonsInstantiated方法会取出标记有ExtRocketMQTemplateConfiguration的beans,然后挨个执行registerTemplate方法

  • registerTemplate首先判断RocketMQTemplate.class是否是该bean的超类,是的话则读取ExtRocketMQTemplateConfiguration注解,校验注解指定的NameServer地址是否与全局的相同,相同则抛出BeanDefinitionValidationException异常

  • 最后通过createProducer方法创建DefaultMQProducer,然后执行其start方法,然后复制给该bean,并设置objectMapper

感谢各位的阅读,以上就是"rocketmq中ExtProducerResetConfiguration的原理及应用"的内容了,经过本文的学习后,相信大家对rocketmq中ExtProducerResetConfiguration的原理及应用这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

0