天天看点

ThreadPoolExecutor流程

ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)      
用给定的初始参数创建新的  

ThreadPoolExecutor

参数:

corePoolSize

 - 池中所保存的线程数,包括空闲线程。

maximumPoolSize

 - 池中允许的最大线程数。

keepAliveTime

 - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。

unit

 - keepAliveTime 参数的时间单位。

workQueue

 - 执行前用于保持任务的队列。此队列仅保持由  

execute

 方法提交的  

Runnable

 任务。

threadFactory

 - 执行程序创建新线程时使用的工厂。

handler

 - 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。
抛出:

IllegalArgumentException

 - 如果 corePoolSize 或 keepAliveTime 小于 0,或者 maximumPoolSize 小于等于 0,或者 corePoolSize 大于 maximumPoolSize。

NullPointerException

 - 如果  

workQueue

、  

threadFactory

 或  

handler

 为 null。

FixThreadPool创建:

public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}      

ThreadPoolExecutor的corePoolSize和maximumPoolSize都被设置为nThreads,当线程池中的线程数大于corePoolSize时,keepAliveTime为多余的空闲线程等待新任务的最长时间,超出这个时间后多余的线程将被终止。这里把keepAliveTime设置为0L,意味着多余的空闲线程会被立刻终止。   ThreadPoolExecutor的  execute(): (1) 如果当前运行的线程数少于 corePoolSize,则创建新线程来执行任务。 (2) 在线程池完成预热之后(当前运行的线程数等于corePoolSize),将任务加入LinkedBlockingQueue。 (3) 线程执行完1中的任务后,会在循环中反复从LinkedBlockingQueue获取任务来执行。    

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true)) //addWorker 创建任务并start
        return;
        c = ctl.get();
    }
if (isRunning(c) && workQueue.offer(command)) {  //等于corePoolSize后加入队列
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
            reject(command);
    else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}      

addWorker方法创建worker,即实际工作线程,当实际工作线程达到corePoolSize后,不再创建worker,后续的任务加入阻塞队列,现有的worker的 runWorker方法从队列中不断的获取任务。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
            w.lock();
// If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
try {
                beforeExecute(wt, task);
                Throwable thrown = null;
try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}      

FixThreadPool 使用无界队列LinkedBlockingQueue作为线程池的工作队列(容量为Integer.MAX_VALUE),使用无界队列会: (1) 当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池中的线程数不会超过corePoolSize。 (2) 由于第一点,maximumPoolSize和 keepAliveTime是无效的参数。 (3) 由于使用无界队列,运行中的FixedThreadPool(未执行方法shutdown()或shutdownNow()) 不会拒绝任务(不会调用RejectedExecutionHandler.rejectedExecution 方法 )