天天看点

Java并发编程之线程池的关闭

对于一些定时任务或者网络请求服务将会使用线程池,当应用停机时需要正确安全的关闭线程池,并且要适当的处理已经提交给线程池而线程池没完成的任务,如果处理不当,可能造成数据丢失,业务异常等等问题。那么就需要我们对线程池的状态和关闭的方法要有一个比较清晰的了解和认识。

线程池的五种状态

线程池的5种状态:RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED

Java并发编程之线程池的关闭

RUNNING

  • 状态说明:线程池处在RUNNING状态时,能够接收新任务(线程池没有满的情况下),以及对已添加的任务进行处理。
  • 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0。

SHUTDOWN

  • 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但是会将工作队列中的任务执行结束
  • 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。

STOP

  • 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理任务的线程。
  • 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。

TIDYING

  • 状态说明:所有的任务终止之后,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。
  • 状态切换:当线程池在SHUTDOWN状态下,阻塞队列(任务队列)为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN

    ->TIDYING。 当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。

TERMINATED

  • 状态说明:线程池彻底终止,就变成TERMINATED状态。
  • 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。

测试

ShutDownTask类实现了Runnable接口,它的run()就是我们线程所要执行的任务

class ShutDownTask implements Runnable {

    @Override
    public void run() {
        try {
            Thread.sleep(500);
            System.out.println(Thread.currentThread().getName());
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName() + "被中断了");
        }
    }

}      

shutdown()方法

public class ShutDown {

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 100; i++) {
            executorService.execute(new ShutDownTask());
        }
        Thread.sleep(1000);
        executorService.shutdown();
    }
}      

输出,会一直执行完for循环中已经提交完毕的任务,直到执行完毕

Java并发编程之线程池的关闭

进入ThreadPoolExecutor的shutdown()方法

/*启动有序关闭,其中执行先前提交的任务,但不会接受新任务。 如果已经关闭,调用没有额外的效果。
此方法不等待先前提交的任务完成执行。 使用awaitTermination来做到这一点。
抛出:
SecurityException 
*/
  public void shutdown() {
      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
          // 检查权限
          checkShutdownAccess();
          // 设置线程池状态 为SHUTDOWN
          advanceRunState(SHUTDOWN);
         // 中断空闲线程
          interruptIdleWorkers();
          // 钩子函数,主要用于清理一些资源
          onShutdown(); 
      } finally {
          mainLock.unlock();
      }
      tryTerminate();
  }      

shutdown 方法首先加锁,其次先检查系统安装状态。接着就会将线程池状态变为 SHUTDOWN,在这之后线程池不再接受提交的新任务。此时如果还继续往线程池提交任务,将会使用线程池拒绝策略响应,默认情况下将会使用 ThreadPoolExecutor.AbortPolicy

private static final RejectedExecutionHandler defaultHandler =new AbortPolicy();      

AbortPolicy是一个static 类,实现了RejectedExecutionHandler接口,

只有一个方法,那就是抛出 RejectedExecutionException 异常。

public static class AbortPolicy implements RejectedExecutionHandler {
        public AbortPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }      

interruptIdleWorkers 方法只会中断空闲的线程,不会中断正在执行任务的的线程。

调用shutdown()后线程池的后续操作:

  1. 停止接收新submit的任务。
  2. 已经提交的任务(包括正在执行的任务和队列中等待的任务),都会继续执行完成。
  3. 等到第2步完成后,才真正停止。

shutdown()之后我们继续提交任务试试

public class ShutDown {

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 100; i++) {
            executorService.execute(new ShutDownTask());
        }
        Thread.sleep(1000);

        executorService.shutdown();
        executorService.execute(new ShutDownTask());
    }
}      

抛出RejectedExecutionException异常,并且我们可以看见,会打印出在排队的任务还有多少,完成的任务有多少,活跃的线程数有多少

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task threadpool.ShutDownTask@4b67cf4d rejected from java.util.concurrent.ThreadPoolExecutor@7ea987ac[Shutting down, pool size = 10, active threads = 10, queued tasks = 70, completed tasks = 20]
  at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
  at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
  at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
  at threadpool.ShutDown.main(ShutDown.java:21)      

shutdownNow()方法

让线程池马上停止执行线程和任务队列里面的所有任务

public class ShutDown {

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 100; i++) {
            executorService.execute(new ShutDownTask());
        }
        Thread.sleep(1000);
        List<Runnable> runnableList = executorService.shutdownNow();
        
        for(Runnable runnable:runnableList){
            System.out.println(runnable.toString());
        }
        
    }
}      

输出, 在调用shutdownNow()方法之前成功执行了部分任务,但是在执行shutdownNow()方法后,其他任务被中断了,执行完这个方法后,会把没执行完的任务返回,便于继续处理

Java并发编程之线程池的关闭

