线程池是Java并发编程中的一个重要组件,它能够有效地管理线程的生命周期和执行,从而避免了频繁创建和销毁线程的开销。在本文中,我们将详细解读Java线程池的实现源码。
1. 线程池的基本实现
Java线程池的基本实现是通过ThreadPoolExecutor类来完成的。ThreadPoolExecutor是一个线程池的核心类,它实现了Executor接口并提供了线程池的完整功能。下面是ThreadPoolExecutor类的构造函数:
```
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { // 省略代码 }
}
```
其中,corePoolSize是线程池的核心线程数,maximumPoolSize是线程池的最大线程数,keepAliveTime和unit是非核心线程的存活时间和时间单位,workQueue是任务队列,threadFactory用于创建线程,handler是拒绝策略,用于处理任务队列已满时的任务拒绝。
线程池的执行过程如下:
1. 当任务数量小于核心线程数时,创建核心线程来处理任务。
2. 当任务数量大于核心线程数时,将任务放入任务队列中。
3. 当任务队列已满且线程数小于最大线程数时,创建新的线程来处理任务。
4. 当线程数达到最大线程数时,执行拒绝策略。
下面我们来详细解读ThreadPoolExecutor类的实现源码。
2. 线程池的实现细节
ThreadPoolExecutor类实现了ExecutorService接口,它提供了submit、invokeAll、invokeAny等方法,用于提交任务和管理线程池。下面我们来看一下ThreadPoolExecutor类中的一些重要方法。
2.1 execute方法
execute方法是ThreadPoolExecutor类中最重要的方法之一,用于执行任务。下面是execute方法的源码:
```
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we add a new thread. This covers both cases when the * queue is full and when the queue is "small" (i.e., when * queuing would normally be allowed but construction has * not yet completed). We don't actually know whether the * task is going to be executed until workerThread * actually handles it, so we compensate for workers that * exit just after checking before blocking by rechecking * runState. */
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) return; c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) reject(command);
else if (workerCountOf(recheck) == 0) addWorker(null, false);
}
else if (! addWorker(command, false)) reject(command);
}
```
可以看到,execute方法中主要做了以下几件事情:
1. 如果当前线程数小于核心线程数,尝试通过addWorker方法创建新的线程来处理任务。
2. 如果任务队列未满,则将任务加入任务队列中。
3. 如果任务队列已满但线程数小于最大线程数,尝试通过addWorker方法创建新的线程来处理任务。
4. 如果任务队列已满且线程数已达到最大线程数,执行拒绝策略。 其中,addWorker方法用于创建新的线程来处理任务。下面我们来看一下addWorker方法的实现。
2.2 addWorker方法 addWorker方法用于创建新的线程来处理任务,其源码如下:
```
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (; ; ) {
int c = ctl.get();
int rs = runStateOf(c); // Check if queue empty only if necessary.
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false;
for (; ; ) {
int wc = workerCountOf(c);
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false;
if (compareAndIncrementWorkerCount(c)) break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
int rs = runStateOf(c);
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize) largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (!workerStarted) addWorkerFailed(w);
}
return workerStarted;
}
```
可以看到,addWorker方法主要做了以下几件事情:
1. 检查线程池的状态,如果线程池的状态为SHUTDOWN或STOP,则返回false,不再创建新的线程。
2. 如果当前线程数小于最大线程数,则通过compareAndIncrementWorkerCount方法增加线程数,并继续执行下面的代码。
3. 创建新的Worker对象,并将其加入到workers集合中。
4. 如果worker添加成功,则启动新的线程。 其中,Worker类是线程池中最重要的类之一,我们接下来来看一下Worker类的实现。
2.3 Worker类 Worker类是线程池中真正执行任务的类,其源码如下:
```
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/**
* Thread this worker is running in. Null if factory fails.
*/
final Thread thread;
/**
* Initial task to run. Possibly null.
*/
Runnable firstTask;
/**
* Per-thread task counter
*/
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
*
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/**
* Delegates main run loop to outer runWorker
*/
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() {
acquire(1);
}
public boolean tryLock() {
return tryAcquire(1);
}
public void unlock() {
release(1);
}
public boolean isLocked() {
return isHeldExclusively();
}
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
```
可以看到,Worker类主要做了以下几件事情:
1. 实现了Runnable接口,用于执行任务。
2. 实现了AbstractQueuedSynchronizer类,用实现线程的同步,保证同一时间只有一个线程在执行任务。
3. 每个Worker对象都有一个firstTask属性,表示Worker执行的第一个任务。
4. 每个Worker对象都有一个completedTasks属性,表示Worker执行的任务数量。
在Worker类中,最重要的方法是runWorker方法,它是线程池中真正执行任务的方法,其源码如下:
```
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
```
可以看到,runWorker方法主要做了以下几件事情:
1. 获取Worker对象的firstTask属性,并将其置为null。
1. 允许中断,执行任务。
1. 如果线程池状态为STOP或SHUTDOWN,并且当前线程没有被中断,则中断当前线程。
1. 执行任务,并在任务执行前后调用beforeExecute和afterExecute方法。
1. 在任务执行完成后,更新Worker对象的completedTasks属性,并将Worker对象解锁。
1. 循环执行任务,直到线程池状态为STOP或SHUTDOWN。
通过以上源码解读,我们可以对Java线程池的实现原理有更加深入的理解。