千家信息网

RocketMQTransactionAnnotationProcessor的原理和用法

发表于:2025-01-31 作者:千家信息网编辑
千家信息网最后更新 2025年01月31日,本篇内容介绍了"RocketMQTransactionAnnotationProcessor的原理和用法"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如
千家信息网最后更新 2025年01月31日RocketMQTransactionAnnotationProcessor的原理和用法

本篇内容介绍了"RocketMQTransactionAnnotationProcessor的原理和用法"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

本文主要研究一下RocketMQTransactionAnnotationProcessor

RocketMQTransactionAnnotationProcessor

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/config/RocketMQTransactionAnnotationProcessor.java

public class RocketMQTransactionAnnotationProcessor    implements BeanPostProcessor, Ordered, ApplicationContextAware {    private final static Logger log = LoggerFactory.getLogger(RocketMQTransactionAnnotationProcessor.class);    private ApplicationContext applicationContext;    private final Set> nonProcessedClasses =        Collections.newSetFromMap(new ConcurrentHashMap, Boolean>(64));    private TransactionHandlerRegistry transactionHandlerRegistry;    public RocketMQTransactionAnnotationProcessor(TransactionHandlerRegistry transactionHandlerRegistry) {        this.transactionHandlerRegistry = transactionHandlerRegistry;    }    @Override    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {        this.applicationContext = applicationContext;    }    @Override    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {        return bean;    }    @Override    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {        if (!this.nonProcessedClasses.contains(bean.getClass())) {            Class targetClass = AopUtils.getTargetClass(bean);            RocketMQTransactionListener listener = AnnotationUtils.findAnnotation(targetClass, RocketMQTransactionListener.class);            this.nonProcessedClasses.add(bean.getClass());            if (listener == null) { // for quick search                log.trace("No @RocketMQTransactionListener annotations found on bean type: {}", bean.getClass());            } else {                try {                    processTransactionListenerAnnotation(listener, bean);                } catch (MQClientException e) {                    log.error("Failed to process annotation " + listener, e);                    throw new BeanCreationException("Failed to process annotation " + listener, e);                }            }        }        return bean;    }    private void processTransactionListenerAnnotation(RocketMQTransactionListener listener, Object bean)        throws MQClientException {        if (transactionHandlerRegistry == null) {            throw new MQClientException("Bad usage of @RocketMQTransactionListener, " +                "the class must work with RocketMQTemplate", null);        }        if (!RocketMQLocalTransactionListener.class.isAssignableFrom(bean.getClass())) {            throw new MQClientException("Bad usage of @RocketMQTransactionListener, " +                "the class must implement interface RocketMQLocalTransactionListener",                null);        }        TransactionHandler transactionHandler = new TransactionHandler();        transactionHandler.setBeanFactory(this.applicationContext.getAutowireCapableBeanFactory());        transactionHandler.setName(listener.txProducerGroup());        transactionHandler.setBeanName(bean.getClass().getName());        transactionHandler.setListener((RocketMQLocalTransactionListener) bean);        transactionHandler.setCheckExecutor(listener.corePoolSize(), listener.maximumPoolSize(),                listener.keepAliveTime(), listener.blockingQueueSize());        RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(),            listener.accessKey(), listener.secretKey());        if (Objects.nonNull(rpcHook)) {            transactionHandler.setRpcHook(rpcHook);        } else {            log.debug("Access-key or secret-key not configure in " + listener + ".");        }        transactionHandlerRegistry.registerTransactionHandler(transactionHandler);    }    @Override    public int getOrder() {        return LOWEST_PRECEDENCE;    }}
  • RocketMQTransactionAnnotationProcessor实现了BeanPostProcessor, Ordered, ApplicationContextAware接口

  • postProcessAfterInitialization方法会查找标记有RocketMQTransactionListener注解的bean,然后执行processTransactionListenerAnnotation方法

