天天看点

Java之并发编程(二)二、Java内存模型(重要)三、Java线程池(重要)四、Java 线程池最佳实践

二、Java内存模型(重要)

1. CPU缓存模型

1.1 CPU缓存

CPU缓存是为了解决 CPU处理速度和内存处理速度不对等的问题。(类比:缓存如Redis是为了解决程序处理速度和访问常规关系型数据库速度不对等的问题)

内存缓存是为了解决 内存处理速度和外存(硬盘)处理速度不对等的问题

Java之并发编程(二)二、Java内存模型(重要)三、Java线程池(重要)四、Java 线程池最佳实践

1.2 内存缓存不一致问题

CPU 为了解决内存缓存不一致性问题可以通过制定缓存一致协议(比如 MESI 协议)或者其他手段来解决。

这个缓存一致性协议指的是在 CPU高速缓存与主内存交互的时候需要遵守的原则和规范。

操作系统通过内存模型(Memory Model) 定义一系列规范来解决这个问题。

2. 指令重排序

2.1 概念

为了提升执行速度/性能,计算机在执行程序代码的时候,会对指令进行重排序。

指令简单来说就是系统在执行代码的时候并不一定是按照你写的代码的顺序依次执行。

2.2 分类

(1)编译器优化重排

概念:编译器(包括 JVM、JIT 编译器等)在不改变单线程程序语义的前提下,重新安排语句的执行顺序。

编译器禁止重排序方式:通过禁止特定类型的编译器重排序的方式来禁止重排序。

(2)处理器级别指令重排序

处理器级别指令禁止重排序方式:通过插入内存屏障(Memory Barrier,或有时叫做内存栅栏,Memory Fence)的方式来禁止特定类型的处理器重排序。

&& 内存屏障是一种 CPU 指令,用来禁止处理器指令发生重排序(像屏障一样),从而保障指令执行的有序性。另外,为了达到屏障的效果,它也会使处理器写入、读取值之前,将将主内存的值写入高效缓存,清空无效队列,从而保障变量的可见性。

<1> 指令重排序

现代处理器采用了指令级并行技术(Instruction-Level Parallelism,ILP)来将多条指令重叠执行。如果不存在数据依赖性,处理器可以改变语句对应机器指令的执行顺序。

<2> 内存系统重排

内存系统也会有“重排序”,但又不是真正意义上的重排序。在 JMM 里表现为主存和本地内存的内容可能不一致,进而导致程序在多线程下执行可能出现问题。

2.3 java源代码重排及问题

(1)java源代码重排过程

Java 源代码会经历 编译器优化重排 —> 指令并行重排 —> 内存系统重排 的过程,最终才变成操作系统可执行的指令序列。

(2)指令重排问题

指令重排序可以保证串行语义一致,但是没有义务保证多线程间的语义也一致 ,所以在多线程下,指令重排序可能会导致一些问题。

3. JMM内存模型图

Java之并发编程(二)二、Java内存模型(重要)三、Java线程池(重要)四、Java 线程池最佳实践

4. 主内存与本地内存(工作内存)

(1)主内存

所有线程创建的实例对象都存放在主内存中,不管该实例对象是成员变量还是方法中的本地变量(也称局部变量)

(2)本地内存

每个线程都有一个私有的本地内存来存储共享变量的副本,并且,每个线程只能访问自己的本地内存,无法访问其他线程的本地内存。本地内存是 JMM 抽象出来的一个概念,存储了主内存中的共享变量副本。

5. JMM8种同步操作

(1)锁定(lock)

作用于主内存中的变量,将他标记为一个线程独享变量。

(2)解锁(unlock)

作用于主内存中的变量,解除变量的锁定状态,被解除锁定状态的变量才能被其他线程锁定。

(3)read(读取)

作用于主内存的变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的 load 动作使用。

(4)load(载入)

把 read 操作从主内存中得到的变量值放入工作内存的变量的副本中。

(5)use(使用)

把工作内存中的一个变量的值传给执行引擎,每当虚拟机遇到一个使用到变量的指令时都会使用该指令。

