Spring @Async 原理

@EnableAsync

开启异步功能需要 @EnableAsync 注解,可以以此作为切入点。

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
// 导入了 AsyncConfigurationSelector,很重要
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {

    // 可以自定义异步的注解来代替 @Async,一般不会使用
	Class<? extends Annotation> annotation() default Annotation.class;

    // 是否使用 CGLIB 代理,这个配置会影响所有使用代理的地方,不止是 @Async
	boolean proxyTargetClass() default false;

    // 代理的 advice 是使用 JDK 还是使用 ASPECTJ,一般不修改
	AdviceMode mode() default AdviceMode.PROXY;

	/**
	 * Indicate the order in which the {@link AsyncAnnotationBeanPostProcessor}
	 * should be applied.
	 * <p>The default is {@link Ordered#LOWEST_PRECEDENCE} in order to run
	 * after all other post-processors, so that it can add an advisor to
	 * existing proxies rather than double-proxy.
	 */
    // 决定 AsyncAnnotationBeanPostProcessor 被应用的顺序,默认是最低优先级,在运行完其他 post-processors 后再被应用
	int order() default Ordered.LOWEST_PRECEDENCE;

}

AsyncConfigurationSelector

public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {

    private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
          "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";


    @Override
    @NonNull
    public String[] selectImports(AdviceMode adviceMode) {
       return switch (adviceMode) {
          // 向 Spring 注入一个 ProxyAsyncConfiguration
          case PROXY -> new String[] {ProxyAsyncConfiguration.class.getName()};
          case ASPECTJ -> new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
       };
    }

}

AsyncConfigurationSelector 继承于 AdviceModeImportSelector,可以通过解析子类的泛型,泛型一般是 EnableXXX 注解类型,比如 AsyncConfigurationSelector 的泛型是 AdviceModeImportSelector<EnableAsync>CachingConfigurationSelector 的泛型是 AdviceModeImportSelector<EnableCaching>,然后根据 AdviceMode 选择要 import 的配置。

public abstract class AdviceModeImportSelector<A extends Annotation> implements ImportSelector {
	
    // EnableAsync、EnableCaching 注解中都会有一个 mode 属性代表 AdviceMode
    public static final String DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME = "mode";

    protected String getAdviceModeAttributeName() {
       return DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME;
    }

    @Override
    public final String[] selectImports(AnnotationMetadata importingClassMetadata) {
       // 获取类上的泛型,即 EnableAsync
       Class<?> annType = GenericTypeResolver.resolveTypeArgument(getClass(), AdviceModeImportSelector.class);
       Assert.state(annType != null, "Unresolvable type argument for AdviceModeImportSelector");
	   // 获取 EnableAsync 的属性
       AnnotationAttributes attributes = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);
       if (attributes == null) {
          throw new IllegalArgumentException(String.format(
                "@%s is not present on importing class '%s' as expected",
                annType.getSimpleName(), importingClassMetadata.getClassName()));
       }
	   // 获取 EnableAsync 的 mode 属性
       AdviceMode adviceMode = attributes.getEnum(getAdviceModeAttributeName());
       // 调用 AsyncConfigurationSelector 的 selectImports
       String[] imports = selectImports(adviceMode);
       if (imports == null) {
          throw new IllegalArgumentException("Unknown AdviceMode: " + adviceMode);
       }
       return imports;
    }

    @Nullable
    protected abstract String[] selectImports(AdviceMode adviceMode);

}

ProxyAsyncConfiguration

主要就是注册一个 AsyncAnnotationBeanPostProcessor

@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {
    // 注册一个 AsyncAnnotationBeanPostProcessor
	@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
		Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
		// AsyncAnnotationBeanPostProcessor
		AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
        // 配置线程池、错误处理器 AsyncUncaughtExceptionHandler
		bpp.configure(this.executor, this.exceptionHandler);
        // this.enableAsync 是 EnableAsync 的属性有,由父类 AbstractAsyncConfiguration 赋值
		Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
        // 设置自定义的异步注解
		if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
			bpp.setAsyncAnnotationType(customAsyncAnnotation);
		}
        // 是否使用 CGLIB
		bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
        // Order
		bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
		return bpp;
	}
}

