更新時(shí)間:2022-08-25 來源:黑馬程序員 瀏覽量:
目錄
- [Spring的Async注解線程池?cái)U(kuò)展方案]
- [目錄]
- [1. 擴(kuò)展目的]
- [2. 擴(kuò)展實(shí)現(xiàn)]
- [2.1 擴(kuò)展Async注解的執(zhí)行攔截器`AnnotationAsyncExecutionInterceptor`]
- [2.2 擴(kuò)展Async注解的Spring代理顧問`AsyncAnnotationAdvisor`]
- [2.3 擴(kuò)展Async注解的 Spring Bean 后置處理器`AsyncAnnotationBeanPostProcessor`]
- [2.4 擴(kuò)展代理異步配置類`ProxyAsyncConfiguration`]
- [2.5 擴(kuò)展異步代理配置選擇器`AsyncConfigurationSelector`]
- [2.6 擴(kuò)展異步啟動(dòng)注解`@EnableAsync`]
- [3. 額外擴(kuò)展:給`@Async`注解代理指定線程池]
擴(kuò)展目的
1. 異步調(diào)用,改用Spring提供的`@Aysnc`注解實(shí)現(xiàn),代替手寫線程池執(zhí)行。
2. 在實(shí)際場(chǎng)景中,可能會(huì)遇到需要將主線程的一些個(gè)性化參數(shù)、變量、數(shù)據(jù)傳遞到子線程中使用的需求。
3. `InheritableThreadLocal`可以解決子線程繼承父線程值的需求,但是它存在一些問題。
1. `SessionUser.SESSION_USER`是中臺(tái)提供,無法修改。
2. `InheritableThreadLocal`在線程池機(jī)制應(yīng)用中并不友好,不及時(shí)在子線程中清除的話,會(huì)造成線程安全問題。
實(shí)現(xiàn)思路有兩種:
1. 針對(duì)`ThreadLocal`進(jìn)行擴(kuò)展,并說服中臺(tái)統(tǒng)一改用擴(kuò)展后的`ThreadLocal`。
2. 針對(duì)`@EnableAsync`和`@Async`注解進(jìn)行擴(kuò)展,將手動(dòng)copy的代碼寫入到Spring代理類中。
第一種要跟中臺(tái)打交道,就很煩,能夠天平自己獨(dú)立解決,就自己解決。第二種會(huì)是一個(gè)不錯(cuò)的選擇,擴(kuò)展實(shí)現(xiàn)也并不困難。
2. 擴(kuò)展實(shí)現(xiàn)
2.1 擴(kuò)展Async注解的執(zhí)行攔截器`AnnotationAsyncExecutionInterceptor`
類全名:`org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor`
從調(diào)試記錄可以分析得出`AnnotationAsyncExecutionInterceptor#invoke`方法,正是創(chuàng)建異步任務(wù)并且執(zhí)行異步任務(wù)的核心代碼所在,我們要做的就是重寫這個(gè)方法,將父線程的運(yùn)行參數(shù)手動(dòng)copy到子線程任務(wù)體中。
2.2 擴(kuò)展Async注解的Spring代理顧問`AsyncAnnotationAdvisor`
我們依靠追蹤`AnnotationAsyncExecutionInterceptor`的構(gòu)造方法調(diào)用,定位到了它。
全類名:`org.springframework.scheduling.annotation.AsyncAnnotationAdvisor`
> 補(bǔ)充說明:代理顧問(`Advisor`)、建議(`Advice`)以及Spring代理實(shí)現(xiàn)原理
>
> Spring `@EnableAsync`默認(rèn)的代理模式是 JDK 代理,代理機(jī)制如下:
>
> Spring 一個(gè) Bean 會(huì)在 `BeanPostProcessor#postProcessAfterInitialization()`這個(gè)生命周期環(huán)節(jié),遍歷所有的`BeanPostProcessor`實(shí)例,判斷Bean是否符合代理?xiàng)l件,如果符合代理?xiàng)l件,就給 Bean 代理對(duì)象中追加建議(`Advice`)對(duì)象,這樣就完成了代理。
>
> 而建議(`Advice`)對(duì)象是由顧問(`Advisor`)對(duì)象創(chuàng)建和提供。
>
> 上一小節(jié)提到的異步執(zhí)行攔截器`AnnotationAsyncExecutionInterceptor`就是實(shí)現(xiàn)了`Advice`接口的類。
在`@Async`注解的代理過程中,異步執(zhí)行攔截器`AnnotationAsyncExecutionInterceptor`就是通過`AsyncAnnotationAdvisor#buildAdvice`方法創(chuàng)建的。
所以,當(dāng)我們想要將擴(kuò)展的新的異步執(zhí)行攔截器`LibraAnnotationAsyncExecutionInterceptor`用起來,則需要相應(yīng)的,還要把`AsyncAnnotationAdvisor#buildAdvice`方法重寫。
2.3 擴(kuò)展Async注解的 Spring Bean 后置處理器`AsyncAnnotationBeanPostProcessor`
我們依靠追蹤`AsyncAnnotationAdvisor`的構(gòu)造方法調(diào)用,定位到了它。
類全名:`org.springframework.scheduling.annotation.AsyncAnnotationBeanPostProcessor`
這個(gè)沒什么好說的,Spring Bean 的生命周期其中一環(huán)。是 Spring Bean 實(shí)現(xiàn)代理的起點(diǎn)。
開發(fā)人員可以自定義一個(gè)`BeanPostProcessor`類,把它注冊(cè)到 Bean 容器中,它就會(huì)自動(dòng)生效,并將后續(xù)的每一個(gè) Bean 實(shí)例進(jìn)行條件判斷以及進(jìn)行代理。
我們要重寫的方法是:`AsyncAnnotationBeanPostProcessor#setBeanFactory`。這個(gè)方法構(gòu)造了異步代理顧問`AsyncAnnotationAdvisor`對(duì)象。
2.4 擴(kuò)展代理異步配置類`ProxyAsyncConfiguration`
`AsyncAnnotationBeanPostProcessor`不是一般的 Spring Bean。它有幾個(gè)限制,導(dǎo)致它不能直接通過`@Component`或者`@Configuration`來創(chuàng)建實(shí)例。
`AsyncAnnotationBeanPostProcessor`僅僅是實(shí)現(xiàn)了基于 JDK 代理,如果開發(fā)決定另外一種(基于ASPECTJ編織),那么它就應(yīng)該受到某種條件判斷來進(jìn)行 Bean 實(shí)例化。
2. `AsyncAnnotationBeanPostProcessor`還需要配置指定的線程池、排序等等屬性,所以無法直接使用`@Component`注解注冊(cè)為 Bean。
我們閱讀一下`@EnableAsync`注解源碼:
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(AsyncConfigurationSelector.class) public @interface EnableAsync { Class<? extends Annotation> annotation() default Annotation.class; boolean proxyTargetClass() default false; AdviceMode mode() default AdviceMode.PROXY; int order() default Ordered.LOWEST_PRECEDENCE; } ```
進(jìn)一步閱讀`AsyncConfigurationSelector`的源碼:
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> { private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME = "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration"; /** * 分別為EnableAsync.mode()的PROXY和ASPECTJ值返回{@link ProxyAsyncConfiguration}或{@code AspectJAsyncConfiguration} 。 */ @Override @Nullable public String[] selectImports(AdviceMode adviceMode) { switch (adviceMode) { case PROXY: return new String[] {ProxyAsyncConfiguration.class.getName()}; case ASPECTJ: return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME}; default: return null; } } } ```
謎底揭曉,`ProxyAsyncConfiguration`原來是在這里開始注冊(cè)到 Spring 容器中的。
Spring Boot 啟動(dòng)后,會(huì)根據(jù)`@EnableAsync`注解的`mode()`方法的具體值,來決定整個(gè)Spring的 Bean 代理機(jī)制。
既然 Spring 代理機(jī)制只會(huì)有一種,所以,也就只會(huì)在兩種機(jī)制的配置類中選擇其中一個(gè)來進(jìn)行實(shí)例化。
而默認(rèn)`EnableAsync$mode()`默認(rèn)值是`AdviceMode.PROXY`,所以默認(rèn)采用 JDK 代理機(jī)制。
2.5 擴(kuò)展異步代理配置選擇器`AsyncConfigurationSelector`
類全名:`org.springframework.scheduling.annotation.AsyncConfigurationSelector`
2.6 擴(kuò)展異步啟動(dòng)注解`@EnableAsync`
類全名:`org.springframework.scheduling.annotation.EnableAsync`
3. 額外擴(kuò)展:給`@Async`注解代理指定線程池
`@Async`會(huì)自動(dòng)根據(jù)類型`TaskExecutor.class`從 Spring Bean 容器中找一個(gè)已經(jīng)實(shí)例化的異步任務(wù)執(zhí)行器(線程池)。如果找不到,則另尋他路,嘗試從 Spring Bean 容器中查找名稱為`taskExecutor`的`Executor.class`實(shí)例。最后都還是未找到呢,就默認(rèn)自動(dòng)`new`一個(gè)`SimpleAsyncTaskExecutor`來用。
> 補(bǔ)充說明:`TaskExecutor.class`是Spring定義的,而`Executor.class`JDK定義的。
場(chǎng)景:其他小伙伴、或者舊代碼已經(jīng)實(shí)現(xiàn)過了一個(gè)線程池,但是這個(gè)線程池,是個(gè)`Executor.class`類型,且 Bean 實(shí)例名稱不是`taskExecutor`(假設(shè)是`libraThreadPool`),正常情況下`@Async`根本無法找到它。
需求:通過配置,將`@Async`的默認(rèn)線程池,指定為名為`libraThreadPool`的`Executor.class`類型線程池。
我們只需要注冊(cè)一個(gè)實(shí)現(xiàn)`AsyncConfigurer`接口的配置類
`org.springframework.scheduling.annotation.AbstractAsyncConfiguration#setConfigurers`:
/** * Collect any {@link AsyncConfigurer} beans through autowiring. */ @Autowired(required = false) void setConfigurers(Collection<AsyncConfigurer> configurers) { if (CollectionUtils.isEmpty(configurers)) { return; } if (configurers.size() > 1) { throw new IllegalStateException("Only one AsyncConfigurer may exist"); } AsyncConfigurer configurer = configurers.iterator().next(); this.executor = configurer::getAsyncExecutor; this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler; } ```