天天看點

并發讀源碼——并發讀源碼Striped64/LongAdder/DoubleAdder/LongAccumulator/DoubleAccumulator1、LongAdder原理介紹2、LongAdder源碼介紹3、LongAdder并發時應用4、DoubleAdder分析5、LongAccumulator分析6、DoubleAccumulator分析

文章目錄

  • 1、LongAdder原理介紹
  • 2、LongAdder源碼介紹
  • 3、LongAdder并發時應用
  • 4、DoubleAdder分析
  • 5、LongAccumulator分析
  • 6、DoubleAccumulator分析

1、LongAdder原理介紹

LongAdder是JDK8中新增加的一個并發類,LongAdder主要用于對long型變量進行CAS(CAS可以參考前面系列文章)操作,但AtomicLong也是對long型變量進行CAS操作,兩者什麼差別呢?

AtomicLong内部維護了一個volatile long類型的變量,多個線程并發CAS操作該變量,而LongAdder把一個long變量拆分出多個long變量,采用分段鎖的方式對多個變量進行并發。

在LongAdder内部,一個long型變量被拆分成了一個base變量和一組Cell,每個Cell内部維護了一個long型變量,多個線程并發時,如果并發低,直接累加到base變量,并發高沖突大時,平攤到一組Cell中,最後取值時,再把base和全部的Cell中的value值進行 累加求和。

上述原理内容在Striped64元子類中的longAccumulate方法實作的,LongAdder繼承了Striped64,LongAdder隻是一個元子類并發的架構子,其分段鎖核心思想均在父類Striped64中longAccumulate方法實作。Striped64内部維護了一個base和一組Cell,當線程執行時會首先 更新base變量,如果更新失敗,認為目前線程有競争,就會對Cell進行初始化或擴容,然後根據目前線程的探針計算hash值映射到一組Cell的下标中,如果多個線程競争同一個Cell導緻更新Cell失敗,會擴容Cell的數組并重新hash數組的下标,不同的并發線程分散到不同的Cell中執行。

這種分段鎖的思想,在并發低的時候,不如AtomicLong的效率高,因為并發低時,LongAdder還要把base和各個Cell累加起來;當并發非常高時,LongAdder的效率就顯現出來了,是以LongAdder适合大規模并發統計的場景。

2、LongAdder源碼介紹

首先分析LongAdder的源碼

public class LongAdder extends Striped64 implements Serializable {
    private static final long serialVersionUID = 7249069246863182397L;

    public LongAdder() {
    }

    /*
     * LongAdder繼承了Striped64,是以同時繼承了Striped64中的屬性cells、base、cellsBusy
     * */
    public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        /*
         **首次執行的時候,都是試着把x加到base上面,首次執行時cell都是null,如果casBase失敗,則産生了競争,進入longAccumulate處理
         **非首次執行,cells數組已經初始化過了,目前線程hash對應下标的cell為空,則進入longAccumulate建立cell,并加入cells;如果不為空,則把x值加到對應的cell商
         **如果x加到cell上失敗,說明多線程産生了競争,進入longAccumulate重試或者擴容
         *
         **在取cells下标時,是根據目前線程的hash,即getProbe()&m,getProbe為目前線程的探針值
         */
        if ((as = cells) != null || !casBase(b = base, b + x)) {	
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                longAccumulate(x, null, uncontended);
        }
    }

    /**
     * 加1
     */
    public void increment() {
        add(1L);
    }

    /**
     * 減1
     */
    public void decrement() {
        add(-1L);
    }

    /**
     * 把cells中所有的value值累加到base上傳回結果
     * 該方法局限性:不能有競争線程存在
     */
    public long sum() {
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

    /**
     * 該方法重置base以及cells中所有value的值為0,相當于建立了一個新的LongAdder
     * 但是該方法時有局限性的,在重置時不能有并發存在,否則會導緻base或cells中values未能有效置0
     */
    public void reset() {
        Cell[] as = cells; Cell a;
        base = 0L;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    a.value = 0L;
            }
        }
    }

    /**
     * 先把cells中所有values累加到base中,然後重置base和cells中所有value值為0
     * 該方法局限性:不能有競争線程存在
     */
    public long sumThenReset() {
        Cell[] as = cells; Cell a;
        long sum = base;
        base = 0L;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null) {
                    sum += a.value;
                    a.value = 0L;
                }
            }
        }
        return sum;
    }

    public String toString() {
        return Long.toString(sum());
    }

    /**
     * 求出base以及cells中所有value總和
     */
    public long longValue() {
        return sum();
    }

    /**
     * 把sum總和向下轉為int類型,如果超出Int的最大範圍,則高位截斷
     */
    public int intValue() {
        return (int)sum();
    }

    /**
     * sum總和轉化為float類型
     */
    public float floatValue() {
        return (float)sum();
    }

    /**
     * sum總和轉化為double類型
     */
    public double doubleValue() {
        return (double)sum();
    }

    /*
     * 序列化代理内部類
     * 在對LongAdder序列化時,不再直接對LongAdder進行序列化,會暴露LongAdder中各種屬性,通過SerializationProxy類,隻把LongAdder中的sum總和進行序列化,隐藏了LongAdder的結構
     * */
    private static class SerializationProxy implements Serializable {
        private static final long serialVersionUID = 7249069246863182397L;

        /**
         * The current value returned by sum().
         * @serial
         */
        private final long value;

        SerializationProxy(LongAdder a) {
            value = a.sum();
        }

        /*
         * 反序列化時,讀取該方法進行 反序列化
         * */
        private Object readResolve() {
            LongAdder a = new LongAdder();
            a.base = value;
            return a;
        }
    }

    /*
     * 調用該方法,對LongAdder的序列化代理内部類SerializationProxy進行序列化
     * */
    private Object writeReplace() {
        return new SerializationProxy(this);
    }

    /*
     * 如果反序列結果與序列化結果不一緻,調用該方法抛出無效對象異常
     * */
    private void readObject(java.io.ObjectInputStream s)
        throws java.io.InvalidObjectException {
        throw new java.io.InvalidObjectException("Proxy required");
    }

}
           