AbstractAsyncConfiguration

@Configuration(proxyBeanMethods = false)
public abstract class AbstractAsyncConfiguration implements ImportAware {

    // EnableAsync 的属性
	@Nullable
	protected AnnotationAttributes enableAsync;
	// 线程池
	@Nullable
	protected Supplier<Executor> executor;
	// 异常处理器
	@Nullable
	protected Supplier<AsyncUncaughtExceptionHandler> exceptionHandler;

	@Override
	public void setImportMetadata(AnnotationMetadata importMetadata) {
        // EnableAsync 的属性
		this.enableAsync = AnnotationAttributes.fromMap(
				importMetadata.getAnnotationAttributes(EnableAsync.class.getName()));
		if (this.enableAsync == null) {
			throw new IllegalArgumentException(
					"@EnableAsync is not present on importing class " + importMetadata.getClassName());
		}
	}

	/**
	 * Collect any {@link AsyncConfigurer} beans through autowiring.
	 */
	@Autowired
	void setConfigurers(ObjectProvider<AsyncConfigurer> configurers) {
        // 获取唯一的线程池、异常处理器的配置
		Supplier<AsyncConfigurer> configurer = SingletonSupplier.of(() -> {
			List<AsyncConfigurer> candidates = configurers.stream().toList();
			if (CollectionUtils.isEmpty(candidates)) {
				return null;
			}
			if (candidates.size() > 1) {
				throw new IllegalStateException("Only one AsyncConfigurer may exist");
			}
			return candidates.get(0);
		});
        // 配置线程池、异常处理器
		this.executor = adapt(configurer, AsyncConfigurer::getAsyncExecutor);
		this.exceptionHandler = adapt(configurer, AsyncConfigurer::getAsyncUncaughtExceptionHandler);
	}

	private <T> Supplier<T> adapt(Supplier<AsyncConfigurer> supplier, Function<AsyncConfigurer, T> provider) {
		return () -> {
			AsyncConfigurer configurer = supplier.get();
			return (configurer != null ? provider.apply(configurer) : null);
		};
	}

}

AsyncConfigurer

用于配置异步处理的线程池、异常处理器

public interface AsyncConfigurer {
	// 线程池
	@Nullable
	default Executor getAsyncExecutor() {
		return null;
	}

    // 异常处理器
	@Nullable
	default AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
		return null;
	}
}

AsyncAnnotationBeanPostProcessor

image-20240124151911662

setBeanFactory() 方法中将 AsyncAnnotationAdvisor 赋值给 this.advisorAsyncAnnotationAdvisor 就是 AOP 的增强。

设置了自定义的异步注解。

设置了线程池、异常处理器。

postProcessAfterInitialization() 方法中创建了代理对象,代理对象中有 AsyncAnnotationAdvisor 增强,进而拥有了异步执行的功能。

public class AsyncAnnotationBeanPostProcessor extends AbstractBeanFactoryAwareAdvisingPostProcessor {

	public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME =
			AnnotationAsyncExecutionInterceptor.DEFAULT_TASK_EXECUTOR_BEAN_NAME;

	protected final Log logger = LogFactory.getLog(getClass());
	// 线程池
	@Nullable
	private Supplier<Executor> executor;
	// 异常处理器
	@Nullable
	private Supplier<AsyncUncaughtExceptionHandler> exceptionHandler;
    // 自定义的异步注解,一般为空
	@Nullable
	private Class<? extends Annotation> asyncAnnotationType;

	public AsyncAnnotationBeanPostProcessor() {
        // 控制将 AsyncAnnotationAdvisor 放到所有 Advisor 的前面
		setBeforeExistingAdvisors(true);
	}


