千家信息网

rocketmq中ListenerContainerConfiguration的作用是什么

发表于:2025-02-22 作者:千家信息网编辑
千家信息网最后更新 2025年02月22日,rocketmq中ListenerContainerConfiguration的作用是什么,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
千家信息网最后更新 2025年02月22日rocketmq中ListenerContainerConfiguration的作用是什么

rocketmq中ListenerContainerConfiguration的作用是什么,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

ListenerContainerConfiguration

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java

@Configurationpublic class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton {    private final static Logger log = LoggerFactory.getLogger(ListenerContainerConfiguration.class);    private ConfigurableApplicationContext applicationContext;    private AtomicLong counter = new AtomicLong(0);    private StandardEnvironment environment;    private RocketMQProperties rocketMQProperties;    private ObjectMapper objectMapper;    public ListenerContainerConfiguration(ObjectMapper rocketMQMessageObjectMapper,        StandardEnvironment environment,        RocketMQProperties rocketMQProperties) {        this.objectMapper = rocketMQMessageObjectMapper;        this.environment = environment;        this.rocketMQProperties = rocketMQProperties;    }    @Override    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {        this.applicationContext = (ConfigurableApplicationContext) applicationContext;    }    @Override    public void afterSingletonsInstantiated() {        Map beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);        if (Objects.nonNull(beans)) {            beans.forEach(this::registerContainer);        }    }    private void registerContainer(String beanName, Object bean) {        Class clazz = AopProxyUtils.ultimateTargetClass(bean);        if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {            throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName());        }        RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);        validate(annotation);        String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),            counter.incrementAndGet());        GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;        genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,            () -> createRocketMQListenerContainer(containerBeanName, bean, annotation));        DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,            DefaultRocketMQListenerContainer.class);        if (!container.isRunning()) {            try {                container.start();            } catch (Exception e) {                log.error("Started container failed. {}", container, e);                throw new RuntimeException(e);            }        }        log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);    }    private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean, RocketMQMessageListener annotation) {        DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();        String nameServer = environment.resolvePlaceholders(annotation.nameServer());        nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer;        String accessChannel = environment.resolvePlaceholders(annotation.accessChannel());        container.setNameServer(nameServer);        if (!StringUtils.isEmpty(accessChannel)) {            container.setAccessChannel(AccessChannel.valueOf(accessChannel));        }        container.setTopic(environment.resolvePlaceholders(annotation.topic()));        container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));        container.setRocketMQMessageListener(annotation);        container.setRocketMQListener((RocketMQListener) bean);        container.setObjectMapper(objectMapper);        container.setName(name);  // REVIEW ME, use the same clientId or multiple?        return container;    }    private void validate(RocketMQMessageListener annotation) {        if (annotation.consumeMode() == ConsumeMode.ORDERLY &&            annotation.messageModel() == MessageModel.BROADCASTING) {            throw new BeanDefinitionValidationException(                "Bad annotation definition in @RocketMQMessageListener, messageModel BROADCASTING does not support ORDERLY message!");        }    }}
  • ListenerContainerConfiguration实现了ApplicationContextAware、SmartInitializingSingleton接口

  • 其setApplicationContext方法保存了applicationContext;其afterSingletonsInstantiated方法会获取标注了RocketMQMessageListener注解的bean,然后挨个执行registerContainer

  • registerContainer方法首先判断该bean是否是RocketMQListener的实现类,不是则抛出IllegalStateException;接着获取RocketMQMessageListener注解的信息,判断是否设置了不支持的属性;之后通过createRocketMQListenerContainer创建DefaultRocketMQListenerContainer并注册到applicationContext,然后对于没有running的container执行start方法

RocketMQMessageListener

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java

@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documentedpublic @interface RocketMQMessageListener {    String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";    String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";    String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";    String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";    String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";    /**     * Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve     * load balance. It's required and needs to be globally unique.     *     *     * See here for further discussion.     */    String consumerGroup();    /**     * Topic name.     */    String topic();    /**     * Control how to selector message.     *     * @see SelectorType     */    SelectorType selectorType() default SelectorType.TAG;    /**     * Control which message can be select. Grammar please see {@link SelectorType#TAG} and {@link SelectorType#SQL92}     */    String selector_Expression() default "*";    /**     * Control consume mode, you can choice receive message concurrently or orderly.     */    ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;    /**     * Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice.     */    MessageModel messageModel() default MessageModel.CLUSTERING;    /**     * Max consumer thread number.     */    int consumeThreadMax() default 64;    /**     * Max consumer timeout, default 30s.     */    long consumeTimeout() default 30000L;    /**     * The property of "access-key".     */    String accessKey() default ACCESS_KEY_PLACEHOLDER;    /**     * The property of "secret-key".     */    String secretKey() default SECRET_KEY_PLACEHOLDER;    /**     * Switch flag instance for message trace.     */    boolean enableMsgTrace() default true;    /**     * The name value of message trace topic.If you don't config,you can use the default trace topic name.     */    String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;    /**     * The property of "name-server".     */    String nameServer() default NAME_SERVER_PLACEHOLDER;    /**     * The property of "access-channel".     */    String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;}
  • RocketMQMessageListener注解定义了consumerGroup、topic、selectorType、selectorExpression、consumeMode、messageModel、consumeThreadMax、consumeTimeout、accessKey、secretKey、enableMsgTrace、customizedTraceTopic、nameServer、accessChannel属性

小结

  • ListenerContainerConfiguration实现了ApplicationContextAware、SmartInitializingSingleton接口

  • 其setApplicationContext方法保存了applicationContext;其afterSingletonsInstantiated方法会获取标注了RocketMQMessageListener注解的bean,然后挨个执行registerContainer

  • registerContainer方法首先判断该bean是否是RocketMQListener的实现类,不是则抛出IllegalStateException;接着获取RocketMQMessageListener注解的信息,判断是否设置了不支持的属性;之后通过createRocketMQListenerContainer创建DefaultRocketMQListenerContainer并注册到applicationContext,然后对于没有running的container执行start方法

看完上述内容,你们掌握rocketmq中ListenerContainerConfiguration的作用是什么的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!

0