天天看点

《Java 并发编程的艺术》

并发编程的挑战

1.1 上下文切换

CPU通过时间片分配算法来循环执行任务,当前任务执行一个时间片后会切换到下一个

任务。但是,在切换前会保存上一个任务的状态,以便下次切换回这个任务时,可以再加载这

个任务的状态。所以任务从保存到再加载的过程就是一次上下文切换。

如何减少上下文切换:

  • 无锁并发编程。多线程竞争锁时,会引起上下文切换,所以多线程处理数据时,可以用一

    些办法来避免使用锁,如将数据的ID按照Hash算法取模分段,不同的线程处理不同段的数据。

  • CAS算法。Java的Atomic包使用CAS算法来更新数据,而不需要加锁。
  • 使用最少线程和使用协程。避免创建不需要的线程,比如任务很少,但是创建了很多线程来处理,这

    样会造成大量线程都处于等待状态。

1.2 死锁

现在我们介绍避免死锁的几个常见方法。

  • 避免一个线程同时获取多个锁。
  • ·避免一个线程在锁内同时占用多个资源,尽量保证每个锁只占用一个资源。
  • 尝试使用定时锁,使用lock.tryLock(timeout)来替代使用内部锁机制。
  • 对于数据库锁,加锁和解锁必须在一个数据库连接里,否则会出现解锁失败的情况。

1.3 资源限制的挑战

  • 资源限制是指在进行并发编程时,程序的执行速度受限于计算机硬件资源或软件资源。
  • 在并发编程中,将代码执行速度加快的原则是将代码中串行执行的部分变成并发执行,但是如果将某段串行的代码并发执行,因为受限于资源,仍然在串行执行,这时候程序不仅不会加快执行,反而会更慢,因为增加了上下文切换和资源调度的时间。
  • 对于硬件资源限制,可以考虑使用集群并行执行程序。
  • 对于软件资源限制,可以考虑使用资源池将资源复用。比如使用连接池将数据库和Socket连接复用,或者在调用对方webservice接口获取数据时,只建立一个连接。

Java并发机制的底层实现原理

2.1 volatile的应用

  1. volatile的定义与实现原理

    下面来具体讲解volatile的两条实现原则。

    1)Lock前缀指令会引起处理器缓存回写到内存。

    2)一个处理器的缓存回写到内存会导致其他处理器的缓存无效。

2.2 synchronized的实现原理与应用

先来看下利用synchronized实现同步的基础:Java中的每一个对象都可以作为锁。具体表现

为以下3种形式。

·对于普通同步方法,锁是当前实例对象。

·对于静态同步方法,锁是当前类的Class对象。

·对于同步方法块,锁是Synchonized括号里配置的对象。

代码块同步是使用monitorenter和monitorexit指令实现的。

方法级的同步是隐式,即无需通过字节码指令来控制的,它实现在方法调用和返回操作之中。JVM可以从方法常量池中的方法表结构(method_info Structure) 中的 ACC_SYNCHRONIZED 访问标志区分一个方法是否同步方法。当方法调用时,调用指令将会 检查方法的ACC_SYNCHRONIZED 访问标志是否被设置,如果设置了,执行线程将先持有monitor(虚拟机规范中用的是管程一词),然后再执行方法,最后再方法完成(无论是正常完成还是非正常完成)时释放monitor。在方法执行期间,执行线程持有了monitor,其他任何线程都无法再获得同一个monitor。如果一个同步方法执行期间抛出了异常,并且在方法内部无法处理此异常,那这个同步方法所持有的monitor将在异常抛到同步方法之外时自动释放。

Java对象头

《Java 并发编程的艺术》

在运行期间,Mark Word里存储的数据会随着锁标志位的变化而变化。Mark Word可能变化为存储以下4种数据,如表2-4所示。

《Java 并发编程的艺术》

锁的升级与对比

在Java SE 1.6中,锁一共有4种状态,级别从低到高依次是:无锁状态、偏向锁状态、轻量级锁状态和重量级锁状态。

偏向锁

当一个线程访问同步块并获取锁时,会在对象头和栈帧中的锁记录里存储锁偏向的线程ID,以后该线程在进入和退出同步块时不需要进行CAS操作来加锁和解锁,只需简单地测试一下对象头的Mark

Word里是否存储着指向当前线程的偏向锁。如果测试成功,表示线程已经获得了锁。如果测试失败,则需要再测试一下Mark

Word中偏向锁的标识是否设置成1(表示当前是偏向锁):如果没有设置,则使用CAS竞争锁;如果设置了,则尝试使用CAS将对象头的偏向锁指向当前线程。

轻量级锁

轻量级锁加锁:

线程在执行同步块之前,JVM会先在当前线程的栈桢中创建用于存储锁记录的空间,并将对象头中的Mark Word复制到锁记录中,官方称为Displaced Mark Word。然后线程尝试使用CAS将对象头中的Mark Word替换为指向锁记录的指针。如果成功当前线程获得锁,如果失败,表示其他线程竞争锁,当前线程便尝试使用自旋来获取锁。

轻量级锁解锁:

轻量级解锁时,会使用原子的CAS操作将Displaced Mark Word替换回到对象头,如果成功,则表示没有竞争发生。如果失败,表示当前锁存在竞争,锁就会膨胀成重量级锁。

锁的优缺点对比

《Java 并发编程的艺术》

2.3 原子操作的实现原理

原子(atomic)本意是“不能被进一步分割的最小粒子”,而原子操作(atomic operation)意为“不可被中断的一个或一系列操作”。在多处理器上实现原子操作就变得有点复杂。

术语定义

CPU术语定义

《Java 并发编程的艺术》

.处理器如何实现原子操作

  • 使用总线锁保证原子性

    处理器使用总线锁就是来解决这个问题的。所谓总线锁就是使用处理器提供的一个LOCK#信号,当一个处理器在总线上输出此信号时,其他处理器的请求将被阻塞住,那么该处理器可以独占共享内存。

  • 使用缓存锁保证原子性

    所谓“缓存锁定”是指内存区域如果被缓存在处理器的缓存行中,并且在Lock操作期间被锁定,那么当它执行锁操作回写到内存时,处理器不在总线上声言LOCK#信号,而是修改内部的内存地址,并允许它的缓存一致性机制来保证操作的原子性。

但是有两种情况下处理器不会使用缓存锁定。

第一种情况是:当操作的数据不能被缓存在处理器内部,或操作的数据跨多个缓存行(cache line)时,则处理器会调用总线锁定。

第二种情况是:有些处理器不支持缓存锁定。对于Intel 486和Pentium处理器,就算锁定的内存区域在处理器的缓存行中也会调用总线锁定。

Java如何实现原子操作

1.使用循环CAS实现原子操作

在Java中可以通过锁和循环CAS的方式来实现原子操作。

public class Counter {
    private AtomicInteger atomicI = new AtomicInteger(0);
    private int i = 0;
    public static void main(String[] args) {
        final Counter cas = new Counter();
        List<Thread> ts = new ArrayList<Thread>(600);
        long start = System.currentTimeMillis();
        for (int j = 0; j < 100; j++) {
            Thread t = new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int i = 0; i < 10000; i++) {
                        cas.count();
                        cas.safeCount();
                    }
                }
            });
            ts.add(t);
        }
        for (Thread t : ts) {
            t.start();
        }