    // 配置线程池、异常处理器,被 ProxyAsyncConfiguration 的 asyncAdvisor() 方法调用
	public void configure(
			@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

		this.executor = executor;
		this.exceptionHandler = exceptionHandler;
	}
	// 配置线程池
	public void setExecutor(Executor executor) {
		this.executor = SingletonSupplier.of(executor);
	}
	// 配置异常处理器
	public void setExceptionHandler(AsyncUncaughtExceptionHandler exceptionHandler) {
		this.exceptionHandler = SingletonSupplier.of(exceptionHandler);
	}
	// 配置自定义的异步注解
	public void setAsyncAnnotationType(Class<? extends Annotation> asyncAnnotationType) {
		Assert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null");
		this.asyncAnnotationType = asyncAnnotationType;
	}


	@Override
	public void setBeanFactory(BeanFactory beanFactory) {
        // 设置 BeanFactory
		super.setBeanFactory(beanFactory);
		
        // AsyncAnnotationAdvisor 随机 AOP 的增强,异步的逻辑就在其中
		AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
		if (this.asyncAnnotationType != null) {
            // 配置自定义的异步注解
			advisor.setAsyncAnnotationType(this.asyncAnnotationType);
		}
		advisor.setBeanFactory(beanFactory);
        // advisor 字段在 AbstractAdvisingBeanPostProcessor 中,下面会用到
		this.advisor = advisor;
	}
}

AbstractBeanFactoryAwareAdvisingPostProcessorAsyncAnnotationBeanPostProcessor 的父类

public abstract class AbstractBeanFactoryAwareAdvisingPostProcessor extends AbstractAdvisingBeanPostProcessor
		implements BeanFactoryAware {

	@Nullable
	private ConfigurableListableBeanFactory beanFactory;


	@Override
	public void setBeanFactory(BeanFactory beanFactory) {
		this.beanFactory = (beanFactory instanceof ConfigurableListableBeanFactory clbf ? clbf : null);
	}

	@Override
	protected ProxyFactory prepareProxyFactory(Object bean, String beanName) {
        // 设置
		if (this.beanFactory != null) {
			AutoProxyUtils.exposeTargetClass(this.beanFactory, beanName, bean.getClass());
		}

		ProxyFactory proxyFactory = super.prepareProxyFactory(bean, beanName);
        // 是否使用 CGLIB
		if (!proxyFactory.isProxyTargetClass() && this.beanFactory != null &&
				AutoProxyUtils.shouldProxyTargetClass(this.beanFactory, beanName)) {
			proxyFactory.setProxyTargetClass(true);
		}
		return proxyFactory;
	}

	@Override
	protected boolean isEligible(Object bean, String beanName) {
		return (!AutoProxyUtils.isOriginalInstance(beanName, bean.getClass()) &&
				super.isEligible(bean, beanName));
	}
}

AbstractAdvisingBeanPostProcessorAbstractBeanFactoryAwareAdvisingPostProcessor 的父类,实现了 SmartInstantiationAwareBeanPostProcessor,在 postProcessAfterInitialization 方法中处理 AOP 增强的逻辑。

