天天看點

【JDK源碼分析系列】AtomicLong 與 LongAdder 對比分析【JDK源碼分析系列】AtomicLong 與 LongAdder 對比分析

【JDK源碼分析系列】AtomicLong 與 LongAdder 對比分析

【1】LongAdder 性能優勢與适用場景

AtomicLong VS LongAdder
 LongAdder适合的場景是統計求和計數的場景
 在競争激烈的情況下,LongAdder的效率比AtomicLong高,但要消耗更多的空間

 LongAdder 性能優勢
 LongAdder 的基本思路是分散熱點,
 AtomicLong 中存在成員變量 value 儲存 long 值,在高并發的情況下該 value 值是一個熱點,由 N 個線程競争;
 LongAdder 将 value 值分散到一個數組中,不同的線程命中到數組的不同的槽中,
 各線程僅僅對自己槽中的值進行 CAS 操作,進而分散熱點減少沖突
           

【2】AtomicLong 源碼分析

// 使用 Unsafe 對 long 類型變量提供 CAS 操作
public class AtomicLong extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 1927816293512124184L;

    // setup to use Unsafe.compareAndSwapLong for updates
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8();

    private static native boolean VMSupportsCS8();

    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicLong.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }

    private volatile long value;

    public AtomicLong(long initialValue) {
        value = initialValue;
    }

    public AtomicLong() {
    }

    public final long get() {
        return value;
    }

    public final void set(long newValue) {
        value = newValue;
    }

    public final void lazySet(long newValue) {
        unsafe.putOrderedLong(this, valueOffset, newValue);
    }

    public final long getAndSet(long newValue) {
        return unsafe.getAndSetLong(this, valueOffset, newValue);
    }

    public final boolean compareAndSet(long expect, long update) {
        return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
    }

    public final boolean weakCompareAndSet(long expect, long update) {
        return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
    }

    public final long getAndIncrement() {
        return unsafe.getAndAddLong(this, valueOffset, 1L);
    }

    public final long getAndDecrement() {
        return unsafe.getAndAddLong(this, valueOffset, -1L);
    }

    public final long getAndAdd(long delta) {
        return unsafe.getAndAddLong(this, valueOffset, delta);
    }

    public final long incrementAndGet() {
        return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;
    }

//    // Unsafe.java
//    public final long getAndAddLong(Object o, long offset, long delta) {
//        long v;
//        // 如果其它線程正在修改AtomicLong, 本線程隻能是不斷的重試, 直到成功
//        do {
//            v = getLongVolatile(o, offset);
//        } while (!compareAndSwapLong(o, offset, v, v + delta));
//        return v;
//    }

    public final long decrementAndGet() {
        return unsafe.getAndAddLong(this, valueOffset, -1L) - 1L;
    }

    public final long addAndGet(long delta) {
        return unsafe.getAndAddLong(this, valueOffset, delta) + delta;
    }

    public final long getAndUpdate(LongUnaryOperator updateFunction) {
        long prev, next;
        do {
            prev = get();
            next = updateFunction.applyAsLong(prev);
        } while (!compareAndSet(prev, next));
        return prev;
    }

    public final long updateAndGet(LongUnaryOperator updateFunction) {
        long prev, next;
        do {
            prev = get();
            next = updateFunction.applyAsLong(prev);
        } while (!compareAndSet(prev, next));
        return next;
    }

    public final long getAndAccumulate(long x,
                                       LongBinaryOperator accumulatorFunction) {
        long prev, next;
        do {
            prev = get();
            next = accumulatorFunction.applyAsLong(prev, x);
        } while (!compareAndSet(prev, next));
        return prev;
    }

    public final long accumulateAndGet(long x,
                                       LongBinaryOperator accumulatorFunction) {
        long prev, next;
        do {
            prev = get();
            next = accumulatorFunction.applyAsLong(prev, x);
        } while (!compareAndSet(prev, next));
        return next;
    }

    public String toString() {
        return Long.toString(get());
    }

    public int intValue() {
        return (int)get();
    }

    public long longValue() {
        return get();
    }

    public float floatValue() {
        return (float)get();
    }

    public double doubleValue() {
        return (double)get();
    }

}
           

【3】LongAdder 源碼分析

【3.1】LongAdder -- public void add(long x)

