天天看点

java 线程池封装_spring对java线程池封装源码解析

spring的 ThreadPoolTaskExecutor类最终是通过调用java 的ThreadPoolExecutor的void execute(Runnable task)方法或Future> submit(Runnable task)方法执行任务的

下面是spring的任务执行类和接口的继承层次

interface Executor

void execute(Runnable command);

interface TaskExecutor extends Executor

void execute(Runnable task);

interface AsyncTaskExecutor extends TaskExecutor

void execute(Runnable task, long startTimeout);

Future> submit(Runnable task);

Future submit(Callable task);

interface SchedulingTaskExecutor extends AsyncTaskExecutor

boolean prefersShortLivedTasks();

任务执行类

class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements SchedulingTaskExecutor

成员变量

private ThreadPoolExecutor threadPoolExecutor;

执行任务方法

public void execute(Runnable task) {

Executor executor = getThreadPoolExecutor();

try {

executor.execute(task);

}

catch (RejectedExecutionException ex) {

throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);

}

}

public ThreadPoolExecutor getThreadPoolExecutor() throws IllegalStateException {

Assert.state(this.threadPoolExecutor != null, "ThreadPoolTaskExecutor not initialized");

return this.threadPoolExecutor;

}

基类ExecutorConfigurationSupport

abstract class ExecutorConfigurationSupport extends CustomizableThreadFactory

implements BeanNameAware, InitializingBean, DisposableBean

其中基类CustomizableThreadFactory为自定义线程工厂类

成员变量

private ExecutorService executor;

生命周期初始化

public void afterPropertiesSet() {

initialize();

}

初始化方法

public void initialize() {

if (logger.isInfoEnabled()) {

logger.info("Initializing ExecutorService " + (this.beanName != null ? " '" + this.beanName + "'" : ""));

}

if (!this.threadNamePrefixSet && this.beanName != null) {

setThreadNamePrefix(this.beanName + "-");

}

this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);

}

抽象方法(子类ThreadPoolTaskExecutor实现)

protected abstract ExecutorService initializeExecutor(

ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler);

生命周期销毁方法

public void destroy() {

shutdown();

}

ThreadPoolTaskExecutor实现ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler);方法

protected ExecutorService initializeExecutor(

ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

BlockingQueue queue = createQueue(this.queueCapacity);

ThreadPoolExecutor executor = new ThreadPoolExecutor(

this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,

queue, threadFactory, rejectedExecutionHandler);

if (this.allowCoreThreadTimeOut) {

executor.allowCoreThreadTimeOut(true);

}

this.threadPoolExecutor = executor;

return executor;

}

我们注意到上面的方法 initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler)是父类ExecutorConfigurationSupport调用的,初始化父类成员变量private ExecutorService executor;

而ThreadPoolTaskExecutor实际执行任务是采用的自身成员变量private ThreadPoolExecutor threadPoolExecutor;

public ThreadPoolExecutor getThreadPoolExecutor() throws IllegalStateException {

Assert.state(this.threadPoolExecutor != null, "ThreadPoolTaskExecutor not initialized");

return this.threadPoolExecutor;

}

不明白这里为什么要这么处理?

再看其他部分

ExecutorConfigurationSupport 里面配置默认拒绝策略

private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();

ThreadPoolTaskExecutor成员变量

private int corePoolSize = 1;

private int maxPoolSize = Integer.MAX_VALUE;//默认最大线程池

private int keepAliveSeconds = 60;

private boolean allowCoreThreadTimeOut = false;

private int queueCapacity = Integer.MAX_VALUE;//默认队列容量

阻塞队列创建方法

protected BlockingQueue createQueue(int queueCapacity) {

if (queueCapacity > 0) {

return new LinkedBlockingQueue(queueCapacity);

}

else {

return new SynchronousQueue();

}

}