天天看點

Java并發程式設計:ThreadPoolExecutor常用線程池線程池的分類常用線程池(ThreadPoolExecutor)示例ThreadPoolExecutor類分析

目錄

線程池的分類

常用線程池(ThreadPoolExecutor)示例

CachedThreadPool

FixedThreadPool

SingleThreadExecutor

ThreadPoolExecutor類分析

三種線程池(傳回ThreadPoolExecutor)構造方法

ThreadPoolExecutor中的成員變量

三種線程池(傳回ThreadPoolExecutor類)分析

execute與submit方法

線程池的分類

所有實作了ExecutorService接口(Executor的子接口)的實作類都是線程池,可以分為三大類

  • ForkJoinPool
  • ScheduledThreadPoolExecutor
  • ThreadPoolExecutor

具體的線程池,在工具類Executors中預建立了六小類

實作了ThreadPoolExecutor類:

  • ExecutorService newCachedThreadPool():無界線程池
  • ExecutorService newFixedThreadPool():有界線程池
  • ExecutorService newSingleThreadExecutor():單一線程池

實作了ScheduledThreadPoolExecutor類:

  • ScheduledExecutorService newSingleThreadScheduledExecutor() 
  • ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

實作了ForkJoinPool類: 

  • ExecutorService newWorkStealingPool()  

當然我們也可以自定義線程池

常用線程池(ThreadPoolExecutor)示例

現在有一個任務WorkTask

public class WorkTask implements Runnable{
	public void run() {
		try {
			int r = (int)(Math.random()*10);
			Thread.sleep(r*1000);
			System.out.println(Thread.currentThread().getId() + " is over");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}
           

CachedThreadPool

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CachedThreadPoolTest {
	public static void main(String[] args) {
		ExecutorService exec = Executors.newCachedThreadPool();
		for(int i=0;i<20;i++){
			exec.execute(new WorkTask());
		}
		exec.shutdown();
	}
}
           

無界線程池,最多可建立Integer.MAX_VALUE個線程,運作結果沒有重複的線程号

Java并發程式設計:ThreadPoolExecutor常用線程池線程池的分類常用線程池(ThreadPoolExecutor)示例ThreadPoolExecutor類分析

FixedThreadPool

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FixedThreadPoolTest {
	public static void main(String[] args) {
		ExecutorService exec = Executors.newFixedThreadPool(3);
		for(int i=0;i<20;i++){
			exec.execute(new WorkTask());
		}
		exec.shutdown();
		
	}
}
           

3個線程執行20個任務

Java并發程式設計:ThreadPoolExecutor常用線程池線程池的分類常用線程池(ThreadPoolExecutor)示例ThreadPoolExecutor類分析

SingleThreadExecutor

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SingleThreadExecutorTest {
	public static void main(String[] args) {
		ExecutorService exec = Executors.newSingleThreadExecutor();
		for(int i=0;i<20;i++){
			exec.execute(new WorkTask());
		}
		exec.shutdown();
	}
}
           

始終1個線程執行所有任務

Java并發程式設計:ThreadPoolExecutor常用線程池線程池的分類常用線程池(ThreadPoolExecutor)示例ThreadPoolExecutor類分析

ThreadPoolExecutor類分析

三種線程池(傳回ThreadPoolExecutor)構造方法

以下3種線程池均實作了ThreadPoolExecutor類

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
           
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
           
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
           

ThreadPoolExecutor中的成員變量

以上3中線程池都是使用了ThreadPoolExecutor中的一種構造方法:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                     long keepAliveTime, TimeUnit unit, 
                        BlockingQueue<Runnable> workQueue) 
           
