天天看点

Java并发编程:ThreadPoolExecutor常用线程池线程池的分类常用线程池(ThreadPoolExecutor)示例ThreadPoolExecutor类分析

目录

线程池的分类

常用线程池(ThreadPoolExecutor)示例

CachedThreadPool

FixedThreadPool

SingleThreadExecutor

ThreadPoolExecutor类分析

三种线程池(返回ThreadPoolExecutor)构造方法

ThreadPoolExecutor中的成员变量

三种线程池(返回ThreadPoolExecutor类)分析

execute与submit方法

线程池的分类

所有实现了ExecutorService接口(Executor的子接口)的实现类都是线程池,可以分为三大类

  • ForkJoinPool
  • ScheduledThreadPoolExecutor
  • ThreadPoolExecutor

具体的线程池,在工具类Executors中预创建了六小类

实现了ThreadPoolExecutor类:

  • ExecutorService newCachedThreadPool():无界线程池
  • ExecutorService newFixedThreadPool():有界线程池
  • ExecutorService newSingleThreadExecutor():单一线程池

实现了ScheduledThreadPoolExecutor类:

  • ScheduledExecutorService newSingleThreadScheduledExecutor() 
  • ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

实现了ForkJoinPool类: 

  • ExecutorService newWorkStealingPool()  

当然我们也可以自定义线程池

常用线程池(ThreadPoolExecutor)示例

现在有一个任务WorkTask

public class WorkTask implements Runnable{
	public void run() {
		try {
			int r = (int)(Math.random()*10);
			Thread.sleep(r*1000);
			System.out.println(Thread.currentThread().getId() + " is over");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}
           

CachedThreadPool

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CachedThreadPoolTest {
	public static void main(String[] args) {
		ExecutorService exec = Executors.newCachedThreadPool();
		for(int i=0;i<20;i++){
			exec.execute(new WorkTask());
		}
		exec.shutdown();
	}
}
           

无界线程池,最多可创建Integer.MAX_VALUE个线程,运行结果没有重复的线程号

Java并发编程:ThreadPoolExecutor常用线程池线程池的分类常用线程池(ThreadPoolExecutor)示例ThreadPoolExecutor类分析

FixedThreadPool

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FixedThreadPoolTest {
	public static void main(String[] args) {
		ExecutorService exec = Executors.newFixedThreadPool(3);
		for(int i=0;i<20;i++){
			exec.execute(new WorkTask());
		}
		exec.shutdown();
		
	}
}
           

3个线程执行20个任务

Java并发编程:ThreadPoolExecutor常用线程池线程池的分类常用线程池(ThreadPoolExecutor)示例ThreadPoolExecutor类分析

SingleThreadExecutor

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SingleThreadExecutorTest {
	public static void main(String[] args) {
		ExecutorService exec = Executors.newSingleThreadExecutor();
		for(int i=0;i<20;i++){
			exec.execute(new WorkTask());
		}
		exec.shutdown();
	}
}
           

始终1个线程执行所有任务

Java并发编程:ThreadPoolExecutor常用线程池线程池的分类常用线程池(ThreadPoolExecutor)示例ThreadPoolExecutor类分析

ThreadPoolExecutor类分析

三种线程池(返回ThreadPoolExecutor)构造方法

以下3种线程池均实现了ThreadPoolExecutor类

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
           
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
           
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
           

ThreadPoolExecutor中的成员变量

以上3中线程池都是使用了ThreadPoolExecutor中的一种构造方法:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                     long keepAliveTime, TimeUnit unit, 
                        BlockingQueue<Runnable> workQueue) 
           
