千家信息网

Java Spring的@Async原理是什么

发表于:2025-01-17 作者:千家信息网编辑
千家信息网最后更新 2025年01月17日,本篇内容介绍了"Java Spring的@Async原理是什么"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学
千家信息网最后更新 2025年01月17日Java Spring的@Async原理是什么

本篇内容介绍了"Java Spring的@Async原理是什么"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

目录
  • 前言

  • 一、如何使用@Async

  • 二、源码解读

一、如何使用@Async

使用@Async注解主要分两步:

1.在配置类上添加@EnableAsync注解

@ComponentScan(value = "com.wang")@Configuration@EnableAsyncpublic class AppConfig {}

2.在想要异步执行的方法上面加上@Async

@Servicepublic class CycleService2 {        @Autowired        private CycleService1 cycleService1;        @Async        public void alsoDo() {                System.out.println("create cycleService2");        }}

二、源码解读

1.@EnableAsync的作用

@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Documented@Import(AsyncConfigurationSelector.class)public @interface EnableAsync {        /**         * Indicate the 'async' annotation type to be detected at either class         * or method level.         * 

By default, both Spring's @{@link Async} annotation and the EJB 3.1 * {@code @javax.ejb.Asynchronous} annotation will be detected. *

This attribute exists so that developers can provide their own * custom annotation type to indicate that a method (or all methods of * a given class) should be invoked asynchronously. * 此处说明的是方法执行变成异步,扫描的是哪个注解,目前默认的是Async和Asynchronous,开发者也可以自定义 */ Class annotation() default Annotation.class; /** * Indicate whether subclass-based (CGLIB) proxies are to be created as opposed * to standard Java interface-based proxies. *

Applicable only if the {@link #mode} is set to {@link AdviceMode#PROXY}. *

The default is {@code false}. *

Note that setting this attribute to {@code true} will affect all * Spring-managed beans requiring proxying, not just those marked with {@code @Async}. * For example, other beans marked with Spring's {@code @Transactional} annotation * will be upgraded to subclass proxying at the same time. This approach has no * negative impact in practice unless one is explicitly expecting one type of proxy * vs. another — for example, in tests. * 如何proxyTargetClass被设置成true,那么spring的所有proxy都会通过CGLIB方式实现,不再使用Java默认的基于接口的代理实现方式;而且此处如果设置,不仅仅是会影响添加了@Async注解的类的proxy方式,加了@Transactional的类也会变成CGLIB代理,不推荐修改;这个注解只有mode是默认的PROXY,才有意义 */ boolean proxyTargetClass() default false; /** * Indicate how async advice should be applied. *

The default is {@link AdviceMode#PROXY}. * Please note that proxy mode allows for interception of calls through the proxy * only. Local calls within the same class cannot get intercepted that way; an * {@link Async} annotation on such a method within a local call will be ignored * since Spring's interceptor does not even kick in for such a runtime scenario. * For a more advanced mode of interception, consider switching this to * {@link AdviceMode#ASPECTJ}. * 代理方式的不同,默认的是使用Spring的proxy方式,也可以换成原生的AspectJ的proxy方式。 * 这两个的区别作用还是很明显的 */ AdviceMode mode() default AdviceMode.PROXY; /** * Indicate the order in which the {@link AsyncAnnotationBeanPostProcessor} * should be applied. *

The default is {@link Ordered#LOWEST_PRECEDENCE} in order to run * after all other post-processors, so that it can add an advisor to * existing proxies rather than double-proxy. * 因为在beanPostProcessor执行的时候,会根据order值进行排序,此处设置为最低值,就是想让其最后执行 * 其实即使不设置这个值,因为AsyncAnnotationBeanPostProcessor继承了ProxyProcessorSupport,ProxyProcessorSupport中的order默认也是最小优先级 * */ int order() default Ordered.LOWEST_PRECEDENCE;}

2. AsyncConfigurationSelector的作用

public class AsyncConfigurationSelector extends AdviceModeImportSelector {        private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =                        "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";        /**         * Returns {@link ProxyAsyncConfiguration} or {@code AspectJAsyncConfiguration}         * for {@code PROXY} and {@code ASPECTJ} values of {@link EnableAsync#mode()},         * respectively.         */        @Override        @Nullable        public String[] selectImports(AdviceMode adviceMode) {                switch (adviceMode) {                        case PROXY:                                return new String[] {ProxyAsyncConfiguration.class.getName()};                        case ASPECTJ:                                return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};                        default:                                return null;                }        }}

看过我之前博客的同学应该知道,其实此处就是往Spring容器中增加一个新的需要扫描的类,很明显可以看到差别主要集中在adviceMode的差别上。

3. adviceMode:PROXY(默认值)

引入了ProxyAsyncConfiguration配置类

3.1 ProxyAsyncConfiguration

