在Java的線程池的使用會有比較多的地方,有比較多的應用場景,介紹一下Java線程池ThreadPoolExecutor。
線程是一個作業系統概念。作業系統負責這個線程的建立、挂起、運作、阻塞和終結操作。而作業系統建立線程、切換線程狀态、終結線程都要進行CPU排程----這是一個耗費時間和系統資源的事情。
大多數實際場景中是這樣的:處理某一次請求的時間是非常短暫的,但是請求數量是巨大的。這種背景下,如果我們為每一個請求都單獨建立一個線程,那麼實體機的所有資源基本上都被作業系統建立線程、切換線程狀态、銷毀線程這些操作所占用,用于業務請求處理的資源反而減少了。是以最理想的處理方式是,将處理請求的線程數量控制在一個範圍,既保證後續的請求不會等待太長時間,又保證實體機将足夠的資源用于請求處理本身。
1.ThreadPoolExecutor類
二話不多說,來看一下ThreadPoolExecutor類的具體實作源碼。
繼承抽象類AbstractExecutorService,它實作了ExecutorService 接口
public class ThreadPoolExecutor extends AbstractExecutorService{····}
public abstract class AbstractExecutorService implements ExecutorService {····}
public interface ExecutorService extends Executor {····}
public interface Executor {····}
ThreadPoolExecutor類中提供了四個構造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
構造函數中需要傳入的參數包括corePoolSize、maximumPoolSize、keepAliveTime、timeUnit、workQueue、threadFactory和handler。
corePoolSize:線程池主要用于執行任務的是核心線程數量。預設情況下,在建立了線程池後,線程池中的線程數為0,當有任務來之後,就會建立一個線程去執行任務,當線程池中的線程數目達到corePoolSize後,就會把到達的任務放到緩存隊列當中。
非核心線程:設定的大于corePoolSize參數小于maximumPoolSize參數的部分,就是線程池可以臨時建立的“非核心線程”的最大數量。這種情況下如果某個線程沒有運作任何任務,在等待keepAliveTime時間後,這個線程将會被銷毀,直到線程池的線程數量重新達到corePoolSize。
maximumPoolSize:目前線程池允許建立的最大線程數量。那麼如果設定的corePoolSize參數和設定的maximumPoolSize參數一緻時,線程池在任何情況下都不會回收空閑線程。keepAliveTime和timeUnit也就失去了意義。
keepAliveTime:線程沒有任務執行時最多保持多久時間會終止。預設情況下,隻有當線程池中的線程數大于corePoolSize時,keepAliveTime才會起作用,直到線程池中的線程數不大于corePoolSize,即當線程池中的線程數大于corePoolSize時,如果一個線程空閑的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。但是如果調用了allowCoreThreadTimeOut(boolean)方法,線上程池中的線程數不大于corePoolSize時,keepAliveTime參數也會起作用,直到線程池中的線程數為0。
timeUnit:參數keepAliveTime的時間機關。
workQueue:一個阻塞隊列,用來存儲等待執行的任務。一般來說,這裡的阻塞隊列有以下幾種選擇:ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue。
threadFactory:線程工廠,主要用來建立線程。
handler:表示當拒絕處理任務時的政策。有以下四種取值:
ThreadPoolExecutor.AbortPolicy:丢棄任務并抛出RejectedExecutionException異常
ThreadPoolExecutor.DiscardPolicy:也是丢棄任務,但是不抛出異常
ThreadPoolExecutor.DiscardOldestPolicy:丢棄隊列最前面的任務,然後重新嘗試執行任務(重複此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務
2.線程池實作原理
(1)任務的執行
先來看一下ThreadPoolExecutor類中其他的一些比較重要成員變量:
private final BlockingQueue<Runnable> workQueue; //任務緩存隊列,用來存放等待執行的任務
private final ReentrantLock mainLock = new ReentrantLock(); //線程池的主要狀态鎖,對線程池狀态(比如線程池大小,runState等)的改變都要使用這個鎖
private final HashSet<Worker> workers = new HashSet<Worker>(); //用來存放工作集
private volatile long keepAliveTime; //線程存活時間
private volatile boolean allowCoreThreadTimeOut; //是否允許為核心線程設定存活時間
private volatile int corePoolSize; //核心池的大小(即線程池中的線程數目大于這個參數時,送出的任務會被放進任務緩存隊列)
private volatile int maximumPoolSize; //線程池最大能容忍的線程數
private volatile int poolSize; //線程池中目前的線程數
private volatile RejectedExecutionHandler handler; //任務拒絕政策
private volatile ThreadFactory threadFactory; //線程工廠,用來建立線程
private int largestPoolSize; //用來記錄線程池中曾經出現過的最大線程數
private long completedTaskCount; //用來記錄已經執行完畢的任務個數
在ThreadPoolExecutor類中,最核心的任務送出方法是execute()方法,雖然通過submit也可以送出任務,但是實際上submit方法裡面最終調用的還是execute()方法,是以我們隻需要研究execute()方法的實作原理即可:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
·····························································································
<未完待續>