天天看点

Java Atomic 原子操作类

目录

1、基本类型的原子更新

2、数组类型的原子更新

3、引用类型的原子更新

4、对象属性原子修改器

5、LongAdder/DoubleAdder详解

6、LongAdder 原理

在并发编程中很容易出现并发安全的问题,有一个很简单的例子就是多线程更新变量 i=1 ,比如多个线程执行 i++ 操作,就有可能获取不到正确的值,而这个问题,最常用的方法是通过  Synchronized 进行控制来达到线程安全的目的。但是由于synchronized是采用的是悲观锁策略,并不是特别高效的一种解决方案。

实际上,在J.U.C下的 atomic 包提供了一系列的操作简单, 性能高效,并能保证线程安全的类去更新基本类型变量,数组元素,引用类型以及更新对象中的字段类型。atomic 包下的这些类都是采用乐观锁策略去更新数据,在 Java 中乐观锁策略的实现使用的是 CAS 。

java.util.concurrent.atomic 包里提供了一组原子操作类:

  • 基本类型:AtomicInteger、AtomicLong、AtomicBoolean;
  • 引用类型:AtomicReference、AtomicStampedRerence、AtomicMarkableReference;
  • 数组类型:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray
  • 对象属性原子修改器:AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、 AtomicReferenceFieldUpdater
  • 原子类型累加器(jdk1.8增加的类):DoubleAccumulator、DoubleAdder、 LongAccumulator、LongAdder、Striped64

1、基本类型的原子更新

以 AtomicInteger 为例,总结常用的方法

// 以原子的方式将实例中的原值加 1,返回的是自增前的旧值;
public final int getAndIncrement() {// 先获取(get)后加(add)
    return unsafe.getAndAddInt(this, valueOffset, 1);
} 

// 将实例中的值更新为新值,并返回旧值; 
public final int getAndSet(int newValue) {
    return unsafe.getAndSetInt(this, valueOffset, newValue);
}

// 以原子的方式将实例中的原值进行加 1 操作,并返回最终相加后的结果;
public final int incrementAndGet() {
    return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

// 以原子方式将输入的数值与实例中原本的值相加,并返回最后的结果;
public final int addAndGet(int delta) {// 先加(add)后获取(get)
    return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
}
           

AtomicInteger 使用示例

import java.util.concurrent.atomic.AtomicInteger;

public class AtomicIntegerTest {

    static AtomicInteger sum = new AtomicInteger(0);

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            Thread thread = new Thread(() -> {
                for (int j = 0; j < 10000; j++) {
                    // 原子自增  CAS
                    sum.incrementAndGet();
                }
            });
            thread.start();
        }
        try {
            Thread.sleep(3000); // 保证所有线程执行完成
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(sum.get());
    }
}
           
Java Atomic 原子操作类
Java Atomic 原子操作类

incrementAndGet() 方法通过CAS自增实现,如果CAS失败,自旋直到成功+1。

Java Atomic 原子操作类

2、数组类型的原子更新

AtomicIntegerArray 为例,总结常用的方法

// 以原子更新的方式将数组中索引为i的元素与输入值相加,返回相加后的值
public final int addAndGet(int i, int delta) {
    return getAndAdd(i, delta) + delta;
}
 
// 以原子更新的方式将数组中索引为i的元素自增加 1,返回原值
public final int getAndIncrement(int i) {
    return getAndAdd(i, 1);
}

// 将数组中索引为i的位置的元素进行更新,参数为:索引,期望值,修改值
public final boolean compareAndSet(int i, int expect, int update) {
    return compareAndSetRaw(checkedByteOffset(i), expect, update);
}
           

AtomicIntegerArray  使用示例

import java.util.concurrent.atomic.AtomicIntegerArray;

public class AtomicIntegerArrayTest {

    static int[] arr = new int[]{1, 2, 3, 4, 5};
    static AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(arr);

