天天看点

Java多线程之ThreadPoolExecutor实现原理和源码分析(五)

章节概览、

  • Java多线程之章节概览

1、概述

线程池的顾名思义,就是线程的一个集合。需要用到线程,从集合里面取出即可。这样设计主要的作用是优化线程的创建和销毁而造成的资源浪费的情况。Java中的线程池的实现主要是JUC下面的ThreadPoolExecutor类完成的。下面我们做的源码分析都是基于ThreadPoolExecutor类进行分析。

2、线程池实现类图UML

Java多线程之ThreadPoolExecutor实现原理和源码分析(五)

从类继承图可以看到,ThreadPoolExecutor 继承 AbstractExecutorService 抽象类。而AbstractExecutorService 实现了ExecutorService接口。ExecutorService 接口又继承了Executor接口。下面分析下这几个接口。

2.1、 核心接口分析

2.1.1、 Executor接口源码分析

public interface Executor {
// 执行一个任务。任务都被封装成Runnable的实现
    void execute(Runnable command);
}
           

2.1.2 ExecutorService接口源码分析

public interface ExecutorService extends Executor {
	
// 启动有序的关闭,之前提交的任务将会被执行,但不会接受新的任务。
    void shutdown();

// 尝试停止所有正在执行的任务,停止等待处理的任务,病返回任务列表
    List<Runnable> shutdownNow();

// 判断线程池是否已经关闭
    boolean isShutdown();

// 如果关闭后所有任务都已完成。 但是前提是必须先执行:shutdown 或者 shutdownNow
    boolean isTerminated();

// 在开启shutdown之后,阻止所有的任务知道执行完成
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
        
// 提交任务,带返回结果的
    <T> Future<T> submit(Callable<T> task);

// 提交任务,封装返回结果为T
    <T> Future<T> submit(Runnable task, T result);

 // 提交一个普通任务,返回结果任意
    Future<?> submit(Runnable task);

// 执行一批任务,返回结果为 List<Future<T>>
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
           
2.2 核心内部类分析

从继承图来看,其具有5个核心的内部类。其中4内部类对应的是拒绝策略。Worker是核心的执行代码。下面我们看下拒绝策略类的结构以及策略的运用场景

2.2.1、 RejectedExecutionHandler 接口

public interface RejectedExecutionHandler {
// 拒绝执行策略
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
           

2.2.2、 AbortPolicy 策略

Java线程池默认的阻塞策略,不执行此任务,而且直接抛出一个运行时异常。

public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy() { }
        // 直接抛出异常,描述前线程的基本信息
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }
           

2.2.3、DiscardPolicy策略

空方法,不做任何处理

public static class DiscardPolicy implements RejectedExecutionHandler {
        public DiscardPolicy() { }
        
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }
           

2.2.4、DiscardOldestPolicy 策略

从队列里面抛弃一个最老的任务,并再次execute 此task

public static class DiscardOldestPolicy implements RejectedExecutionHandler {

        public DiscardOldestPolicy() { }
       
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
            	// 从队列里面取出最老的一个任务
                e.getQueue().poll();
                // 手动调用execute方法执行,将任务添加到队列中
                e.execute(r);
            }
        }
    }
           

2.2.5、CallerRunsPolicy 策略

public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code CallerRunsPolicy}.
         */
        public CallerRunsPolicy() { }

      // 如果当前线程池没有关闭,则调用线程的run方法
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }
           

3、ThreadPoolExecutor构造函数核心成员变量分析

3.1、构造函数详解
public class ThreadPoolExecutor extends AbstractExecutorService {
	public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
}
           

