天天看点

并发编程--JAVA线程池实现讲解及使用示例1. 线程池相关概念1.1.  什么是线程池2. java中的线程池3. 线程池使用建议参考

1. 线程池相关概念

1.1.  什么是线程池

线程池和数据库连接池的原理也差不多,创建线程去处理业务,可能创建线程的时间比处理业务的时间还长一些,如果系统能够提前创建好线程,需要的时候直接拿来使用,用完之后不是直接将其关闭,而是将其返回到线程中中,给其他需要这使用,这样直接节省了创建和销毁的时间,提升了系统的性能。

简单的说,在使用了线程池之后,创建线程变成了从线程池中获取一个空闲的线程,然后使用,关闭线程变成了将线程归还到线程池。

1.2. 线程池实现原理

当向线程池提交一个任务之后,线程池的处理流程如下:

  1. 判断是否达到核心线程数,若未达到,则直接创建新的线程处理当前传入的任务,否则进入下个流程
  2. 线程池中的工作队列是否已满,若未满,则将任务丢入工作队列中先存着等待处理,否则进入下个流程
  3. 是否达到最大线程数,若未达到,则创建新的线程处理当前传入的任务,否则交给线程池中的饱和策略进行处理。

流程如下图:

并发编程--JAVA线程池实现讲解及使用示例1. 线程池相关概念1.1.  什么是线程池2. java中的线程池3. 线程池使用建议参考

2. java中的线程池

2.1. 主要构造方法

jdk中提供了线程池的具体实现,实现类是:

java.util.concurrent.ThreadPoolExecutor

,主要构造方法:

  1. corePoolSize:核心线程大小,当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使有其他空闲线程可以处理任务也会创新线程,等到工作的线程数大于核心线程数时就不会在创建了。如果调用了线程池的

    prestartCoreThread

    方法,线程池会提前把核心线程都创造好,并启动
  2. maximumPoolSize:线程池允许创建的最大线程数。如果队列满了,并且以创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。如果我们使用了无界队列,那么所有的任务会加入队列,这个参数就没有什么效果了
  3. keepAliveTime:线程池的工作线程空闲后,保持存活的时间。如果没有任务处理了,有些线程会空闲,空闲的时间超过了这个值,会被回收掉。如果任务很多,并且每个任务的执行时间比较短,避免线程重复创建和回收,可以调大这个时间,提高线程的利用率
  4. unit:keepAliveTIme的时间单位,可以选择的单位有天、小时、分钟、毫秒、微妙、千分之一毫秒和纳秒。类型是时间单位的枚举

    java.util.concurrent.TimeUnit

  5. workQueue:工作队列,用于缓存待处理任务的阻塞队列,常见的有4种,详见2.4. 线程池中常见的工作队列
  6. threadFactory:线程池中创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字
  7. handler:饱和策略,当线程池无法处理新来的任务了,那么需要提供一种策略处理提交的新任务,默认有4种策略,文章后面会提到
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
           

2.2. execute方法执行过程

  1. 判断线程池中运行的线程数是否小于corepoolsize,是:则创建新的线程来处理任务,否:执行下一步
  2. 试图将任务添加到workQueue指定的队列中,如果无法添加到队列,进入下一步
  3. 判断线程池中运行的线程数是否小于

    maximumPoolSize

    ,是:则新增线程处理当前传入的任务,否:将任务传递给

    handler

    对象

    rejectedExecution

    方法处理

2.3. 线程池的使用步骤

  1. 调用构造方法创建线程池
  2. 调用线程池的方法处理任务
  3. 关闭线程池

2.4. 线程池中常见的工作队列

任务太多的时候,工作队列用于暂时缓存待处理的任务,jdk中常见的5种阻塞队列:

ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按照先进先出原则对元素进行排序

LinkedBlockingQueue:是一个基于链表结构的无界阻塞队列,此队列按照先进先出排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法

Executors.newFixedThreadPool

使用了这个队列。

SynchronousQueue :一个不存储元素的阻塞队列,每个插入操作必须等到另外一个线程调用移除操作,否则插入操作一直处理阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法

Executors.newCachedThreadPool

使用这个队列

PriorityBlockingQueue:优先级队列,无界阻塞队列,进入队列的元素按照优先级会进行排序

线程优先级代码示例如下: 从输出可以看到,除了第一个任务,线程按设置的优先级从高到低顺序执行。

public class ThreadPriorityTest {
    static class Task implements Runnable, Comparable<Task> {
        private int priority;
        private String taskName;

        public Task(int priority, String taskName) {
            this.priority = priority;
            this.taskName = taskName;
        }

        @Override
        public void run() {
            try {
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "处理" + this.taskName);
        }

