首頁(yè)技術(shù)文章正文

Java培訓(xùn):Spring的Async注解線程池?cái)U(kuò)展方案

更新時(shí)間:2022-08-25 來源:黑馬程序員 瀏覽量:

  目錄

  - [Spring的Async注解線程池?cái)U(kuò)展方案]

  - [目錄]

  - [1. 擴(kuò)展目的]

  - [2. 擴(kuò)展實(shí)現(xiàn)]

  - [2.1 擴(kuò)展Async注解的執(zhí)行攔截器`AnnotationAsyncExecutionInterceptor`]

  - [2.2 擴(kuò)展Async注解的Spring代理顧問`AsyncAnnotationAdvisor`]

  - [2.3 擴(kuò)展Async注解的 Spring Bean 后置處理器`AsyncAnnotationBeanPostProcessor`]

  - [2.4 擴(kuò)展代理異步配置類`ProxyAsyncConfiguration`]

  - [2.5 擴(kuò)展異步代理配置選擇器`AsyncConfigurationSelector`]

  - [2.6 擴(kuò)展異步啟動(dòng)注解`@EnableAsync`]

  - [3. 額外擴(kuò)展:給`@Async`注解代理指定線程池]

  1. 擴(kuò)展目的


  1. 異步調(diào)用,改用Spring提供的`@Aysnc`注解實(shí)現(xiàn),代替手寫線程池執(zhí)行。

  2. 在實(shí)際場(chǎng)景中,可能會(huì)遇到需要將主線程的一些個(gè)性化參數(shù)、變量、數(shù)據(jù)傳遞到子線程中使用的需求。

  3. `InheritableThreadLocal`可以解決子線程繼承父線程值的需求,但是它存在一些問題。

  1. `SessionUser.SESSION_USER`是中臺(tái)提供,無法修改。

  2. `InheritableThreadLocal`在線程池機(jī)制應(yīng)用中并不友好,不及時(shí)在子線程中清除的話,會(huì)造成線程安全問題。

  實(shí)現(xiàn)思路有兩種:

  1. 針對(duì)`ThreadLocal`進(jìn)行擴(kuò)展,并說服中臺(tái)統(tǒng)一改用擴(kuò)展后的`ThreadLocal`。

  2. 針對(duì)`@EnableAsync`和`@Async`注解進(jìn)行擴(kuò)展,將手動(dòng)copy的代碼寫入到Spring代理類中。

  第一種要跟中臺(tái)打交道,就很煩,能夠天平自己獨(dú)立解決,就自己解決。第二種會(huì)是一個(gè)不錯(cuò)的選擇,擴(kuò)展實(shí)現(xiàn)也并不困難。

  2. 擴(kuò)展實(shí)現(xiàn)

  2.1 擴(kuò)展Async注解的執(zhí)行攔截器`AnnotationAsyncExecutionInterceptor`

  類全名:`org.springframework.scheduling.annotation.AnnotationAsyncExecutionInterceptor`

  從調(diào)試記錄可以分析得出`AnnotationAsyncExecutionInterceptor#invoke`方法,正是創(chuàng)建異步任務(wù)并且執(zhí)行異步任務(wù)的核心代碼所在,我們要做的就是重寫這個(gè)方法,將父線程的運(yùn)行參數(shù)手動(dòng)copy到子線程任務(wù)體中。