public abstract class AbstractAdvisingBeanPostProcessor extends ProxyProcessorSupport
		implements SmartInstantiationAwareBeanPostProcessor {
	// AOP 的增强,AsyncAnnotationAdvisor
	@Nullable
	protected Advisor advisor;
	// AsyncAnnotationBeanPostProcessor 的工作方法中设置成了 true,将 AsyncAnnotationAdvisor 放到所有 Advisor 的前面
	protected boolean beforeExistingAdvisors = false;
	// Class 是否适用于 AsyncAnnotationAdvisor 的缓存
	private final Map<Class<?>, Boolean> eligibleBeans = new ConcurrentHashMap<>(256);

	public void setBeforeExistingAdvisors(boolean beforeExistingAdvisors) {
		this.beforeExistingAdvisors = beforeExistingAdvisors;
	}


	@Override
	public Class<?> determineBeanType(Class<?> beanClass, String beanName) {
		if (this.advisor != null && isEligible(beanClass)) {
			ProxyFactory proxyFactory = new ProxyFactory();
			proxyFactory.copyFrom(this);
			proxyFactory.setTargetClass(beanClass);

			if (!proxyFactory.isProxyTargetClass()) {
				evaluateProxyInterfaces(beanClass, proxyFactory);
			}
			proxyFactory.addAdvisor(this.advisor);
			customizeProxyFactory(proxyFactory);

			// Use original ClassLoader if bean class not locally loaded in overriding class loader
			ClassLoader classLoader = getProxyClassLoader();
			if (classLoader instanceof SmartClassLoader smartClassLoader &&
					classLoader != beanClass.getClassLoader()) {
				classLoader = smartClassLoader.getOriginalClassLoader();
			}
			return proxyFactory.getProxyClass(classLoader);
		}

		return beanClass;
	}

	@Override
	public Object postProcessAfterInitialization(Object bean, String beanName) {
		if (this.advisor == null || bean instanceof AopInfrastructureBean) {
			// Ignore AOP infrastructure such as scoped proxies.
			return bean;
		}
		// 当前 bean 已经是代理对象了,只需要向 advisor 链中添加 advisor 即可
		if (bean instanceof Advised advised) {
            // advised 没有冻结,且 bean 能应用 AsyncAnnotationAdvisor
			if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
				// Add our local Advisor to the existing proxy's Advisor chain.
                // 将 AsyncAnnotationAdvisor 放到其他 Advisor 的前面
				if (this.beforeExistingAdvisors) {
					advised.addAdvisor(0, this.advisor);
				}
				else if (advised.getTargetSource() == AdvisedSupport.EMPTY_TARGET_SOURCE &&
						advised.getAdvisorCount() > 0) {
					// No target, leave last Advisor in place and add new Advisor right before.
					advised.addAdvisor(advised.getAdvisorCount() - 1, this.advisor);
					return bean;
				}
				else {
					advised.addAdvisor(this.advisor);
				}
				return bean;
			}
		}
		// 当前 bean 不是代理对象,需要创建代理对象
        // bean 能应用 AsyncAnnotationAdvisor
		if (isEligible(bean, beanName)) {
            // 创建代理工厂
			ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
            // 如果不是 CGLIB 代理,遍历目标类的实现所有接口,排除 Spring 的接口和一些特殊的内部语言的接口,将其添加到 proxyFactory,
            // 如果排除后没有合适的接口了,就切换到 CGLIB
			if (!proxyFactory.isProxyTargetClass()) {
				evaluateProxyInterfaces(bean.getClass(), proxyFactory);
			}
            // 添加 AsyncAnnotationAdvisor
			proxyFactory.addAdvisor(this.advisor);
            // 可自定义,默认为空
			customizeProxyFactory(proxyFactory);

			// Use original ClassLoader if bean class not locally loaded in overriding class loader
			ClassLoader classLoader = getProxyClassLoader();
			if (classLoader instanceof SmartClassLoader smartClassLoader &&
					classLoader != bean.getClass().getClassLoader()) {
				classLoader = smartClassLoader.getOriginalClassLoader();
			}
            // 生成代理对象
			return proxyFactory.getProxy(classLoader);
		}
		
        // 无法使用代理,返回原始对象
		// No proxy needed.
		return bean;
	}

	protected boolean isEligible(Object bean, String beanName) {
		return isEligible(bean.getClass());
	}

	/**
	 * Check whether the given class is eligible for advising with this
	 * post-processor's {@link Advisor}.
	 * <p>Implements caching of {@code canApply} results per bean target class.
	 * @param targetClass the class to check against
	 * @see AopUtils#canApply(Advisor, Class)
	 */
    // 检查给定的 Class 是否适用于
	protected boolean isEligible(Class<?> targetClass) {
        // 先查缓存
		Boolean eligible = this.eligibleBeans.get(targetClass);
		if (eligible != null) {
			return eligible;
		}
        // 增强为空,则无需代理
		if (this.advisor == null) {
			return false;
		}
        // 判断切点能否应用于 targetClass,AOP 的内容,这里不展开
        // 切点的构建在 AsyncAnnotationAdvisor#buildPointcut() 中
		eligible = AopUtils.canApply(this.advisor, targetClass);
        // 填充缓存
		this.eligibleBeans.put(targetClass, eligible);
		return eligible;
	}

    // 创建代理工厂
	protected ProxyFactory prepareProxyFactory(Object bean, String beanName) {
		ProxyFactory proxyFactory = new ProxyFactory();
		proxyFactory.copyFrom(this);
		proxyFactory.setTarget(bean);
		return proxyFactory;
	}

	protected void customizeProxyFactory(ProxyFactory proxyFactory) {
	}

}

