天天看點

動态線程池的簡單實作思路

作者:Java架構學習指南

什麼是動态線程池?

線上程池日常實踐中我們常常會遇到以下問題:

  • 代碼中建立了一個線程池卻不知道核心參數設定多少比較合适。
  • 參數設定好後,上線發現需要調整,改代碼重新開機服務非常麻煩。
  • 線程池相對于開發人員來說是個黑箱,運作情況再出現問題 前很難被感覺。

是以,動态可監控線程池一種針對以上痛點開發的線程池管理工具。主要可實作功能有:提供對 Spring 應用内線程池執行個體的全局管控、應用運作時動态變更線程池參數以及線程池資料采集和監控門檻值報警。

已經實作的優秀開源動态線程池

hippo4j、dynamic-tp.....

實作思路

核心管理類

需要能實作對線程池的

  • 服務注冊
  • 擷取已經注冊好的線程池
  • 以及對注冊号線程池參數的重新整理。

對于每一個線程池,我們使用一個線程池名字作為辨別每個線程池的唯一ID。

僞代碼實作
public class DtpRegistry {
    /**
     * 儲存線程池
     */
    private static final Map<String, Executor> EXECUTOR_MAP = new ConcurrentHashMap<>();
    
    /**
     * 擷取線程池
     * @param executorName 線程池名字
     */
    public static Executor getExecutor(String executorName) {
        return EXECUTOR_MAP.get(executorName);
    }
    
    /**
     * 線程池注冊
     * @param executorName 線程池名字
     */
    public static void registry(String executorName, Executor executor) {
        //注冊
        EXECUTOR_MAP.put(executorName, executorWrapper);
    }
    
    
    /**
     * 重新整理線程池參數
     * @param executorName 線程池名字
     * @param properties 線程池參數
     */
    public static void refresh(String executorName, ThreadPoolProperties properties) {
        Executor executor = EXECUTOR_MAP.get(executorName)
        //重新整理參數
        //.......
    }
	
}           

如何建立線程池?

STEP 1. 我們可以使用yml配置檔案的方式配置一個線程池,将線程池執行個體的建立交由Spring容器。

相關配置
public class DtpProperties {
    
    private List<ThreadPoolProperties> executors;
    
}

public class ThreadPoolProperties {
    /**
     * 辨別每個線程池的唯一名字
     */
    private String poolName;
    private String poolType = "common";

    /**
     * 是否為守護線程
     */
    private boolean isDaemon = false;

    /**
     * 以下都是核心參數
     */
    private int corePoolSize = 1;
    private int maximumPoolSize = 1;
    private long keepAliveTime;
    private TimeUnit timeUnit = TimeUnit.SECONDS;
    private String queueType = "arrayBlockingQueue";
    private int queueSize = 5;
    private String threadFactoryPrefix = "-td-";
    private String RejectedExecutionHandler;
}           

yml example:

spring:
  dtp:
    executors:
      # 線程池1
      - poolName: dtpExecutor1
        corePoolSize: 5
        maximumPoolSize: 10
      # 線程池2
      - poolName: dtpExecutor2
        corePoolSize: 2
        maximumPoolSize: 15           

STEP 2 根據配置資訊添加線程池的BeanDefinition

關鍵類
@Slf4j
public class DtpImportBeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar, EnvironmentAware {
    private Environment environment;

    @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
        log.info("注冊");
        //綁定資源
        DtpProperties dtpProperties = new DtpProperties();
        ResourceBundlerUtil.bind(environment, dtpProperties);
        List<ThreadPoolProperties> executors = dtpProperties.getExecutors();
        if (Objects.isNull(executors)) {
            log.info("未檢測本地到配置檔案線程池");
            return;
        }
        //注冊beanDefinition
        executors.forEach((executorProp) -> {
            BeanUtil.registerIfAbsent(registry, executorProp);
        });
    }


    @Override
    public void setEnvironment(Environment environment) {
        this.environment = environment;
    }
}


/**
 *
 * 工具類
 *
 */
public class BeanUtil{
    public static void registerIfAbsent(BeanDefinitionRegistry registry, ThreadPoolProperties executorProp) {
        register(registry, executorProp.getPoolName(), executorProp);
    }

    public static void register(BeanDefinitionRegistry registry, String beanName, ThreadPoolProperties executorProp) {
        Class<? extends Executor> executorType = ExecutorType.getClazz(executorProp.getPoolType());
        Object[] args = assembleArgs(executorProp);
        register(registry, beanName, executorType, args);
    }

