前沿
Java中的線程池在實際項目中使用場景很多,幾乎索引需要實作異步或者并發執行任務的程式都會使用到線程池,合理的使用線程池能夠帶來以下幾點好處。
- 降低資源的消耗: 通過出重複利用已建立的線程降低線程建立和銷毀帶來的性能消耗。
- 提高響應速度: 當任務到達時,任務可以不需要等待線程的建立就能立即執行。
- 提高線程的可管理性: 線程是稀缺資源,如果無限的建立,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一配置設定、調優和控制。
一、線程池的實作原理
二、線程池的核心對象參數說明
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
複制代碼
線程池的核心對象 | 說明 |
corePoolSize | 線程池核心線程數量,corePoolSize 是線程池中的一個最小的線程數量,即使這些線程處理空閑狀态,他們也不會被銷毀,除非設定了allowCoreThreadTimeOut。 |
maximumPoolSize | 線程池最大線程數量,線程池允許建立的最大線程數,如果隊列滿了,并且一建立的線程數小于最大線程數,則線程池會繼續建立新的線程來執行任務。 |
keepAliveTime | 線程保持活動的時間,如果任務很多,并且每個任務的執行時間比較短,可以調大時間,提高線程的使用率 |
workQueue | 任務隊列(阻塞隊列),有多種任務隊列,比如:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、PriorityBlockingQueue等等 |
ThreadFactory | 用于建立線程的工廠,可以通過線程工廠給每個建立出來的線程設定更有意義的名字 |
RejectedExecutionHandler | 飽和拒絕政策,當隊列和線程池都滿了,說明線程池處于飽和狀态,那麼必須采用一種政策處理送出的新任務,預設政策是AbortPolicy,表示無法處理新任務時抛出異常,在JDK 1.5中線程池架構提供4中拒絕政策: 1.AbortPolicy:直接抛異常。 2.CallerRunsPolicy:隻用調用者所線上程來運作任務。 3.DiscardOldestPolicy:丢棄隊列裡最佳的一個任務,并執行目前任務。 4.DiscardPocily:不處理,直接丢棄掉。 |
三、Java中常用的四種線程池
在Java中使用線程池,可以用ThreadPoolExecutor的構造函數直接建立出線程池執行個體,在Executors類中,為我們提供了常用線程池的建立方法。
接下來我們就來了解常用的四種:
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
複制代碼
從構造方法可以看出,它建立了一個固定大小的線程池,每次送出一個任務就建立一個線程,直到線程達到線程池的最大值nThreads。線程池的大小一旦達到最大值後,再有新的任務送出時則放入無界阻塞隊列中,等到有線程空閑時,再從隊列中取出任務繼續執行。 那麼,如何使用newFixedThreadPool呢?我們來舉個例子:
public class ThreadPoolExecutorsChallenge {
static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
public static void main(String[] args) {
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++) {
final int index = i;
fixedThreadPool.execute(new Runnable() {
@Override
public void run() {
try {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
System.out.println("運作時間: " + sdf.format(new Date()) + " " + index);
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
}
複制代碼
運作結果:
運作時間: 23:56:12 1
運作時間: 23:56:12 0
運作時間: 23:56:12 2
運作時間: 23:56:14 3
運作時間: 23:56:14 4
複制代碼
上面我建立的一個固定大小為3的線程池,然後線上程池送出個5個任務,從結果可以看到,一開始三個任務進來都是立即執行,在送出第4個任務時,由于線程池大小已經到達3并且前3個任務在運作中,是以第4個任務被放進了隊列,等待有空閑的線程時再被執行(前3個任務運作時間是一緻的,後兩個延遲了2秒才被執行)。
這裡僅僅是為了示範效果,正常的話手動建立線程池效果會更好,生産環境線程池不允許Executors建立,使用建議使用ThreadPoolExecutor方式建立,避免資源耗盡的風險。
說明:
Executors傳回的線程池對象弊端如下有:
1) FixedThreadPool和SingleThreadPool:允許的請求隊列長度為Integer.MAX_VALUE,可能對堆積大量的請求,進而導緻OOM。
2) CacheThreadPool和ScheduledThreadPool:允許的建立線程數量為Integer.MAX_VALUE,可能建立大量的線程,進而導緻OOM。
newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
複制代碼
使用newCachedThreadPool線程池:
public class ThreadPoolExecutorsChallenge {
public static void main(String[] args) {
ExecutorService fixedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
final int index = i;
fixedThreadPool.execute(new Runnable() {
@Override
public void run() {
try {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
System.out.println("運作時間: " + sdf.format(new Date()) + " " + index);
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
}
複制代碼
運作結果:
運作時間: 23:39:04 3
運作時間: 23:39:04 1
運作時間: 23:39:04 2
運作時間: 23:39:04 0
運作時間: 23:39:04 4
複制代碼
因為這種線程有新的任務送出,就會建立新的線程(線程池中沒有空閑線程時),不需要等待,是以送出的5個任務的運作時間是一樣的,通過Executors.newCachedThreadPool()建立線程池可能會建立大量的線程,導緻OOM。
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
複制代碼
從構造方法可以看出,它建立了一個單線程化的線程池,它隻會用唯一的工作線程來執行任務,保證所有任務按照指定順序執行。 那麼,如何使用newSingleThreadExecutor呢?我們來舉個例子:
public class ThreadPoolExecutorsChallenge {
public static void main(String[] args) {
ExecutorService fixedThreadPool = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
final int index = i;
fixedThreadPool.execute(new Runnable() {
@Override
public void run() {
try {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
System.out.println("運作時間: " + sdf.format(new Date()) + " " + index);
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
}
複制代碼
因為該線程池類似于單線程執行,是以先執行完前一個任務後,每隔2秒,再順序執行下一個任務, 運作結果如下:
運作時間: 23:47:05 0
運作時間: 23:47:07 1
運作時間: 23:47:09 2
運作時間: 23:47:11 3
運作時間: 23:47:13 4
複制代碼
newScheduledThreadPool
這個方法建立了一個固定大小的線程池,支援定時及周期性任務執行。 首先看一下定時執行的例子:
public class ThreadPoolExecutorsChallenge {
public static void main(String[] args) {
final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
System.out.println("任務送出時間:" + sdf.format(new Date()));
scheduledThreadPool.schedule(new Runnable() {
@Override
public void run() {
System.out.println("任務運作時間:" + sdf.format(new Date()));
}
}, 3, TimeUnit.SECONDS);
scheduledThreadPool.shutdown();
}
}
複制代碼
使用該線程池的schedule方法,延遲3秒鐘後執行任務,運作結果如下:
任務送出時間:23:54:22
任務運作時間:23:54:25
複制代碼
同時使用newScheduledThreadPool可以實作周期執行的例子,實作延遲1秒後每個三秒執行一次任務:
public class ThreadPoolExecutorsChallenge {
public static void main(String[] args) {
final SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
System.out.println("送出時間: " + sdf.format(new Date()));
scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println("運作時間: " + sdf.format(new Date()));
}
}, 1, 3, TimeUnit.SECONDS);
}
}
複制代碼
運作結果:
送出時間: 00:03:15
運作時間: 00:03:16
運作時間: 00:03:19
運作時間: 00:03:22
運作時間: 00:03:25
運作時間: 00:03:28
複制代碼
四、推薦
避免耗盡的風險,推薦建立線程池方式:
//擷取系統處理器個數,作為線程池數量
int nThreads = Runtime.getRuntime().availableProcessors();
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("demo-pool-%d").build();
System.out.println("系統處理器個數:" + nThreads);
ExecutorService pool = new ThreadPoolExecutor(nThreads , 200, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
複制代碼