ProxyProcessorSupportAbstractAdvisingBeanPostProcessor 的父类

public class ProxyProcessorSupport extends ProxyConfig implements Ordered, BeanClassLoaderAware, AopInfrastructureBean {

	private int order = Ordered.LOWEST_PRECEDENCE;

	@Nullable
	private ClassLoader proxyClassLoader = ClassUtils.getDefaultClassLoader();

	private boolean classLoaderConfigured = false;

	public void setOrder(int order) {
		this.order = order;
	}

	@Override
	public int getOrder() {
		return this.order;
	}

	public void setProxyClassLoader(@Nullable ClassLoader classLoader) {
		this.proxyClassLoader = classLoader;
		this.classLoaderConfigured = (classLoader != null);
	}

	@Nullable
	protected ClassLoader getProxyClassLoader() {
		return this.proxyClassLoader;
	}

	@Override
	public void setBeanClassLoader(ClassLoader classLoader) {
		if (!this.classLoaderConfigured) {
			this.proxyClassLoader = classLoader;
		}
	}

	protected void evaluateProxyInterfaces(Class<?> beanClass, ProxyFactory proxyFactory) {
        // 获取 beanClass 实现的所有接口
		Class<?>[] targetInterfaces = ClassUtils.getAllInterfacesForClass(beanClass, getProxyClassLoader());
        // 是否有合适的代理接口
		boolean hasReasonableProxyInterface = false;
		for (Class<?> ifc : targetInterfaces) {
            // 排除 Spring 的回调接口、一些特殊的内部语言的接口
			if (!isConfigurationCallbackInterface(ifc) && !isInternalLanguageInterface(ifc) &&
					ifc.getMethods().length > 0) {
				hasReasonableProxyInterface = true;
				break;
			}
		}
        // 如果有合适的代理接口,将其添加到 proxyFactory,并继续使用 JDK 代理
		if (hasReasonableProxyInterface) {
			// Must allow for introductions; can't just set interfaces to the target's interfaces only.
			for (Class<?> ifc : targetInterfaces) {
				proxyFactory.addInterface(ifc);
			}
		}
        // 如果没有合适的代理接口,改用 CGLIB
		else {
			proxyFactory.setProxyTargetClass(true);
		}
	}
	// 是否是 Spring 的回调接口
	protected boolean isConfigurationCallbackInterface(Class<?> ifc) {
		return (InitializingBean.class == ifc || DisposableBean.class == ifc || Closeable.class == ifc ||
				AutoCloseable.class == ifc || ObjectUtils.containsElement(ifc.getInterfaces(), Aware.class));
	}

	protected boolean isInternalLanguageInterface(Class<?> ifc) {
		return (ifc.getName().equals("groovy.lang.GroovyObject") ||
				ifc.getName().endsWith(".cglib.proxy.Factory") ||
				ifc.getName().endsWith(".bytebuddy.MockAccess"));
	}
}

AsyncAnnotationAdvisor