// 等待所有线程执行完成
        for (Thread t : ts) {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println(cas.i);
        System.out.println(cas.atomicI.get());
        System.out.println(System.currentTimeMillis() - start);
    }
    /** * 使用CAS实现线程安全计数器 */
    private void safeCount() {
        for (;;) {
            int i = atomicI.get();
            boolean suc = atomicI.compareAndSet(i, ++i);
            if (suc) {
                break;
            }
        }
    }
    /**
     * 非线程安全计数器
     */
    private void count() {
        i++;
    }
}
           

从Java 1.5开始,JDK的并发包里提供了一些类来支持原子操作,如AtomicBoolean(用原子方式更新的boolean值)、AtomicInteger(用原子方式更新的int值)和AtomicLong(用原子方式更新的long值)。这些原子包装类还提供了有用的工具方法,比如以原子的方式将当前值自增1和自减1。

2.CAS实现原子操作的三大问题

  • ABA问题

    A→B→A 值已经被修改 ,但是实际检测没有变化。

    ABA问题的解决思路就是使用版本号。在变量前面追加上版本号,每次变量更新的时候把版本号加1,那么A→B→A就会变成1A→2B→3A。

  • 循环时间长开销大

    自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。

  • 只能保证一个共享变量的原子操作

    当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁。

    还有一个取巧的办法,就是把多个共享变量合并成一个共享变量来

    操作。比如,有两个共享变量i=2,j=a,合并一下ij=2a,然后用CAS来操作ij。

  • 使用锁机制实现原子操作

    锁机制保证了只有获得锁的线程才能够操作锁定的内存区域。JVM内部实现了很多种锁机制,有偏向锁、轻量级锁和互斥锁。

    有意思的是除了偏向锁,JVM实现锁的方式都用了循环CAS,即当一个线程想进入同步块的时候使用循环CAS的方式来获取锁,当它退出同步块的时候使用循环CAS释放锁。

Java内存模型

3.1 Java内存模型的基础

3.1.1 并发编程模型的两个关键问题

在并发编程中,需要处理两个关键问题:线程之间如何通信及线程之间如何同步(这里的线程是指并发执行的活动实体)。通信是指线程之间以何种机制来交换信息。在命令式编程中,线程之间的通信机制有两种:共享内存和消息传递。

在共享内存的并发模型里,线程之间共享程序的公共状态,通过写-读内存中的公共状态进行隐式通信。在消息传递的并发模型里,线程之间没有公共状态,线程之间必须通过发送消息来显式进行通信。

同步是指程序中用于控制不同线程间操作发生相对顺序的机制。在共享内存并发模型里,同步是显式进行的。程序员必须显式指定某个方法或某段代码需要在线程之间互斥执行。

在消息传递的并发模型里,由于消息的发送必须在消息的接收之前,因此同步是隐式进行的。

3.1.2 Java内存模型的抽象结构

在Java中,所有实例域、静态域和数组元素都存储在堆内存中,堆内存在线程之间共享(本章用“共享变量”这个术语代指实例域,静态域和数组元素)。

局部变量(Local Variables),方法定义参数(Formal Method Parameters)和异常处理器参数(Exception

Handler Parameters)不会在线程之间共享,它们不会有内存可见性问题,也不受内存模型的影响。

Java线程之间的通信由Java内存模型(本文简称为JMM)控制,JMM决定一个线程对共享变量的写入何时对另一个线程可见。

Java内存模型的抽象结构示意图:

《Java 并发编程的艺术》

从上图来看,如果线程A与线程B之间要通信的话,必须要经历下面2个步骤。

1)线程A把本地内存A中更新过的共享变量刷新到主内存中去。

2)线程B到主内存中去读取线程A之前已更新过的共享变量。

《Java 并发编程的艺术》

从整体来看,这两个步骤实质上是线程A在向线程B发送消息,而且这个通信过程必须要经过主内存。JMM通过控制主内存与每个线程的本地内存之间的交互,来为Java程序员提供内存可见性保证。

3.1.3 从源代码到指令序列的重排序

在执行程序时,为了提高性能,编译器和处理器常常会对指令做重排序。重排序分3种类型。

1)编译器优化的重排序。编译器在不改变单线程程序语义的前提下,可以重新安排语句的执行顺序。

2)指令级并行的重排序。现代处理器采用了指令级并行技术(Instruction-LevelParallelism,ILP)来将多条指令重叠执行。如果不存在数据依赖性,处理器可以改变语句对应机器指令的执行顺序。

3)内存系统的重排序。由于处理器使用缓存和读/写缓冲区,这使得加载和存储操作看上去可能是在乱序执行。

《Java 并发编程的艺术》
上述的1属于编译器重排序,2和3属于处理器重排序。这些重排序可能会导致多线程程序出现内存可见性问题。对于编译器,JMM的编译器重排序规则会禁止特定类型的编译器重排序(不是所有的编译器重排序都要禁止)。对于处理器重排序,JMM的处理器重排序规则会要求Java编译器在生成指令序列时,插入特定类型的内存屏障(Memory Barriers,Intel称之为Memory Fence)指令,通过内存屏障指令来禁止特定类型的处理器重排序。

JMM属于语言级的内存模型,它确保在不同的编译器和不同的处理器平台之上,通过禁止特定类型的编译器重排序和处理器重排序,为程序员提供一致的内存可见性保证。

3.1.4 并发编程模型的分类

虽然写缓冲区有这么多好处,但每个处理器上的写缓冲区,仅仅对它所在的处理器可见。

这个特性会对内存操作的执行顺序产生重要的影响:处理器对内存的读/写操作的执行顺序,不一定与内存实际发生的读/写操作顺序一致!

为了保证内存可见性,Java编译器在生成指令序列的适当位置会插入内存屏障指令来禁止特定类型的处理器重排序。JMM把内存屏障指令分为4类:

《Java 并发编程的艺术》

3.1.5 happens-before简介

在JMM中,如果一个操作执行的结果需要对另一个操作可见,那么这两个操作之间必须要存在happens-before关系。这里提到的两个操作既可以是在一个线程之内,也可以是在不同线程之间。

  • 程序顺序规则:一个线程中的每个操作,happens-before于该线程中的任意后续操作。

    两个操作之间具有happens-before关系,并不意味着前一个操作必须要在后一个操作之前执行!happens-before仅仅要求前一个操作(执行的结果)对后一个操作可见。

  • 监视器锁规则:对一个锁的解锁,happens-before于随后对这个锁的加锁。
  • volatile变量规则:对一个volatile域的写,happens-before于任意后续对这个volatile域的读。

3.2 重排序

重排序是指编译器和处理器为了优化程序性能而对指令序列进行重新排序的一种手段。

3.2.1 数据依赖性

如果两个操作访问同一个变量,且这两个操作中有一个为写操作,此时这两个操作之间就存在数据依赖性。

数据依赖类型表:

《Java 并发编程的艺术》

编译器和处理器可能会对操作做重排序。编译器和处理器在重排序时,会遵守数据依赖性,编译器和处理器不会改变存在数据依赖关系的两个操作的执行顺序。

