天天看点

并发编程9 Java线程池 ThreadPoolExector干货博客思想原理JDK中的线程池

干货博客

https://www.jianshu.com/p/389b58856894

思想

池化技术

池化技术应用:线程池、数据库连接池、http连接池等等。

池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。

通过将资源池化,统一管理,可以方便管控。例如提供资源监控,调度策略等能力

线程池的好处

降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

提高响应速度:当任务到达时,可以不需要等待线程创建就能立即执行。

提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,监控和调优。

原理

下图所示为线程池的实现原理:调用方不断地向线程池中提交任务;线程池中有一组线程,不断地从队列中取任务,这是一个典型的生产者—消费者模型

并发编程9 Java线程池 ThreadPoolExector干货博客思想原理JDK中的线程池

设计的关键点,即调度策略制定

  1. 队列设置多长?如果是无界的,调用方不断地往队列中放任务,可能导致内存耗尽。如果是有界的,当队列满了之后,调用方如何处理?
  2. 线程池中的线程个数是固定的,还是动态变化的?
  3. 每次提交新任务,是放入队列?还是开新线程?
  4. 当没有任务的时候,线程是睡眠一小段时间?还是进入阻塞?如果进入阻塞,如何唤醒?

JDK的实现

ThreadPoolExector/ScheduledThreadPoolExecutor

  • 基于阻塞队列实现(即设计关键点:4)
  • 通过构造方法入参,让我们定制具体的调度策略(即设计关键点:1,2,3)

JDK中的线程池

类图

并发编程9 Java线程池 ThreadPoolExector干货博客思想原理JDK中的线程池

种类

  • ThreadPoolExector:线程池
  • ScheduledThreadPoolExecutor:定时调度的线程池

ThreadPoolExector

概念与逻辑模型

本节的描述不是很严谨,适宜快速理解

概念
  • 核心线程是否允许关闭:默认为否
  • 核心线程数:若核心线程不允许关闭,则核心线程为线程池的最小线程数量,会一直存活。若核心线程允许关闭,则线程池最小线程数可以为0,不受核心线程数控制。
  • 最大线程数数:线程池中同时线程的最大上限。
  • 线程超时时间:当 本线程超过超时时间,若 (核心线程不允许关闭 并 线程数量大于核心线程数)或 核心线程允许关闭,本线程会被关闭
  • 任务缓冲队列:是一个阻塞队列,当前线程数大于核心线程数时,实时缓冲(体现在实时放任务)一定数量的(即缓冲队列的大小)任务。如果此时有空闲线程,则空闲线程直接执行任务(这个就是阻塞取任务的意义)。若不能实时缓冲(即放入任务后会超过缓冲队列的大小),就尝试新建任务执行。
模型
  • 线程池中有 核心线程数,最大线程数,线程保证存活时间,核心线程是否允许关闭,线程集合,任务缓冲-阻塞队列,拒绝策略 等 要素
  • 线程池负责调度线程
    • 注意在线程池中的线程都是等同的,并不区分某个线程是否是核心线程,
    • 线程池 是 依据 存活的线程数量 和 线程是否空闲,选择适宜的调度策略(即执行任务的方式)。
调度策略
  1. 线程池初始化时不初始化线程
  2. 我们将任务提交到线程池时,线程池的调度策略
    1. 任务执行策略
      • 当线程池的线程数量小于核心线程数时,新建线程执行(即使有空闲线程也不用)
      • 当线程池的线程数量大于等于核心线程数时
        • 优先使用空闲线程执行(即 可以实时放入缓冲队列,即 放入后不超过缓冲队列的大小)
        • 再尝试扩张线程池,新建线程执行(即 线程池的线程数小于最大线程数)
        • 扩张失败后,调用拒绝策略(即 线程池的线程数大于等于最大线程数)
    2. 线程超时策略

      本线程超时后,

      * 若核心线程允许关闭,则关闭本线程

      * 若核心线程不允许关闭 并 当前线程数大于核心线程数,则关闭本线程

  3. 线程池的关闭
    • shutdown()会将线程池状态置为SHUTDOWN,不再接受新的任务,同时会等待线程池中已有的任务执行完成再结束。
    • shutdownNow()会将线程池状态置为SHUTDOWN,对所有线程执行interrupt()操作,清空队列,并将队列中的任务返回回来。

