天天看点

Java线程池源码解读

作者:我心永恒224077694

线程池是Java并发编程中的一个重要组件,它能够有效地管理线程的生命周期和执行,从而避免了频繁创建和销毁线程的开销。在本文中,我们将详细解读Java线程池的实现源码。

1. 线程池的基本实现

Java线程池的基本实现是通过ThreadPoolExecutor类来完成的。ThreadPoolExecutor是一个线程池的核心类,它实现了Executor接口并提供了线程池的完整功能。下面是ThreadPoolExecutor类的构造函数:

```

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { // 省略代码 }

}

```

其中,corePoolSize是线程池的核心线程数,maximumPoolSize是线程池的最大线程数,keepAliveTime和unit是非核心线程的存活时间和时间单位,workQueue是任务队列,threadFactory用于创建线程,handler是拒绝策略,用于处理任务队列已满时的任务拒绝。

线程池的执行过程如下:

1. 当任务数量小于核心线程数时,创建核心线程来处理任务。

2. 当任务数量大于核心线程数时,将任务放入任务队列中。

3. 当任务队列已满且线程数小于最大线程数时,创建新的线程来处理任务。

4. 当线程数达到最大线程数时,执行拒绝策略。

下面我们来详细解读ThreadPoolExecutor类的实现源码。

2. 线程池的实现细节

ThreadPoolExecutor类实现了ExecutorService接口,它提供了submit、invokeAll、invokeAny等方法,用于提交任务和管理线程池。下面我们来看一下ThreadPoolExecutor类中的一些重要方法。

2.1 execute方法

execute方法是ThreadPoolExecutor类中最重要的方法之一,用于执行任务。下面是execute方法的源码:

```

public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

/*

* Proceed in 3 steps:

*

* 1. If fewer than corePoolSize threads are running, try to

* start a new thread with the given command as its first

* task. The call to addWorker atomically checks runState and

* workerCount, and so prevents false alarms that would add

* threads when it shouldn't, by returning false.

*

* 2. If a task can be successfully queued, then we still need

* to double-check whether we should have added a thread

* (because existing ones died since last checking) or that

* the pool shut down since entry into this method. So we

* recheck state and if necessary roll back the enqueuing if

* stopped, or start a new thread if there are none.

*

* 3. If we cannot queue task, then we add a new thread. This covers both cases when the * queue is full and when the queue is "small" (i.e., when * queuing would normally be allowed but construction has * not yet completed). We don't actually know whether the * task is going to be executed until workerThread * actually handles it, so we compensate for workers that * exit just after checking before blocking by rechecking * runState. */

int c = ctl.get();

if (workerCountOf(c) < corePoolSize) {

if (addWorker(command, true)) return; c = ctl.get();

}

if (isRunning(c) && workQueue.offer(command)) {

int recheck = ctl.get();

if (! isRunning(recheck) && remove(command)) reject(command);

else if (workerCountOf(recheck) == 0) addWorker(null, false);

}

else if (! addWorker(command, false)) reject(command);

}

```

可以看到,execute方法中主要做了以下几件事情:

1. 如果当前线程数小于核心线程数,尝试通过addWorker方法创建新的线程来处理任务。

2. 如果任务队列未满,则将任务加入任务队列中。

3. 如果任务队列已满但线程数小于最大线程数,尝试通过addWorker方法创建新的线程来处理任务。

4. 如果任务队列已满且线程数已达到最大线程数,执行拒绝策略。 其中,addWorker方法用于创建新的线程来处理任务。下面我们来看一下addWorker方法的实现。

2.2 addWorker方法 addWorker方法用于创建新的线程来处理任务,其源码如下:

```

private boolean addWorker(Runnable firstTask, boolean core) {

retry:

for (; ; ) {

int c = ctl.get();

int rs = runStateOf(c); // Check if queue empty only if necessary.

if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false;

for (; ; ) {

int wc = workerCountOf(c);

if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false;

if (compareAndIncrementWorkerCount(c)) break retry;

c = ctl.get(); // Re-read ctl

if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop

}

}

boolean workerStarted = false;

boolean workerAdded = false;

Worker w = null;

try {

w = new Worker(firstTask);

final Thread t = w.thread;

if (t != null) {

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

// Recheck while holding lock.

// Back out on ThreadFactory failure or if

// shut down before lock acquired.

int c = ctl.get();

int rs = runStateOf(c);

if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {

if (t.isAlive()) // precheck that t is startable

throw new IllegalThreadStateException();

workers.add(w);

int s = workers.size();

if (s > largestPoolSize) largestPoolSize = s;

workerAdded = true;

}

} finally {

mainLock.unlock();

}

if (workerAdded) {

t.start();

workerStarted = true;

}

}

} finally {

if (!workerStarted) addWorkerFailed(w);

}

return workerStarted;

}

```

