springboot异步线程(三)源码解析(二)

Image by Sa Ka from Pixabay

前言

上一篇文章主要讲了EnableAsync注解是如何创建aop并生效的,这一篇讲springboot是如何处理被拦截的方法的;

正文

1.0 AsyncExecutionInterceptor类

这里看AsyncExecutionInterceptor类中的invoke方法,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
    public Object invoke(final MethodInvocation invocation) throws Throwable {
        //1 获取拦截的方法
		Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
		Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
		final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
        //2 根据被拦截的方法来选取执行异步任务的执行器
		AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
		if (executor == null) {
			throw new IllegalStateException(
					"No executor specified and no default executor set on AsyncExecutionInterceptor either");
		}

        //3 构建任务(添加异常的处理方式)
		Callable<Object> task = () -> {
			try {
				Object result = invocation.proceed();
				if (result instanceof Future) {
					return ((Future<?>) result).get();
				}
			}
			catch (ExecutionException ex) {
				handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
			}
			catch (Throwable ex) {
				handleError(ex, userDeclaredMethod, invocation.getArguments());
			}
			return null;
		};
		//4 执行构建的任务并返回任务执行结果
		return doSubmit(task, executor, invocation.getMethod().getReturnType());
	}

  1. 获取拦截的方法
  2. 根据被拦截的方法来选取执行异步任务的执行器
  3. 构建任务(添加异常的处理方式)
  4. 执行构建的任务并返回任务执行结果

2.0 AsyncExecutionAspectSupport类

该类的determineAsyncExecutor方法会返回一个AsyncTaskExecutor,也就是返回一个执行异步任务的线程池;

这里来看看上一个类中的determineAsyncExecutor方法源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
        // 1根据方法选取`executors`缓存中的执行器
		AsyncTaskExecutor executor = this.executors.get(method);
		if (executor == null) {
			Executor targetExecutor;
			//2 获取方法上面的Async注解的value值
			String qualifier = getExecutorQualifier(method);
			if (StringUtils.hasLength(qualifier)) {
			    //3 根据获取的value值查找对应的执行器
				targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
			}
			else {
			    //4 默认执行器
				targetExecutor = this.defaultExecutor.get();
			}
			if (targetExecutor == null) {
				return null;
			}
			//5 类型转换
			executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
					(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
			// 6 放入线程私有内存
			this.executors.put(method, executor);
		}
		//7
		return executor;
	}

  1. 根据方法选取executors缓存中的执行器
  2. 该方法是获取被拦截方法的Async注解的value,具体的源码在AnnotationAsyncExecutionInterceptor类中
  3. 如果注解有value值,则从beanFactory中获取该value值的bean(根据注解中的值选取Executor执行器)
  4. 获取默认执行器,该默认值是在构造方法时设置的,后面详解;
  5. 转换targetExecutorexecutor
  6. executor存入缓存
  7. 返回executor

2.1 getExecutorQualifier方法

该方法是返回参数方法上的Async注解的value值(executorbeanname),该方法的源码在AnnotationAsyncExecutionInterceptor类中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14

    protected String getExecutorQualifier(Method method) {
		// Maintainer's note: changes made here should also be made in
		// AnnotationAsyncExecutionAspect#getExecutorQualifier
		//1
		Async async = AnnotatedElementUtils.findMergedAnnotation(method, Async.class);
		if (async == null) {
		//2
			async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async.class);
		}
		//3
		return (async != null ? async.value() : null);
	}

  1. 获取方法上的Async注解信息
  2. 若1为null,则根据方法的类来获取Async注解信息
  3. 返回Async的注解值

2.2 defaultExecutor值

在2.0中的第四步调用了defaultExecutor,获取默认的Executor:

1
2
3

targetExecutor = this.defaultExecutor.get();

那么这个defaultExecutor的值是什么时候初始化的呢??? 来看看AsyncExecutionAspectSupport的构造方法:

1
2
3
4
5
6

    public AsyncExecutionAspectSupport(@Nullable Executor defaultExecutor) {
		this.defaultExecutor = new SingletonSupplier<>(defaultExecutor, () -> getDefaultExecutor(this.beanFactory));
		this.exceptionHandler = SingletonSupplier.of(SimpleAsyncUncaughtExceptionHandler::new);
	}

先讲解这个方法:
这里是在给defaultExecutor属性和exceptionHandler属性赋值;

那么为什么要看这个构造方法呢??? 接下来返回到AsyncAnnotationAdvisor类的buildAdvice方法(参考上一篇文章:springboot异步线程(三)源码解析(一)),最终调用了上方的构造方法

1
2
3
4
5
6
7
8

    protected Advice buildAdvice(
			@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
		AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
		interceptor.configure(executor, exceptionHandler);
		return interceptor;
	}
	

我们可以看见调用AnnotationAsyncExecutionInterceptor构造方法的参数为null;

接下来看看AsyncExecutionAspectSupport类的构造方法中给defaultExecutor属性赋值时的另一个方法getDefaultExecutor方法,该方法就是获取BeanFactory中的Executor,这里选关键代码:

1
2
3
4
5
6
7
8
9
10
11
12
13

public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME = "taskExecutor";

// Search for TaskExecutor bean... not plain Executor since that would
// match with ScheduledExecutorService as well, which is unusable for
// our purposes here. TaskExecutor is more clearly designed for it.
// 根据TaskExecutor类别来获取Executor

return beanFactory.getBean(TaskExecutor.class);

// 根据Executor类别且beanName为taskExecutor值来获取Executor
return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);

该方法被子类AsyncExecutionInterceptor重写:

1
2
3
4
5
6

    protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
		Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
		return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
	}
	

3.0 doSubmit方法

该方法是执行上边1.0的第四步:将拦截的方法封装任务并交由默认执行器执行,最后返回结果;

该方法的源码在AsyncExecutionAspectSupport类,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

    @Nullable
	protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
		if (CompletableFuture.class.isAssignableFrom(returnType)) {
			return CompletableFuture.supplyAsync(() -> {
				try {
					return task.call();
				}
				catch (Throwable ex) {
					throw new CompletionException(ex);
				}
			}, executor);
		}
		else if (ListenableFuture.class.isAssignableFrom(returnType)) {
			return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
		}
		else if (Future.class.isAssignableFrom(returnType)) {
			return executor.submit(task);
		}
		else {
			executor.submit(task);
			return null;
		}
	}

总结

这里主要写一下异步任务选择执行器的大概流程吧:

  1. 检查AsyncConfigurer接口是否有实现类
  2. 查看Async注解是否带有指定执行器beanName,也就是Async注解的value值
  3. 根据TaskExecutor(专门为 异步任务设计的)类别查找beanFactory中的bean(执行器)
  4. 根据beanName为taskExecutor且类型为Executor来查找beanFactory中的bean(执行器)
  5. 新建一个 SimpleAsyncTaskExecutor执行器

在我的文章:springboot异步线程(二) 中有讲到:springboot2.1之后的版本与springboot2.1之前的版本的区别,这里也就不细说了;

最后

我还是我,我在努力,Take your time。

我的博客
我的GitHub

坚持原创技术分享,您的支持将鼓励我继续创作!