目录
线程池的分类
常用线程池(ThreadPoolExecutor)示例
CachedThreadPool
FixedThreadPool
SingleThreadExecutor
ThreadPoolExecutor类分析
三种线程池(返回ThreadPoolExecutor)构造方法
ThreadPoolExecutor中的成员变量
三种线程池(返回ThreadPoolExecutor类)分析
execute与submit方法
线程池的分类
所有实现了ExecutorService接口(Executor的子接口)的实现类都是线程池,可以分为三大类
- ForkJoinPool
- ScheduledThreadPoolExecutor
- ThreadPoolExecutor
具体的线程池,在工具类Executors中预创建了六小类
实现了ThreadPoolExecutor类:
- ExecutorService newCachedThreadPool():无界线程池
- ExecutorService newFixedThreadPool():有界线程池
- ExecutorService newSingleThreadExecutor():单一线程池
实现了ScheduledThreadPoolExecutor类:
- ScheduledExecutorService newSingleThreadScheduledExecutor()
- ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
实现了ForkJoinPool类:
- ExecutorService newWorkStealingPool()
当然我们也可以自定义线程池
常用线程池(ThreadPoolExecutor)示例
现在有一个任务WorkTask
public class WorkTask implements Runnable{
public void run() {
try {
int r = (int)(Math.random()*10);
Thread.sleep(r*1000);
System.out.println(Thread.currentThread().getId() + " is over");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
CachedThreadPool
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CachedThreadPoolTest {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
for(int i=0;i<20;i++){
exec.execute(new WorkTask());
}
exec.shutdown();
}
}
无界线程池,最多可创建Integer.MAX_VALUE个线程,运行结果没有重复的线程号

FixedThreadPool
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FixedThreadPoolTest {
public static void main(String[] args) {
ExecutorService exec = Executors.newFixedThreadPool(3);
for(int i=0;i<20;i++){
exec.execute(new WorkTask());
}
exec.shutdown();
}
}
3个线程执行20个任务
SingleThreadExecutor
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class SingleThreadExecutorTest {
public static void main(String[] args) {
ExecutorService exec = Executors.newSingleThreadExecutor();
for(int i=0;i<20;i++){
exec.execute(new WorkTask());
}
exec.shutdown();
}
}
始终1个线程执行所有任务
ThreadPoolExecutor类分析
三种线程池(返回ThreadPoolExecutor)构造方法
以下3种线程池均实现了ThreadPoolExecutor类
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
ThreadPoolExecutor中的成员变量
以上3中线程池都是使用了ThreadPoolExecutor中的一种构造方法:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue)
- int corePoolSize:核心线程数,即使空闲也仍保留在池中的线程数
- int maximumPoolSize:最大线程数
- BlockingQueue<Runnable> workQueue:阻塞队列,在执行任务之前用于保存任务的队列
- long keepAliveTime:保持激活时间,当线程数大于核心数时,这是多余的空闲线程在终止之前等待新任务的最大时间
- TimeUnit unit:keepAliveTime的时间单位
核心线程是指线程池一启动的时候分配的线程,只要任务数量<核心线程数的任务都会使用核心线程去执行,并且执行完是不回收的;如果任务数量>核心线程数量,就会进入阻塞队列,入队之后队列里的任务就由非核心线程来执行;非核心线程如果在空闲后的keepAliveTime内还没活就会被回收。
三种线程池(返回ThreadPoolExecutor类)分析
newCachedThreadPool:核心线程数为0,最大线程数为Integer.MAX_VALUE,所以称之为无界线程池;假设一次执行20个任务,由于corePoolSize为0,所以20个任务全会进入阻塞队列BlockingQueue,启动新线程执行队列中的任务,最多可以启动20个任务,如果20个任务都执行完毕,从线程闲置时开始倒计时60s,超时则关闭线程。
FixedThreadPool:因为核心线程数是传入且固定的,所以称为有界线程池,一般在后台执行一些辅助性的任务,最大线程数与核心线程数相等;假设核心线程数为3,一次执行20个任务:先启动3个线程,剩下17个任务会进入BlockingQueue排队;因为核心线程数数=最大线程数,所以keepAliveTime这个参数是没有意义的。
SingleThreadExecutor:线程池中只有1个线程,超过1个任务进入阻塞队列,与FixedThreadPool类似,只不过nThread=1。比如说服务端需要有一个线程不断监听客户端发送来的socket,一旦出现异常会新建一个线程来替换。
我们可以来模拟一下监听任务ListeningTask并出现异常
import javax.management.RuntimeErrorException;
public class ListeningTask implements Runnable{
private int i;
public ListeningTask(int i){
this.i = i;
}
public void run() {
System.out.println("thread"+Thread.currentThread().getId()+" is started...task" + i);
try {
Thread.sleep(1000);
} catch (Exception e) {
}
if(i==3){
throw new RuntimeErrorException(null,"exception...");
}
System.out.println("thread"+Thread.currentThread().getId()+" is over...task"+i);
}
}
主方法
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class test {
public static void main(String[] args) {
ExecutorService exec = Executors.newSingleThreadExecutor();
for(int i=0;i<20;i++){
exec.execute(new ListeningTask(i));
}
exec.shutdown();
}
}
运行结果
可见一开始全由thread8来执行,出现异常后程序并没有终止,由另一个线程thread10接管任务,这就是SingleThreadExecutor在监听上的用武之地
execute与submit方法
Executor接口中有一个方法
而其子接口ExecutorService中有3个重载的submit方法
submit是基方法Executor.execute(Runnable)的延伸,通过创建并返回一个Future类对象可用于取消执行和/或等待完成。
Future类中的方法
用submit方法举个例子,获取多线程运行结果
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class test {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
for(int i=0;i<20;i++){
Future<String> f = exec.submit(new TaskResult(i));
try {
String v=f.get();
System.out.println(v);
} catch (Exception e) {
}
}
}
}
class TaskResult implements Callable<String>{
private int i;
public TaskResult(int i){
this.i=i;
}
public String call(){
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Thread.currentThread().getId()+"'s result is "+i;
}
}
运行结果,这是execute方法无法实现的