3.2.2 as-if-serial语义

as-if-serial语义的意思是:不管怎么重排序(编译器和处理器为了提高并行度),(单线程)程序的执行结果不能被改变。编译器、runtime和处理器都必须遵守as-if-serial语义。

double pi = 3.14; // A
double r = 1.0; // B
double area = pi * r * r; // C
           

依赖关系是:

《Java 并发编程的艺术》

A和B之间没有数据依赖关系,编译器和处理器可以重排序A和B之间的执行顺序。

A——B——C

B——A——C

as-if-serial语义把单线程程序保护了起来,遵守as-if-serial语义的编译器、runtime和处理器共同为编写单线程程序的程序员创建了一个幻觉:单线程程序是按程序的顺序来执行的。as-if-serial语义使单线程程序员无需担心重排序会干扰他们,也无需担心内存可见性问题。

3.2.3 程序顺序规则

根据happens-before的程序顺序规则,上面计算圆的面积的示例代码存在3个happens-

before关系。

1)A happens-before B。

2)B happens-before C。

3)A happens-before C。

这里的第3个happens-before关系,是根据happens-before的传递性推导出来的。这里A happens-before B,但实际执行时B却可以排在A之前执行(看上面的重排序后的执行顺序)。如果A happens-before B,JMM并不要求A一定要在B之前执行。JMM仅仅要求前一个操作(执行的结果)对后一个操作可见,且前一个操作按顺序排在第二个操作之前。这里操作A的执行结果不需要对操作B可见;而且重排序操作A和操作B后的执行结果,与操作A和操作B按happens-before顺序执行的结果一致。在这种情况下,JMM会认为这种重排序并不非法(notillegal),JMM允许这种重排序。

3.2.4 重排序对多线程的影响

class ReorderExample {
    int a = 0;
    boolean flag = false;

    public void writer() {
        a = 1; // 1
        flag = true; // 2
    }

    public void reader() {
        if (flag) { // 3
            int i = a * a; // 4
        }
    }
}
           

操作1和操作2做了重排序。程序执行时,线程A首先写标记变量flag,随后线程B读这个变量。由于条件判断为真,线程B将读取变量a。此时,变量a还没有被线程A写入,在这里多线程程序的语义被重排序破坏了!

《Java 并发编程的艺术》

操作3和操作4存在控制依赖关系。当代码中存在控制依赖性时,会影响指令序列执行的并行度。为此,编译器和处理器会采用猜测(Speculation)执行来克服控制相关性对并行度的影响。以处理器的猜测执行为例,执行线程B的处理器可以提前读取并计算a*a,然后把计算结果临时保存到一个名为重排序缓冲(Reorder Buffer,ROB)的硬件缓存中。当操作3的条件判断为真时,就把该计算结果写入变量i中。

猜测执行实质上对操作3和4做了重排序。重排序在这里破坏了多线程程序的语义!

《Java 并发编程的艺术》

3.3 顺序一致性

3.3.1 数据竞争与顺序一致性

当程序未正确同步时,就可能会存在数据竞争。Java内存模型规范对数据竞争的定义如下。

在一个线程中写一个变量,

在另一个线程读同一个变量,

而且写和读没有通过同步来排序。

JMM对正确同步的多线程程序的内存一致性做了如下保证。

如果程序是正确同步的,程序的执行将具有顺序一致性(Sequentially Consistent)——即程序的执行结果与该程序在顺序一致性内存模型中的执行结果相同。

3.3.2 顺序一致性内存模型

顺序一致性内存模型有两大特性。

1)一个线程中的所有操作必须按照程序的顺序来执行。

2)(不管程序是否同步)所有线程都只能看到一个单一的操作执行顺序。在顺序一致性内存模型中,每个操作都必须原子执行且立刻对所有线程可见。

假设有两个线程A和B并发执行。其中A线程有3个操作,它们在程序中的顺序是:

A1→A2→A3。B线程也有3个操作,它们在程序中的顺序是:B1→B2→B3。

3.3.3 同步程序的顺序一致性效果

class SynchronizedExample {
    int a = 0;
    boolean flag = false;

    public synchronized void writer() { // 获取锁
        a = 1;
        flag = true;
    } // 释放锁

    public synchronized void reader() { // 获取锁
        if (flag) {
            int i = a;

        } // 释放锁
    }
}
           

两个内存模型中的执行时序对比图:

《Java 并发编程的艺术》

3.3.4 未同步程序的执行特性

对于未同步或未正确同步的多线程程序,JMM只提供最小安全性:线程执行时读取到的值,要么是之前某个线程写入的值,要么是默认值(0,Null,False),JMM保证线程读操作读取到的值不会无中生有(Out Of Thin Air)的冒出来。

未同步程序在两个模型中的执行特性有如下几个差异。

1)顺序一致性模型保证单线程内的操作会按程序的顺序执行,而JMM不保证单线程内的操作会按程序的顺序执行(比如上面正确同步的多线程程序在临界区内的重排序)。

2)顺序一致性模型保证所有线程只能看到一致的操作执行顺序,而JMM不保证所有线程能看到一致的操作执行顺序。

3)JMM不保证对64位的long型和double型变量的写操作具有原子性,而顺序一致性模型保证对所有的内存读/写操作都具有原子性。

注意,在JSR-133之前的旧内存模型中,一个64位long/double型变量的读/写操作可以被拆分为两个32位的读/写操作来执行。从JSR-133内存模型开始(即从JDK5开始),仅仅只允许把一个64位long/double型变量的写操作拆分为两个32位的写操作来执行,任意的读操作在JSR-133中都必须具有原子性(即任意读操作必须要在单个读事务中执行)。

3.4 volatile的内存语义

3.4.1 volatile的特性

class VolatileFeaturesExample {
    volatile long vl = 0L; // 使用volatile声明64位的long型变量
    public void set(long l) {
        vl = l; // 单个volatile变量的写
    }
    public void getAndIncrement () {
        vl++; // 复合(多个)volatile变量的读/写
    }
    public long get() {
        return vl; // 单个volatile变量的读
    }
}
           

假设有多个线程分别调用上面程序的3个方法,这个程序在语义上和下面程序等价。

class VolatileFeaturesExample {
    long vl = 0L; // 64位的long型普通变量
    public synchronized void set(long l) { // 对单个的普通变量的写用同一个锁同步
        vl = l;
    }
    public void getAndIncrement () { // 普通方法调用
        long temp = get(); // 调用已同步的读方法
        temp += 1L; // 普通写操作
        set(temp); // 调用已同步的写方法
    }
    public synchronized long get() { // 对单个的普通变量的读用同一个锁同步
        return vl;
    }
}
           

锁的happens-before规则保证释放锁和获取锁的两个线程之间的内存可见性,这意味着对一个volatile变量的读,总是能看到(任意线程)对这个volatile变量最后的写入。

锁的语义决定了临界区代码的执行具有原子性。这意味着,即使是64位的long型和double型变量,只要它是volatile变量,对该变量的读/写就具有原子性。如果是多个volatile操作或类似于volatile++这种复合操作,这些操作整体上不具有原子性。