使用

  1. 根据具体场景构造合适的线程池。应使用ThreadPoolExecutor构造器构造线程池,而不是Executor的工厂方法,因为使用ThreadPoolExecutor构造器可以更清楚的知道线程池的细节,并且可以定制参数。
  2. 任务的提交
    • execute()提交无返回结果的任务
    • submit()提交有返回结果的任务
  3. 注意线程池的关闭
    • shutdown()会将线程池状态置为SHUTDOWN,不再接受新的任务,同时会等待线程池中已有的任务执行完成再结束。
    • shutdownNow()会将线程池状态置为SHUTDOWN,对所有线程执行interrupt()操作,清空队列,并将队列中的任务返回回来。
      并发编程9 Java线程池 ThreadPoolExector干货博客思想原理JDK中的线程池
  4. 使用钩子方法定制功能,增强ThreadPoolExector。比如集成日志或者监控
// ThreadPoolExector的钩子方法
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }
           

原理

源码的简要流程

只关注主干逻辑,忽略加锁,异常和线程池关闭等分支

  1. 构造一个线程池,只是设置线程池参数,没做其他处理。
  2. 使用线程池执行 任务
    • 当前线程数小于核心线程数,则开启新线程并执行任务
    • 当超过核心线程数时,
      • 先使用offer方法 将任务提交到 任务缓冲-阻塞队列,直接返回成功或失败。(注意不是阻塞当前线程等待消费)
        • 提交成功,即在 任务缓冲-阻塞队列 中等待 空闲线程消费
        • 提交失败
          • 在不大于最大线程数的条件下,启动线程并执行任务
            • 若启动线程失败,即超过最大线程数等情况,则调用拒绝策略,拒绝任务。
  3. 线程(这里指的是Worker)的逻辑
    • 线程启动后,会立即执行第一个任务
    • 执行完第一个任务后,会不断从缓存队列中获取任务,并执行,若没有获取到任务则终止线程

      获取任务的逻辑 :

      • 如果 线程数大于核心线程数 或 核心线程允许超时销毁,则使用带有超时时间poll方法“阻塞地”从缓存队列中获取任务,
        • 如果超时即获取不到任务
      • 否则使用take方法“阻塞地”从缓存队列中获取任务
  4. 关闭线程池:

    未完待续…

核心数据结构
public class ThreadPoolExecutor extends AbstractExecutorService {
	//...
	private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
	// 存放任务的阻塞队列
	private final BlockingQueue<Runnable> workQueue;
	// 对线程池内部各种变量进行互斥访问控制
	private final ReentrantLock mainLock = new ReentrantLock();
	// 线程集合
	private final HashSet<Worker> workers = new HashSet<Worker>();
	//...
}
           

每一个线程是一个Worker对象。Worker是ThreadPoolExector的内部类,核心数据结构如下:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
	// ...
	final Thread thread; // Worker封装的线程
	Runnable firstTask; // Worker接收到的第1个任务
	volatile long completedTasks; // Worker执行完毕的任务个数
	// ...
}
           

由定义会发现,Worker继承于AQS,也就是说Worker本身就是一把锁。

这把锁有什么用处呢?用于线程池的关闭、线程执行任务的过程中。

构造器,即配置参数
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler){
       if (corePoolSize < 0 ||
           maximumPoolSize <= 0 ||
           maximumPoolSize < corePoolSize ||
           keepAliveTime < 0)
           throw new IllegalArgumentException();
       if (workQueue == null || threadFactory == null || handler == null)
           throw new NullPointerException();
       this.corePoolSize = corePoolSize;
       this.maximumPoolSize = maximumPoolSize;
       this.workQueue = workQueue;
       this.keepAliveTime = unit.toNanos(keepAliveTime);
       this.threadFactory = threadFactory;
       this.handler = handler;
   }
           

构造方法有7个参数

  1. corePoolSize,线程池中的核心线程数
  2. maximumPoolSize,线程池中的最大线程数
  3. keepAliveTime,空闲时间,当线程池数量超过核心线程数时,多余的空闲线程存活的时间,即:这些线程多久被销毁。
  4. unit,空闲时间的单位,可以是毫秒、秒、分钟、小时和天,等等
  5. workQueue,等待队列,线程池中的线程数超过核心线程数时,任务将放在等待队列,它是一个BlockingQueue类型的对象
  6. threadFactory,线程工厂,我们可以使用它来创建一个线程
  7. handler,拒绝策略,当线程池和等待队列都满了之后,需要通过该对象的回调函数进行回调处理