构造函数参数说明:

  • corePoolSize

    线程池中的核心线程数,空闲时候线程也不会回收,除非把allowCoreThreadTimeOut设置为 true,这时核心线程才会被回收。

  • maximumPoolSize

    线程池中可以创建的最大线程数,限定为2^29-1。

  • keepAliveTime

    当线程池中创建的线程超过了核心线程数的时候,在没有新任务加入的等待时间。

  • unit

    keepAliveTime的时间单位,可以是纳秒,微秒,毫秒,秒,分钟,小时,天。

  • workQueue

    存放任务的队列,只有当线程数 > 核心线程数,才会把其他的任务放入queue,一般常用的是queue就是ArrayBlockingQueue,LinkedBlockingQueue,SynchronousQueue。

  • threadFactory

    创建线程的工厂类。

  • handler

    当queue满了和线程数达到最大限制,对于继续到达的任务采取的策略。默认采取AbortPolicy , 也就是拒绝策略,直接抛出异常

3.2、核心成员变量分析

线程池中设计非常巧妙的一个地方是把线程池的状态和运行的线程数量用一个int类型进行存储。这样一来可以保持线程池状态和线程池活跃线程数量的一致性。因为AtomicInteger是线程安全的。

  1. workerCount:线程池中当前活动的线程数量,占据ctl的低29位;
  2. runState:线程池运行状态,占据ctl的高3位,有RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED五种状态

为了将线程池的状态和线程池中的工作线程的数量放到一个int里面进行管理。他们利用了二进制数据进行位运算。其中int类型有4个字节,一个字节8位。总共有32位。其中高的3位表示线程的状态。低29位代表线程的数量。

其中32位中,高三位代表的是状态:

  • 111 > RUNNING
  • 000 > SHUTDOWN
  • 001 > STOP
  • 010 > TIDYING
  • 110 > TERMINATED

低29位代表线程的数量。所以最大的线程数为 2^29 -1 = 536870911

// 记录线程池状态和线程数量(总共32位,前三位表示线程池状态,后29位表示线程数量),保证线程安全性
	private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// int 字节32位,COUNT_BITS代表的是29位
    private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程的最大容量: 000 11111111111111111111111111111
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
// 运行状态: 111 00000000000000000000000000000
    private static final int RUNNING    = -1 << COUNT_BITS;
// 关闭状态: 000 00000000000000000000000000000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 停止状态: 001 00000000000000000000000000000
    private static final int STOP       =  1 << COUNT_BITS;
// 整理状态: 010 00000000000000000000000000000
    private static final int TIDYING    =  2 << COUNT_BITS;
// 终止状态: 011 00000000000000000000000000000
    private static final int TERMINATED =  3 << COUNT_BITS;

/**
* 是按位取反的意思,CAPACITY表示的是高位的3个0,和低位的29个1,而~CAPACITY则表示高位的3个1,2低位的9个0,
* 然后再与入参c执行按位与操作,即高3位保持原样,低29位全部设置为0,也就获取了线程池的运行状态runState
*/
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    
/**
* 返回当前线程的数量。其中c代表线程池的状态,即是高三位。:
* 而CAPACITY 代表的是线程的容量,即000 11111111111111111111111111111
* c & CAPACITY ,只有当都为1的时候,才为真,这样直接舍弃高位
*/
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    
/**
* 传入的rs表示线程池运行状态runState,其是高3位有值,低29位全部为0的int,
* 而wc则代表线程池中有效线程的数量workerCount,其为高3位全部为0,而低29位有值得int,
* 将runState和workerCount做或操作|处理,即用runState的高3位,workerCount的低29位填充的数字,而默认传入的
*/
    private static int ctlOf(int rs, int wc) { return rs | wc; }
           

线程池的状态转换:

// 调用了shutdown()方法 
RUNNING -> SHUTDOWN 

// 调用了shutdownNow() 
(RUNNING 或 SHUTDOWN) -> STOP 

// 当队列和线程池为空 
SHUTDOWN -> TIDYING 

// 当线程池为空 
STOP -> TIDYING 

// 当terminated()钩子方法执行完成 
TIDYING -> TERMINATED 
           

4、执行流程核心源码分析

4.1、程序入口:execute 方法
/*
 * 我们以execute 方法作为程序的入口开始分析
 */
