天天看點

并發原子類:都有了AtomicLong,為什麼還要提供LongAdder?

并發原子類:都有了AtomicLong,為什麼還要提供LongAdder?

LongAdder相比AtomicLong有哪些優勢?

并發原子類:都有了AtomicLong,為什麼還要提供LongAdder?

上一節我們分享了原子類一些常用的工具類,除此之外還提供了另外4個原子類。

并發原子類:都有了AtomicLong,為什麼還要提供LongAdder?

這4個原子類和我們之前提到的原子類的設計思想不太一樣,是以單開一節來分析

= new AtomicLong();
 // 1
 System.out.println(sum.incrementAndGet());


 LongAdder sum1 = new LongAdder();
 sum1.increment();
 // 1
 System.out.println(sum1);      

可以看到使用方式差不多,但是LongAdder的性能比較高,是以阿裡巴巴《Java開發手冊》中也有如下建議

并發原子類:都有了AtomicLong,為什麼還要提供LongAdder?

那麼LongAdder是如何實作高性能的?

其實我們可以把對一個變量的cas操作,分攤到對多個變量的cas操作,這樣就可以提高并發度,想擷取最終的值時,隻需要把多個變量的值加在一起即可。這就是LongAdder實作高并發的秘密

并發原子類:都有了AtomicLong,為什麼還要提供LongAdder?

除去LongAdder外,還新提供了一個類LongAccumulator。LongAccumulator比LongAdder的功能呢更強大

LongAdder隻能進行累加操作,并且初始值預設為0。而LongAccumulator可以自己定義一個二進制操作符,并且可以傳入一個初始值。

我們來看一下用LongAccumulator實作累乘的操作

LongAccumulator sum2 = new LongAccumulator((a, b) -> a * b, 1);
for (int i = 1; i < 5; i++) {
    sum2.accumulate(i);
}
// 24
// 4 * 3 * 2 * 1
System.out.println(sum2);      

除此之外還有DoubleAdder和DoubleAccumulator類

DoubleAdder的實作思路LongAdder類似,因為沒有double類型的cas函數,是以DoubleAdder底層也是用long實作的,隻不過多了long和double的互相轉換

DoubleAccumulator的實作思路和LongAccumulator類似,隻是多了一個二進制操作符

LongAdder如何實作高性能的?

LongAdder實作高并發的秘密就是用空間換時間,對一個值的cas操作,變成對多個值的cas操作,當擷取數量的時候,對這多個值加和即可。

并發原子類:都有了AtomicLong,為什麼還要提供LongAdder?

具體到源碼就是

  1. 先對base變量進行cas操作,cas成功後傳回
  2. 對線程擷取一個hash值(調用getProbe),hash值對數組長度取模,定位到cell數組中的元素,對數組中的元素進行cas

其中對base值的操作是由Striped64的實作類來實作的,而對cell數組的操作則由Striped64來實作

增加數量

// LongAdder
public void increment() {
    add(1L);
}      
// LongAdder
public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    // 數組為空則先對base進行一波cas,成功則直接退出
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
            longAccumulate(x, null, uncontended);
    }
}      

當數組不為空,并且根據線程hash值定位到數組某個下标中的元素不為空,對這個元素cas成功則直接傳回,否則進入longAccumulate方法

并發原子類:都有了AtomicLong,為什麼還要提供LongAdder?
  1. cell數組已經初始化完成,主要是在cell數組中放元素,對cell數組進行擴容等操作
  2. cell數組沒有初始化,則對數組進行初始化
  3. cell數組正在初始化,這時其他線程利用cas對baseCount進行累加操作
// Striped64
final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    int h;
    if ((h = getProbe()) == 0) {
        ThreadLocalRandom.current(); // force initialization
        h = getProbe();
        wasUncontended = true;
    }
    // 往數組中放元素是否沖突
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        if ((as = cells) != null && (n = as.length) > 0) {
            if ((a = as[(n - 1) & h]) == null) {
                // 有線程在操作數組cellsBusy=1
                // 沒有線程在操作數組cellsBusy=0
                if (cellsBusy == 0) {       // Try to attach new Cell
                    Cell r = new Cell(x);   // Optimistically create
                    if (cellsBusy == 0 && casCellsBusy()) {
                        boolean created = false;
                        try {               // Recheck under lock
                            Cell[] rs; int m, j;
                            // // 和單例模式的雙重檢測一個道理
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        // 成功在數組中放置元素
                        if (created)
                            break;
                        continue;           // Slot is now non-empty
                    }
                }
                collide = false;
            }
            // cas baseCount失敗
            // 并且往CounterCell數組放的時候已經有值了
            // 才會重新更改wasUncontended為true
            // 讓線程重新生成hash值,重新找下标
            else if (!wasUncontended)       // CAS already known to fail
                wasUncontended = true;      // Continue after rehash
            // cas數組的值
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))
                break;
            // 其他線程把數組位址改了(有其他線程正在扣哦榮)
            // 數組的數量>=CPU的核數
            // 不會進行擴容
            else if (n >= NCPU || cells != as)
                collide = false;            // At max size or stale
            else if (!collide)
                collide = true;
            // collide = true(collide = true會進行擴容)的時候,才會進入這個else if 
            // 上面2個else if 是用來控制collide的
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    if (cells == as) {      // Expand table unless stale
                        Cell[] rs = new Cell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        cells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            h = advanceProbe(h);
        }
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            boolean init = false;
            try {                           // Initialize table
                if (cells == as) {
                    Cell[] rs = new Cell[2];
                    rs[h & 1] = new Cell(x);
                    cells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;                          // Fall back on using base
    }
}      