(6)assign(赋值)

作用于工作内存的变量,它把一个从执行引擎接收到的值赋给工作内存的变量,每当虚拟机遇到一个给变量赋值的字节码指令时执行这个操作。

(7)store(存储)

作用于工作内存的变量,它把工作内存中一个变量的值传送到主内存中,以便随后的 write 操作使用。

(8)write(写入)

作用于主内存的变量,它把 store 操作从工作内存中得到的变量的值放入主内存的变量中。

6.happens-before原则

6.1 为什么要happens-before原则?

(1)happens-before 原则的诞生是为了程序员和编译器、处理器之间的平衡。

(2)程序员追求的是易于理解和编程的强内存模型,遵守既定规则编码即可。编译器和处理器追求的是较少约束的弱内存模型,让它们尽己所能地去优化性能,让性能最大化。

6.2 happens-before原则设计思想

(1)为了对编译器和处理器的约束尽可能少,只要不改变程序的执行结果(单线程程序和正确执行的多线程程序),编译器和处理器怎么进行重排序优化都行。

(2)对于会改变程序执行结果的重排序,JMM 要求编译器和处理器必须禁止这种重排序。

6.3 happens-before常见规则

(1)程序顺序规则

一个线程内,按照代码顺序,书写在前面的操作 happens-before 于书写在后面的操作;

(2)解锁规则

解锁 happens-before 于加锁;

(3)volatile 变量规则

对一个 volatile 变量的写操作 happens-before 于后面对这个 volatile 变量的读操作。

说白了就是对 volatile 变量的写操作的结果对于发生于其后的任何操作都是可见的。

(4)传递规则

如果 A happens-before B,且 B happens-before C,那么 A happens-before C;

(5)线程启动规则

Thread 对象的 start()方法 happens-before 于此线程的每一个动作。

&& 如果两个操作不满足上述任意一个 happens-before 规则,那么这两个操作就没有顺序的保障,JVM 可以对这两个操作进行重排序。

6.4 happens-before 和 JMM 什么关系?

Java之并发编程(二)二、Java内存模型(重要)三、Java线程池(重要)四、Java 线程池最佳实践

7. 并发编程三个重要特性

7.1 原子性

(1)一次操作或者多次操作,要么所有的操作全部都得到执行并且不会受到任何因素的干扰而中断,要么都不执行。

(2)Java 中,可以借助synchronized 、各种 Lock 以及各种原子类实现原子性。

(3)synchronized 和各种 Lock 可以保证任一时刻只有一个线程访问该代码块,因此可以保障原子性。各种原子类是利用CAS(compare and swap)操作(可能也会用到 volatile或者final关键字)来保证原子操作。

7.2 可见性

(1)当一个线程对共享变量进行了修改,那么另外的线程都是立即可以看到修改后的最新值。

(2)Java 中,可以借助synchronized 、volatile 以及各种 Lock 实现可见性。

(3)果我们将变量声明为 volatile ,这就指示 JVM,这个变量是共享且不稳定的,每次使用它都到主存中进行读取。

7.3 有序性

(1)由于指令重排序问题,代码的执行顺序未必就是编写代码时候的顺序。

(2)在 Java 中,volatile 关键字可以禁止指令进行重排序优化。

8. JMM内存模型总结

Java之并发编程(二)二、Java内存模型(重要)三、Java线程池(重要)四、Java 线程池最佳实践

三、Java线程池(重要)

1.概念

提前创建好多个线程,放入线程池中,使用时直接获取,使用完放回池中

1.线程池的作用

(1)提高响应速度:

减少了创建新线程的时间

(1)降低资源消耗:

重复利用线程池中线程,不需要每次创建

(1)便于线程管理:

可以进行线程的统一分配、调优和监控

2.Executor 框架

2.1 概念

(1)

工具类、线程池的工厂类框架,用于创建并返回不同类型的线程池

(2)

Executor 框架不仅包括了线程池的管理,还提供了线程工厂、队列以及拒绝策略等,Executor 框架让并发编程变得更加简单。