  • int corePoolSize:核心线程数,即使空闲也仍保留在池中的线程数
  • int maximumPoolSize:最大线程数
  • BlockingQueue<Runnable> workQueue:阻塞队列,在执行任务之前用于保存任务的队列
  • long keepAliveTime:保持激活时间,当线程数大于核心数时,这是多余的空闲线程在终止之前等待新任务的最大时间
  • TimeUnit unit:keepAliveTime的时间单位

核心线程是指线程池一启动的时候分配的线程,只要任务数量<核心线程数的任务都会使用核心线程去执行,并且执行完是不回收的;如果任务数量>核心线程数量,就会进入阻塞队列,入队之后队列里的任务就由非核心线程来执行;非核心线程如果在空闲后的keepAliveTime内还没活就会被回收。

三种线程池(返回ThreadPoolExecutor类)分析

newCachedThreadPool:核心线程数为0,最大线程数为Integer.MAX_VALUE,所以称之为无界线程池;假设一次执行20个任务,由于corePoolSize为0,所以20个任务全会进入阻塞队列BlockingQueue,启动新线程执行队列中的任务,最多可以启动20个任务,如果20个任务都执行完毕,从线程闲置时开始倒计时60s,超时则关闭线程。

FixedThreadPool:因为核心线程数是传入且固定的,所以称为有界线程池,一般在后台执行一些辅助性的任务,最大线程数与核心线程数相等;假设核心线程数为3,一次执行20个任务:先启动3个线程,剩下17个任务会进入BlockingQueue排队;因为核心线程数数=最大线程数,所以keepAliveTime这个参数是没有意义的。

SingleThreadExecutor:线程池中只有1个线程,超过1个任务进入阻塞队列,与FixedThreadPool类似,只不过nThread=1。比如说服务端需要有一个线程不断监听客户端发送来的socket,一旦出现异常会新建一个线程来替换。

我们可以来模拟一下监听任务ListeningTask并出现异常

import javax.management.RuntimeErrorException;

public class ListeningTask implements Runnable{
	private int i;
	public ListeningTask(int i){
		this.i = i;
	}
	public void run() {
		System.out.println("thread"+Thread.currentThread().getId()+" is started...task" + i);
		try {
			Thread.sleep(1000);
		} catch (Exception e) {
		}
		if(i==3){
			throw new RuntimeErrorException(null,"exception...");
		}
		System.out.println("thread"+Thread.currentThread().getId()+" is over...task"+i);
	}
}
           

主方法

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class test {
	public static void main(String[] args) {
		ExecutorService exec = Executors.newSingleThreadExecutor();
		for(int i=0;i<20;i++){
			exec.execute(new ListeningTask(i));
		}
		exec.shutdown();
	}
}
           

运行结果

Java并发编程:ThreadPoolExecutor常用线程池线程池的分类常用线程池(ThreadPoolExecutor)示例ThreadPoolExecutor类分析

可见一开始全由thread8来执行,出现异常后程序并没有终止,由另一个线程thread10接管任务,这就是SingleThreadExecutor在监听上的用武之地

execute与submit方法

Executor接口中有一个方法

Java并发编程:ThreadPoolExecutor常用线程池线程池的分类常用线程池(ThreadPoolExecutor)示例ThreadPoolExecutor类分析

而其子接口ExecutorService中有3个重载的submit方法

Java并发编程:ThreadPoolExecutor常用线程池线程池的分类常用线程池(ThreadPoolExecutor)示例ThreadPoolExecutor类分析

submit是基方法Executor.execute(Runnable)的延伸,通过创建并返回一个Future类对象可用于取消执行和/或等待完成。

Future类中的方法

Java并发编程:ThreadPoolExecutor常用线程池线程池的分类常用线程池(ThreadPoolExecutor)示例ThreadPoolExecutor类分析

用submit方法举个例子,获取多线程运行结果

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;


public class test {
	public static void main(String[] args) {
		ExecutorService exec = Executors.newCachedThreadPool();
		for(int i=0;i<20;i++){
			Future<String> f = exec.submit(new TaskResult(i));
			try {
				String v=f.get();
				System.out.println(v);
			} catch (Exception e) {
			}
		}
	}
}

class TaskResult implements Callable<String>{
	private int i;
	public TaskResult(int i){
		this.i=i;
	}
	public String call(){
		try {
			Thread.sleep(200);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return Thread.currentThread().getId()+"'s result is "+i;
	}
}
           

运行结果,这是execute方法无法实现的

Java并发编程:ThreadPoolExecutor常用线程池线程池的分类常用线程池(ThreadPoolExecutor)示例ThreadPoolExecutor类分析

继续阅读