擷取數量

base值+Cell數組中的值即可

// LongAdder
 public long sum() {
     Cell[] as = cells; Cell a;
     long sum = base;
     if (as != null) {
         for (int i = 0; i < as.length; ++i) {
             if ((a = as[i]) != null)
                 sum += a.value;
         }
     }
     return sum;
 }      

需要注意的是,調用sum()傳回的數量有可能并不是目前的數量,因為在調用sum()方法的過程中,可能有其他數組對base變量或者cell數組進行了改動

// AtomicLong
public final long getAndIncrement() {
    return unsafe.getAndAddLong(this, valueOffset, 1L);
}      

而AtomicLong#getAndIncrement方法則會傳回遞增之後的準确值,因為cas是一個原子操作

最後告訴大家一個小秘密,jdk1.8中ConcurrentHashMap對元素個數的遞增和統計操作的思想和LongAdder一摸一樣,代碼基本相差無幾,有興趣的可以看看。

計數用synchronized,AtomicLong,還是LongAdder?

在很多系統中都用到了計數的功能,那麼計數我們應該用synchronized,AtomicLong,LongAdder中的哪一個呢?來跑個例子

public class CountTest {
    
    private int count = 0;

    @Test
    public void startCompare() {
        compareDetail(1, 100 * 10000);
        compareDetail(20, 100 * 10000);
        compareDetail(30, 100 * 10000);
        compareDetail(40, 100 * 10000);
    }

    /**
     * @param threadCount 線程數
     * @param times 每個線程增加的次數
     */
    public void compareDetail(int threadCount, int times) {
        try {
            System.out.println(String.format("threadCount: %s, times: %s", threadCount, times));
            long start = System.currentTimeMillis();
            testSynchronized(threadCount, times);
            System.out.println("testSynchronized cost: " + (System.currentTimeMillis() - start));

            start = System.currentTimeMillis();
            testAtomicLong(threadCount, times);
            System.out.println("testAtomicLong cost: " + (System.currentTimeMillis() - start));

            start = System.currentTimeMillis();
            testLongAdder(threadCount, times);
            System.out.println("testLongAdder cost: " + (System.currentTimeMillis() - start));
            System.out.println();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void testSynchronized(int threadCount, int times) throws InterruptedException {
        List<Thread> threadList = new ArrayList<>();
        for (int i = 0; i < threadCount; i++) {
            threadList.add(new Thread(()-> {
                for (int j = 0; j < times; j++) {
                    add();
                }
            }));
        }
        for (Thread thread : threadList) {
            thread.start();
        }
        for (Thread thread : threadList) {
            thread.join();
        }
    }
    
    public synchronized void add() {
        count++;
    }

    public void testAtomicLong(int threadCount, int times) throws InterruptedException {
        AtomicLong count = new AtomicLong();
        List<Thread> threadList = new ArrayList<>();
        for (int i = 0; i < threadCount; i++) {
            threadList.add(new Thread(()-> {
                for (int j = 0; j < times; j++) {
                    count.incrementAndGet();
                }
            }));
        }
        for (Thread thread : threadList) {
            thread.start();
        }
        for (Thread thread : threadList) {
            thread.join();
        }
    }

    public void testLongAdder(int threadCount, int times) throws InterruptedException {
        LongAdder count = new LongAdder();
        List<Thread> threadList = new ArrayList<>();
        for (int i = 0; i < threadCount; i++) {
            threadList.add(new Thread(()-> {
                for (int j = 0; j < times; j++) {
                    count.increment();
                }
            }));
        }
        for (Thread thread : threadList) {
            thread.start();
        }
        for (Thread thread : threadList) {
            thread.join();
        }
    }
}      
threadCount: 1, times: 1000000
testSynchronized cost: 187
testAtomicLong cost: 13
testLongAdder cost: 15

threadCount: 20, times: 1000000
testSynchronized cost: 829
testAtomicLong cost: 242
testLongAdder cost: 187

threadCount: 30, times: 1000000
testSynchronized cost: 232
testAtomicLong cost: 413
testLongAdder cost: 111

threadCount: 40, times: 1000000
testSynchronized cost: 314
testAtomicLong cost: 629
testLongAdder cost: 162      

并發量比較低的時候AtomicLong優勢比較明顯,因為AtomicLong底層是一個樂觀鎖,不用阻塞線程,不斷cas即可。但是在并發比較高的時候用synchronized比較有優勢,因為大量線程不斷cas,會導緻cpu持續飙高,反而會降低效率