可以看出LongAdder内部的核心方法是add,在并發比較高時,add方法調用了父類Strped64中的方法longAccumulate。下面分析下Strped64源碼

@SuppressWarnings("serial")
abstract class Striped64 extends Number {

	/*
	 * Cell為Striped64的内部類,Cell中維護了一個value變量值,當多線程并發時,把要競争的資源攤分到不同的Cell中進行執行,減少并發
	 * *後續對value操作都是對value在Cell中記憶體偏移量進行操作的
	 * */
    @sun.misc.Contended static final class Cell {
        volatile long value;
        Cell(long x) { value = x; }
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }

        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

    /*擷取所在機器的CPU數量*/
    static final int NCPU = Runtime.getRuntime().availableProcessors();

    /**
     * cells數組中容量是2的倍數
     */
    transient volatile Cell[] cells;

    /**
     * *從最開始沒有競争時,對base進行累加或累減操作
     */
    transient volatile long base;

    /**
     * *相當于一個鎖标志,當對cells進行擴容或建立時,需要先擷取cellBusy鎖,如果擷取不到,說明有其他線程已經擷取了
     */
    transient volatile int cellsBusy;

    Striped64() {
    }

    /**
     * * 對base進行CAS操作
     */
    final boolean casBase(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
    }

    /**
     * *CELLSBUSY是cellsBusy的記憶體偏移量
     * *對CELLSBUSY做CAS操作,修改成功,cellsBusy的值為1,表示拿到鎖
     */
    final boolean casCellsBusy() {
        return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
    }

    /**
     * *傳回目前線程的探針
     */
    static final int getProbe() {
        return UNSAFE.getInt(Thread.currentThread(), PROBE);
    }

    /**
     **利用xorshift算法生成僞随機數,生成目前線程的探針值
     */
    static final int advanceProbe(int probe) {
        probe ^= probe << 13;   // xorshift
        probe ^= probe >>> 17;
        probe ^= probe << 5;
        UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
        return probe;
    }