public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {
	// AnnotationAsyncExecutionInterceptor 拦截器:实际的异步逻辑
	private final Advice advice;
	// 切点
	private Pointcut pointcut;
    
	public AsyncAnnotationAdvisor() {
		this((Supplier<Executor>) null, (Supplier<AsyncUncaughtExceptionHandler>) null);
	}

	public AsyncAnnotationAdvisor(
			@Nullable Executor executor, @Nullable AsyncUncaughtExceptionHandler exceptionHandler) {

		this(SingletonSupplier.ofNullable(executor), SingletonSupplier.ofNullable(exceptionHandler));
	}

	@SuppressWarnings("unchecked")
	public AsyncAnnotationAdvisor(
			@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
		// 支持的异步注解
		Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
        // 支持 Async
		asyncAnnotationTypes.add(Async.class);

		ClassLoader classLoader = AsyncAnnotationAdvisor.class.getClassLoader();
		try {
            // 支持 jakarta.ejb.Asynchronous
			asyncAnnotationTypes.add((Class<? extends Annotation>)
					ClassUtils.forName("jakarta.ejb.Asynchronous", classLoader));
		}
		catch (ClassNotFoundException ex) {
			// If EJB API not present, simply ignore.
		}
		try {
            // 支持 jakarta.enterprise.concurrent.Asynchronous
			asyncAnnotationTypes.add((Class<? extends Annotation>)
					ClassUtils.forName("jakarta.enterprise.concurrent.Asynchronous", classLoader));
		}
		catch (ClassNotFoundException ex) {
			// If Jakarta Concurrent API not present, simply ignore.
		}
		// 构建 advice、pointcut
		this.advice = buildAdvice(executor, exceptionHandler);
		this.pointcut = buildPointcut(asyncAnnotationTypes);
	}

    // 如果有自定义的异步注解会覆盖掉工作方法中的 3 哥注解
	public void setAsyncAnnotationType(Class<? extends Annotation> asyncAnnotationType) {
		Assert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null");
		Set<Class<? extends Annotation>> asyncAnnotationTypes = new HashSet<>();
		asyncAnnotationTypes.add(asyncAnnotationType);
        // 重新构造 pointcut
		this.pointcut = buildPointcut(asyncAnnotationTypes);
	}

	@Override
	public void setBeanFactory(BeanFactory beanFactory) {
		if (this.advice instanceof BeanFactoryAware beanFactoryAware) {
			beanFactoryAware.setBeanFactory(beanFactory);
		}
	}
    ...

	protected Advice buildAdvice(
			@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
		// 异步执行的 AOP 拦截器
		AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
		interceptor.configure(executor, exceptionHandler);
		return interceptor;
	}

	/**
	 * Calculate a pointcut for the given async annotation types, if any.
	 * @param asyncAnnotationTypes the async annotation types to introspect
	 * @return the applicable Pointcut object, or {@code null} if none
	 */
	protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
		ComposablePointcut result = null;
		for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
            // 匹配类上的异步注解
			Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
            // 匹配方法上的异步注解
			Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
			if (result == null) {
				result = new ComposablePointcut(cpc);
			}
			else {
				result.union(cpc);
			}
			result = result.union(mpc);
		}
		return (result != null ? result : Pointcut.TRUE);
	}

}

AnnotationAsyncExecutionInterceptor

public class AnnotationAsyncExecutionInterceptor extends AsyncExecutionInterceptor {

	public AnnotationAsyncExecutionInterceptor(@Nullable Executor defaultExecutor) {
		super(defaultExecutor);
	}

	public AnnotationAsyncExecutionInterceptor(@Nullable Executor defaultExecutor, AsyncUncaughtExceptionHandler exceptionHandler) {
		super(defaultExecutor, exceptionHandler);
	}

	@Override
	@Nullable
	protected String getExecutorQualifier(Method method) {
		// Maintainer's note: changes made here should also be made in
		// AnnotationAsyncExecutionAspect#getExecutorQualifier
        // 获取线程池的 beanName,即 Async 的 value 属性
		Async async = AnnotatedElementUtils.findMergedAnnotation(method, Async.class);
		if (async == null) {
			async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async.class);
		}
		return (async != null ? async.value() : null);
	}

}

AsyncExecutionInterceptor