1661406736004_1.jpg

  2.2 擴(kuò)展Async注解的Spring代理顧問`AsyncAnnotationAdvisor`

  我們依靠追蹤`AnnotationAsyncExecutionInterceptor`的構(gòu)造方法調(diào)用,定位到了它。

  全類名:`org.springframework.scheduling.annotation.AsyncAnnotationAdvisor`

  > 補(bǔ)充說明:代理顧問(`Advisor`)、建議(`Advice`)以及Spring代理實(shí)現(xiàn)原理

  >

  > Spring `@EnableAsync`默認(rèn)的代理模式是 JDK 代理,代理機(jī)制如下:

  >

  > Spring 一個(gè) Bean 會(huì)在 `BeanPostProcessor#postProcessAfterInitialization()`這個(gè)生命周期環(huán)節(jié),遍歷所有的`BeanPostProcessor`實(shí)例,判斷Bean是否符合代理?xiàng)l件,如果符合代理?xiàng)l件,就給 Bean 代理對(duì)象中追加建議(`Advice`)對(duì)象,這樣就完成了代理。

  >

  > 而建議(`Advice`)對(duì)象是由顧問(`Advisor`)對(duì)象創(chuàng)建和提供。

  >

  > 上一小節(jié)提到的異步執(zhí)行攔截器`AnnotationAsyncExecutionInterceptor`就是實(shí)現(xiàn)了`Advice`接口的類。

  在`@Async`注解的代理過程中,異步執(zhí)行攔截器`AnnotationAsyncExecutionInterceptor`就是通過`AsyncAnnotationAdvisor#buildAdvice`方法創(chuàng)建的。

  所以,當(dāng)我們想要將擴(kuò)展的新的異步執(zhí)行攔截器`LibraAnnotationAsyncExecutionInterceptor`用起來,則需要相應(yīng)的,還要把`AsyncAnnotationAdvisor#buildAdvice`方法重寫。

  2.3 擴(kuò)展Async注解的 Spring Bean 后置處理器`AsyncAnnotationBeanPostProcessor`

  我們依靠追蹤`AsyncAnnotationAdvisor`的構(gòu)造方法調(diào)用,定位到了它。

  類全名:`org.springframework.scheduling.annotation.AsyncAnnotationBeanPostProcessor`

  這個(gè)沒什么好說的,Spring Bean 的生命周期其中一環(huán)。是 Spring Bean 實(shí)現(xiàn)代理的起點(diǎn)。

  開發(fā)人員可以自定義一個(gè)`BeanPostProcessor`類,把它注冊(cè)到 Bean 容器中,它就會(huì)自動(dòng)生效,并將后續(xù)的每一個(gè) Bean 實(shí)例進(jìn)行條件判斷以及進(jìn)行代理。

  我們要重寫的方法是:`AsyncAnnotationBeanPostProcessor#setBeanFactory`。這個(gè)方法構(gòu)造了異步代理顧問`AsyncAnnotationAdvisor`對(duì)象。

  2.4 擴(kuò)展代理異步配置類`ProxyAsyncConfiguration`

  `AsyncAnnotationBeanPostProcessor`不是一般的 Spring Bean。它有幾個(gè)限制,導(dǎo)致它不能直接通過`@Component`或者`@Configuration`來創(chuàng)建實(shí)例。

  1. `AsyncAnnotationBeanPostProcessor`僅僅是實(shí)現(xiàn)了基于 JDK 代理,如果開發(fā)決定另外一種(基于ASPECTJ編織),那么它就應(yīng)該受到某種條件判斷來進(jìn)行 Bean 實(shí)例化。

  2. `AsyncAnnotationBeanPostProcessor`還需要配置指定的線程池、排序等等屬性,所以無法直接使用`@Component`注解注冊(cè)為 Bean。

  我們閱讀一下`@EnableAsync`注解源碼:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
    Class<? extends Annotation> annotation() default Annotation.class;
    boolean proxyTargetClass() default false;
    AdviceMode mode() default AdviceMode.PROXY;
    int order() default Ordered.LOWEST_PRECEDENCE;
}
```

  進(jìn)一步閱讀`AsyncConfigurationSelector`的源碼:

public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {

    private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
            "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";


    /**
   * 分別為EnableAsync.mode()的PROXY和ASPECTJ值返回{@link ProxyAsyncConfiguration}或{@code AspectJAsyncConfiguration} 。
     */
    @Override
    @Nullable
    public String[] selectImports(AdviceMode adviceMode) {
        switch (adviceMode) {
            case PROXY:
                return new String[] {ProxyAsyncConfiguration.class.getName()};
            case ASPECTJ:
                return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
            default:
                return null;
        }
    }

}
```

  謎底揭曉,`ProxyAsyncConfiguration`原來是在這里開始注冊(cè)到 Spring 容器中的。

  Spring Boot 啟動(dòng)后,會(huì)根據(jù)`@EnableAsync`注解的`mode()`方法的具體值,來決定整個(gè)Spring的 Bean 代理機(jī)制。

  既然 Spring 代理機(jī)制只會(huì)有一種,所以,也就只會(huì)在兩種機(jī)制的配置類中選擇其中一個(gè)來進(jìn)行實(shí)例化。

  而默認(rèn)`EnableAsync$mode()`默認(rèn)值是`AdviceMode.PROXY`,所以默認(rèn)采用 JDK 代理機(jī)制。

  2.5 擴(kuò)展異步代理配置選擇器`AsyncConfigurationSelector`

  類全名:`org.springframework.scheduling.annotation.AsyncConfigurationSelector`

  2.6 擴(kuò)展異步啟動(dòng)注解`@EnableAsync`

  類全名:`org.springframework.scheduling.annotation.EnableAsync`

  3. 額外擴(kuò)展:給`@Async`注解代理指定線程池

  `@Async`會(huì)自動(dòng)根據(jù)類型`TaskExecutor.class`從 Spring Bean 容器中找一個(gè)已經(jīng)實(shí)例化的異步任務(wù)執(zhí)行器(線程池)。如果找不到,則另尋他路,嘗試從 Spring Bean 容器中查找名稱為`taskExecutor`的`Executor.class`實(shí)例。最后都還是未找到呢,就默認(rèn)自動(dòng)`new`一個(gè)`SimpleAsyncTaskExecutor`來用。

  > 補(bǔ)充說明:`TaskExecutor.class`是Spring定義的,而`Executor.class`JDK定義的。

  場(chǎng)景:其他小伙伴、或者舊代碼已經(jīng)實(shí)現(xiàn)過了一個(gè)線程池,但是這個(gè)線程池,是個(gè)`Executor.class`類型,且 Bean 實(shí)例名稱不是`taskExecutor`(假設(shè)是`libraThreadPool`),正常情況下`@Async`根本無法找到它。

  需求:通過配置,將`@Async`的默認(rèn)線程池,指定為名為`libraThreadPool`的`Executor.class`類型線程池。

  我們只需要注冊(cè)一個(gè)實(shí)現(xiàn)`AsyncConfigurer`接口的配置類

  `org.springframework.scheduling.annotation.AbstractAsyncConfiguration#setConfigurers`:

/**
     * Collect any {@link AsyncConfigurer} beans through autowiring.
     */
    @Autowired(required = false)
    void setConfigurers(Collection<AsyncConfigurer> configurers) {
        if (CollectionUtils.isEmpty(configurers)) {
            return;
        }
        if (configurers.size() > 1) {
            throw new IllegalStateException("Only one AsyncConfigurer may exist");
        }
        AsyncConfigurer configurer = configurers.iterator().next();
        this.executor = configurer::getAsyncExecutor;
        this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;
    }
```


分享到:
在線咨詢 我要報(bào)名
和我們?cè)诰€交談!