volatile变量自身具有下列特性。

  • 可见性。对一个volatile变量的读,总是能看到(任意线程)对这个volatile变量最后的写 入。
  • 原子性:对任意单个volatile变量的读/写具有原子性,但类似于volatile++这种复合操作不 具有原子性。

3.4.2 volatile写-读建立的happens-before关系

上面讲的是volatile变量自身的特性,对程序员来说,volatile对线程的内存可见性的影响比volatile自身的特性更为重要,也更需要我们去关注。

从JSR-133开始(即从JDK5开始),volatile变量的写-读可以实现线程之间的通信。

从内存语义的角度来说,volatile的写-读与锁的释放-获取有相同的内存效果:volatile写和锁的释放有相同的内存语义;volatile读与锁的获取有相同的内存语义。
class VolatileExample {
    int a = 0;
    volatile boolean flag = false;

    public void writer() {
        a = 1;              // 1
        flag = true;        // 2
    }

    public void reader() {
        if (flag) {         // 3
            int i = a;      // 4

        }
    }
}
           

happens-before关系:

《Java 并发编程的艺术》

3.4.3 volatile写-读的内存语义

volatile写的内存语义如下。

当写一个volatile变量时,JMM会把该线程对应的本地内存中的共享变量值刷新到主内存。

线程A执行volatile写后,共享变量的状态示意图。

《Java 并发编程的艺术》

volatile读的内存语义如下。

当读一个volatile变量时,JMM会把该线程对应的本地内存置为无效。线程接下来将从主内存中读取共享变量。

线程B读同一个volatile变量后,共享变量的状态示意图。

《Java 并发编程的艺术》

下面对volatile写和volatile读的内存语义做个总结。

  • 线程A写一个volatile变量,实质上是线程A向接下来将要读这个volatile变量的某个线程发出了(其对共享变量所做修改的)消息。
  • 线程B读一个volatile变量,实质上是线程B接收了之前某个线程发出的(在写这个volatile 变量之前对共享变量所做修改的)消息。
  • 线程A写一个volatile变量,随后线程B读这个volatile变量,这个过程实质上是线程A通过 主内存向线程B发送消息。

3.4.4 volatile内存语义的实现

为了实现volatile的内存语义,编译器在生成字节码时,会在指令序列中插入内存屏障来禁止特定类型的处理器重排序。

volatile重排序规则表

《Java 并发编程的艺术》
  • ·当第二个操作是volatile写时,不管第一个操作是什么,都不能重排序。这个规则确保 volatile写之前的操作不会被编译器重排序到volatile写之后。
  • 当第一个操作是volatile读时,不管第二个操作是什么,都不能重排序。这个规则确保volatile读之后的操作不会被编译器重排序到volatile读之前。
  • 当第一个操作是volatile写,第二个操作是volatile读时,不能重排序。

3.4.5 JSR-133为什么要增强volatile的内存语义

在JSR-133之前的旧Java内存模型中,虽然不允许volatile变量之间重排序,但旧的Java内存模型允许volatile变量与普通变量重排序。在旧的内存模型中,VolatileExample示例程序可能被重排序成下列时序来执行,如图3-23所示。

《Java 并发编程的艺术》

其结果就是:读线程B执行4时,不一定能看到写线程A在执行1时对共享变量的修改。

为了提供一种比锁更轻量级的线程之间通信的机制,JSR-133专家组决定增强volatile的内存语义:严格限制编译器和处理器对volatile变量与普通变量的重排序,确保volatile的写-读和锁的释放-获取具有相同的内存语义。

3.5 锁的内存语义

3.5.1 锁的释放-获取建立的happens-before关系

锁是Java并发编程中最重要的同步机制。锁除了让临界区互斥执行外,还可以让释放锁的线程向获取同一个锁的线程发送消息。

class MonitorExample {
    int a = 0;

    public synchronized void writer() {    // 1
        a++;          // 2
    }            // 3

    public synchronized void reader() {   // 4
        int i = a;        // 5
    }            // 6
}
           

假设线程A执行writer()方法,随后线程B执行reader()方法。根据happens-before规则,这个过程包含的happens-before关系可以分为3类。

《Java 并发编程的艺术》

在上图中,2 happens-before 5。因此,线程A在释放锁之前所有可见的共享变量,在线程B获取同一个锁之后,将立刻变得对B线程可见。

3.5.2 锁的释放和获取的内存语义

class MonitorExample {
    int a = 0;

    public synchronized void writer() {    // 1
        a++;          // 2
    }            // 3

    public synchronized void reader() {   // 4
        int i = a;        // 5
    }            // 6
}
           

当线程释放锁时,JMM会把该线程对应的本地内存中的共享变量刷新到主内存中。以上面的MonitorExample程序为例,A线程释放锁后,共享数据的状态示意图如图所示。

《Java 并发编程的艺术》

锁获取的状态示意图:

《Java 并发编程的艺术》

下面对锁释放和锁获取的内存语义做个总结。

  • 线程A释放一个锁,实质上是线程A向接下来将要获取这个锁的某个线程发出了(线程A对共享变量所做修改的)消息。
  • 线程B获取一个锁,实质上是线程B接收了之前某个线程发出的(在释放这个锁之前对共享变量所做修改的)消息。
  • 线程A释放锁,随后线程B获取这个锁,这个过程实质上是线程A通过主内存向线程B发送消息。

3.5.3 锁内存语义的实现

class ReentrantLockExample {
    int a = 0;
    ReentrantLock lock = new ReentrantLock();

    public void writer() {
        lock.lock();        // 获取锁
        try {
            a++;
        } finally {
            lock.unlock();  // 释放锁
        }
    }

    public void reader() {
        lock.lock();        // 获取锁
        try {
            int i = a;
        } finally {
            lock.unlock();  // 释放锁
        }
    }
}
           

CAS是如何同时具有volatile读和volatile写的内存语义的。

lock前缀提供的内存屏障效果。

1)确保对内存的读-改-写操作原子执行。在Pentium及Pentium之前的处理器中,带有lock前 缀的指令在执行期间会锁住总线,使得其他处理器暂时无法通过总线访问内存。很显然,这会 带来昂贵的开销。从Pentium 4、Intel Xeon及P6处理器开始,Intel使用缓存锁定(Cache Locking) 来保证指令执行的原子性。缓存锁定将大大降低lock前缀指令的执行开销。

2)禁止该指令,与之前和之后的读和写指令重排序。

3)把写缓冲区中的所有数据刷新到内存中。

上面的第2点和第3点所具有的内存屏障效果,足以同时实现volatile读和volatile写的内存 语义。

现在对公平锁和非公平锁的内存语义做个总结。

·公平锁和非公平锁释放时,最后都要写一个volatile变量state。

·公平锁获取时,首先会去读volatile变量。

·非公平锁获取时,首先会用CAS更新volatile变量,这个操作同时具有volatile读和volatile写的内存语义。

从本文对ReentrantLock的分析可以看出,锁释放-获取的内存语义的实现至少有下面两种方式。

1)利用volatile变量的写-读所具有的内存语义。

2)利用CAS所附带的volatile读和volatile写的内存语义。

3.5.4 concurrent包的实现

如果我们仔细分析concurrent包的源代码实现,会发现一个通用化的实现模式。