@Configuration@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {        @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)        @Role(BeanDefinition.ROLE_INFRASTRUCTURE)        public AsyncAnnotationBeanPostProcessor asyncAdvisor() {                Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");                AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();                bpp.configure(this.executor, this.exceptionHandler);                Class customAsyncAnnotation = this.enableAsync.getClass("annotation");                if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {                        bpp.setAsyncAnnotationType(customAsyncAnnotation);                }                bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));                bpp.setOrder(this.enableAsync.getNumber("order"));                return bpp;        }}

作用也很明显,就是往spring容器中添加了AsyncAnnotationBeanPostProcessor类

3.2 AsyncAnnotationBeanPostProcessor

public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {    // 删除了一些无关紧要,或者默认不会设置的属性        public AsyncAnnotationBeanPostProcessor() {                setBeforeExistingAdvisors(true);        }        /**         * 因为AsyncAnnotationBeanPostProcessor实现了BeanFactoryAware接口         * 所以在实例化的过程中执行到initializeBean步骤的时候,里面第一步就是执行各种实现了Aware接口的接口方法         * 在此处new了一个advisor。advisor简单理解就是:advice+pointcut         * @param beanFactory         */        @Override        public void setBeanFactory(BeanFactory beanFactory) {                super.setBeanFactory(beanFactory);                AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);                if (this.asyncAnnotationType != null) {                        advisor.setAsyncAnnotationType(this.asyncAnnotationType);                }                advisor.setBeanFactory(beanFactory);                this.advisor = advisor;        }}

其实可以看到最重要的方法,就是setBeanFactory了,该方法是在AsyncAnnotationBeanPostProcessor的生命周期最后一步initializeBean里面的第一小步,也就是执行所有Aware接口的时候执行。
对于AOP来说,其实最主要的就是advice+pointcut,也就是advisor,在生命周期的这一步,也创建了advisor。

3.3 AsyncAnnotationAdvisor

public AsyncAnnotationAdvisor(                        @Nullable Supplier executor, @Nullable Supplier exceptionHandler) {                Set> asyncAnnotationTypes = new LinkedHashSet<>(2);                /**                 * 这儿设置符合pointCut需要的注解                 * 此处的executor就是一个扩展点,如果不想用spring的默认单线程线程池,可以自定义一个线程池                 * exceptionHandler,顾名思义,就是我们的方法在线程池中执行时抛出exception该如何handle使用的                 * advice也就是咱们的interceptor                 * pointCut就不多解释了,就是把设置符合什么条件会进行interceptor的invoke方法                 */                asyncAnnotationTypes.add(Async.class);                try {                        asyncAnnotationTypes.add((Class)                                        ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));                }                catch (ClassNotFoundException ex) {                        // If EJB 3.1 API not present, simply ignore.                }                this.advice = buildAdvice(executor, exceptionHandler);                this.pointcut = buildPointcut(asyncAnnotationTypes);        }

可以看到最主要的工作就是buildAdvice和buildPointcut。advice的作用是定义在方法执行方面,该如何执行;pointcut的作用是定义方法的范围

3.3.1 buildAdvice

protected Advice buildAdvice(                        @Nullable Supplier executor, @Nullable Supplier exceptionHandler) {                // new了一个interceptor                AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);                interceptor.configure(executor, exceptionHandler);                return interceptor;        }

可以看到advice主要就是定义了一个烂机器interceptor,在方法执行的时候进行一些拦截,至于executor,是方法执行器,默认为null,exceptionHandler也默认是null。

3.3.1.1 AnnotationAsyncExecutionInterceptor,异步执行的原理

在AnnotationAsyncExecutionInterceptor的父类AsyncExecutionInterceptor中,实现了拦截器的接口方法invoke,也就是真实的方法执行逻辑。

/**         * Intercept the given method invocation, submit the actual calling of the method to         * the correct task executor and return immediately to the caller.         * @param invocation the method to intercept and make asynchronous         * @return {@link Future} if the original method returns {@code Future}; {@code null}         * otherwise.         */        @Override        @Nullable        public Object invoke(final MethodInvocation invocation) throws Throwable {                Class targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);                Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);                final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);                /**获取一个任务执行器                 * 1. 从@Async注解里面获取配置的任务执行器                 * 2. 从Spring容器中找TaskExecutor类的bean                 * 3. 从spring容器中获取名为"taskExecutor"的bean,                 * 4. 如果还没有,new SimpleAsyncTaskExecutor())                 */                AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);                if (executor == null) {                        throw new IllegalStateException(                                        "No executor specified and no default executor set on AsyncExecutionInterceptor either");                }                //将当前方法执行封装成一个callable对象,然后放入到线程池里                Callable task = () -> {                        try {                                Object result = invocation.proceed();                                if (result instanceof Future) {                                        return ((Future) result).get();                                }                        }                        catch (ExecutionException ex) {                                handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());                        }                        catch (Throwable ex) {                                handleError(ex, userDeclaredMethod, invocation.getArguments());                        }                        return null;                };                //任务提交                return doSubmit(task, executor, invocation.getMethod().getReturnType());        }

可以看到主要做的事情是:

  1. 寻找任务执行器:

  2. 从@Async注解里面获取配置的任务执行器

  3. 从Spring容器中找TaskExecutor类的bean

  4. 从spring容器中获取名为"taskExecutor"的bean,

  5. 如果还没有,new SimpleAsyncTaskExecutor())可以看到其实我们是可以给@Async进行任务执行器的配置的。

  6. 将具体的方法封装成callable的对象,然后doSubmit

  7. 此处我们就看一下默认的doSumit,使用的SimpleAsyncTaskExecutor是如何实现的

  8. 最终会执行到下面这个doExecute方法,默认情况下threadFactory是null,所以默认情况下,我们的方法,每次都是被创建了一个新的守护线程来进行方法的执行。

       protected void doExecute(Runnable task) {                Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));                thread.start();        }