    public static void main(String[] args) throws InterruptedException {
        // 设置索引 0 的元素为 100
        atomicIntegerArray.set(0, 100);
        System.out.println(atomicIntegerArray.get(0));
        // 以原子更新的方式将数组中索引为 1 的元素与输入值相加,返回原值
        int andAdd = atomicIntegerArray.getAndAdd(1, 5);
        System.out.println(atomicIntegerArray);
    }
}
           
Java Atomic 原子操作类

3、引用类型的原子更新

AtomicReference 作用是对普通对象的封装,它可以保证在修改对象引用时的线程安全性。

import lombok.AllArgsConstructor;
import lombok.Data;

import java.util.concurrent.atomic.AtomicReference;

public class AtomicReferenceTest {

    public static void main(String[] args) {
        User jerry = new User("jerry", 23);
        User Bob = new User("Bob", 25);
        User Smith = new User("Smith", 20);

        //初始化为 jerry
        AtomicReference<User> atomicReference = new AtomicReference<>();
        atomicReference.set(jerry);
        //把 Bob 赋给 atomicReference
        atomicReference.compareAndSet(jerry, Bob);
        System.out.println(atomicReference.get());
        //把 Smith 赋给 atomicReference
        atomicReference.compareAndSet(jerry, Smith); // 赋值失败,期望值不符合
        System.out.println(atomicReference.get());
    }
}

@Data
@AllArgsConstructor// 所有参数的构造器
class User {
    private String name;
    private Integer age;
}
           
Java Atomic 原子操作类

4、对象属性原子修改器

AtomicIntegerFieldUpdater 可以线程安全地更新对象中的整型变量。

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

public class AtomicIntegerFieldUpdaterTest {

    public static class Candidate { // 对象
        // 字段必须是volatile类型
        volatile int score = 0;
        AtomicInteger atomicInteger = new AtomicInteger();
    }

    // 定义属性修改器
    public static final AtomicIntegerFieldUpdater<Candidate> scoreUpdater =
            AtomicIntegerFieldUpdater.newUpdater(Candidate.class, "score");

    public static AtomicInteger realScore = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {
        final Candidate candidate = new Candidate();
        Thread[] threads = new Thread[10000]; // 开启10000个线程
        for (int i = 0; i < 10000; i++) {
            threads[i] = new Thread(new Runnable() {
                @Override
                public void run() {
                    if (Math.random() > 0.4) {
                        candidate.atomicInteger.incrementAndGet();
                        scoreUpdater.incrementAndGet(candidate); // 修改 candidate 的属性
                        realScore.incrementAndGet();
                    }
                }
            });
            threads[i].start();
        }
        for (int i = 0; i < 10000; i++) {
            threads[i].join();
        }
        System.out.println("AtomicIntegerFieldUpdater Score= " + candidate.score);
        System.out.println("AtomicInteger Score= " + candidate.atomicInteger.get());
        System.out.println("realScore= " + realScore.get());
    }
}
           
Java Atomic 原子操作类

对于 AtomicIntegerFieldUpdater 的使用有一些限制和约束,约束如下:

  • 属性字段必须是 volatile 类型的,在线程之间共享变量时保证立即可见(volatile int score = 0)
  • 字段的描述类型(修饰符 public / protected / default / private )与调用者与操作对象字段的关系一致。也就是说调用者能够直接操作对象字段,那么就可以反射进行原子操作。但是对于父类的字段,子类是不能直接操作的,尽管子类可以访问父类的字段。
  • 只能是实例变量,不能是类变量,也就是说不能加 static 关键字。
  • 只能是可修改变量,不能使 final 变量,因为 final 的语义就是不可修改。实际上 final 的语义和  volatile 是有冲突的,这两个关键字不能同时存在。
  • 对于 AtomicIntegerFieldUpdater 和 AtomicLongFieldUpdater 只能修改 int / long 类型的字段,不能修改其包装类型( Integer / Long )。如果要修改包装类型就需要使用AtomicReferenceFieldUpdater。

5、LongAdder/DoubleAdder详解

AtomicLong 是利用了底层的 CAS 操作来提供并发性的,比如 addAndGet() 方法:

Java Atomic 原子操作类
Java Atomic 原子操作类
Java Atomic 原子操作类

上述方法调用了 Unsafe 类的 getAndAddLong() 方法,该方法内部是个 native 方法,它的逻辑是采用自旋的方式不断更新目标值,直到更新成功。 在并发量较低的环境下,线程冲突的概率比较小,自旋的次数不会很多。但是,高并发环境下, N个线程同时进行自旋操作,会出现大量失败并不断自旋的情况,此时 AtomicLong 的自旋会成为性能瓶颈。

这就是 LongAdder 引入的初衷:解决高并发环境下 AtomicInteger, AtomicLong 的自旋瓶颈问题。 

性能测试对比

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;

public class LongAdderTest {

    public static void main(String[] args) {
        compareAtomicLongAndLongAdder(10, 100000);
        System.out.println("====================================");
        compareAtomicLongAndLongAdder(10, 200000);
        System.out.println("====================================");
        compareAtomicLongAndLongAdder(100, 200000);
    }

    static void compareAtomicLongAndLongAdder(final int threadCount, final int times) {
        try {
            System.out.println("测试条件->线程数:" + threadCount + ", 单线程操作计数:" + times);
            long startLongAdder = System.currentTimeMillis();
            testLongAdder(threadCount, times);
            long endLongAdder = System.currentTimeMillis() - startLongAdder;
            System.out.println("LongAdder方式->结果,增加计数:" + (threadCount * times) + "次,共计耗时:" + endLongAdder);

            long startAtomicLong = System.currentTimeMillis();
            testAtomicLong(threadCount, times);
            long endAtomicLong = System.currentTimeMillis() - startAtomicLong;
            System.out.println("AtomicLong方式->结果,增加计数:" + (threadCount * times) + "次,共计耗时:" + endAtomicLong);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    static void testAtomicLong(final int threadCount, final int times) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        AtomicLong atomicLong = new AtomicLong();
        for (int i = 0; i < threadCount; i++) { // 线程数量
            new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < times; j++) { // 循环次数
                        atomicLong.incrementAndGet();
                    }
                    countDownLatch.countDown();
                }
            }, "My-thread" + i).start();
        }
        countDownLatch.await();
    }

    static void testLongAdder(final int threadCount, final int times) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(threadCount);
        LongAdder longAdder = new LongAdder();
        for (int i = 0; i < threadCount; i++) { // 线程数量
            new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < times; j++) {
                        longAdder.add(1); // 循环次数
                    }
                    countDownLatch.countDown();
                }
            }, "My-thread" + i).start();
        }
        countDownLatch.await();
    }
}
           

测试结果:线程数越多,并发操作数越大,LongAdder的优势越明显

Java Atomic 原子操作类

低并发、一般的业务场景下使用 AtomicLong 就足够了。如果并发量很多,存在大量写多读少的情况,LongAdder 可能更合适。

6、LongAdder 原理

设计思路:AtomicLong 中有个内部变量 value 保存着实际的 long 值(深入理解CAS有分析),所有的操作都是针对该变量进行。也就是说,高并发环境下,value 变量其实是一个热点,也就是N个线程竞争一个热点。

LongAdder 的基本思路就是分散热点,将 value 值分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的 long 值,只要将各个槽中的变量值累加返回。

Java Atomic 原子操作类

LongAdder 的内部结构

LongAdder 内部有一个 base 变量,一个 Cell[] 数组:

base变量:非竞态条件下,直接累加到该变量上

Cell[]数组:竞态条件下,累加个各个线程自己的槽 Cell[i] 中

在 LongAdder 的父类 Striped64 中,有如下成员属性

Java Atomic 原子操作类
// CPU核数,用来决定槽数组的大小 
static final int NCPU = Runtime.getRuntime().availableProcessors(); 

// 数组槽,大小为2的次幂 
transient volatile Cell[] cells; 

// 基数,在两种情况下会使用: 
// 1. 没有遇到并发竞争时,直接使用base累加数值
// 2. 初始化cells数组时,必须要保证cells数组只能被初始化一次(即只有一个线程能对cell s初始化), // 其他竞争失败的线程会讲数值累加到base上 
transient volatile long base; 