2.2 三大组成

2.2.1 任务(Runnable /Callable)

执行任务需要实现的 Runnable 接口 或 Callable接口。Runnable 接口或 Callable 接口 实现类都

可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。

2.2.2 任务的执行(Executor)

包括任务执行机制的核心接口 Executor ,以及继承自 Executor 接口的 ExecutorService 接口。ThreadPoolExecutor 和 ScheduledThreadPoolExecutor 这两个关键类实现了 ExecutorService 接口。

Java之并发编程(二)二、Java内存模型(重要)三、Java线程池(重要)四、Java 线程池最佳实践

2.2.3 异步计算的结果(Future)

(1)Future 接口以及 Future 接口的实现类 FutureTask 类都可以代表异步计算的结果。

(2)当我们把 Runnable接口 或 Callable 接口 的实现类提交给 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行,调用 submit() 方法时会返回一个 FutureTask 对象。

2.3 Executor 框架的使用

如图:

Java之并发编程(二)二、Java内存模型(重要)三、Java线程池(重要)四、Java 线程池最佳实践

2.3.1 主线程首先要创建实现Runnable或者Callable接口的任务对象。

2.3.2 把创建完成的实现 Runnable/Callable接口的对象直接交给ExcutorService执行。

ExecutorService.execute(Runnable command))或者也可以把 Runnable 对象或Callable 对象提交给 ExecutorService 执行(ExecutorService.submit(Runnable task)或 ExecutorService.submit(Callable task))

2.3.3 如果执行 ExecutorService.submit(…),ExecutorService 将返回一个实现Future接口的对象。

由于 FutureTask 实现了 Runnable,我们也可以创建 FutureTask,然后直接交给 ExecutorService 执行。

2.3.4 最后,主线程可以执行 FutureTask.get()方法来等待任务执行完成。

主线程也可以执行 FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行

3.ThreadPoolExecutor 类(重要)

线程池实现类 ThreadPoolExecutor 是 Executor 框架最核心的类。

3.1 ThreadPoolExecutor 类分析

ThreadPoolExecutor 类中提供的四个构造方法。我们来看最长的那个,其余三个都是在这个构造方法的基础上产生(其他几个构造方法说白点都是给定某些默认参数的构造方法比如默认制定拒绝策略是什么)。

/**
     * 用给定的初始参数创建一个新的ThreadPoolExecutor。
     */
    public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
                              int maximumPoolSize,//线程池的最大线程数
                              long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
                              TimeUnit unit,//时间单位
                              BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
                              ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
                              RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
                               ) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
           

3.2 ThreadPoolExecutor3个最重要的参数

3.2.1 corePoolSize:核心线程池大小

最小可以同时运行的线程数量。

3.2.2 maximumPoolSize :最大线程数

当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。

3.2.3 workQueue: 阻塞队列

(1)当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。

(2)用来存放待执行的任务,均为线程安全,如果队列满了,而任务还再持续进入,则会创建新的线程。

3.3 ThreadPoolExecutor 其他常见参数

3.3.1 keepAliveTime:最长存活时间

超出核心线程数之外的线程,没有新任务时最多保持多长时间后会终止

3.3.2 unit : keepAliveTime 参数的时间单位

3.3.3 threadFactory :executor 创建新线程的时候会用到。

用来生产线程执行任务的线程工厂,默认正常优先级、非守护线程

3.3.4 handler :饱和(拒绝)策略。

如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时,ThreadPoolTaskExecutor 定义如下策略(如:ThreadPoolExecutor.AbortPolicy):

(1)AbortPolicy

抛出 RejectedExecutionException异常来拒绝新任务的处理。

(1)CallerRunsPolicy

重试提交当前的任务,即再次调用运行该任务的execute()方法。

适用于:承受一定延迟并且要求任何一个任务请求都要被执行

(1)DiscardPolicy

不处理新任务,直接丢弃掉

(1)DiscardOldestPolicy

丢弃最早的未处理的任务请求。