首先,声明共享变量为volatile。

然后,使用CAS的原子条件更新来实现线程之间的同步。

同时,配合以volatile的读/写和CAS所具有的volatile读和写的内存语义来实现线程之间的通信。

concurrent包的实现示意图:

《Java 并发编程的艺术》

3.6 final域的内存语义

3.6.6 final语义在处理器中的实现

上面我们提到,写final域的重排序规则会要求编译器在final域的写之后,构造函数return之前插入一个StoreStore障屏。读final域的重排序规则要求编译器在读final域的操作前面插入一个LoadLoad屏障。

3.6.7 JSR-133为什么要增强final的语义

在旧的Java内存模型中,一个最严重的缺陷就是线程可能看到final域的值会改变。

为了修补这个漏洞,JSR-133专家组增强了final的语义。通过为final域增加写和读重排序规则,可以为Java程序员提供初始化安全保证:只要对象是正确构造的(被构造对象的引用在构造函数中没有“逸出”),那么不需要使用同步(指lock和volatile的使用)就可以保证任意线程都能看到这个final域在构造函数中被初始化之后的值。

3.7 happens-before

3.7.1 JMM的设计

double pi = 3.14;  // A
double r = 1.0;    // B
double area = pi * r * r; // C
           

上面计算圆的面积的示例代码存在3个happens-before关系,如下。

·A happens-before B。

·B happens-before C。

·A happens-before C。

在3个happens-before关系中,2和3是必需的,但1是不必要的。因此,JMM把happens-before

要求禁止的重排序分为了下面两类。

·会改变程序执行结果的重排序。

·不会改变程序执行结果的重排序。

JMM对这两种不同性质的重排序,采取了不同的策略,如下。

·对于会改变程序执行结果的重排序,JMM要求编译器和处理器必须禁止这种重排序。

·对于不会改变程序执行结果的重排序,JMM对编译器和处理器不做要求(JMM允许这种

重排序)。

《Java 并发编程的艺术》

3.7.2 happens-before的定义

《JSR-133:Java Memory Model and Thread Specification》对happens-before关系的定义如下。

1)如果一个操作happens-before另一个操作,那么第一个操作的执行结果将对第二个操作可见,而且第一个操作的执行顺序排在第二个操作之前。

2)两个操作之间存在happens-before关系,并不意味着Java平台的具体实现必须要按照happens-before关系指定的顺序来执行。如果重排序之后的执行结果,与按happens-before关系来执行的结果一致,那么这种重排序并不非法(也就是说,JMM允许这种重排序)。

上面的1)是JMM对程序员的承诺。从程序员的角度来说,可以这样理解happens-before关系:如果A happens-before B,那么Java内存模型将向程序员保证——A操作的结果将对B可见,且A的执行顺序排在B之前。注意,这只是Java内存模型向程序员做出的保证!

上面的2)是JMM对编译器和处理器重排序的约束原则。正如前面所言,JMM其实是在遵循一个基本原则:只要不改变程序的执行结果(指的是单线程程序和正确同步的多线程程序),编译器和处理器怎么优化都行。JMM这么做的原因是:程序员对于这两个操作是否真的被重排序并不关心,程序员关心的是程序执行时的语义不能被改变(即执行结果不能被改变)。因此,happens-before关系本质上和as-if-serial语义是一回事。

3.7.3 happens-before规则

1)程序顺序规则:一个线程中的每个操作,happens-before于该线程中的任意后续操作。

2)监视器锁规则:对一个锁的解锁,happens-before于随后对这个锁的加锁。

3)volatile变量规则:对一个volatile域的写,happens-before于任意后续对这个volatile域的读。

4)传递性:如果A happens-before B,且B happens-before C,那么A happens-before C。

5)start()规则:如果线程A执行操作ThreadB.start()(启动线程B),那么A线程的

ThreadB.start()操作happens-before于线程B中的任意操作。

6)join()规则:如果线程A执行操作ThreadB.join()并成功返回,那么线程B中的任意操作happens-before于线程A从ThreadB.join()操作成功返回。

3.8 双重检查锁定与延迟初始化

public class UnsafeLazyInitialization {
    private static Instance instance;

    public static Instance getInstance() {
        if (instance == null) // 1:A线程执行
            instance = new Instance(); // 2:B线程执行
        return instance;
    }
}
           

下面是使用双重检查锁定来实现延迟初始化的示例代码

public class DoubleCheckedLocking {         // 1
    private static Instance instance;       // 2

    public static Instance getInstance() { // 3
        if (instance == null) {             // 4:第一次检查
            synchronized (DoubleCheckedLocking.class) { // 5:加锁
                if (instance == null)       // 6:第二次检查
                    instance = new Instance(); // 7:问题的根源出在这里
            }                               // 8
        }                                   // 9
        return instance;                    // 10
    }                                       // 11
}
           

在线程执行到第4行,代码读取到instance不为null时,instance引用的对象有可能还没有完成初始化。

3.8.2 问题的根源

前面的双重检查锁定示例代码的第7行(instance=new Singleton();)创建了一个对象。这一行代码可以分解为如下的3行伪代码。

memory = allocate();  // 1:分配对象的内存空间
ctorInstance(memory); // 2:初始化对象
instance = memory;  // 3:设置instance指向刚分配的内存地址
           

上面3行伪代码中的2和3之间,可能会被重排序。

memory = allocate();  // 1:分配对象的内存空间
instance = memory;  // 3:设置instance指向刚分配的内存地址
// 注意,此时对象还没有被初始化!
ctorInstance(memory); // 2:初始化对象
           

多线程执行时序图:

《Java 并发编程的艺术》

在知晓了问题发生的根源之后,我们可以想出两个办法来实现线程安全的延迟初始化。

1)不允许2和3重排序。

2)允许2和3重排序,但不允许其他线程“看到”这个重排序。

3.8.3 基于volatile的解决方案

public class SafeDoubleCheckedLocking {
    private volatile static Instance instance;

    public static Instance getInstance() {
        if (instance == null) {
            synchronized (SafeDoubleCheckedLocking.class) {
                if (instance == null)
                    instance = new Instance(); // instance为volatile,现在没问题了
            }
        }
        return instance;
    }
}
           

当声明对象的引用为volatile后,3.8.2节中的3行伪代码中的2和3之间的重排序,在多线程

环境中将会被禁止。上面示例代码将按如下的时序执行,如图3-39所示。

《Java 并发编程的艺术》

这个方案本质上是通过禁止图3-39中的2和3之间的重排序,来保证线程安全的延迟初始化。

3.8.4 基于类初始化的解决方案

JVM在类的初始化阶段(即在Class被加载后,且被线程使用之前),会执行类的初始化。在执行类的初始化期间,JVM会去获取一个锁。这个锁可以同步多个线程对同一个类的初始化。

基于这个特性,可以实现另一种线程安全的延迟初始化方案(这个方案被称之为Initialization On Demand Holder idiom)。

public class InstanceFactory {
    private static class InstanceHolder {
        public static Instance instance = new Instance();
    }
    public static Instance getInstance() {
        return InstanceHolder.instance ;  // 这里将导致InstanceHolder类被初始化
    }
}
           