而我们更关心的是workQueue、threadFactory和handler

  1. workQueue

    等待队列是BlockingQueue类型的,理论上只要是它的子类,我们都可以用来作为等待队列。

    jdk内部自带一些阻塞队列

    ArrayBlockingQueue,队列是有界的,基于数组实现的阻塞队列

    LinkedBlockingQueue,队列可以有界,也可以无界。基于链表实现的阻塞队列

    PriorityBlockingQueue,带优先级的无界阻塞队列

    SynchronousQueue,不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作将一直处于阻塞状态。

    该队列也是Executors.newCachedThreadPool()的默认队列

  2. threadFactory

    ThreadFactory是一个接口,只有一个方法。用于生产线程。

    public interface ThreadFactory {

    Thread newThread(Runnable r);

    }

    可参考默认的线程工厂实现类:DefaultThreadFactory

  3. handler:拒绝策略

    jdk自带4种拒绝策略,我们来看看。

    AbortPolicy // 直接抛出RejectedExecutionException异常,默认的

    DiscardPolicy // 任务直接丢弃,不做任何处理

    DiscardOldestPolicy // 丢弃队列里最旧的那个任务,再尝试执行当前任务

    CallerRunsPolicy // 在调用者线程执行

    我们可以通过实现RejectedExecutionHandler接口的方式,定制自己的拒绝策略。

    如何制定需要看实际的场景,比如

    线上业务:如 秒杀,订单查询,下订单等,时效性要求高,在超过某个时间后,等待的意义不大了。

    离线业务:如 短信提醒批处理,时效性要求不高,但是在生产消费不平衡后,很容易积累大量的待处理任务。

任务提交
public void execute(Runnable command) {
	if (command == null)
		throw new NullPointerException();
	int c = ctl.get();
	// 如果当前线程数小于corePoolSize,则启动新线程
	if (workerCountOf(c) < corePoolSize) {
		// 添加Worker,并将command设置为Worker线程的第一个任务开始执行。
		if (addWorker(command, true))
			return;
		c = ctl.get();
	} 
	// 如果当前的线程数大于或等于corePoolSize,则调用workQueue.offer放入队列
	if (isRunning(c) && workQueue.offer(command)) {
		int recheck = ctl.get();
		// 如果线程池正在停止,则将command任务从队列移除,并拒绝command任务请求。
		if (! isRunning(recheck) && remove(command))
			reject(command);
		// 放入队列中后发现没有线程执行任务,开启新线程
		else if (workerCountOf(recheck) == 0)
			addWorker(null, false);
	} 
	// 线程数大于maxPoolSize,并且队列已满,调用拒绝策略
	else if (!addWorker(command, false))
		reject(command);
} 
	
// 该方法用于启动新线程。boolean core代表是否是核心线程。
private boolean addWorker(Runnable firstTask, boolean core) {
	retry:
	for (int c = ctl.get();;) {
		// 如果线程池状态值起码是SHUTDOWN和STOP,或则第一个任务不是null,或者工作队列为空
		// 则添加worker失败,返回false
		if (runStateAtLeast(c, SHUTDOWN)
			&& (runStateAtLeast(c, STOP)
				|| firstTask != null
				|| workQueue.isEmpty()))
			return false;
		for (;;) {
			// 工作线程数达到上限,要么是corePoolSize要么是maximumPoolSize,启动线程失败
			if (workerCountOf(c)>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
				return false;
			// 增加worker数量成功,返回到retry语句
			if (compareAndIncrementWorkerCount(c))
				break retry;
			c = ctl.get(); // Re-read ctl
			// 如果线程池运行状态起码是SHUTDOWN,则重试retry标签语句,CAS
			if (runStateAtLeast(c, SHUTDOWN))
				continue retry;
			// else CAS failed due to workerCount change; retry inner loop
		}
	} 
	// worker数量加1成功后,接着运行:
	boolean workerStarted = false;
	boolean workerAdded = false;
	Worker w = null;
	try {
		// 新建worker对象
		w = new Worker(firstTask);
		// 获取线程对象
		final Thread t = w.thread;
		if (t != null) {
			final ReentrantLock mainLock = this.mainLock;
			// 加锁
			mainLock.lock();
			try {
				// Recheck while holding lock.
				// Back out on ThreadFactory failure or if
				// shut down before lock acquired.
				int c = ctl.get();
				if (isRunning(c) 
					|| (runStateLessThan(c, STOP) && firstTask == null)) {
					// 由于线程已经在运行中,无法启动,抛异常
					if (t.isAlive()) // precheck that t is startable
						throw new IllegalThreadStateException();
					// 将线程对应的worker加入worker集合
					workers.add(w);
					int s = workers.size();
					if (s > largestPoolSize)
						largestPoolSize = s;
						workerAdded = true;
					}
				} finally {
					// 释放锁
					mainLock.unlock();
				} 
				// 如果添加worker成功,则启动该worker对应的线程
				if (workerAdded) {
					t.start();
					workerStarted = true;
				}
			}
		} finally {
			// 如果启动新线程失败
			if (! workerStarted)
			// workCount - 1
			addWorkerFailed(w);
		} 
		return workerStarted;
	}
           

线程池关闭

未完待续…