3.3.1.2 自定义任务执行器

  1. 可以在配置类里new SimpleAsyncTaskExecutor(),然后setThreadFactory,这样修改了默认线程的产生方式

  2. 比较主流的方式是,定义一个ThreadPoolTaskExecutor,也就是线程池任务执行器,可以进行线程复用

3.3.2 buildPointcut

/**         * Calculate a pointcut for the given async annotation types, if any.         * @param asyncAnnotationTypes the async annotation types to introspect         * @return the applicable Pointcut object, or {@code null} if none         */        protected Pointcut buildPointcut(Set> asyncAnnotationTypes) {                ComposablePointcut result = null;                for (Class asyncAnnotationType : asyncAnnotationTypes) {                        // 就是根据这两个匹配器进行匹配的                        // 检查类上是否有@Async注解                        Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);                        //检查方法上是否有@Async注解                        Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);                        if (result == null) {                                result = new ComposablePointcut(cpc);                        }                        else {                        // 取并集:类上加了@Async或者类的方法上加了@Async                                result.union(cpc);                        }                        result = result.union(mpc);                }                return (result != null ? result : Pointcut.TRUE);        }

主要方法就是定义了一个类匹配pointcut和一个方法匹配pointcut。

4 什么时候判断进行advice的添加呢

当然就是在对某个bean进行proxy的判断的时候,也就是bean的生命周期最后一步,也是initializeBean里最后的一步,对于BeanPostProcessor的执行

3.4.1 AsyncAnnotationBeanPostProcessor#postProcessAfterInitialization

要注意的是AsyncAnnotationBeanPostProcessor的postProcessAfterInitialization方法其实是继承的是父类AbstractAdvisingBeanPostProcessor的。

@Override        public Object postProcessAfterInitialization(Object bean, String beanName) {                // 没有通知,或者是AOP的基础设施类,那么不进行代理                if (this.advisor == null || bean instanceof AopInfrastructureBean) {                        // Ignore AOP infrastructure such as scoped proxies.                        return bean;                }                // 对已经被代理的类,不再生成代理,只是将通知添加到代理类的逻辑中                // 这里通过beforeExistingAdvisors决定是将通知添加到所有通知之前还是添加到所有通知之后                // 在使用@Async注解的时候,beforeExistingAdvisors被设置成了true,                // @Async注解之所以把beforeExistingAdvisors设置为true,是因为该advisor和其他的advisor差别太大了,从情理上讲,也应该第一个执行                // 意味着整个方法及其拦截逻辑都会异步执行                if (bean instanceof Advised) {                        Advised advised = (Advised) bean;                        // 判断bean是否符合该advisor的使用范围,通过pointcut来判断                        if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {                                // Add our local Advisor to the existing proxy's Advisor chain...                                if (this.beforeExistingAdvisors) {                                        advised.addAdvisor(0, this.advisor);                                }                                else {                                        advised.addAdvisor(this.advisor);                                }                                return bean;                        }                }                // 如果还不是一个代理类,也需要通过eligible来判断是否符合使用该advisor的条件                if (isEligible(bean, beanName)) {                        ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);                        if (!proxyFactory.isProxyTargetClass()) {                                evaluateProxyInterfaces(bean.getClass(), proxyFactory);                        }                        proxyFactory.addAdvisor(this.advisor);                        customizeProxyFactory(proxyFactory);                        return proxyFactory.getProxy(getProxyClassLoader());                }                // No proxy needed.                return bean;        }

而在isEligible中,就是判断当前执行生命周期的bean是否满足我们的@Async注解的使用范围,主要是通过其class来判断

protected boolean isEligible(Class targetClass) {                Boolean eligible = this.eligibleBeans.get(targetClass);                if (eligible != null) {                        return eligible;                }                if (this.advisor == null) {                        return false;                }                // 其实就是判断类是否可以进行添加该advisor,也就是判断是否符合该advisor的使用条件                // 就是把advisor的pointCut拿出来,pointCut里的classMatcher和methodMatcher拿出来对类及其方法进行判断                eligible = AopUtils.canApply(this.advisor, targetClass);                this.eligibleBeans.put(targetClass, eligible);                return eligible;        }

具体的AopUtils.canApply(this.advisor, targetClass)逻辑就不写了,就是根据pointcut里设置的classFilter和methodMatcher类判断当前bean的class是否需要进行该advisor的使用。

"Java Spring的@Async原理是什么"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

0