ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- 用給定的初始參數建立新的
。ThreadPoolExecutor
- 參數:
-
- 池中所儲存的線程數,包括空閑線程。corePoolSize
-
- 池中允許的最大線程數。maximumPoolSize
-
- 當線程數大于核心時,此為終止前多餘的空閑線程等待新任務的最長時間。keepAliveTime
-
- keepAliveTime 參數的時間機關。unit
-
- 執行前用于保持任務的隊列。此隊列僅保持由workQueue
方法送出的execute
任務。Runnable
-
- 執行程式建立新線程時使用的工廠。threadFactory
-
- 由于超出線程範圍和隊列容量而使執行被阻塞時所使用的處理程式。handler
抛出: -
- 如果 corePoolSize 或 keepAliveTime 小于 0,或者 maximumPoolSize 小于等于 0,或者 corePoolSize 大于 maximumPoolSize。IllegalArgumentException
-
- 如果NullPointerException
、workQueue
或threadFactory
為 null。handler
-
FixThreadPool建立:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
ThreadPoolExecutor的corePoolSize和maximumPoolSize都被設定為nThreads,當線程池中的線程數大于corePoolSize時,keepAliveTime為多餘的空閑線程等待新任務的最長時間,超出這個時間後多餘的線程将被終止。這裡把keepAliveTime設定為0L,意味着多餘的空閑線程會被立刻終止。 ThreadPoolExecutor的 execute(): (1) 如果目前運作的線程數少于 corePoolSize,則建立新線程來執行任務。 (2) 線上程池完成預熱之後(目前運作的線程數等于corePoolSize),将任務加入LinkedBlockingQueue。 (3) 線程執行完1中的任務後,會在循環中反複從LinkedBlockingQueue擷取任務來執行。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) //addWorker 建立任務并start
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { //等于corePoolSize後加入隊列
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
addWorker方法建立worker,即實際工作線程,當實際工作線程達到corePoolSize後,不再建立worker,後續的任務加入阻塞隊列,現有的worker的 runWorker方法從隊列中不斷的擷取任務。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
FixThreadPool 使用無界隊列LinkedBlockingQueue作為線程池的工作隊列(容量為Integer.MAX_VALUE),使用無界隊列會: (1) 當線程池中的線程數達到corePoolSize後,新任務将在無界隊列中等待,是以線程池中的線程數不會超過corePoolSize。 (2) 由于第一點,maximumPoolSize和 keepAliveTime是無效的參數。 (3) 由于使用無界隊列,運作中的FixedThreadPool(未執行方法shutdown()或shutdownNow()) 不會拒絕任務(不會調用RejectedExecutionHandler.rejectedExecution 方法 )