shutdownNow()源码

public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 检查状态
            checkShutdownAccess();
            // 将线程池状态变为 STOP
            advanceRunState(STOP);
            // 中断所有线程,包括工作线程以及空闲线程
            interruptWorkers();
            // 丢弃工作队列中存量任务
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }      

shutdownNow 方法将会把线程池状态设置为 STOP,然后中断所有线程,最后取出工作队列中所有未完成的任务返回给调用者。

调用shutdownNow()后线程池的后续操作:

  1. 跟shutdown()一样,先停止接收新submit的任务。
  2. 忽略任务队列里等待的任务。
  3. 尝试将正在执行的任务interrupt中断。
  4. 返回未执行的任务列表。

对比 shutdown 方法,shutdownNow 方法比较粗暴,直接中断工作线程。不过这里需要注意,中断线程并不代表线程立刻结束。这里需要线程主动配合线程中断响应。

线程中断机制: thread.interrupt() 只是设置一个中断标志,不会立即中断正常的线程。如果线程中没有sleep 、wait、Condition、定时锁等应用, interrupt()方法是无法中断当前的线程的。如果想让中断立即生效,必须在线程 内调用 Thread.interrupted() 判断线程的中断状态。 对于阻塞的线程,调用中断时,线程将会立刻退出阻塞状态并抛出 InterruptedException 异常。所以对于阻塞线程需要正确处理InterruptedException 异常。

线程池 shutdown 与 shutdownNow 方法都不会主动等待执行任务的结束,如果需要等到线程池任务执行结束,需要调用 awaitTermination 主动等待任务调用结束。

awaitTermination()

等待一个时间,在这个时间里面如果线程池里面的线程都执行完毕了,会返回一个True,否则返回false。执行这个方法的时候,调用的线程是会被阻塞到这个方法这里,直到执行完毕返回一个boolean值

public class ShutDown {

    public static void main(String[] args) throws InterruptedException {

        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 100; i++) {
            executorService.execute(new ShutDownTask());
        }
        Thread.sleep(1500);

        try {
            executorService.shutdown();
            while (!executorService.awaitTermination(3, TimeUnit.SECONDS)){
                System.out.println("线程池任务还未执行结束");
            }
            System.out.println("线程池关闭,所有任务已经结束");
        } catch (InterruptedException e) {
            System.err.println("线程池任务未正常执行结束");
            e.printStackTrace();
        }

    }
}      

输出:调用线程执行shutdown()方法之后,不断调用awaitTermination(),这里每次调用,都会阻塞三秒,然后返回一个boolean值表示是否执行完毕,没有就继续while,直到返回true,表示线程池里面所有的任务已经执行完毕

Java并发编程之线程池的关闭
Java并发编程之线程池的关闭

isShutdown()

我们先了解他是怎么保存线程池状态和线程数量的。

ThreadPoolExecutor源码里面使用了一个原子类AtomicInteger来保存数据

线程池运行状态保存在了原子类的高3位,有效线程数量保存在了原子类的低位

如下所示

//ctl一个变量同时存储runState和workerCount,其中runState占用高3位,workCount占用低29位
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
 
    //workerCount使用的位数:32-3=29位
    private static final int COUNT_BITS = Integer.SIZE - 3;
 
    //workerCount最大值:536870911,即0b00011111_11111111_11111111_11111111
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
 
    // runState存储在高位,占用3位
    //0b11100000_00000000_00000000_00000000
    private static final int RUNNING    = -1 << COUNT_BITS;
 
    //0b00000000_00000000_00000000_00000000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
 
    //0b00100000_00000000_00000000_00000000
    private static final int STOP       =  1 << COUNT_BITS;
 
    //0b01000000_00000000_00000000_00000000
    private static final int TIDYING    =  2 << COUNT_BITS;
 
    //0b01100000_00000000_00000000_00000000
    private static final int TERMINATED =  3 << COUNT_BITS;
 
    // 获取runState,即保留ctl的高3位,后29位置0
    private static int runStateOf(int c)     {
      return c & ~CAPACITY;
    }
 
    //获取workerCount,即保留ctl的低29位,高3位置0
    private static int workerCountOf(int c)  { 
      return c & CAPACITY;
    }
    //设置ctl,或操作
    private static int ctlOf(int rs, int wc) {
      return rs | wc;
    }      

我们再看isShutdown() 方法,这个方法返回boolean值

public boolean isShutdown() {
     return ! isRunning(ctl.get());
 }      

如下所示,也就比较清晰了,就是直接比数的大小,而线程状态的数已经事先定义成了static final 字段

private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }      

所以这个方法就是判断线程池是不是处于RUNNING状态

isTerminated()

我们再看 isTerminated()方法

public boolean isTerminated() {
        return runStateAtLeast(ctl.get(), TERMINATED);
    }      
private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }      

References: