目錄
線程池的分類
常用線程池(ThreadPoolExecutor)示例
CachedThreadPool
FixedThreadPool
SingleThreadExecutor
ThreadPoolExecutor類分析
三種線程池(傳回ThreadPoolExecutor)構造方法
ThreadPoolExecutor中的成員變量
三種線程池(傳回ThreadPoolExecutor類)分析
execute與submit方法
線程池的分類
所有實作了ExecutorService接口(Executor的子接口)的實作類都是線程池,可以分為三大類
- ForkJoinPool
- ScheduledThreadPoolExecutor
- ThreadPoolExecutor
具體的線程池,在工具類Executors中預建立了六小類
實作了ThreadPoolExecutor類:
- ExecutorService newCachedThreadPool():無界線程池
- ExecutorService newFixedThreadPool():有界線程池
- ExecutorService newSingleThreadExecutor():單一線程池
實作了ScheduledThreadPoolExecutor類:
- ScheduledExecutorService newSingleThreadScheduledExecutor()
- ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
實作了ForkJoinPool類:
- ExecutorService newWorkStealingPool()
當然我們也可以自定義線程池
常用線程池(ThreadPoolExecutor)示例
現在有一個任務WorkTask
public class WorkTask implements Runnable{
public void run() {
try {
int r = (int)(Math.random()*10);
Thread.sleep(r*1000);
System.out.println(Thread.currentThread().getId() + " is over");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
CachedThreadPool
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CachedThreadPoolTest {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
for(int i=0;i<20;i++){
exec.execute(new WorkTask());
}
exec.shutdown();
}
}
無界線程池,最多可建立Integer.MAX_VALUE個線程,運作結果沒有重複的線程号

FixedThreadPool
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FixedThreadPoolTest {
public static void main(String[] args) {
ExecutorService exec = Executors.newFixedThreadPool(3);
for(int i=0;i<20;i++){
exec.execute(new WorkTask());
}
exec.shutdown();
}
}
3個線程執行20個任務
SingleThreadExecutor
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class SingleThreadExecutorTest {
public static void main(String[] args) {
ExecutorService exec = Executors.newSingleThreadExecutor();
for(int i=0;i<20;i++){
exec.execute(new WorkTask());
}
exec.shutdown();
}
}
始終1個線程執行所有任務
ThreadPoolExecutor類分析
三種線程池(傳回ThreadPoolExecutor)構造方法
以下3種線程池均實作了ThreadPoolExecutor類
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
ThreadPoolExecutor中的成員變量
以上3中線程池都是使用了ThreadPoolExecutor中的一種構造方法:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue)
- int corePoolSize:核心線程數,即使空閑也仍保留在池中的線程數
- int maximumPoolSize:最大線程數
- BlockingQueue<Runnable> workQueue:阻塞隊列,在執行任務之前用于儲存任務的隊列
- long keepAliveTime:保持激活時間,當線程數大于核心數時,這是多餘的空閑線程在終止之前等待新任務的最大時間
- TimeUnit unit:keepAliveTime的時間機關
核心線程是指線程池一啟動的時候配置設定的線程,隻要任務數量<核心線程數的任務都會使用核心線程去執行,并且執行完是不回收的;如果任務數量>核心線程數量,就會進入阻塞隊列,入隊之後隊列裡的任務就由非核心線程來執行;非核心線程如果在空閑後的keepAliveTime内還沒活就會被回收。
三種線程池(傳回ThreadPoolExecutor類)分析
newCachedThreadPool:核心線程數為0,最大線程數為Integer.MAX_VALUE,是以稱之為無界線程池;假設一次執行20個任務,由于corePoolSize為0,是以20個任務全會進入阻塞隊列BlockingQueue,啟動新線程執行隊列中的任務,最多可以啟動20個任務,如果20個任務都執行完畢,從線程閑置時開始倒計時60s,逾時則關閉線程。
FixedThreadPool:因為核心線程數是傳入且固定的,是以稱為有界線程池,一般在背景執行一些輔助性的任務,最大線程數與核心線程數相等;假設核心線程數為3,一次執行20個任務:先啟動3個線程,剩下17個任務會進入BlockingQueue排隊;因為核心線程數數=最大線程數,是以keepAliveTime這個參數是沒有意義的。
SingleThreadExecutor:線程池中隻有1個線程,超過1個任務進入阻塞隊列,與FixedThreadPool類似,隻不過nThread=1。比如說服務端需要有一個線程不斷監聽用戶端發送來的socket,一旦出現異常會建立一個線程來替換。
我們可以來模拟一下監聽任務ListeningTask并出現異常
import javax.management.RuntimeErrorException;
public class ListeningTask implements Runnable{
private int i;
public ListeningTask(int i){
this.i = i;
}
public void run() {
System.out.println("thread"+Thread.currentThread().getId()+" is started...task" + i);
try {
Thread.sleep(1000);
} catch (Exception e) {
}
if(i==3){
throw new RuntimeErrorException(null,"exception...");
}
System.out.println("thread"+Thread.currentThread().getId()+" is over...task"+i);
}
}
主方法
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class test {
public static void main(String[] args) {
ExecutorService exec = Executors.newSingleThreadExecutor();
for(int i=0;i<20;i++){
exec.execute(new ListeningTask(i));
}
exec.shutdown();
}
}
運作結果
可見一開始全由thread8來執行,出現異常後程式并沒有終止,由另一個線程thread10接管任務,這就是SingleThreadExecutor在監聽上的用武之地
execute與submit方法
Executor接口中有一個方法
而其子接口ExecutorService中有3個重載的submit方法
submit是基方法Executor.execute(Runnable)的延伸,通過建立并傳回一個Future類對象可用于取消執行和/或等待完成。
Future類中的方法
用submit方法舉個例子,擷取多線程運作結果
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class test {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
for(int i=0;i<20;i++){
Future<String> f = exec.submit(new TaskResult(i));
try {
String v=f.get();
System.out.println(v);
} catch (Exception e) {
}
}
}
}
class TaskResult implements Callable<String>{
private int i;
public TaskResult(int i){
this.i=i;
}
public String call(){
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Thread.currentThread().getId()+"'s result is "+i;
}
}
運作結果,這是execute方法無法實作的