天天看点

【JDK源码分析系列】AtomicLong 与 LongAdder 对比分析【JDK源码分析系列】AtomicLong 与 LongAdder 对比分析

【JDK源码分析系列】AtomicLong 与 LongAdder 对比分析

【1】LongAdder 性能优势与适用场景

AtomicLong VS LongAdder
 LongAdder适合的场景是统计求和计数的场景
 在竞争激烈的情况下,LongAdder的效率比AtomicLong高,但要消耗更多的空间

 LongAdder 性能优势
 LongAdder 的基本思路是分散热点,
 AtomicLong 中存在成员变量 value 保存 long 值,在高并发的情况下该 value 值是一个热点,由 N 个线程竞争;
 LongAdder 将 value 值分散到一个数组中,不同的线程命中到数组的不同的槽中,
 各线程仅仅对自己槽中的值进行 CAS 操作,从而分散热点减少冲突
           

【2】AtomicLong 源码分析

// 使用 Unsafe 对 long 类型变量提供 CAS 操作
public class AtomicLong extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 1927816293512124184L;

    // setup to use Unsafe.compareAndSwapLong for updates
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8();

    private static native boolean VMSupportsCS8();

    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicLong.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }

    private volatile long value;

    public AtomicLong(long initialValue) {
        value = initialValue;
    }

    public AtomicLong() {
    }

    public final long get() {
        return value;
    }

    public final void set(long newValue) {
        value = newValue;
    }

    public final void lazySet(long newValue) {
        unsafe.putOrderedLong(this, valueOffset, newValue);
    }

    public final long getAndSet(long newValue) {
        return unsafe.getAndSetLong(this, valueOffset, newValue);
    }

    public final boolean compareAndSet(long expect, long update) {
        return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
    }

    public final boolean weakCompareAndSet(long expect, long update) {
        return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
    }

    public final long getAndIncrement() {
        return unsafe.getAndAddLong(this, valueOffset, 1L);
    }

    public final long getAndDecrement() {
        return unsafe.getAndAddLong(this, valueOffset, -1L);
    }

    public final long getAndAdd(long delta) {
        return unsafe.getAndAddLong(this, valueOffset, delta);
    }

    public final long incrementAndGet() {
        return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;
    }

//    // Unsafe.java
//    public final long getAndAddLong(Object o, long offset, long delta) {
//        long v;
//        // 如果其它线程正在修改AtomicLong, 本线程只能是不断的重试, 直到成功
//        do {
//            v = getLongVolatile(o, offset);
//        } while (!compareAndSwapLong(o, offset, v, v + delta));
//        return v;
//    }

    public final long decrementAndGet() {
        return unsafe.getAndAddLong(this, valueOffset, -1L) - 1L;
    }

    public final long addAndGet(long delta) {
        return unsafe.getAndAddLong(this, valueOffset, delta) + delta;
    }

    public final long getAndUpdate(LongUnaryOperator updateFunction) {
        long prev, next;
        do {
            prev = get();
            next = updateFunction.applyAsLong(prev);
        } while (!compareAndSet(prev, next));
        return prev;
    }

    public final long updateAndGet(LongUnaryOperator updateFunction) {
        long prev, next;
        do {
            prev = get();
            next = updateFunction.applyAsLong(prev);
        } while (!compareAndSet(prev, next));
        return next;
    }

    public final long getAndAccumulate(long x,
                                       LongBinaryOperator accumulatorFunction) {
        long prev, next;
        do {
            prev = get();
            next = accumulatorFunction.applyAsLong(prev, x);
        } while (!compareAndSet(prev, next));
        return prev;
    }

    public final long accumulateAndGet(long x,
                                       LongBinaryOperator accumulatorFunction) {
        long prev, next;
        do {
            prev = get();
            next = accumulatorFunction.applyAsLong(prev, x);
        } while (!compareAndSet(prev, next));
        return next;
    }

    public String toString() {
        return Long.toString(get());
    }

    public int intValue() {
        return (int)get();
    }

    public long longValue() {
        return get();
    }

    public float floatValue() {
        return (float)get();
    }

    public double doubleValue() {
        return (double)get();
    }

}
           

【3】LongAdder 源码分析

【3.1】LongAdder -- public void add(long x)