《Java 并发编程的艺术》

3.9 Java内存模型综述

处理器内存模型的特征表

《Java 并发编程的艺术》

各种内存模型之间的关系

《Java 并发编程的艺术》

JMM的内存可见性保证

  • 单线程程序。单线程程序不会出现内存可见性问题。
  • 正确同步的多线程程序。正确同步的多线程程序的执行将具有顺序一致性(程序的执行结果与该程序在顺序一致性内存模型中的执行结果相同)。
  • 未同步/未正确同步的多线程程序。JMM为它们提供了最小安全性保障:线程执行时读取到的值,要么是之前某个线程写入的值,要么是默认值(0、null、false)。

Java并发编程基础

4.1 线程简介

4.1.1 什么是线程

现代操作系统调度的最小单元是线程。也叫轻量级进程(Light

Weight Process),在一个进程里可以创建多个线程,这些线程都拥有各自的计数器、堆栈和局部变量等属性,并且能够访问共享的内存变量。

4.1.4 线程的状态

《Java 并发编程的艺术》

Java线程状态变迁图:

《Java 并发编程的艺术》

4.1.5 Daemon线程

Daemon线程是一种支持型线程,因为它主要被用作程序中后台调度以及支持性工作。这意味着,当一个Java虚拟机中不存在非Daemon线程的时候,Java虚拟机将会退出。可以通过调用Thread.setDaemon(true)将线程设置为Daemon线程。

public class Daemon {
    public static void main(String[] args) {
        Thread thread = new Thread(new DaemonRunner(), "DaemonRunner");
        thread.setDaemon(true);
        thread.start();
    }
    static class DaemonRunner implements Runnable {
        @Override
        public void run() {
            try {
                SleepUtils.second(10);
            } finally {
                System.out.println("DaemonThread finally run.");
            }
        }
    }
}
           

Daemon线程被用作完成支持性工作,但是在Java虚拟机退出时Daemon线程中的finally块并不一定会执行。

4.2 启动和终止线程

线程对象在初始化完成之后,调用start()方法就可以启动这个线程。线程start()方法的含义是:当前线程(即parent线程)同步告知Java虚拟机,只要线程规划器空闲,应立即启动调用start()方法的线程。

4.2.3 理解中断

中断可以理解为线程的一个标识位属性,它表示一个运行中的线程是否被其他线程进行了中断操作。

中断好比其他线程对该线程打了个招呼,其他线程通过调用该线程的interrupt()方法对其进行中断操作。

线程通过检查自身是否被中断来进行响应,线程通过方法isInterrupted()来进行判断是否被中断,也可以调用静态方法Thread.interrupted()对当前线程的中断标识位进行复位。如果该线程已经处于终结状态,即使该线程被中断过,在调用该线程对象的isInterrupted()时依旧会返回false。

4.2.5 安全地终止线程

在4.2.3节中提到的中断状态是线程的一个标识位,而中断操作是一种简便的线程间交互方式,而这种交互方式最适合用来取消或停止任务。除了中断以外,还可以利用一个boolean变量来控制是否需要停止任务并终止该线程。

4.3 线程间通信

4.3.1 volatile和synchronized关键字

关键字volatile可以用来修饰字段(成员变量),就是告知程序任何对该变量的访问均需要从共享内存中获取,而对它的改变必须同步刷新回共享内存,它能保证所有线程对变量访问的可见性。

关键字synchronized可以修饰方法或者以同步块的形式来进行使用,它主要确保多个线程在同一个时刻,只能有一个线程处于方法或者同步块中,它保证了线程对变量访问的可见性和排他性。

无论采用哪种方式,其本质是对一个对象的监视器(monitor)进行获取,而这个获取过程是排他的,也就是同一时刻只能有一个线程获取到由synchronized所保护对象的监视器。

图4-2描述了对象、对象的监视器、同步队列和执行线程之间的关系。

《Java 并发编程的艺术》

4.3.2 等待/通知机制

等待/通知的相关方法

《Java 并发编程的艺术》

1)使用wait()、notify()和notifyAll()时需要先对调用对象加锁。

2)调用wait()方法后,线程状态由RUNNING变为WAITING,并将当前线程放置到对象的等待队列。

3)notify()或notifyAll()方法调用后,等待线程依旧不会从wait()返回,需要调用notify()或notifAll()的线程释放锁之后,等待线程才有机会从wait()返回。

public class WaitNotify {
    static boolean flag = true;
    static Object lock = new Object();

    public static void main(String[] args) throws Exception {
        Thread waitThread = new Thread(new Wait(), "WaitThread");
        waitThread.start();
        TimeUnit.SECONDS.sleep(1);
        Thread notifyThread = new Thread(new Notify(), "NotifyThread");
        notifyThread.start();
    }

    static class Wait implements Runnable {
        public void run() {
// 加锁,拥有lock的Monitor
            synchronized (lock) {
// 当条件不满足时,继续wait,同时释放了lock的锁
                while (flag) {
                    try {
                        System.out.println(Thread.currentThread() + " flag is true. wait @ "
                                + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                        lock.wait();
                    } catch (InterruptedException e) {
                    }
                }
// 条件满足时,完成工作
                System.out.println(Thread.currentThread() + " flag is false. running @ "
                        + new SimpleDateFormat("HH:mm:ss").format(new Date()));
            }
        }
    }

    static class Notify implements Runnable {
        public void run() {
// 加锁,拥有lock的Monitor
            synchronized (lock) {
// 获取lock的锁,然后进行通知,通知时不会释放lock的锁,
// 直到当前线程释放了lock后,WaitThread才能从wait方法中返回
                System.out.println(Thread.currentThread() + " hold lock. notify @ " +
                        new SimpleDateFormat("HH:mm:ss").format(new Date()));
                lock.notifyAll();
                flag = false;
                SleepUtils.second(5);
            }
// 再次加锁
            synchronized (lock) {
                System.out.println(Thread.currentThread() + " hold lock again. [email protected] "
                        + new SimpleDateFormat("HH:mm:ss").format(new Date()));
                SleepUtils.second(5);
            }
        }
    }
}
           

4.3.5 Thread.join()的使用

如果一个线程A执行了thread.join()语句,其含义是:当前线程A等待thread线程终止之后才从thread.join()返回。

public class JoinThread {
    public static void main(String[] args) throws Exception {
        Thread previous = Thread.currentThread();
        for (int i = 0; i < 10; i++) {
// 每个线程拥有前一个线程的引用,需要等待前一个线程终止,才能从等待中返回
            Thread thread = new Thread(new Domino(previous), String.valueOf(i));
            thread.start();
            previous = thread;
        }
        TimeUnit.SECONDS.sleep(5);
        System.out.println(Thread.currentThread().getName() + " terminate.");
    }
    static class Domino implements Runnable {
        private Thread thread;
        public Domino(Thread thread) {
            this.thread = thread;
        }
        public void run() {
            try {
                thread.join();
            } catch (InterruptedException e) {
            }
            System.out.println(Thread.currentThread().getName() + " terminate.");
        }
    }
}
           

4.3.6 ThreadLocal的使用

ThreadLocal,即线程变量,是一个以ThreadLocal对象为键、任意对象为值的存储结构。这个结构被附带在线程上,也就是说一个线程可以根据一个ThreadLocal对象查询到绑定在这个线程上的一个值。

Java中的锁

5.1 Lock接口

表5-1 Lock接口提供的synchronized关键字不具备的主要特性

《Java 并发编程的艺术》

5.2 队列同步器

队列同步器AbstractQueuedSynchronizer(以下简称同步器),是用来构建锁或者其他同步组 件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获 取线程的排队工作,并发包的作者(Doug Lea)期望它能够成为实现大部分同步需求的基础。

5.2.1 队列同步器的接口与示例

同步器的设计是基于模板方法模式的,也就是说,使用者需要继承同步器并重写指定的 方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些 模板方法将会调用使用者重写的方法。

重写同步器指定的方法时,需要使用同步器提供的如下3个方法来访问或修改同步状态。

·getState():获取当前同步状态。

·setState(int newState):设置当前同步状态。

·compareAndSetState(int expect,int update):使用CAS设置当前状态,该方法能够保证状态 设置的原子性。

5.3 重入锁

重入锁ReentrantLock,顾名思义,就是支持重进入的锁,它表示该锁能够支持一个线程对资源的重复加锁。除此之外,该锁的还支持获取锁时的公平和非公平性选择。

1.实现重进入

重进入是指任意线程在获取到锁之后能够再次获取该锁而不会被锁所阻塞,该特性的实 现需要解决以下两个问题。

1)线程再次获取锁。锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再 次成功获取。