// Spinlock (locked via CAS) used when resizing and/or creating Cells.
// CAS锁标记,Cells 扩容时会被使用
transient volatile int cellsBusy;
           

然后,定义了一个内部 Cell 类,这就是我们之前所说的槽,每个 Cell 对象存有一个 value 值,可以通过 Unsafe 来 CAS 操作它的值:

Java Atomic 原子操作类

接下来,分析下 LongAdder#add 方法的源码

public void add(long x) {
     Cell[] as; long b, v; int m; Cell a;
     // 第一次不会创建cells,第二次才会casBase()失败,出现并发冲突,使用Cell
     if ((as = cells) != null || !casBase(b = base, b + x)) { 
         boolean uncontended = true;
         if (as == null || (m = as.length - 1) < 0 || // 判断Cell数组是否存在
         (a = as[getProbe() & m]) == null ||  // 创建Hash下标,即Cell下标
         !(uncontended = a.cas(v = a.value, v + x))) // 对Cell中的Value值进行累加操作
         longAccumulate(x, null, uncontended); // 核心方法,创建Cell
     }
}
           

只有从未出现过并发冲突的时候,base 基数才会使用到,一旦出现了并发冲突,之后所有的操作 都只针对 Cell[] 数组中的单元 Cell 。 如果 Cell[] 数组未初始化,会调用父类的 longAccumelate 去初始化 Cell[] ,如果 Cell[] 已经初始化,但是冲突发生在 Cell 单元内,则也调用父类的 longAccumelate ,此时可能就需要对 Cell[] 扩容了。

这也是 LongAdder 设计的精妙之处:尽量减少热点冲突,不到最后万不得已,尽量将CAS操作延 迟。

Java Atomic 原子操作类

进一步分析 longAccumulate(x, null, uncontended) 源码

final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        int h;
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); // force initialization
            h = getProbe();
            wasUncontended = true;
        }
        boolean collide = false;                // True if last slot nonempty
        for (;;) {                // CAS 自旋
            Cell[] as; Cell a; int n; long v;
            if ((as = cells) != null && (n = as.length) > 0) { // Cell数组不为空
                if ((a = as[(n - 1) & h]) == null) { // 取模定位,位置如果为空,创建Cell
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        Cell r = new Cell(x);   // 创建Cell
                        if (cellsBusy == 0 && casCellsBusy()) { // CAS锁casCellsBusy(),保证原子性
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;      // 把Cell放入Cell[]数组中
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0; // 还原CAS锁标记
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale
                else if (!collide)
                    collide = true;
                else if (cellsBusy == 0 && casCellsBusy()) { // CAS锁
                    try {
                        if (cells == as) {      // 扩容,Expand table unless stale
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = advanceProbe(h);
            }
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) { // CAS锁
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
    }
           

LongAccumulator

LongAccumulator 是 LongAdder 的增强版。LongAdder 只能针对数值的进行加减运算,而 LongAccumulator 提供了自定义的函数操作。其构造函数如下:

Java Atomic 原子操作类

通过 LongBinaryOperator ,可以自定义对入参的任意操作,并返回结果( LongBinaryOperator 接收 2 个 long 作为参数,并返回 1 个 long )。

LongAccumulator 内部原理和 LongAdder 几乎完全 一样,都是利用了父类 Striped64 的 longAccumulate() 方法。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.stream.IntStream;

public class LongAccumulatorTest {
    public static void main(String[] args) throws InterruptedException {
        // 累加 x+y
        LongAccumulator accumulator = new LongAccumulator((x, y) -> x + y, 0);
        ExecutorService executor = Executors.newFixedThreadPool(8);
        // 1到9累加
        IntStream.range(1, 10).forEach(i -> executor.submit(() -> accumulator.accumulate(i)));
        Thread.sleep(2000);
        System.out.println("获取最终结果:" + accumulator.getThenReset());
    }
}
           

继续阅读