public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        // 進入該分支場景
        // 兩種可能場景:
        // 1. as != null, 說明已經使用了分散存儲
        // 2. as == null, 但casBase失敗了,說明另一線程正在修改base
        boolean uncontended = true;     // true表處于非競争狀态, false表示處于多線程競争狀态
        if (as == null  // 場景 2
                || (m = as.length - 1) < 0  // 場景 1 但 as 的長度為 0
                || (a = as[getProbe() & m]) == null ||  // 場景1 as長度>0,以線程hash值取出一個cell,但它還未初始化
            // 場景1, 取出了一個cell, 并且已經初始化了. 嘗試直接将x加進去, 但另外一線程正在占用它
            !(uncontended = a.cas(v = a.value, v + x)))
            // 以上的判斷語句嘗試加x加到cell中去, 但失敗了. 此時LongAdder與AtomicLong差不多, 都得不斷的重試
            longAccumulate(x, null, uncontended);
    }
}
           

public void add(long x) 執行流程圖

【JDK源碼分析系列】AtomicLong 與 LongAdder 對比分析【JDK源碼分析系列】AtomicLong 與 LongAdder 對比分析

【3.2】Striped64 -- final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended)

final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
    int h;
    if ((h = getProbe()) == 0) {
        // 擷取目前線程的探針值,若為0
        // 則建立目前線程的 ThreadLocalRandom 變量
        // 線程的探針,可以了解為線程的hash值,首次使用需要初始化
        ThreadLocalRandom.current(); // force initialization
        // 擷取目前線程的探針值
        h = getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        if ((as = cells) != null && (n = as.length) > 0) {  // cells數組已經new過了
            if ((a = as[(n - 1) & h]) == null) {    // 按線程hash取出一個cell,但為null, 需要初始化這個cell
                // 以下代碼目的為了實作: as[(n - 1) & h] = new Cell(x)
                // [加鎖] cellsBusy充當了自旋鎖的作用,阻止其它的線程同時執行以下代碼

                // 目前沒有線程在執行以下代碼,此處判斷并不安全,是以下面的代碼還要再判斷一次
                if (cellsBusy == 0) {       // Try to attach new Cell
                    Cell r = new Cell(x);   // Optimistically create
                    if (cellsBusy == 0 && casCellsBusy()) {
                        // 使用cas再次确認cellsBusy的值,并将cellsBusy置為1,表明目前線程正在執行以下代碼
                        // casCellsBusy失敗,說明其它線程正在執行以下代碼
                        boolean created = false;
                        try {               // Recheck under lock
                            Cell[] rs; int m, j;
                            // 加鎖之前的那些if語句判斷都是不安全的,于是,加鎖之後,再判斷一次
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            else if (!wasUncontended)       // CAS already known to fail
                // 目前處于競争狀态,嘗試跳過一個回合,再下回合再試
                wasUncontended = true;      // Continue after rehash
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))
                //嘗試直接将x加到cell上
                break;
            else if (n >= NCPU || cells != as)
                // 另個一個線程正在擴容, 或者可能已經擴容到上限了
                // 跳過一個回合
                collide = false;            // At max size or stale
            else if (!collide)
                // 跳過一個回合
                collide = true;
            else if (cellsBusy == 0 && casCellsBusy()) {
                // 擴容操作,cells容量翻倍(隻有當n小于CPU數量時)
                try {
                    if (cells == as) {      // Expand table unless stale
                        Cell[] rs = new Cell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        cells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            // 修改線程探針(hash值),嘗試其它的cell
            h = advanceProbe(h);
        }
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            // 首次初始化
            boolean init = false;
            try {                           // Initialize table
                if (cells == as) {
                    Cell[] rs = new Cell[2];
                    rs[h & 1] = new Cell(x);
                    cells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
    }
}
           

final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) 執行流程圖

【JDK源碼分析系列】AtomicLong 與 LongAdder 對比分析【JDK源碼分析系列】AtomicLong 與 LongAdder 對比分析

【3.3】LongAdder -- public long sum()

// 取值操作
// 将base及所有的cell中的值加起來
public long sum() {
    Cell[] as = cells; Cell a;
    long sum = base;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}
           

參考緻謝

本部落格為部落客的學習實踐總結,并參考了衆多部落客的博文,在此表示感謝,部落客若有不足之處,請批評指正。

【1】多線程程式設計:并發加/減操作優化, LongAdder原理,與AtomicLong比較

【2】Java多線程進階(十七)—— J.U.C之atomic架構:LongAdder