&& 如图为常见参数图:

Java之并发编程(二)二、Java内存模型(重要)三、Java线程池(重要)四、Java 线程池最佳实践

3.4 创建线程池方式

推荐使用 ThreadPoolExecutor 构造函数创建线程池

3.4.1 为什么推荐ThreadPoolExecutor

首先,如果不使用线程池,有可能会造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。

(1)导致OOM

通过 ThreadPoolExecutor 构造函数的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

说白了就是:使用有界队列,控制线程创建数量。

如通过Executors 去创建线程池,返回线程池对象弊端如下:

<1> FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。

<2> CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。

(2)根据具体业务去动态配置相关参数及策略等

<1> 实际使用中需要根据自己机器的性能、业务场景来手动配置线程池的参数比如核心线程数、使用的任务队列、饱和策略等等。

<2> 我们应该显示地给我们的线程池命名,这样有助于我们定位问题。

3.4.2 推荐的创建线程池方式

(1)通过ThreadPoolExecutor构造函数实现(推荐)

Java之并发编程(二)二、Java内存模型(重要)三、Java线程池(重要)四、Java 线程池最佳实践

(2)通过 Executor 框架的工具类 Executors 来实现我们可以创建三种类型的 ThreadPoolExecutor:FixedThreadPool、SingleThreadExecutor、CachedThreadPool

Java之并发编程(二)二、Java内存模型(重要)三、Java线程池(重要)四、Java 线程池最佳实践

4.ThreadPoolExecutor 原理分析

4.1 ThreadPoolExecutor的execute(worker)方法

该方法提交一个任务到线程池中去

Java之并发编程(二)二、Java内存模型(重要)三、Java线程池(重要)四、Java 线程池最佳实践
// 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)
   private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    private static int workerCountOf(int c) {
        return c & CAPACITY;
    }
    //任务队列
    private final BlockingQueue<Runnable> workQueue;

    public void execute(Runnable command) {
        // 如果任务为null,则抛出异常。
        if (command == null)
            throw new NullPointerException();
        // ctl 中保存的线程池当前的一些状态信息
        int c = ctl.get();

        //  下面会涉及到 3 步 操作
        // 1.首先判断当前线程池中执行的任务数量是否小于 corePoolSize
        // 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 2.如果当前执行的任务数量大于等于 corePoolSize 的时候就会走到这里
        // 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态并且队列可以加入任务,该任务才会被加入进去
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
            if (!isRunning(recheck) && remove(command))
                reject(command);
                // 如果当前线程池为空就新创建一个线程并执行。
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
        //如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
        else if (!addWorker(command, false))
            reject(command);
    }

           

4.2 addWorker

该方法主要用来创建新的工作线程,如果返回 true 说明创建和启动工作线程成功,否则的话返回的就是 false。

