天天看点

Java提高班(二)深入理解线程池ThreadPool

Java提高班(二)深入理解线程池ThreadPool

本文你将获得以下信息:

线程池源码解读

线程池执行流程分析

带返回值的线程池实现

延迟线程池实现

为了方便读者理解,本文会由浅入深,先从线程池的使用开始再延伸到源码解读和源码分析等高级内容,读者可根据自己的情况自主选择阅读顺序和需要了解的章节。

线程池能够更加充分的利用CPU、内存、网络、IO等系统资源,线程池的主要作用如下:

利用线程池可以复用线程,控制最大并发数;

实现任务缓存策略和拒绝机制;

实现延迟执行

阿里巴巴Java开发手册强制规定:线程资源必须通过线程池提供,如下图:

Java提高班(二)深入理解线程池ThreadPool

本节会介绍7种线程池的创建与使用,线程池的状态介绍,ThreadPoolExecutor参数介绍等。

线程池可以使用Executors和ThreadPoolExecutor,其中使用Executors有六种创建线程池的方法,如下图:

Java提高班(二)深入理解线程池ThreadPool

newSingleThreadExecutor(),它的特点在于工作线程数目被限制为 1,操作一个无界的工作队列,所以它保证了所有任务的都是被顺序执行,最多会有一个任务处于活动状态,并且不允许使用者改动线程池实例,因此可以避免其改变线程数目。

newCachedThreadPool(),它是一种用来处理大量短时间工作任务的线程池,具有几个鲜明特点:它会试图缓存线程并重用,当无缓存线程可用时,就会创建新的工作线程;如果线程闲置的时间超过 60 秒,则被终止并移出缓存;长时间闲置时,这种线程池,不会消耗什么资源。其内部使用 SynchronousQueue 作为工作队列。

newFixedThreadPool(int nThreads),重用指定数目(nThreads)的线程,其背后使用的是无界的工作队列,任何时候最多有 nThreads 个工作线程是活动的。这意味着,如果任务数量超过了活动队列数目,将在工作队列中等待空闲线程出现;如果有工作线程退出,将会有新的工作线程被创建,以补足指定的数目 nThreads。

newSingleThreadScheduledExecutor() 创建单线程池,返回 ScheduledExecutorService,可以进行定时或周期性的工作调度。

newScheduledThreadPool(int corePoolSize)和newSingleThreadScheduledExecutor()类似,创建的是个 ScheduledExecutorService,可以进行定时或周期性的工作调度,区别在于单一工作线程还是多个工作线程。

newWorkStealingPool(int parallelism),这是一个经常被人忽略的线程池,Java 8 才加入这个创建方法,其内部会构建ForkJoinPool,利用Work-Stealing算法,并行地处理任务,不保证处理顺序。

ThreadPoolExecutor是最原始的线程池创建,上面1-3创建方式都是对ThreadPoolExecutor的封装。

总结: 其中newSingleThreadExecutor、newCachedThreadPool、newFixedThreadPool是对ThreadPoolExecutor的封装实现,newSingleThreadScheduledExecutor、newScheduledThreadPool则为ThreadPoolExecutor子类ScheduledThreadPoolExecutor的封装,用于执行延迟任务,newWorkStealingPool则为Java 8新加的方法。

从以上代码可以看出newSingleThreadExecutor和newSingleThreadScheduledExecutor创建的都是单线程池,那么单线程池的意义是什么呢?

虽然是单线程池,但提供了工作队列,生命周期管理,工作线程维护等功能。

ThreadPoolExecutor作为线程池的核心方法,我们来看一下ThreadPoolExecutor内部实现,以及封装类是怎么调用ThreadPoolExecutor的。

先从构造函数说起,构造函数源码如下:

参数说明:

corePoolSize:所谓的核心线程数,可以大致理解为长期驻留的线程数目(除非设置了 allowCoreThreadTimeOut)。对于不同的线程池,这个值可能会有很大区别,比如 newFixedThreadPool 会将其设置为 nThreads,而对于 newCachedThreadPool 则是为 0。