    /*
     * *該函數用于處理涉及cells初始化、擴容、更新情況,對于更新cells進行從試或擴容
     * */
    final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
       /*如果目前線程還未hash,初始化線程的hash探針*/ 
    	int h;
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); // force initialization
            h = getProbe();
            wasUncontended = true;
        }
        boolean collide = false;        //collide=false表示cells不會擴容,collide=true表示cells會擴容
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            if ((as = cells) != null && (n = as.length) > 0) {
            	/*
            	 * 當cells不為null時,對目前線程探針作hash作為cells的下标,确定下标後,x的值就要作用到該指定位置的Cell上
            	 * 但是當cells指定位置的Cell為null時,就要在該坐标位置建立Cell
            	 * 在建立Cell時一定要先擷取cellsBusy鎖
            	 * */
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        Cell r = new Cell(x);   // Optimistically create
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                /*
                 * 在LongAdder的add方法中,在對cells做CAS時失敗,wasUncontended=false傳入到該longAccumulate方法,說明發生了線程競争
                 * 現在置wasUncontended=true,重新對cells執行
                 * */
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                /*
                 * 對線程探針hash後的值對應cells的坐标有值,則對該坐标處的Cell執行CAS操作
                 * 如果fn==null,則直接把x累加到a中value上;如果fn!=null,則對v和x執行功能函數fn,執行後的值更新到a中的value值上
                 * */
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                /*
                 * 如果a做CAS失敗,說明線程發生了競争,多個線程在競争a資源
                 * 此時判斷是否對cells擴容:如果n>=CPU的個數,不擴容;或者其他線程已經修改了cells,也不擴容
                 * */
                else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale
                /*
                 * 如果判斷不對cells進行擴容,則把擴容标志collide設定true,從新進行自旋,也即執行for循環
                 * */
                else if (!collide)
                    collide = true;
                /*
                 * 如果在執行前面(a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))操作失敗後,即更新Cell失敗,
                 * 又滿足擴容條件,即n<NCPU,則對cells進行擴容一倍,并把原來cells中内容複制到新擴容後的cells中
                 * */
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == as) {      // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                /*
                 * a.cas執行失敗,又不滿足擴容條件時,重新hash目前線程探針值,并從新執行自旋操作,即for循環
                 * */
                h = advanceProbe(h);
            }
            /*
             * cells為空時,在LongAdder中執行add方法時,首先加到base上,加到base上失敗,即有競争時,走該分支
             * *初始化cells,cells初始值容量是2,後續擴容時乘2,
             * 
             * *在初始化cells時,首先拿到cellsBusy鎖,即cellsBuys為0時,證明沒有其它線程競争,後續才可以初始化cells,因為其它線程此時也可能在初始化cells,
             * *隻能由一個線程初始化cells,是以要先判斷是否獲得鎖
             * */
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            /*
             * *如果前一步初始化cells失敗,即沒有拿到cellsBusy鎖,說明有其它線程已經拿到了cellsBusy鎖,正在初始化cells
             * *在此種情況下,會把x的值加到base上
             * */
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
    }

    /**
     *  doubleAccumulate與longAccumulate操作一緻
     *  隻不過longAccumulate時處理long型分段鎖的,doubleAccumulate是執行double類型分段鎖的,
     *  但是doubleAccumulate中double類型也是轉換成long類型,然後用分段鎖的,最後再把執行結果轉換成double類型
     */
    final void doubleAccumulate(double x, DoubleBinaryOperator fn,
                                boolean wasUncontended) {
        int h;
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); // force initialization
            h = getProbe();
            wasUncontended = true;
        }
        boolean collide = false;                // True if last slot nonempty
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            if ((as = cells) != null && (n = as.length) > 0) {
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        Cell r = new Cell(Double.doubleToRawLongBits(x));
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                else if (a.cas(v = a.value,
                               ((fn == null) ?
                                Double.doubleToRawLongBits
                                (Double.longBitsToDouble(v) + x) :
                                Double.doubleToRawLongBits
                                (fn.applyAsDouble
                                 (Double.longBitsToDouble(v), x)))))
                    break;
                else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale
                else if (!collide)
                    collide = true;
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == as) {      // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = advanceProbe(h);
            }
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(Double.doubleToRawLongBits(x));
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            else if (casBase(v = base,
                             ((fn == null) ?
                              Double.doubleToRawLongBits
                              (Double.longBitsToDouble(v) + x) :
                              Double.doubleToRawLongBits
                              (fn.applyAsDouble
                               (Double.longBitsToDouble(v), x)))))
                break;                          // Fall back on using base
        }
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long BASE;
    private static final long CELLSBUSY;
    private static final long PROBE;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> sk = Striped64.class;
            BASE = UNSAFE.objectFieldOffset
                (sk.getDeclaredField("base"));
            CELLSBUSY = UNSAFE.objectFieldOffset
                (sk.getDeclaredField("cellsBusy"));
            Class<?> tk = Thread.class;
            PROBE = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomProbe"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }

}

}
           