    public static void register(BeanDefinitionRegistry registry, String beanName, Class<?> clazz, Object[] args) {
        BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(clazz);
        for (Object arg : args) {
            builder.addConstructorArgValue(arg);
        }
        registry.registerBeanDefinition(beanName, builder.getBeanDefinition());
    }
    
    private static Object[] assembleArgs(ThreadPoolProperties executorProp) {
        return new Object[]{
                executorProp.getCorePoolSize(),
                executorProp.getMaximumPoolSize(),
                executorProp.getKeepAliveTime(),
                executorProp.getTimeUnit(),
                QueueType.getInstance(executorProp.getQueueType(), executorProp.getQueueSize()),
                new NamedThreadFactory(
                        executorProp.getPoolName() + executorProp.getThreadFactoryPrefix(),
                        executorProp.isDaemon()
                ),
                //先預設不做設定
                RejectPolicy.ABORT.getValue()
        };
    }
}           

下面解釋一下這個類的作用,environment執行個體中儲存着spring啟動時解析的yml配置,是以我們spring提供的Binder将配置綁定到我們前面定義的DtpProperties類中,友善後續使用。接下來的比較簡單,就是将線程池的BeanDefinition注冊到IOC容器中,讓spring去幫我們執行個體化這個bean。

STEP 3. 将已經執行個體化的線程池注冊到核心類 DtpRegistry 中

我們注冊了 beanDefinition 後,spring會幫我們執行個體化出來, 在這之後我們可以根據需要将這個bean進行進一步的處理,spring也提供了很多機制讓我們對bean的生命周期管理進行更多的擴充。對應到這裡我們就是将執行個體化出來的線程池注冊到核心類 DtpRegistry 中進行管理。

這裡我們使用 BeanPostProcessor 進行處理。
@Slf4j
public class DtpBeanPostProcessor implements BeanPostProcessor {
    private DefaultListableBeanFactory beanFactory;

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if (bean instanceof DtpExecutor) {
            //直接納入管理
            DtpRegistry.registry(beanName, (DtpExecutor) bean);
        }
        return bean;
    }
}           

這裡的邏輯很簡單, 就是判斷一下這個bean是不是線程池,是就統一管理起來。

STEP 4. 啟用 BeanDefinitionRegistrar 和 BeanPostProcessor

在springboot程式中,隻要加一個@MapperScan注解就能啟用mybatis的功能,我們可以學習其在spring中的啟用方式,自定義一個注解:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(DtpImportSelector.class)
public @interface EnableDynamicThreadPool {
}           

其中,比較關鍵的是@Import注解,spring會導入注解中的類DtpImportSelector

而DtpImportSelector這個類實作了:

public class DtpImportSelector implements DeferredImportSelector {
    @Override
    public String[] selectImports(AnnotationMetadata importingClassMetadata) {
        return new String[]{
                DtpImportBeanDefinitionRegistrar.class.getName(),
                DtpBeanPostProcessor.class.getName()
        };
    }
}           

這樣,隻要我們再啟動類或者配置類上加上@EnableDynamicThreadPool這個注解,spring就會将DtpImportBeanDefinitionRegistrar和DtpBeanPostProcessor這兩個類加入spring容器管理,進而實作我們的線程池的注冊。

@SpringBootApplication
@EnableDynamicThreadPool
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}           

如何實作線程池配置的動态重新整理

首先明确一點,對于線程池的實作類,例如:ThreadPoolExecutor等,都有提供核心參數對應的 set 方法,讓我們實作參數修改。是以,在核心類DtpRegistry中的refresh方法,我們可以這樣寫:

public class DtpRegistry {
    /**
     * 儲存線程池
     */
    private static final Map<String, ThreadPoolExecutor> EXECUTOR_MAP = new ConcurrentHashMap<>();
    /**
     * 重新整理線程池參數
     * @param executorName 線程池名字
     * @param properties 線程池參數
     */
    public static void refresh(String executorName, ThreadPoolProperties properties) {
        ThreadPoolExecutor executor = EXECUTOR_MAP.get(executorName)
        
        //設定參數
        executor.setCorePoolSize(...);
        executor.setMaximumPoolSize(...);
        ......
    }
	
}           

而這些新參數怎麼來呢?我們可以引入Nacos、Apollo等配置中心,實作他們的監聽器方法,在監聽器方法裡調用DtpRegistry的refresh方法重新整理即可。