// 全局锁,并发操作必备
    private final ReentrantLock mainLock = new ReentrantLock();
    // 跟踪线程池的最大大小,只有在持有全局锁mainLock的前提下才能访问此集合
    private int largestPoolSize;
    // 工作线程集合,存放线程池中所有的(活跃的)工作线程,只有在持有全局锁mainLock的前提下才能访问此集合
    private final HashSet<Worker> workers = new HashSet<>();
    //获取线程池状态
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    //判断线程池的状态是否为 Running
    private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }


    /**
     * 添加新的工作线程到线程池
     * @param firstTask 要执行
     * @param core参数为true的话表示使用线程池的基本大小,为false使用线程池最大大小
     * @return 添加成功就返回true否则返回false
     */
   private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            //这两句用来获取线程池的状态
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
               //获取线程池中工作的线程的数量
                int wc = workerCountOf(c);
                // core参数为false的话表明队列也满了,线程池大小变为 maximumPoolSize
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
               //原子操作将workcount的数量加1
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // 如果线程的状态改变了就再次执行上述操作
                c = ctl.get();
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        // 标记工作线程是否启动成功
        boolean workerStarted = false;
        // 标记工作线程是否创建成功
        boolean workerAdded = false;
        Worker w = null;
        try {

            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
              // 加锁
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                   //获取线程池状态
                    int rs = runStateOf(ctl.get());
                   //rs < SHUTDOWN 如果线程池状态依然为RUNNING,并且线程的状态是存活的话,就会将工作线程添加到工作线程集合中
                  //(rs=SHUTDOWN && firstTask == null)如果线程池状态小于STOP,也就是RUNNING或者SHUTDOWN状态下,同时传入的任务实例firstTask为null,则需要添加到工作线程集合和启动新的Worker
                   // firstTask == null证明只新建线程而不执行任务
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                       //更新当前工作线程的最大容量
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                      // 工作线程是否启动成功
                        workerAdded = true;
                    }
                } finally {
                    // 释放锁
                    mainLock.unlock();
                }
                 如果成功添加工作线程,则调用Worker内部的线程实例t的Thread#start()方法启动真实的线程实例
                if (workerAdded) {
                    t.start();
                  /// 标记线程启动成功
                    workerStarted = true;
                }
            }
        } finally {
           // 线程启动失败,需要从工作线程中移除对应的Worker
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

           

4.3 示例代码:Runnable+ThreadPoolExecutor,如下:

我们在代码中模拟了 10 个任务,我们配置的核心线程数为 5 、等待队列容量为 100 ,所以每次只可能存在 5 个任务同时执行,剩下的 5 个任务会被放到等待队列中去。当前的 5 个任务中如果有任务被执行完了,线程池就会去拿新的任务执行。

4.3.1 MyRunnable.java

首先创建一个 Runnable 接口的实现类(当然也可以是 Callable 接口,我们上面也说了两者的区别。)

import java.util.Date;

/**
 * 这是一个简单的Runnable类,需要大约5秒钟来执行其任务。
 * @author shuang.kou
 */
public class MyRunnable implements Runnable {

    private String command;

    public MyRunnable(String s) {
        this.command = s;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " Start. Time = " + new Date());
        processCommand();
        System.out.println(Thread.currentThread().getName() + " End. Time = " + new Date());
    }

    private void processCommand() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public String toString() {
        return this.command;
    }
}

           

4.3.2 ThreadPoolExecutorDemo.java

编写测试程序,我们这里以阿里巴巴推荐的使用 ThreadPoolExecutor 构造函数自定义参数的方式来创建线程池。

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutorDemo {

    private static final int CORE_POOL_SIZE = 5;
    private static final int MAX_POOL_SIZE = 10;
    private static final int QUEUE_CAPACITY = 100;
    private static final Long KEEP_ALIVE_TIME = 1L;
    public static void main(String[] args) {

        //使用阿里巴巴推荐的创建线程池的方式
        //通过ThreadPoolExecutor构造函数自定义参数创建
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(QUEUE_CAPACITY),
                new ThreadPoolExecutor.CallerRunsPolicy());

        for (int i = 0; i < 10; i++) {
            //创建WorkerThread对象(WorkerThread类实现了Runnable 接口)
            Runnable worker = new MyRunnable("" + i);
            //执行Runnable
            executor.execute(worker);
        }
        //终止线程池
        executor.shutdown();
        while (!executor.isTerminated()) {
        }
        System.out.println("Finished all threads");
    }
}

           

上面的代码指定了:

corePoolSize: 核心线程数为 5。

maximumPoolSize : 最大线程数 10

keepAliveTime : 等待时间为 1L。

unit: 等待时间的单位为 TimeUnit.SECONDS。

workQueue: 任务队列为 ArrayBlockingQueue,并且容量为 100;

handler: 饱和策略为 CallerRunsPolicy。

四、Java 线程池最佳实践

1.使用 ThreadPoolExecutor 的构造函数声明线程池。

原因上面已经写过,点击下面跳转:

3.4.1 为什么推荐ThreadPoolExecutor

2.监测线程池运行状态

2.1 你可以通过一些手段来检测线程池的运行状态比如 SpringBoot 中的 Actuator 组件。