3、LongAdder并發時應用

比如對銀行中熱點賬戶問題,一個火爆的理财賬戶對外進行搶售賣(假設不設額度上限),如果隻有一個線程對該賬戶執行,多個客戶同時進行購買理财時就會導緻其他幾個客戶購買失敗,造成非常壞的體驗,利用本節的LongAdder,把一個熱點賬戶拆分成幾個賬戶同時對外售賣,就可以大大減少并發失敗率。

package com.lzj.atomic.striped64;

import java.util.concurrent.atomic.LongAdder;

public class LongAdderDemo {

	private static LongAdder adder = new LongAdder();  
	
	
	public static void main(String[] args) throws InterruptedException {
		
		Runnable run1 = new Runnable() {
			@Override
			public void run() {
				for(int i=0; i<100; i++) {
					adder.add(100); 	//假設線程1每次賣理财100,統計100次
				}
			}
		};
		
		Runnable run2 = new Runnable() {
			@Override
			public void run() {
				for(int i=0; i<100; i++) {
					adder.add(50);		//假設線程2每次賣理财50,統計100次
				}
			}
		};

		Thread thread1= new Thread(run1);
		Thread thread2 = new Thread(run2);
		thread1.start();
		thread2.start();
		thread1.join();
		thread2.join();
		System.out.println("理财賬戶總共賣出理财 :" + adder.sum());
	}

}
           

線程執行結束後,輸出如下,可見所有客戶購買理财成功

4、DoubleAdder分析

LongAdder是處理long型變量分段鎖并發的,而DoubleAdder内部是處理double類型分段鎖并發的,兩者原理完全一緻,用法也完全一緻,DoubleAdder内部的核心方法也是add方法,該方法并發高時,調用了父類Striped64中的doubleAccumulate方法。

并且DoubleAdder的代碼與LongAdder的代碼都一緻,兩者都繼承了Striped64基類,DoubleAdder内部就是利用long型分段鎖進行并發的,并發結束後,又把long型變量轉化了double類型,可見DoubleAdder内部也是利用LongAdder進行并發計算的。

5、LongAccumulator分析

LongAccumulator與LongAdder原理類似,都是調用了父類Striped64中longAccumulate方法完成分段鎖的更新,隻是LongAccumulator的功能更強大。

LongAdder隻能進行累加操作,并且初始值為0;LongAccumulator可以進行兩個二維元素的操作,初始值可以自定義。LongAdder在調用Striped64中longAccumulate中方法時,傳入的LongBinaryOperator對象為null,導緻LongAdder隻能進行累加減操作,因為LongAdder在執行CAS的時候執行的casBase(b, b+x),或者a.cas(v = a.value, v + x);而LongAccumulator在調用Striped64中longAccumulate中方法時,自定義了一個對二維元素操作的LongBinaryOperator對象,可以對這個二維元素進行任意操作,因為LongAccumulator在CAS時候執行的casBase(b,r),或者a.cas(v=a.value,r),其中r=LongBinaryOperator.applyAsLong(base,x),或者LongBinaryOperator(a.value,x)。

用法示例如下,假設對一個指定因子,用多個線程對其翻倍處理,示例中因子為1

public class LongAccumulatorDemo {

	public static void main(String[] args) throws InterruptedException  {
		LongBinaryOperator operator = (x, y) -> x*y;
		LongAccumulator accumulator = new LongAccumulator(operator, 1);
		
		Runnable runn1 = new Runnable() {
			@Override
			public void run() {
				for(int i=0; i<10; i++) {
					accumulator.accumulate(2);
				}
			}
		};
		
		Runnable runn2 = new Runnable() {
			@Override
			public void run() {
				for(int i=0; i<10; i++) {
					accumulator.accumulate(2);
				}
			}
		};
		
		Thread thread1 = new Thread(runn1);
		Thread thread2 = new Thread(runn2);
		thread1.start();
		thread2.start();
		thread1.join();
		thread2.join();
		System.out.println("并發結果為:" + accumulator.get());
	}

}
           

執行結果如下:

6、DoubleAccumulator分析

DoubleAccumulator原理與用法與LongAccumulator完全一緻,隻是DoubleAccumulator用來處理double類型的數值,LongAccumulator用來處理long類型的數值。其實DoubleAccumulator内部與DoubleAdder一樣都是先轉化為Long類型處理,結果再轉化為Double類型。

繼續閱讀