public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        // 进入该分支场景
        // 两种可能场景:
        // 1. as != null, 说明已经使用了分散存储
        // 2. as == null, 但casBase失败了,说明另一线程正在修改base
        boolean uncontended = true;     // true表处于非竞争状态, false表示处于多线程竞争状态
        if (as == null  // 场景 2
                || (m = as.length - 1) < 0  // 场景 1 但 as 的长度为 0
                || (a = as[getProbe() & m]) == null ||  // 场景1 as长度>0,以线程hash值取出一个cell,但它还未初始化
            // 场景1, 取出了一个cell, 并且已经初始化了. 尝试直接将x加进去, 但另外一线程正在占用它
            !(uncontended = a.cas(v = a.value, v + x)))
            // 以上的判断语句尝试加x加到cell中去, 但失败了. 此时LongAdder与AtomicLong差不多, 都得不断的重试
            longAccumulate(x, null, uncontended);
    }
}
           

public void add(long x) 执行流程图

【JDK源码分析系列】AtomicLong 与 LongAdder 对比分析【JDK源码分析系列】AtomicLong 与 LongAdder 对比分析

【3.2】Striped64 -- final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended)

final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
    int h;
    if ((h = getProbe()) == 0) {
        // 获取当前线程的探针值,若为0
        // 则创建当前线程的 ThreadLocalRandom 变量
        // 线程的探针,可以理解为线程的hash值,首次使用需要初始化
        ThreadLocalRandom.current(); // force initialization
        // 获取当前线程的探针值
        h = getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        if ((as = cells) != null && (n = as.length) > 0) {  // cells数组已经new过了
            if ((a = as[(n - 1) & h]) == null) {    // 按线程hash取出一个cell,但为null, 需要初始化这个cell
                // 以下代码目的为了实现: as[(n - 1) & h] = new Cell(x)
                // [加锁] cellsBusy充当了自旋锁的作用,阻止其它的线程同时执行以下代码

                // 当前没有线程在执行以下代码,此处判断并不安全,因此下面的代码还要再判断一次
                if (cellsBusy == 0) {       // Try to attach new Cell
                    Cell r = new Cell(x);   // Optimistically create
                    if (cellsBusy == 0 && casCellsBusy()) {
                        // 使用cas再次确认cellsBusy的值,并将cellsBusy置为1,表明当前线程正在执行以下代码
                        // casCellsBusy失败,说明其它线程正在执行以下代码
                        boolean created = false;
                        try {               // Recheck under lock
                            Cell[] rs; int m, j;
                            // 加锁之前的那些if语句判断都是不安全的,于是,加锁之后,再判断一次
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)       // CAS already known to fail
                // 当前处于竞争状态,尝试跳过一个回合,再下回合再试
                wasUncontended = true;      // Continue after rehash
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))
                //尝试直接将x加到cell上
                break;
            else if (n >= NCPU || cells != as)
                // 另个一个线程正在扩容, 或者可能已经扩容到上限了
                // 跳过一个回合
                collide = false;            // At max size or stale
            else if (!collide)
                // 跳过一个回合
                collide = true;
            else if (cellsBusy == 0 && casCellsBusy()) {
                // 扩容操作,cells容量翻倍(只有当n小于CPU数量时)
                try {
                    if (cells == as) {      // Expand table unless stale
                        Cell[] rs = new Cell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        cells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            // 修改线程探针(hash值),尝试其它的cell
            h = advanceProbe(h);
        }
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            // 首次初始化
            boolean init = false;
            try {                           // Initialize table
                if (cells == as) {
                    Cell[] rs = new Cell[2];
                    rs[h & 1] = new Cell(x);
                    cells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
    }
}
           

final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) 执行流程图

【JDK源码分析系列】AtomicLong 与 LongAdder 对比分析【JDK源码分析系列】AtomicLong 与 LongAdder 对比分析

【3.3】LongAdder -- public long sum()

// 取值操作
// 将base及所有的cell中的值加起来
public long sum() {
    Cell[] as = cells; Cell a;
    long sum = base;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}
           

参考致谢

本博客为博主的学习实践总结,并参考了众多博主的博文,在此表示感谢,博主若有不足之处,请批评指正。

【1】多线程编程:并发加/减操作优化, LongAdder原理,与AtomicLong比较

【2】Java多线程进阶(十七)—— J.U.C之atomic框架:LongAdder