  • processTransactionListenerAnnotation方法会创建transactionHandler,然后执行transactionHandlerRegistry.registerTransactionHandler进行注册

TransactionHandler

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/config/TransactionHandler.java

class TransactionHandler {    private String name;    private String beanName;    private RocketMQLocalTransactionListener bean;    private BeanFactory beanFactory;    private ThreadPoolExecutor checkExecutor;    private RPCHook rpcHook;    public String getBeanName() {        return beanName;    }    public void setBeanName(String beanName) {        this.beanName = beanName;    }    public String getName() {        return name;    }    public void setName(String name) {        this.name = name;    }    public RPCHook getRpcHook() {        return rpcHook;    }    public void setRpcHook(RPCHook rpcHook) {        this.rpcHook = rpcHook;    }    public BeanFactory getBeanFactory() {        return beanFactory;    }    public void setBeanFactory(BeanFactory beanFactory) {        this.beanFactory = beanFactory;    }    public void setListener(RocketMQLocalTransactionListener listener) {        this.bean = listener;    }    public RocketMQLocalTransactionListener getListener() {        return this.bean;    }    public void setCheckExecutor(int corePoolSize, int maxPoolSize, long keepAliveTime, int blockingQueueSize) {        this.checkExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize,            keepAliveTime, TimeUnit.MILLISECONDS,            new LinkedBlockingDeque<>(blockingQueueSize));    }    public ThreadPoolExecutor getCheckExecutor() {        return checkExecutor;    }}
  • TransactionHandler包含了name、beanName、bean、beanFactory、checkExecutor、rpcHook属性

TransactionHandlerRegistry

rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/config/TransactionHandlerRegistry.java

public class TransactionHandlerRegistry implements DisposableBean {    private RocketMQTemplate rocketMQTemplate;    private final Set listenerContainers = new ConcurrentSet<>();    public TransactionHandlerRegistry(RocketMQTemplate template) {        this.rocketMQTemplate = template;    }    @Override    public void destroy() throws Exception {        listenerContainers.clear();    }    public void registerTransactionHandler(TransactionHandler handler) throws MQClientException {        if (listenerContainers.contains(handler.getName())) {            throw new MQClientException(-1,                String                    .format("The transaction name [%s] has been defined in TransactionListener [%s]", handler.getName(),                        handler.getBeanName()));        }        listenerContainers.add(handler.getName());        rocketMQTemplate.createAndStartTransactionMQProducer(handler.getName(), handler.getListener(), handler.getCheckExecutor(), handler.getRpcHook());    }}
  • TransactionHandlerRegistry实现了DisposableBean接口,其clear方法直接清空listenerContainers;registerTransactionHandler方法会往listenerContainers添加该handler的name,然后执行rocketMQTemplate.createAndStartTransactionMQProducer来创建并启动TransactionMQProducer

小结

  • RocketMQTransactionAnnotationProcessor实现了BeanPostProcessor, Ordered, ApplicationContextAware接口

  • postProcessAfterInitialization方法会查找标记有RocketMQTransactionListener注解的bean,然后执行processTransactionListenerAnnotation方法

  • processTransactionListenerAnnotation方法会创建transactionHandler,然后执行transactionHandlerRegistry.registerTransactionHandler进行注册

"RocketMQTransactionAnnotationProcessor的原理和用法"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

方法 接口 原理 内容 更多 标记 注解 知识 实用 学有所成 接下来 困境 实际 小结 属性 情况 文章 案例 编带 网站 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 数据库 序列创建 数据库最后一位 大学生校园网络安全教育平台 王者荣耀能卡服务器么 手机为什么无法连接服务器 软件开发java知识点 智能家居软件开发哪家好 was怎么修改数据库密码 计算机网络技术职场环境认知 商丘市保密局网络安全项目 可以保护网络安全的行为是 局域网服务器怎么连接电脑 网络安全参加培训怎么写简历 数据库连接运算能连接几个 六维空间数据库维护 数据库连接查询过程异常 最佳网络安全手抄报 漂亮 r710服务器装系统 如何保护服务器托管主机安全 网络安全赛四川 科技园区互联网 服务 德州公安网络安全保卫支队高峰 软件测试和网络安全挂钩吗 无锡雷华网络技术产品 智嵌串口服务器plc 服务器内存不够项目会突然挂掉 扫脸支付软件开发 薪酬设计软件开发 太原网警检查网络安全 我的世界服务器卡顿怎么解决方案
0