1、概述
從本文開始,我将用兩篇文章的篇幅,為各位讀者呈現JAVA中原生的線程池技術。第一篇文章,我将講解JAVA原生線程池的基本使用,并由此延伸出JAVA中和線程管理相關的類結構體系,然後我們較長的描述JAVA原生線程池的結構和工作方式;第二篇文章,我們将繼續深入,講解JAVA原生線程池的進階特性,包括Thread工廠、隊列、拒絕原則、鈎子和相關工具類。
如果您是JAVA語言的初學者,請從本篇文章看起;如果您對線程池技術已有一定的了解,那麼可以隻看下一篇文章;如果您是高手,請繞行;如果您對我的觀點有任何意見和建議,請留言,謝謝。^-^
2、為什麼要使用線程池

前文我們已經講到,線程是一個作業系統概念。作業系統負責這個線程的建立、挂起、運作、阻塞和終結操作。而作業系統建立線程、切換線程狀态、終結線程都要進行CPU排程——這是一個耗費時間和系統資源的事情(《作業系統知識回顧—程序線程模型》)
另一方面,目前大多數生産環境我們所面臨問題的技術背景一般是:處理某一次請求的時間是非常短暫的,但是請求數量是巨大的。這種技術背景下,如果我們為每一個請求都單獨建立一個線程,那麼實體機的所有資源基本上都被作業系統建立線程、切換線程狀态、銷毀線程這些操作所占用,用于業務請求處理的資源反而減少了。是以最理想的處理方式是,将處理請求的線程數量控制在一個範圍,既保證後續的請求不會等待太長時間,又保證實體機将足夠的資源用于請求處理本身。
另外,一些作業系統是有最大線程數量限制的。當運作的線程數量逼近這個值的時候,作業系統會變得不穩定。這也是我們要限制線程數量的原因。
3、線程池的基本使用方式
JAVA語言為我們提供了兩種基礎線程池的選擇:ScheduledThreadPoolExecutor和ThreadPoolExecutor。它們都實作了ExecutorService接口(注意,ExecutorService接口本身和“線程池”并沒有直接關系,它的定義更接近“執行器”,而“使用線程管理的方式進行實作”隻是其中的一種實作方式)。這篇文章中,我們主要圍繞ThreadPoolExecutor類進行講解。
3-1、簡單使用
首先我們來看看ThreadPoolExecutor類的最簡單使用方式:
package test.thread.pool;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;
public class PoolThreadSimple {
static {
BasicConfigurator.configure();
}
public static void main(String[] args) throws Throwable {
/*
* corePoolSize:核心大小,線程池初始化的時候,就會有這麼大
* maximumPoolSize:線程池最大線程數
* keepAliveTime:如果目前線程池中線程數大于corePoolSize。
* 多餘的線程,在等待keepAliveTime時間後如果還沒有新的線程任務指派給它,它就會被回收
*
* unit:等待時間keepAliveTime的機關
*
* workQueue:等待隊列。這個對象的設定是本文将重點介紹的内容
* */
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(, , , TimeUnit.MINUTES, new SynchronousQueue<Runnable>());
for(int index = ; index < ; index ++) {
poolExecutor.submit(new PoolThreadSimple.TestRunnable(index));
}
// 沒有特殊含義,隻是為了保證main線程不會退出
synchronized (poolExecutor) {
poolExecutor.wait();
}
}
/**
* 這個就是測試用的線程
* @author yinwenjie
*/
private static class TestRunnable implements Runnable {
/**
* 日志
*/
private static Log LOGGER = LogFactory.getLog(TestRunnable.class);
/**
* 記錄任務的唯一編号,這樣在日志中好做識别
*/
private Integer index;
public TestRunnable(int index) {
this.index = index;
}
/**
* @return the index
*/
public Integer getIndex() {
return index;
}
@Override
public void run() {
/*
* 線程中,就隻做一件事情:
* 等待60秒鐘的事件,以便模拟業務操作過程
* */
Thread currentThread = Thread.currentThread();
TestRunnable.LOGGER.info("線程:" + currentThread.getId() + " 中的任務(" + this.getIndex() + ")開始執行===");
synchronized (currentThread) {
try {
currentThread.wait();
} catch (InterruptedException e) {
TestRunnable.LOGGER.error(e.getMessage(), e);
}
}
TestRunnable.LOGGER.info("線程:" + currentThread.getId() + " 中的任務(" + this.getIndex() + ")執行完成");
}
}
}
随後的文章中我們将對線程池中的corePoolSize、maximumPoolSize、keepAliveTime、timeUnit、workQueue、threadFactory、handler參數和一些常用/不常用的設定項進行逐一講解。
3-2、ThreadPoolExecutor邏輯結構和工作方式
在上面的代碼中,我們建立線程池的時候使用了ThreadPoolExecutor中最簡單的一個構造函數:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
構造函數中需要傳入的參數包括corePoolSize、maximumPoolSize、keepAliveTime、timeUnit和workQueue。要明确了解這些參數(和後續将要介紹的參數)的含義,就首先要搞清楚ThreadPoolExecutor線程池的邏輯結構。
一定要注意一個概念,即存在于線程池中容器的一定是Thread對象,而不是您要求運作的任務(是以叫線程池而不叫任務池也不叫對象池,更不叫遊泳池);您要求運作的任務将被線程池配置設定給某一個空閑的Thread運作。
從上圖中,我們可以看到構成線程池的幾個重要元素:
- 等待隊列:顧名思義,就是您調用線程池對象的submit()方法或者execute()方法,要求線程池運作的任務(這些任務必須實作Runnable接口或者Callable接口)。但是出于某些原因線程池并沒有馬上運作這些任務,而是送入一個隊列等待執行(這些原因後文馬上講解)。
- 核心線程:線程池主要用于執行任務的是“核心線程”,“核心線程”的數量是您建立線程時所設定的corePoolSize參數決定的。如果不進行特别的設定,線程池中始終會保持corePoolSize數量的線程數(不包括建立階段)。
- 非核心線程:一旦任務數量過多(由等待隊列的特性決定),線程池将建立“非核心線程”臨時幫助運作任務。您設定的大于corePoolSize參數小于maximumPoolSize參數的部分,就是線程池可以臨時建立的“非核心線程”的最大數量。這種情況下如果某個線程沒有運作任何任務,在等待keepAliveTime時間後,這個線程将會被銷毀,直到線程池的線程數量重新達到corePoolSize。
- 要重點了解上一條描述中黑體字部分的内容。也就是說,并不是所謂的“非核心線程”才會被回收;而是誰的空閑時間達到keepAliveTime這個閥值,就會被回收。直到線程池中線程數量等于corePoolSize為止。
- maximumPoolSize參數也是目前線程池允許建立的最大線程數量。那麼如果您設定的corePoolSize參數和您設定的maximumPoolSize參數一緻時,線程池在任何情況下都不會回收空閑線程。keepAliveTime和timeUnit也就失去了意義。
- keepAliveTime參數和timeUnit參數也是配合使用的。keepAliveTime參數指明等待時間的量化值,timeUnit指明量化值機關。例如keepAliveTime=1,timeUnit為TimeUnit.MINUTES,代表空閑線程的回收閥值為1分鐘。
說完了線程池的邏輯結構,下面我們讨論一下線程池是怎樣處理某一個運作任務的。下圖描述了一個完整的任務處理過程:
1、首先您可以通過線程池提供的submit()方法或者execute()方法,要求線程池執行某個任務。線程池收到這個要求執行的任務後,會有幾種處理情況:
1.1、如果目前線程池中運作的線程數量還沒有達到corePoolSize大小時,線程池會建立一個新的線程運作您的任務,無論之前已經建立的線程是否處于空閑狀态。
1.2、如果目前線程池中運作的線程數量已經達到設定的corePoolSize大小,線程池會把您的這個任務加入到等待隊列中。直到某一個的線程空閑了,線程池會根據您設定的等待隊列規則,從隊列中取出一個新的任務執行。
1.3、如果根據隊列規則,這個任務無法加入等待隊列。這時線程池就會建立一個“非核心線程”直接運作這個任務。注意,如果這種情況下任務執行成功,那麼目前線程池中的線程數量一定大于corePoolSize。
1.4、如果這個任務,無法被“核心線程”直接執行,又無法加入等待隊列,又無法建立“非核心線程”直接執行,且您沒有為線程池設定RejectedExecutionHandler。這時線程池會抛出RejectedExecutionException異常,即線程池拒絕接受這個任務。(實際上抛出RejectedExecutionException異常的操作,是ThreadPoolExecutor線程池中一個預設的RejectedExecutionHandler實作:AbortPolicy,這在後文會提到)
2、一旦線程池中某個線程完成了任務的執行,它就會試圖到任務等待隊列中拿去下一個等待任務(所有的等待任務都實作了BlockingQueue接口,按照接口字面上的了解,這是一個可阻塞的隊列接口),它會調用等待隊列的poll()方法,并停留在哪裡。
3、當線程池中的線程超過您設定的corePoolSize參數,說明目前線程池中有所謂的“非核心線程”。那麼當某個線程處理完任務後,如果等待keepAliveTime時間後仍然沒有新的任務配置設定給它,那麼這個線程将會被回收。線程池回收線程時,對所謂的“核心線程”和“非核心線程”是一視同仁的,直到線程池中線程的數量等于您設定的corePoolSize參數時,回收過程才會停止。
3-3、不常用的設定
在ThreadPoolExecutor線程池中,有一些不常用的設定。我建議如果您在應用場景中沒有特殊的要求,就不需要使用這些設定:
3-3-1、 allowCoreThreadTimeOut:
前文我們讨論到,線程池回收線程隻會發生在目前線程池中線程數量大于corePoolSize參數的時候;當線程池中線程數量小于等于corePoolSize參數的時候,回收過程就會停止。
allowCoreThreadTimeOut設定項可以要求線程池:将包括“核心線程”在内的,沒有任務配置設定的任何線程,在等待keepAliveTime時間後全部進行回收:
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(, , , TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>());
poolExecutor.allowCoreThreadTimeOut(true);
以下是設定前的效果:
以下是設定後的效果:
3-3-2 prestartAllCoreThreads
前文我們還讨論到,當線程池中的線程還沒有達到您設定的corePoolSize參數值的時候,如果有新的任務到來,線程池将建立新的線程運作這個任務,無論之前已經建立的線程是否處于空閑狀态。這個描述可以用下面的示意圖表示出來:
prestartAllCoreThreads設定項,可以線上程池建立,但還沒有接收到任何任務的情況下,先行建立符合corePoolSize參數值的線程數:
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(, , , TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>());
poolExecutor.prestartAllCoreThreads();
================================
(後文預告:線程基礎:線程池(6)——進階特性(下))
ThreadPoolExecutor類結構體系
使用ThreadFactory
線程池任務隊列(重點講解)
拒絕任務
擴充ThreadPoolExecutor線程池
Hook methods
Queue maintenance
工具類和後記
Executors
Apache中的擴充
與spring結合