2.2 你也可以利用 ThreadPoolExecutor 的相关 API 做一个简陋的监控

如下API:

printThreadPoolStatus()会每隔一秒打印出线程池的线程数、活跃线程数、完成的任务数、以及队列中的任务数。

/**
     * 打印线程池的状态
     * @param threadPool 线程池对象
     */
    public static void printThreadPoolStatus(ThreadPoolExecutor threadPool) {
        ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1, createThreadFactory("print-images/thread-pool-status", false));
        scheduledExecutorService.scheduleAtFixedRate(() -> {
            log.info("=========================");
            log.info("ThreadPool Size: [{}]", threadPool.getPoolSize());
            log.info("Active Threads: {}", threadPool.getActiveCount());
            log.info("Number of Tasks : {}", threadPool.getCompletedTaskCount());
            log.info("Number of Tasks in Queue: {}", threadPool.getQueue().size());
            log.info("=========================");
        }, 0, 1, TimeUnit.SECONDS);
    }

           

3.建议不同类别的业务使用不同的线程池

一般建议是不同的业务使用不同的线程池,配置线程池的时候根据当前业务的情况对当前线程池进行配置,因为不同的业务的并发以及对资源的使用情况都不同,重心优化系统性能瓶颈相关的业务。

4.别忘记给线程池命名

给线程池里的线程命名通常有下面两种方式:

4.1 利用 guava 的 ThreadFactoryBuilder

ThreadFactory threadFactory = new ThreadFactoryBuilder()
                        .setNameFormat(threadNamePrefix + "-%d")
                        .setDaemon(true).build();
ExecutorService threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory)

           

4.2 自己实现 ThreadFactor

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * 线程工厂,它设置线程名称,有利于我们定位问题。
 */
public final class NamingThreadFactory implements ThreadFactory {

    private final AtomicInteger threadNum = new AtomicInteger();
    private final ThreadFactory delegate;
    private final String name;

    /**
     * 创建一个带名字的线程池生产工厂
     */
    public NamingThreadFactory(ThreadFactory delegate, String name) {
        this.delegate = delegate;
        this.name = name; // TODO consider uniquifying this
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = delegate.newThread(r);
        t.setName(name + " [#" + threadNum.incrementAndGet() + "]");
        return t;
    }

}

           

5.正确配置线程池参数

5.1 线程池设置

5.1.1 过大过小引发的问题

(1)过小

如果同一时间有大量任务/请求需要处理,可能会导致大量的请求/任务在任务队列中排队等待执行,甚至会出现任务队列满了之后任务/请求无法处理的情况,或者大量任务堆积在任务队列导致 OOM。这样很明显是有问题的,CPU 根本没有得到充分利用。

(2)过大

如果我们设置线程数量太大,大量线程可能会同时在争取 CPU 资源,这样会导致大量的上下文切换,从而增加线程的执行时间,影响了整体执行效率。

5.1.2 线程池大小设置公式

(1) 标准公式

最佳线程数 = N(CPU 核心数)∗(1+WT(线程等待时间)/ST(线程计算时间)),其中 WT(线程等待时间)=线程运行总时间 - ST(线程计算时间)。

&& 我们可以通过 JDK 自带的工具 VisualVM 来查看 WT/ST 比例

(2) 简单适用的公式

<1> 先判断是CPU 密集任务还是 IO 密集任务

CPU 密集型:

简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。

IO 密集型:

涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。

<2> 然后选择对应的公式

CPU 密集型任务: N+1

I/O 密集型任务: 2N

5.2 ThreadPoolExecutor3个最重要的参数

上面已经写过,点击下面跳转:

3.2 ThreadPoolExecutor3个最重要的参数

上一篇跳转—Java之并发编程(一)                 下一篇跳转—Java之并发编程(三)

本篇文章主要参考链接如下:

参考链接1-King说Java

参考链接2-JavaGuide

持续更新中…

随心所往,看见未来。Follow your heart,see light!

欢迎点赞、关注、留言,一起学习、交流!