  • int corePoolSize:核心線程數,即使空閑也仍保留在池中的線程數
  • int maximumPoolSize:最大線程數
  • BlockingQueue<Runnable> workQueue:阻塞隊列,在執行任務之前用于儲存任務的隊列
  • long keepAliveTime:保持激活時間,當線程數大于核心數時,這是多餘的空閑線程在終止之前等待新任務的最大時間
  • TimeUnit unit:keepAliveTime的時間機關

核心線程是指線程池一啟動的時候配置設定的線程,隻要任務數量<核心線程數的任務都會使用核心線程去執行,并且執行完是不回收的;如果任務數量>核心線程數量,就會進入阻塞隊列,入隊之後隊列裡的任務就由非核心線程來執行;非核心線程如果在空閑後的keepAliveTime内還沒活就會被回收。

三種線程池(傳回ThreadPoolExecutor類)分析

newCachedThreadPool:核心線程數為0,最大線程數為Integer.MAX_VALUE,是以稱之為無界線程池;假設一次執行20個任務,由于corePoolSize為0,是以20個任務全會進入阻塞隊列BlockingQueue,啟動新線程執行隊列中的任務,最多可以啟動20個任務,如果20個任務都執行完畢,從線程閑置時開始倒計時60s,逾時則關閉線程。

FixedThreadPool:因為核心線程數是傳入且固定的,是以稱為有界線程池,一般在背景執行一些輔助性的任務,最大線程數與核心線程數相等;假設核心線程數為3,一次執行20個任務:先啟動3個線程,剩下17個任務會進入BlockingQueue排隊;因為核心線程數數=最大線程數,是以keepAliveTime這個參數是沒有意義的。

SingleThreadExecutor:線程池中隻有1個線程,超過1個任務進入阻塞隊列,與FixedThreadPool類似,隻不過nThread=1。比如說服務端需要有一個線程不斷監聽用戶端發送來的socket,一旦出現異常會建立一個線程來替換。

我們可以來模拟一下監聽任務ListeningTask并出現異常

import javax.management.RuntimeErrorException;

public class ListeningTask implements Runnable{
	private int i;
	public ListeningTask(int i){
		this.i = i;
	}
	public void run() {
		System.out.println("thread"+Thread.currentThread().getId()+" is started...task" + i);
		try {
			Thread.sleep(1000);
		} catch (Exception e) {
		}
		if(i==3){
			throw new RuntimeErrorException(null,"exception...");
		}
		System.out.println("thread"+Thread.currentThread().getId()+" is over...task"+i);
	}
}
           

主方法

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class test {
	public static void main(String[] args) {
		ExecutorService exec = Executors.newSingleThreadExecutor();
		for(int i=0;i<20;i++){
			exec.execute(new ListeningTask(i));
		}
		exec.shutdown();
	}
}
           

運作結果

Java并發程式設計:ThreadPoolExecutor常用線程池線程池的分類常用線程池(ThreadPoolExecutor)示例ThreadPoolExecutor類分析

可見一開始全由thread8來執行,出現異常後程式并沒有終止,由另一個線程thread10接管任務,這就是SingleThreadExecutor在監聽上的用武之地

execute與submit方法

Executor接口中有一個方法

Java并發程式設計:ThreadPoolExecutor常用線程池線程池的分類常用線程池(ThreadPoolExecutor)示例ThreadPoolExecutor類分析

而其子接口ExecutorService中有3個重載的submit方法

Java并發程式設計:ThreadPoolExecutor常用線程池線程池的分類常用線程池(ThreadPoolExecutor)示例ThreadPoolExecutor類分析

submit是基方法Executor.execute(Runnable)的延伸,通過建立并傳回一個Future類對象可用于取消執行和/或等待完成。

Future類中的方法

Java并發程式設計:ThreadPoolExecutor常用線程池線程池的分類常用線程池(ThreadPoolExecutor)示例ThreadPoolExecutor類分析

用submit方法舉個例子,擷取多線程運作結果

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;


public class test {
	public static void main(String[] args) {
		ExecutorService exec = Executors.newCachedThreadPool();
		for(int i=0;i<20;i++){
			Future<String> f = exec.submit(new TaskResult(i));
			try {
				String v=f.get();
				System.out.println(v);
			} catch (Exception e) {
			}
		}
	}
}

class TaskResult implements Callable<String>{
	private int i;
	public TaskResult(int i){
		this.i=i;
	}
	public String call(){
		try {
			Thread.sleep(200);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return Thread.currentThread().getId()+"'s result is "+i;
	}
}
           

運作結果,這是execute方法無法實作的

Java并發程式設計:ThreadPoolExecutor常用線程池線程池的分類常用線程池(ThreadPoolExecutor)示例ThreadPoolExecutor類分析

繼續閱讀