可以看到,addWorker方法主要做了以下几件事情:

1. 检查线程池的状态,如果线程池的状态为SHUTDOWN或STOP,则返回false,不再创建新的线程。

2. 如果当前线程数小于最大线程数,则通过compareAndIncrementWorkerCount方法增加线程数,并继续执行下面的代码。

3. 创建新的Worker对象,并将其加入到workers集合中。

4. 如果worker添加成功,则启动新的线程。 其中,Worker类是线程池中最重要的类之一,我们接下来来看一下Worker类的实现。

2.3 Worker类 Worker类是线程池中真正执行任务的类,其源码如下:

```

private final class Worker

extends AbstractQueuedSynchronizer

implements Runnable {

/**

* This class will never be serialized, but we provide a

* serialVersionUID to suppress a javac warning.

*/

private static final long serialVersionUID = 6138294804551838833L;

/**

* Thread this worker is running in. Null if factory fails.

*/

final Thread thread;

/**

* Initial task to run. Possibly null.

*/

Runnable firstTask;

/**

* Per-thread task counter

*/

volatile long completedTasks;

/**

* Creates with given first task and thread from ThreadFactory.

*

* @param firstTask the first task (null if none)

*/

Worker(Runnable firstTask) {

setState(-1); // inhibit interrupts until runWorker

this.firstTask = firstTask;

this.thread = getThreadFactory().newThread(this);

}

/**

* Delegates main run loop to outer runWorker

*/

public void run() {

runWorker(this);

}

// Lock methods

//

// The value 0 represents the unlocked state.

// The value 1 represents the locked state.

protected boolean isHeldExclusively() {

return getState() != 0;

}

protected boolean tryAcquire(int unused) {

if (compareAndSetState(0, 1)) {

setExclusiveOwnerThread(Thread.currentThread());

return true;

}

return false;

}

protected boolean tryRelease(int unused) {

setExclusiveOwnerThread(null);

setState(0);

return true;

}

public void lock() {

acquire(1);

}

public boolean tryLock() {

return tryAcquire(1);

}

public void unlock() {

release(1);

}

public boolean isLocked() {

return isHeldExclusively();

}

void interruptIfStarted() {

Thread t;

if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {

try {

t.interrupt();

} catch (SecurityException ignore) {

}

}

}

}

```

可以看到,Worker类主要做了以下几件事情:

1. 实现了Runnable接口,用于执行任务。

2. 实现了AbstractQueuedSynchronizer类,用实现线程的同步,保证同一时间只有一个线程在执行任务。

3. 每个Worker对象都有一个firstTask属性,表示Worker执行的第一个任务。

4. 每个Worker对象都有一个completedTasks属性,表示Worker执行的任务数量。

在Worker类中,最重要的方法是runWorker方法,它是线程池中真正执行任务的方法,其源码如下:

```

final void runWorker(Worker w) {

Thread wt = Thread.currentThread();

Runnable task = w.firstTask;

w.firstTask = null;

w.unlock(); // allow interrupts

boolean completedAbruptly = true;

try {

while (task != null || (task = getTask()) != null) {

w.lock();

// If pool is stopping, ensure thread is interrupted;

// if not, ensure thread is not interrupted. This

// requires a recheck in second case to deal with

// shutdownNow race while clearing interrupt

if ((runStateAtLeast(ctl.get(), STOP) ||

(Thread.interrupted() &&

runStateAtLeast(ctl.get(), STOP))) &&

!wt.isInterrupted())

wt.interrupt();

try {

beforeExecute(wt, task);

Throwable thrown = null;

try {

task.run();

} catch (RuntimeException x) {

thrown = x;

throw x;

} catch (Error x) {

thrown = x;

throw x;

} catch (Throwable x) {

thrown = x;

throw new Error(x);

} finally {

afterExecute(task, thrown);

}

} finally {

task = null;

w.completedTasks++;

w.unlock();

}

}

completedAbruptly = false;

} finally {

processWorkerExit(w, completedAbruptly);

}

}

```

可以看到,runWorker方法主要做了以下几件事情:

1. 获取Worker对象的firstTask属性,并将其置为null。

1. 允许中断,执行任务。

1. 如果线程池状态为STOP或SHUTDOWN,并且当前线程没有被中断,则中断当前线程。

1. 执行任务,并在任务执行前后调用beforeExecute和afterExecute方法。

1. 在任务执行完成后,更新Worker对象的completedTasks属性,并将Worker对象解锁。

1. 循环执行任务,直到线程池状态为STOP或SHUTDOWN。

通过以上源码解读,我们可以对Java线程池的实现原理有更加深入的理解。

继续阅读