public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered {

	public AsyncExecutionInterceptor(@Nullable Executor defaultExecutor) {
		super(defaultExecutor);
	}

	public AsyncExecutionInterceptor(@Nullable Executor defaultExecutor, AsyncUncaughtExceptionHandler exceptionHandler) {
		super(defaultExecutor, exceptionHandler);
	}

	@Override
	@Nullable
	public Object invoke(final MethodInvocation invocation) throws Throwable {
        // 获取原始的类型
		Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
		Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
        // 获取要执行的方法
		final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
		
        // 获取线程池,实现在父类 AsyncExecutionAspectSupport 中
		AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
		if (executor == null) {
			throw new IllegalStateException(
					"No executor specified and no default executor set on AsyncExecutionInterceptor either");
		}
		// 构建一个 Callable
		Callable<Object> task = () -> {
			try {
                // 执行真正的方法
				Object result = invocation.proceed();
                // 如果方法的返回值是 Future,则调用 get()
				if (result instanceof  <?> future) {
					return future.get();
				}
			}
			catch (ExecutionException ex) {
                // 使用异常处理器 AsyncUncaughtExceptionHandler 处理异常
				handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
			}
			catch (Throwable ex) {
                // 使用异常处理器 AsyncUncaughtExceptionHandler 处理异常
				handleError(ex, userDeclaredMethod, invocation.getArguments());
			}
			return null;
		};
		// 向线程池提交任务
		return doSubmit(task, executor, invocation.getMethod().getReturnType());
	}
    
    // AnnotationAsyncExecutionInterceptor 重写了
	@Override
	@Nullable
	protected String getExecutorQualifier(Method method) {
		return null;
	}

	@Override
	@Nullable
	protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
        // 获取默认的线程池,如何没有则创建一个 SimpleAsyncTaskExecutor,默认情况下会为每个任务创建一个线程
		Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
		return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
	}

	@Override
	public int getOrder() {
		return Ordered.HIGHEST_PRECEDENCE;
	}

}
public abstract class AsyncExecutionAspectSupport implements BeanFactoryAware {
	// 默认 TaskExecutor 的 beanName
	public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME = "taskExecutor";

	protected final Log logger = LogFactory.getLog(getClass());
	// 默认线程池
	private SingletonSupplier<Executor> defaultExecutor;
	// 异常处理器
	private SingletonSupplier<AsyncUncaughtExceptionHandler> exceptionHandler;

	@Nullable
	private BeanFactory beanFactory;

	@Nullable
	private StringValueResolver embeddedValueResolver;
	// 每个方法所对应的线程池的缓存
	private final Map<Method, AsyncTaskExecutor> executors = new ConcurrentHashMap<>(16);
    ...

