天天看點

ThreadPoolExecutor流程

ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)      
用給定的初始參數建立新的  

ThreadPoolExecutor

參數:

corePoolSize

 - 池中所儲存的線程數,包括空閑線程。

maximumPoolSize

 - 池中允許的最大線程數。

keepAliveTime

 - 當線程數大于核心時,此為終止前多餘的空閑線程等待新任務的最長時間。

unit

 - keepAliveTime 參數的時間機關。

workQueue

 - 執行前用于保持任務的隊列。此隊列僅保持由  

execute

 方法送出的  

Runnable

 任務。

threadFactory

 - 執行程式建立新線程時使用的工廠。

handler

 - 由于超出線程範圍和隊列容量而使執行被阻塞時所使用的處理程式。
抛出:

IllegalArgumentException

 - 如果 corePoolSize 或 keepAliveTime 小于 0,或者 maximumPoolSize 小于等于 0,或者 corePoolSize 大于 maximumPoolSize。

NullPointerException

 - 如果  

workQueue

、  

threadFactory

 或  

handler

 為 null。

FixThreadPool建立:

public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}      

ThreadPoolExecutor的corePoolSize和maximumPoolSize都被設定為nThreads,當線程池中的線程數大于corePoolSize時,keepAliveTime為多餘的空閑線程等待新任務的最長時間,超出這個時間後多餘的線程将被終止。這裡把keepAliveTime設定為0L,意味着多餘的空閑線程會被立刻終止。   ThreadPoolExecutor的  execute(): (1) 如果目前運作的線程數少于 corePoolSize,則建立新線程來執行任務。 (2) 線上程池完成預熱之後(目前運作的線程數等于corePoolSize),将任務加入LinkedBlockingQueue。 (3) 線程執行完1中的任務後,會在循環中反複從LinkedBlockingQueue擷取任務來執行。    

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true)) //addWorker 建立任務并start
        return;
        c = ctl.get();
    }
if (isRunning(c) && workQueue.offer(command)) {  //等于corePoolSize後加入隊列
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
            reject(command);
    else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}      

addWorker方法建立worker,即實際工作線程,當實際工作線程達到corePoolSize後,不再建立worker,後續的任務加入阻塞隊列,現有的worker的 runWorker方法從隊列中不斷的擷取任務。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
            w.lock();
// If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
try {
                beforeExecute(wt, task);
                Throwable thrown = null;
try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}      

FixThreadPool 使用無界隊列LinkedBlockingQueue作為線程池的工作隊列(容量為Integer.MAX_VALUE),使用無界隊列會: (1) 當線程池中的線程數達到corePoolSize後,新任務将在無界隊列中等待,是以線程池中的線程數不會超過corePoolSize。 (2) 由于第一點,maximumPoolSize和 keepAliveTime是無效的參數。 (3) 由于使用無界隊列,運作中的FixedThreadPool(未執行方法shutdown()或shutdownNow()) 不會拒絕任務(不會調用RejectedExecutionHandler.rejectedExecution 方法 )