2)锁的最终释放。线程重复n次获取了锁,随后在第n次释放该锁后,其他线程能够获取到 该锁。锁的最终释放要求锁对于获取进行计数自增,计数表示当前锁被重复获取的次数,而锁 被释放时,计数自减,当计数等于0时表示锁已经成功释放。

ReentrantLock的nonfairTryAcquire方法

inal boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        } else if (current == getExclusiveOwnerThread()) {
        }
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
        return false;
    }
           

2.公平与非公平获取锁的区别

公平性与否是针对获取锁而言的,如果一个锁是公平的,那么锁的获取顺序就应该符合请求的绝对时间顺序,也就是FIFO。

5.4 读写锁

之前提到锁(如Mutex和ReentrantLock)基本都是排他锁,这些锁在同一时刻只允许一个线 程进行访问,而读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读 线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写 锁,使得并发性相比一般的排他锁有了很大提升。

Java并发包提供读写锁的实现是 ReentrantReadWriteLock。

ReentrantReadWriteLock的特性

《Java 并发编程的艺术》

5.4.1 读写锁的接口与示例

public class Cache {
    static Map<String, Object> map = new HashMap<String, Object>();
    static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    static Lock r = rwl.readLock();
    static Lock w = rwl.writeLock();

    // 获取一个key对应的value
    public static final Object get(String key) {
        r.lock();
        try {
            return map.get(key);
        } finally {
            r.unlock();
        }
    }

    // 设置key对应的value,并返回旧的value
    public static final Object put(String key, Object value) {
        w.lock();
        try {
            return map.put(key, value);
        } finally {
            w.unlock();
        }
    }


    // 清空所有的内容
    public static final void clear() {
        w.lock();
        try {
            map.clear();
        } finally {
            w.unlock();
        }
    }
}

           

5.4.2 读写锁的实现分析

接下来分析ReentrantReadWriteLock的实现,主要包括:读写状态的设计、写锁的获取与释 放、读锁的获取与释放以及锁降级(以下没有特别说明读写锁均可认为是 ReentrantReadWriteLock)。

5.6 Condition接口

任意一个Java对象,都拥有一组监视器方法(定义在java.lang.Object上),主要包括wait()、 wait(long timeout)、notify()以及notifyAll()方法,这些方法与synchronized同步关键字配合,可以 实现等待/通知模式。Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等 待/通知模式,但是这两者在使用方式以及功能特性上还是有差别的。

5.6.1 Condition接口与示例

Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition();

    public void conditionWait() throws InterruptedException {
        lock.lock();
        try {
            condition.await();
        } finally {
            lock.unlock();
        }
    }

    public void conditionSignal() throws InterruptedException {
        lock.lock();
        try {
            condition.signal();
        } finally {
            lock.unlock();
        }
    }
           

5.6.2 Condition的实现分析

ConditionObject是同步器AbstractQueuedSynchronizer的内部类,因为Condition的操作需要 获取相关联的锁,所以作为同步器的内部类也较为合理。每个Condition对象都包含着一个队 列(以下称为等待队列),该队列是Condition对象实现等待/通知功能的关键。

下面将分析Condition的实现,主要包括:等待队列、等待和通知,下面提到的Condition如 果不加说明均指的是ConditionObject。

Java并发容器和框架

6.1 ConcurrentHashMap的实现原理与使用

在并发编程中使用HashMap可能导致程序死循环。而使用线程安全的HashTable效率又非常低下,基于以上两个原因,便有了ConcurrentHashMap的登场机会。

ConcurrentHashMap的锁分段技术可有效提升并发访问率

HashTable容器在竞争激烈的并发环境下表现出效率低下的原因是所有访问HashTable的 线程都必须竞争同一把锁,假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么 当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效提高并 发访问效率,这就是ConcurrentHashMap所使用的锁分段技术。首先将数据分成一段一段地存 储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数 据也能被其他线程访问。

6.2 ConcurrentLinkedQueue

ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规 则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部;当我们获取一个元 素时,它会返回队列头部的元素。它采用了“wait-free”算法(即CAS算法)来实现,该算法在 Michael&Scott算法上进行了一些修改。

6.3 Java中的阻塞队列

6.3.2 Java里的阻塞队列

JDK 7提供了7个阻塞队列,如下。

·ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。 ·LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。 ·PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。

·DelayQueue:一个使用优先级队列实现的无界阻塞队列。 ·SynchronousQueue:一个不存储元素的阻塞队列。

·LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。 ·LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

6.3.3 阻塞队列的实现原理

6.4 Fork/Join框架

6.4.1 什么是Fork/Join框架

Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

《Java 并发编程的艺术》

6.4.2 工作窃取算法

工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。

6.4.4 使用Fork/Join框架

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

public class CountTask extends RecursiveTask<Integer> {
    private static final int THRESHOLD = 2;  // 阈值 private int start;
    private int start;
    private int end;

    public CountTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
// 如果任务足够小就计算任务
        boolean canCompute = (end - start) <= THRESHOLD;
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
// 如果任务大于阈值,就分裂成两个子任务计算
            int middle = (start + end) / 2;
            CountTask leftTask = new CountTask(start, middle);
            CountTask rightTask = new CountTask(middle + 1, end); // 执行子任务
            leftTask.fork();
            rightTask.fork();
// 等待子任务执行完,并得到其结果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();
// 合并子任务
            sum = leftResult + rightResult;
        }
        return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
// 生成一个计算任务,负责计算1+2+3+4
        CountTask task = new CountTask(1, 100);
// 执行一个任务
        Future<Integer> result = forkJoinPool.submit(task);
        try {
            System.out.println(result.get());
        } catch (InterruptedException e) {
        } catch (ExecutionException e) {
        }
    }
}
           