	/**
	 * Determine the specific executor to use when executing the given method.
	 * @return the executor to use (or {@code null}, but just if no default executor is available)
	 */
	@Nullable
	protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
        // 先从缓存中取
		AsyncTaskExecutor executor = this.executors.get(method);
		if (executor == null) {
            // 缓存中没有
			Executor targetExecutor;
            // 获取线程池的 beanName,AnnotationAsyncExecutionInterceptor 重写为取 @Async 的 value 属性
			String qualifier = getExecutorQualifier(method);
			if (this.embeddedValueResolver != null && StringUtils.hasLength(qualifier)) {
				qualifier = this.embeddedValueResolver.resolveStringValue(qualifier);
			}
            // 如果 beanName 不为空
			if (StringUtils.hasLength(qualifier)) {
                // 从容器获取对应名称的线程池
				targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
			}
			else {
                // 获取默认的,在 AsyncAnnotationAdvisor#buildAdvice() 设置的
				targetExecutor = this.defaultExecutor.get();
			}
			if (targetExecutor == null) {
				return null;
			}
            // 如何获取的类型不是 AsyncTaskExecutor 的话,需要用适配器适配一下
			executor = (targetExecutor instanceof AsyncTaskExecutor asyncTaskExecutor ?
					asyncTaskExecutor : new TaskExecutorAdapter(targetExecutor));
            // 填充缓存
			this.executors.put(method, executor);
		}
		return executor;
	}

	// AnnotationAsyncExecutionInterceptor 重写为取 @Async 的 value 属性
	@Nullable
	protected abstract String getExecutorQualifier(Method method);

	@Nullable
	protected Executor findQualifiedExecutor(@Nullable BeanFactory beanFactory, String qualifier) {
		if (beanFactory == null) {
			throw new IllegalStateException("BeanFactory must be set on " + getClass().getSimpleName() +
					" to access qualified executor '" + qualifier + "'");
		}
		return BeanFactoryAnnotationUtils.qualifiedBeanOfType(beanFactory, Executor.class, qualifier);
	}
	// 获取默认的线程池
	@Nullable
	protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
		if (beanFactory != null) {
			try {
                // 如果容器中有唯一的 TaskExecutor,则直接返回
				// Search for TaskExecutor bean... not plain Executor since that would
				// match with ScheduledExecutorService as well, which is unusable for
				// our purposes here. TaskExecutor is more clearly designed for it.
				return beanFactory.getBean(TaskExecutor.class);
			}
			catch (NoUniqueBeanDefinitionException ex) {
				logger.debug("Could not find unique TaskExecutor bean. " +
						"Continuing search for an Executor bean named 'taskExecutor'", ex);
				try {
                    // 如果容器中有多个 TaskExecutor,需要按照名称获取 Executor
					return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
				}
				catch (NoSuchBeanDefinitionException ex2) {
					if (logger.isInfoEnabled()) {
						logger.info("More than one TaskExecutor bean found within the context, and none is named " +
								"'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +
								"as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());
					}
				}
			}
			catch (NoSuchBeanDefinitionException ex) {
				logger.debug("Could not find default TaskExecutor bean. " +
						"Continuing search for an Executor bean named 'taskExecutor'", ex);
				try {
                    // 如果容器中没有 TaskExecutor,需要按照名称获取 Executor
					return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
				}
				catch (NoSuchBeanDefinitionException ex2) {
					logger.info("No task executor bean found for async processing: " +
							"no bean of type TaskExecutor and no bean named 'taskExecutor' either");
				}
				// Giving up -> either using local default executor or none at all...
			}
		}
		return null;
	}

    // 提交任务
	@Nullable
	@SuppressWarnings("deprecation")
	protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
		if (CompletableFuture.class.isAssignableFrom(returnType)) {
            // 方法的返回值是 CompletableFuture
			return executor.submitCompletable(task);
		}
		else if (org.springframework.util.concurrent.ListenableFuture.class.isAssignableFrom(returnType)) {
            // 方法的返回值是 ListenableFuture
            // ListenableFuture 基础于 FutureTask,对 done() 方法进行了扩展,可以在执行成功或失败后执行回调
			return ((org.springframework.core.task.AsyncListenableTaskExecutor) executor).submitListenable(task);
		}
		else if (Future.class.isAssignableFrom(returnType)) {
            // 方法的返回值是 Future
			return executor.submit(task);
		}
		else if (void.class == returnType) {
            // 方法没有返回值
			executor.submit(task);
			return null;
		}
		else {
			throw new IllegalArgumentException(
					"Invalid return type for async method (only Future and void supported): " + returnType);
		}
	}
    
	protected void handleError(Throwable ex, Method method, Object... params) throws Exception {
        // 如果方法的返回值为 Future,会向上抛出异常
		if (Future.class.isAssignableFrom(method.getReturnType())) {
			ReflectionUtils.rethrowException(ex);
		}
		else {
            // 否则调用 AsyncUncaughtExceptionHandler#handleUncaughtException()
			// Could not transmit the exception to the caller with default executor
			try {
				this.exceptionHandler.obtain().handleUncaughtException(ex, method, params);
			}
			catch (Throwable ex2) {
				logger.warn("Exception handler for async method '" + method.toGenericString() +
						"' threw unexpected exception itself", ex2);
			}
		}
	}

}

AsyncUncaughtExceptionHandler

用于处理异步执行时抛出的异常,但只处理返回值不为 Future 的方法抛出的异常,如上 AsyncExecutionAspectSupport#handleError() 所述。

@FunctionalInterface
public interface AsyncUncaughtExceptionHandler {

	/**
	 * Handle the given uncaught exception thrown from an asynchronous method.
	 * @param ex the exception thrown from the asynchronous method
	 * @param method the asynchronous method
	 * @param params the parameters used to invoke the method
	 */
	void handleUncaughtException(Throwable ex, Method method, Object... params);

}

SimpleAsyncUncaughtExceptionHandler 是其简单实现,打印 error 日志。

public class SimpleAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {

	private static final Log logger = LogFactory.getLog(SimpleAsyncUncaughtExceptionHandler.class);

	@Override
	public void handleUncaughtException(Throwable ex, Method method, Object... params) {
		if (logger.isErrorEnabled()) {
			logger.error("Unexpected exception occurred invoking async method: " + method, ex);
		}
	}

}