public void execute(Runnable command) {
// 判断当前任务是否为null,如果为null,直接抛出异常
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         * 有以下3个步骤
         *
         * 1.如果少于corePoolSize的线程在运行,那么试着启动一个新线程,其中用给定指令作为first task。
         * 这会调用addWorker去原子性得检查runState和workerCoune,因此可以防止错误报警,在错误报警不应该时通过返回false来添加线程
         * 2.如果任务被成功排队,我们任然应该第二次检查是否添加一个新线程(因为可能存在在最后一次检查后挂掉的情况)
         * 或者在进入这个方法期间线程池shutdown。所以我们再次检查状态,如果已关闭和有必要则退出队列,或者如果没有的话就开始一个新的线程。
         * 3.如果我们无法将task入队,那么我们试图添加新线程。如果失败,那么知道我们shutdown或者是饱和的并拒绝task。
         */
		
      // 获取ctl的初始值。其初始值是:rs | wc,即状态位和线程数量高低位互补  
        int c = ctl.get();
        // 获取当前线程的数量,初始化的时候数量为0,和当前 corePoolSize 比较
        if (workerCountOf(c) < corePoolSize) {
       	    // 如果条件成立,调用addWorker(command, true)
       	    // 源码分析请看:4.2、boolean addWorker(Runnable firstTask, boolean core)
       	    // 从addWorker源码分析有得出,只要当前的workerCountOf(c) < corePoolSize 条件成立,就会往线程池里面加入一个线程
            // 当前加入的线程会被初始化到Worker中,通过firstTask进行设置
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 当前线程池的数量达到corePoolSize的时候
        // 验证当前线程池是否处于运行状态。如果处于运行状态。将当前的任务添加到任务队列中。
        // offer方法添加一个元素并返回为true。如果队列已满,这返回false
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 添加任务到队列成功以后,在此判断当前线程池是否运行状态
            // 如果线程池没有处于运行状态,则从队列中移除当前任务,同时执行拒绝策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 由于存在核心线程的过期策略,可能这个时候当前线程池中的线程已经都过期清理了
            // 所以这里进一步的进行检测,获取当前线程的个数。如果线程个数为0的话,则新建一个线程worker
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 如果当前线程池运行正常,且添加任务到队列失败。这时重新启动一个worker线程去执行
        // 此时线程池的最多的线程数量,wc < maximumPoolSize
        // 尝试直接添一个新的worker线程。如果添加失败,执行拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }
           
4.2、 boolean addWorker(Runnable firstTask, boolean core)

addWorker 中的两个方法参数,firstTask代表当前需要执行的任务。

core的含义有如下:

  1. 如果满足workerCountOf© < corePoolSize ,则为true
  2. 如果不满足 workerCountOf© < corePoolSize 且添加任务workQueue.offer(command) 失败。这时候传入的为false。
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
           // 获取当前ctl的最新值
            int c = ctl.get();
            // 获取当前线程池的运行状态
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            // 判断当前线程是否是否已经结束。检查当前队列是否为null
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                // 获取当前线程的数量
                int wc = workerCountOf(c);
                // 判断当前线程的数量是否超过最大值
 				// 这里从core的细节上面已经说明。如果为true 则使用 corePoolSize,反之使用 maximumPoolSize
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
               // 通过CAS设置当前的workerCount:ctl.compareAndSet(expect, expect + 1);
               // 当前的线程数量 + 1
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
           // 初始化Worker对象,传入第一个需要执行的任务
           // Woker类的分析,请看: 4.3、Woker类源码分析
            w = new Worker(firstTask);
            // 获取 worker对象内部封装的thread线程
            final Thread t = w.thread;
            if (t != null) {
                // 同步代码块,保证当前线程池状态的一致性。因为workers是共享变量
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
          
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 检查当前线程是否被启动。
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 将当前线程加入到线程池中
                        workers.add(w);
                        int s = workers.size();
                        // 重置当前线程池拥有的线程个数
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // 添加成功,启动线程
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 添加失败,做失败清理
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
           
4.3、Woker类源码分析
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;
        // 完成的任务数量
        volatile long completedTasks;
        // 构造函数
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            // 初始化成员变量 firstTask
            this.firstTask = firstTask;
            // 初始化当前线程,通过线程工场。具体源码请参考:4.4、ThreadFactory 源码分析
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            // 执行当前传入的任务,具体实现,请参考:4.5、void runWorker(Worker w) 源码分析
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

           
4.4、ThreadFactory 源码分析
static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }
	
       // 创建线程
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            // 设置当前线程为非后台线程
            if (t.isDaemon())
                t.setDaemon(false);
            // 设置当前线程的优先级为正常优先级
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
           
4.5、void runWorker(Worker w) 源码分析
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        // 当线程池中的线程数量未达到corePoolSize大小的时候,每次都会先创建一个Worker对象,把当前的任务复制给firstTask
        // 直到当前的线程池中的线程数量和corePoolSize大小相等,每次新加任务都会存入到任务队列中
        Runnable task = w.firstTask;
        // 置空当前的firstTask,主要是为了方便GC
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
           // 当前的firstTask != null 
           // getTask 获取的任务也不为空,getTask方法详解,请参考:4.6、Runnable getTask() 源码分析
            while (task != null || (task = getTask()) != null) {
                // 获取当前锁资源
                w.lock();
                // 如果池正在停止,请确保线程被中断;
				// 如果没有,请确保线程不被中断。 这个
				// 需要在第二种情况下重新检查才能处理
				// shutdown在清除中断时正在比赛
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                    
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 执行当前的任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    // 置空当前的 task,方便GC
                    task = null;
                    // 当前的完成任务++
                    w.completedTasks++;
                    // 释放当前的锁
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
        	// 当判断条件 task != null || (task = getTask()) != null 不成立的时候,删除当前Woker
            processWorkerExit(w, completedAbruptly);
        }
    }
           
4.6、Runnable getTask() 源码分析
private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // 判断当前的任务队列是否为null
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            // 获取当前的线程数量
            int wc = workerCountOf(c);

            // Are workers subject to culling?
           
            // 如果当前的 allowCoreThreadTimeOut 设置为 ture,或者 wc > corePoolSize 的情况 为 ture
            // 当前线程队列已满,才会出现 wc > corePoolSize的
            // 通过这段设置,用于判断核心线程空闲时,是否需要清理
            // 其次当线程数高于核心线程数时,是否需要清理线程
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize 的情况

            // 判断当前的线程数是否大于最大的线程数
            // wc > 1 或者 workQueue为空
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                // 减少当前的线程数量,通过CAS
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                // 根据  timed判断,通过哪种方式获取当前的任务
                Runnable r = timed ?
                    // timed 为false,说明当前线程池线程数量超过了核心数量
                    //  workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 表示等待 keepAliveTime 的时间之后
                    // 没有任务的话,直接返回 null。而这个返回为null,直接影响当前线程是否被回收的前提条件。
                    // 线程循环条件:while (task != null || (task = getTask()) != null) 如果 返回 task = null。则直接跳出循环
                    // 通过 processWorkerExit(w, completedAbruptly) 进行线程的回收
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    // 移除并返回队列的头部元素,如果队列为空,则阻塞
                    workQueue.take();
                // 如果 r != null。 则返回
                if (r != null)
                    return r;
                // 如果获取失败,设置当前timeOut为超时,接着循环
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
           

ThreadPoolExecutor 的核心理财源码已经分析完了

5、总结

  1. 当往线程池中添加任务的时候,每次添加一个任务都回去新增一个线程。直到不满足 wc < corePoolSize
  2. 当前线程池的大小已经达到了corePoolSize的时候,每次添加任务会被存放到阻塞任务队列中。等待执行
  3. 等等待任务队列也满的时候,且添加失败。此时在来新的任务,就会接着增加线程的个数,直到满足:wc >= maximumPoolSize ,添加线程失败执行拒绝策略。
  4. 线程池中,把线程的状态和数量通过int类型进行维护,高三位表示状态,低29位表示线程数量。这样可以保证线程的状态和数量的一致性

继续阅读