maximumPoolSize:顾名思义,就是线程不够时能够创建的最大线程数。同样进行对比,对于 newFixedThreadPool,当然就是 nThreads,因为其要求是固定大小,而 newCachedThreadPool 则是 Integer.MAX_VALUE。

keepAliveTime:空闲线程的保活时间,如果线程的空闲时间超过这个值,那么将会被关闭。注意此值生效条件必须满足:空闲时间超过这个值,并且线程池中的线程数少于等于核心线程数corePoolSize。当然核心线程默认是不会关闭的,除非设置了allowCoreThreadTimeOut(true)那么核心线程也可以被回收。

TimeUnit:时间单位。

BlockingQueue:任务丢列,用于存储线程池的待执行任务的。

threadFactory:用于生成线程,一般我们可以用默认的就可以了。

handler:当线程池已经满了,但是又有新的任务提交的时候,该采取什么策略由这个来指定。有几种方式可供选择,像抛出异常、直接拒绝然后返回等,也可以自己实现相应的接口实现自己的逻辑。

来看一下线程池封装类对于ThreadPoolExecutor的调用:

newSingleThreadExecutor对ThreadPoolExecutor的封装源码如下:

newCachedThreadPool对ThreadPoolExecutor的封装源码如下:

newFixedThreadPool对ThreadPoolExecutor的封装源码如下:

ScheduledExecutorService对ThreadPoolExecutor的封装源码如下:

newSingleThreadScheduledExecutor使用的是ThreadPoolExecutor的子类ScheduledThreadPoolExecutor,如下图所示:

Java提高班(二)深入理解线程池ThreadPool

newScheduledThreadPool对ThreadPoolExecutor的封装源码如下:

newScheduledThreadPool使用的也是ThreadPoolExecutor的子类ScheduledThreadPoolExecutor。

查看ThreadPoolExecutor源码可知线程的状态如下:

Java提高班(二)深入理解线程池ThreadPool