        @Override
        public int compareTo(Task o) {
            return Integer.compare(o.priority, this.priority);
        }
    }

    public static void main(String[] args) {
        ExecutorService executor = new ThreadPoolExecutor(1, 1,
                60L, TimeUnit.SECONDS,
                new PriorityBlockingQueue());
        System.out.println("测试线程优先级");
        for (int i = 0; i < 10; i++) {
            String taskName = "任务" + i;
            executor.execute(new Task(i, taskName));
        }
        for (int i = 100; i >= 90; i--) {
            String taskName = "任务" + i;
            executor.execute(new Task(i, taskName));
        }
        executor.shutdown();
    }
}
           

2.5. 创建线程池的常见方法

使用Executors 返回的线程池对象的弊端如下:

1)FixedThreadPool 和 SingleThreadPool: 允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。

2)CachedThreadPool 和 ScheduledThreadPool: 允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

  public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
           

2.6. 常见饱和策略

当线程池中队列已满,并且线程池已达到最大线程数,线程池会将任务传递给饱和策略进行处理。这些策略都实现了

RejectedExecutionHandler

接口。接口中有个方法:

void rejectedExecution(Runnable r, ThreadPoolExecutor executor);

2.6.1.  常见的4种饱和策略

  • AbortPolicy:直接抛出异常
  • CallerRunsPolicy:在当前调用者的线程中运行任务,即随丢来的任务,由他自己去处理
  • DiscardOldestPolicy:丢弃队列中最老的一个任务,即丢弃队列头部的一个任务,然后执行当前传入的任务
  • DiscardPolicy:不处理,直接丢弃掉,方法内部为空

2.6.2.  自定义饱和策略

当向线程池提交的任务太多,线程池无法添加新的任务时,将任务传递给RejectedExecutionHandler对象的

rejectedExecution

方法处理。

需要实现

RejectedExecutionHandler

接口。任务无法处理的时候,如果想记录一下日志,示例代码如下:

创建一个核心线程及最大线程是2,且队列是2的线程池,当提交5个任务时,会导致第5个任务被拒绝执行。

public class ThreadTest {
    static class Task implements Runnable {
        String taskName;

        public Task(String taskName) {
            this.taskName = taskName;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "处理" + this.taskName);
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        @Override
        public String toString(){
            return String.format("taskName:%s",taskName);
        }
    }

    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2,
                2,
                60L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(2),
                Executors.defaultThreadFactory(),
                (r, executors) -> {
                    System.out.println(String.format("当前任务数:%s,无法处理的任务:%s",executors.getActiveCount(),r));
                });
        for (int i = 0; i < 5; i++) {
            executor.execute(new Task("任务-" + i));
        }
        executor.shutdown();
    }
}
           

日志输出如下:

pool-1-thread-1处理任务-0

pool-1-thread-2处理任务-1

当前任务数:2,无法处理的任务:taskName:任务-4

pool-1-thread-2处理任务-2

pool-1-thread-1处理任务-3

2.7. 自定义创建线程的工厂

自定义创建线程工厂需要实现

java.util.concurrent.ThreadFactory

接口中的

Thread newThread(Runnable r)

方法,参数为传入的任务,需要返回一个工作线程。

自定义创建线程工厂的目的:给负责不同业务模块的线程起一个能代表业务功能且有意义的名字,在系统出现问题的时候,通过线程堆栈信息可以更容易发现系统中问题所在。

示例代码如下: 创建线程工厂 建议使用guava提供的ThreadFactoryBuilder。

public static void main(String[] args) {
        ThreadFactory factory = new ThreadFactoryBuilder().setNameFormat("stat-%d").setDaemon(true).build();
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5,
                60L, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(10), factory);
        for (int i = 0; i < 5; i++) {
            String taskName = "任务-" + i;
            executor.execute(() -> {
                System.out.println(Thread.currentThread().getName() + "处理" + taskName);
            });
        }
        executor.shutdown();
    }
           

2.8. 线程池关闭方法的区别

shutdown只是将线程池的状态设置为SHUTWDOWN状态,正在执行的任务会继续执行下去,没有被执行的则中断。

shutdownNow则是将线程池的状态设置为STOP,正在执行的任务则被停止,没被执行任务的则返回。

        举个工人吃包子的例子:一个厂的工人(Workers)正在吃包子(可以理解为任务),假如接到shutdown的命令,拿到包子的工人会把手头上的包子给吃完,没有拿到包子的工人则不能吃(不能再从笼子里面拿)。如果接到shutdownNow的命令,则所有工人立刻停止吃包子,直接把手头上没吃完的包子放下,更别提笼子里的包子了。