Java中的13个原子操作类

7.1 原子更新基本类型类

使用原子的方式更新基本类型,Atomic包提供了以下3个类。

·AtomicBoolean:原子更新布尔类型。

·AtomicInteger:原子更新整型。

·AtomicLong:原子更新长整型。

public class AtomicIntegerTest {
    static AtomicInteger ai = new AtomicInteger(1);

    public static void main(String[] args) {
        System.out.println(ai.getAndIncrement());
        System.out.println(ai.get());
    }
}

当我们调用CountDownLatch的countDown方法时,N就会减1,CountDownLatch的await方法 会阻塞当前线程,直到N变成零。
           

Java中的并发工具类

8.1 等待多线程完成的CountDownLatch

CountDownLatch允许一个或多个线程等待其他线程完成操作。

public class CountDownLatchTest {
    static CountDownLatch c = new CountDownLatch(2);

    public static void main(String[] args) throws InterruptedException {
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println(1);
                c.countDown();
                System.out.println(2);
                c.countDown();
            }
        }).start();
        c.await();
        System.out.println("3");
    }
}
           

8.2 同步屏障CyclicBarrier

CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一 组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会 开门,所有被屏障拦截的线程才会继续运行。

CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数 量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

public class CyclicBarrierTest {
    static
    CyclicBarrier c = new CyclicBarrier(2);

    public static void main(String[] args) {
        try {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        c.await();
                    } catch (Exception e) {
                    }
                    System.out.println(1);
                }
            }).start();
            c.await();
        } catch (Exception e) {
        }
        System.out.println(2);
    }
}
           

8.2.3 CyclicBarrier和CountDownLatch的区别

CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重 置。所以CyclicBarrier能处理更为复杂的业务场景。例如,如果计算发生错误,可以重置计数 器,并让线程重新执行一次。

8.3 控制并发线程数的Semaphore

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

Semaphore可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。假 如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程 并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这 时我们必须控制只有10个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连 接。这个时候,就可以使用Semaphore来做流量控制。

8.4 线程间交换数据的Exchanger

Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交 换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。

public class ExchangerTest {
    private static final Exchanger<String> exgr = new Exchanger<String>();
    private static ExecutorService threadPool = Executors.newFixedThreadPool(2);

    public static void main(String[] args) {
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String A = "银行流水A";    // A录入银行流水数据
                    exgr.exchange(A);
                } catch (InterruptedException e) {
                }
            }
        });
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String B = "银行流水B";    // B录入银行流水数据
                    String A = exgr.exchange("B");
                    System.out.println("A和B数据是否一致:" + A.equals(B) + ",A录入的是:" + A + ",B录入是:" + B);

                } catch (InterruptedException e) {
                    threadPool.shutdown();
                }
            }
        });
    }
}
           

Java中的线程池

Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来3个好处。

第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。

第三:提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源, 还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。但是,要做到合理利用 线程池,必须对其实现原理了如指掌。

9.1 线程池的实现原理

线程池的主要处理流程:

《Java 并发编程的艺术》

9.2 线程池的使用

9.2.1 线程池的创建

new ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
           

创建一个线程池时需要输入几个参数,如下。

1)corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任 务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads()方法, 线程池会提前创建并启动所有基本线程。

2)runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。可以选择以下几 个阻塞队列。

·ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原 则对元素进行排序。

·LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量通 常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。

·SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用 移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于Linked-BlockingQueue,静态工 厂方法Executors.newCachedThreadPool使用了这个队列。

·PriorityBlockingQueue:一个具有优先级的无限阻塞队列。

3)maximumPoolSize(线程池最大数量):线程池允许创建的最大线程数。如果队列满了,并 且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是,如 果使用了无界的任务队列这个参数就没什么效果。

4)ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设 置更有意义的名字。使用开源框架guava提供的ThreadFactoryBuilder可以快速给线程池里的线 程设置有意义的名字,代码如下。

5)RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状 态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法 处理新任务时抛出异常。在JDK 1.5中Java线程池框架提供了以下4种策略。

·AbortPolicy:直接抛出异常。

·CallerRunsPolicy:只用调用者所在线程来运行任务。

·DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。

·DiscardPolicy:不处理,丢弃掉。

当然,也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录 日志或持久化存储不能处理的任务。

6)keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以, 如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率。

7)TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS)、小时(HOURS)、分钟 (MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和纳秒(NANOSECONDS,千分之一微秒)。

9.2.2 向线程池提交任务

可以使用两个方法向线程池提交任务,分别为execute()和submit()方法。

execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。

submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个 future对象可以判断任务是否执行成功。

9.2.3 关闭线程池

可以通过调用线程池的shutdown或shutdownNow方法来关闭线程池。

它们的原理是遍历线 程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务 可能永远无法终止。但是它们存在一定的区别,shutdownNow首先将线程池的状态设置成 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而 shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。

9.2.4 合理地配置线程池

要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。

·任务的性质:CPU密集型任务、IO密集型任务和混合型任务。

·任务的优先级:高、中和低。

·任务的执行时间:长、中和短。

·任务的依赖性:是否依赖其他系统资源,如数据库连接。

9.2.5 线程池的监控

可以通过线程池提供的参数进行监控,在监控线程池的 时候可以使用以下属性。

·taskCount:线程池需要执行的任务数量。

·completedTaskCount:线程池在运行过程中已完成的任务数量,小于或等于taskCount。

·largestPoolSize:线程池里曾经创建过的最大线程数量。通过这个数据可以知道线程池是 否曾经满过。如该数值等于线程池的最大大小,则表示线程池曾经满过。

·getPoolSize:线程池的线程数量。如果线程池不销毁的话,线程池里的线程不会自动销 毁,所以这个大小只增不减。

·getActiveCount:获取活动的线程数。

Executor框架

从JDK 5开始,把工作单元与执行机制分离开 来。工作单元包括Runnable和Callable,而执行机制由Executor框架提供。

10.1 Executor框架简介

Executor框架主要由3大部分组成如下。

·任务。包括被执行任务需要实现的接口:Runnable接口或Callable接口。

·任务的执行。包括任务执行机制的核心接口Executor,以及继承自Executor的 ExecutorService接口。Executor框架有两个关键类实现了ExecutorService接口 (ThreadPoolExecutor和ScheduledThreadPoolExecutor)。

·异步计算的结果。包括接口Future和实现Future接口的FutureTask类。

10.2 ThreadPoolExecutor详解

FixedThreadPool被称为可重用固定线程数的线程池。下面是FixedThreadPool的源代码实现。

public static ExecutorService newFixedThreadPool(int nThreads) {
       return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

           

SingleThreadExecutor是使用单个worker线程的Executor。下面是SingleThreadExecutor的源代码实现。

public static ExecutorService newSingleThreadExecutor() {
       return new FinalizableDelegatedExecutorService
       (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
  }

           

CachedThreadPool是一个会根据需要创建新线程的线程池。下面是创建CachedThread-Pool的源代码。

public static ExecutorService newCachedThreadPool() { 
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}
           

10.4 FutureTask详解

Future接口和实现Future接口的FutureTask类,代表异步计算的结果。

《Java 并发编程的艺术》

Java并发编程实践