线程状态解读(以下内容来源于:https://javadoop.com/post/java-thread-pool):

RUNNING:这个没什么好说的,这是最正常的状态:接受新的任务,处理等待队列中的任务;

SHUTDOWN:不接受新的任务提交,但是会继续处理等待队列中的任务;

STOP:不接受新的任务提交,不再处理等待队列中的任务,中断正在执行任务的线程;

TIDYING:所有的任务都销毁了,workCount 为 0。线程池的状态在转换为 TIDYING 状态时,会执行钩子方法 terminated();

TERMINATED:terminated() 方法结束后,线程池的状态就会变成这个;

RUNNING 定义为 -1,SHUTDOWN 定义为 0,其他的都比 0 大,所以等于 0 的时候不能提交任务,大于 0 的话,连正在执行的任务也需要中断。

看了这几种状态的介绍,读者大体也可以猜到十之八九的状态转换了,各个状态的转换过程有以下几种:

RUNNING -> SHUTDOWN:当调用了 shutdown() 后,会发生这个状态转换,这也是最重要的;

(RUNNING or SHUTDOWN) -> STOP:当调用 shutdownNow() 后,会发生这个状态转换,这下要清楚 shutDown() 和 shutDownNow() 的区别了;

SHUTDOWN -> TIDYING:当任务队列和线程池都清空后,会由 SHUTDOWN 转换为 TIDYING;

STOP -> TIDYING:当任务队列清空后,发生这个转换;

TIDYING -> TERMINATED:这个前面说了,当 terminated() 方法结束后;

说了那么多下来一起来看线程池的是怎么执行任务的,线程池任务提交有两个方法:

execute

submit

其中execute只能接受Runnable类型的任务,使用如下:

submit可以接受Runnable或Callable类型的任务,使用如下:

使用submit传递Callable类可以获取执行任务的返回值,Callable是JDK 1.5 添加的特性用于补充Runnable无返回的情况。

在线程池中newSingleThreadScheduledExecutor和newScheduledThreadPool返回的是ScheduledExecutorService,用于执行延迟线程池的,代码如下:

完整示例下载地址: https://github.com/vipstone/java-core-example

阅读线程池的源码有一个小技巧,可以按照线程池执行的顺序进行串连关联阅读,这样更容易理解线程池的实现。

源码阅读流程解读

我们先从线程池的任务提交方法execute()开始阅读,从execute()我们会发现线程池执行的核心方法是addWorker(),在addWorker()中我们发现启动线程调用了start()方法,调用start()方法之后会执行Worker类的run()方法,run里面调用runWorker(),运行程序的关键在于getTask()方法,getTask()方法之后就是此线程的关闭,整个线程池的工作流程也就完成了,下来一起来看吧(如果本段文章没看懂的话也可以看完源码之后,回过头来再看一遍)。

在这段代码可以看出,调用了t.start();

根据上面代码可知,调用了Worker的t.start()之后,紧接着会调用Worker的run()方法,run()源码如下:

runWorker()源码如下:

runWorker里面的有getTask(),来看下具体的实现:

线程池的执行流程如下图:

Java提高班(二)深入理解线程池ThreadPool

本文总结以问答的形式展示,引自《深度解读 java 线程池设计思想及源码实现》,最下方附参考地址。

corePoolSize 到 maximumPoolSize 之间的线程会被回收,当然 corePoolSize 的线程也可以通过设置而得到回收(allowCoreThreadTimeOut(true))。

workQueue 用于存放任务,添加任务的时候,如果当前线程数超过了 corePoolSize,那么往该队列中插入任务,线程池中的线程会负责到队列中拉取任务。

keepAliveTime 用于设置空闲时间,如果线程数超出了 corePoolSize,并且有些线程的空闲时间超过了这个值,会执行关闭这些线程的操作

rejectedExecutionHandler 用于处理当线程池不能执行此任务时的情况,默认有抛出 RejectedExecutionException 异常、忽略任务、使用提交任务的线程来执行此任务和将队列中等待最久的任务删除,然后提交此任务这四种策略,默认为抛出异常。

如果当前线程数少于 corePoolSize,那么提交任务的时候创建一个新的线程,并由这个线程执行这个任务;

如果当前线程数已经达到 corePoolSize,那么将提交的任务添加到队列中,等待线程池中的线程去队列中取任务;

如果队列已满,那么创建新的线程来执行任务,需要保证池中的线程数不会超过 maximumPoolSize,如果此时线程数超过了 maximumPoolSize,那么执行拒绝策略。

如果某个任务执行出现异常,那么执行任务的线程会被关闭,而不是继续接收其他任务。然后会启动一个新的线程来代替它。

workers 的数量达到了 corePoolSize,任务入队成功,以此同时线程池被关闭了,而且关闭线程池并没有将这个任务出队,那么执行拒绝策略。这里说的是非常边界的问题,入队和关闭线程池并发执行,读者仔细看看 execute 方法是怎么进到第一个 reject(command) 里面的。

workers 的数量大于等于 corePoolSize,准备入队,可是队列满了,任务入队失败,那么准备开启新的线程,可是线程数已经达到 maximumPoolSize,那么执行拒绝策略。

书籍:《码出高效:Java开发手册》

Java核心技术36讲:http://t.cn/EwUJvWA

深度解读 java 线程池设计思想及源码实现:https://javadoop.com/post/java-thread-pool

Java线程池-ThreadPoolExecutor源码解析(基于Java8):https://www.imooc.com/article/42990

课程推荐:

Java提高班(二)深入理解线程池ThreadPool

关注下面二维码,订阅更多精彩内容。

Java提高班(二)深入理解线程池ThreadPool
Java提高班(二)深入理解线程池ThreadPool
Java提高班(二)深入理解线程池ThreadPool

关注公众号(加好友):

Java提高班(二)深入理解线程池ThreadPool

作者:

王磊的博客

出处:

http://vipstone.cnblogs.com/