shutdown:执行该该方法时,线程池的状态则立刻变成SHUTDOWN状态。此时,则不能再往线程池中添加任何任务,否则将会抛出RejectedExecutionException异常。但是,此时线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出。 

shutdownNow:执行该方法,线程池的状态立刻变成STOP状态,并试图停止所有正在执行的线程,不再处理还在池队列中等待的任务,当然,它会返回那些未执行的任务。 它试图终止线程的方法是通过调用Thread.interrupt()方法来实现的,但是这种方法的作用有限,如果线程中没有sleep 、wait、Condition、定时锁等应用, interrupt()方法是无法中断当前的线程的。所以,shutdownNow()并不代表线程池就一定立即就能退出,它可能必须要等待所有正在执行的任务都执行完成了才能退出。 

2.9. 如何扩展线程池

jdk提供了

ThreadPoolExecutor

这个高性能线程池,但是如果想在这个线程池上面做一些扩展,比如,监控每个任务执行的开始时间,结束时间,或者一些其他自定义的功能,应该怎么办?

ThreadPoolExecutor

内部提供了几个方法

beforeExecute

afterExecute

terminated

,可以由开发人员自己去这些方法

  • beforeExecute:任务执行之前调用的方法,有2个参数,第1个参数是执行任务的线程,第2个参数是任务  protected void beforeExecute(Thread t, Runnable r) { }
  • afterExecute:任务执行完成之后调用的方法,2个参数,第1个参数表示任务,第2个参数表示任务执行时的异常信息,如果无异常,第二个参数为null  protected void afterExecute(Runnable r, Throwable t) { }
  • terminated:线程池最终关闭之后调用的方法。所有的工作线程都退出了,最终线程池会退出,退出时调用该方法  protected void terminated() { }

   代码示例如下:

public class ThreadPoolExtTest {
    static class Task implements Runnable {
        String taskName;

        public Task(String taskName) {
            this.taskName = taskName;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "处理" + this.taskName);
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        @Override
        public String toString() {
            return String.format("taskName:%s", taskName);
        }
    }

    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 60L,
                TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1), Executors.defaultThreadFactory()) {
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println(System.currentTimeMillis() + "," + t.getName() + ",开始执行任务:" + r.toString());
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + ",任务:" + r.toString() + ",执行完毕!");
            }

            @Override
            protected void terminated() {
                System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + ",关闭线程池!");
            }
        };
        for (int i = 0; i < 3; i++) {
            executor.execute(new Task("任务-" + i));
        }
        executor.shutdown();
    }
}
           

3. 线程池使用建议

3.1. 合理地配置线程池

需要从以下4个角度分析任务的特性,根据任务特性,全是配置线程池

  • 任务的性质:CPU密集型任务、IO密集型任务和混合型任务
  • 任务的优先级:高、中、低
  • 任务的执行时间:长、中、短
  • 任务的依赖性:是否依赖其他的系统资源,如数据库连接。

性质不同的任务可以用不同规模的线程池分开处理。

  • CPU密集型任务应该尽可能小的线程池,如配置cpu数量+1个线程的线程池。
  • IO密集型任务并不是一直在执行任务,不能让cpu闲着,则应配置尽可能多的线程,如:cup数量*2。
  • 混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这2个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。

可以通过

Runtime.getRuntime().availableProcessors()

方法获取cpu数量。优先级不同任务可以对线程池采用优先级队列来处理,让优先级高的先执行。

使用队列的时候建议使用有界队列,有界队列增加了系统的稳定性,如果采用无界队列,任务太多的时候可能导致系统OOM,直接让系统宕机。

3.2. 线程池中线程数量的配置

线程池中总线程大小对系统的性能有一定的影响,目标是希望系统能够发挥最好的性能,过多或者过小的线程数量无法有效的使用机器的性能。

在Java Concurrency in Practice书中给出了估算线程池大小的公式:

Ncpu = CUP的数量  Ucpu = 目标CPU的使用率,0<=Ucpu<=1   W/C = 等待时间与计算时间的比例

为保存处理器达到期望的使用率,最有的线程池的大小等于:Nthreads = Ncpu × Ucpu × (1+W/C)

3.3. 阿里巴巴Java开发手册中的规约

  • 创建线程或线程池时请指定有意义的线程名称,方便出错时回溯
  • 线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。 说明:使用线程池的好处是减少在创建和销毁线程上所花的时间以及系统资源的开销,解决资源不足的问题。如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者 “过度切换”的问题
  • 线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样 的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

参考

阿里巴巴Java开发手册(纪念版).pdf

https://www.cnblogs.com/aspirant/p/10265863.html

java高并发系列 - 第18天:JAVA线程池,这一篇就够了