天天看點

longAdder 學習筆記

現在notion寫的筆記,然後遷移過來,注釋不對等湊合看,好不好嘛

相對于 atmoic,并發性能更牆,追求最終一緻性,但是單一操作沒有atmoic性能好

它是将所有并發所要儲存的值放在一個base中, ,然後進行增減工作

将并發的線程,設計為一個數組,并将所要累加的值放入base中,base是cas操作,根據數組中元素是否為空,來進行最終的增改,多線程較多情況下,還會進行擴容,依然是二的次方幂,主要采用多重判斷,和手動加鎖機制,來進行

主方法add()

public void add(long x) {
            Cell[] as; long b, v; int m; Cell a;
    				// 數組已經初始化,初始化直接走第二個條件寫入資料
    				// 寫入資料失敗,發生了競争或者需要擴容
            if ((as = cells) != null || !casBase(b = base, b + x)) {
    						// 預設無競争
                boolean uncontended = true;
    
    						// 1 : 數組未初始化,需要初始化
    						// 2 : 目前要放入下标值為空,需要建立
    					  // 3 : 寫入值是發生了競争關系
                if (as == null || (m = as.length - 1) < 0 ||
                    (a = as[getProbe() & m]) == null ||
                    !(uncontended = a.cas(v = a.value, v + x)))
                    longAccumulate(x, null, uncontended);
            }
        }
           

核心方法longAccumulate()

// wasUncontended目前線程競争失敗才會是true
    final void longAccumulate(long x, LongBinaryOperator fn,
                                  boolean wasUncontended) {
            int h;
    				// 目前線程hash值還未配置設定
            if ((h = getProbe()) == 0) {
                ThreadLocalRandom.current(); // force initialization
                h = getProbe();
    						// 預設情況下放在0位置,未初始化,不當做一次競争
                wasUncontended = true;
            }
    				// false 表示會擴容, true 可能會擴容
            boolean collide = false;                // True if last slot nonempty
            for (;;) {
    						// as 目前cell數組,cell目前需要使用的cell,n cell數組的長度,v 期望值
                Cell[] as; Cell a; int n; long v;
    						// 數組已經初始化,而且目前數組下标資料為null,需要将元素防到相應位置
                if ((as = cells) != null && (n = as.length) > 0) {
    								// 1.1 : 下标位置為空,可以進行指派
                    if ((a = as[(n - 1) & h]) == null) {
    										// 目前處于未加鎖狀态
                        if (cellsBusy == 0) {       // Try to attach new Cell
                            Cell r = new Cell(x);   // Optimistically create
    												// 多線程多重判斷,未加鎖狀态,并且擷取鎖
                            if (cellsBusy == 0 && casCellsBusy()) {
    														// 建立新元素是否成功
                                boolean created = false;
                                try {               // Recheck under lock
                                    Cell[] rs; int m, j;
    																//重複判斷多線程情況下,該位置是否被其他線程所指派,放置資料覆寫
                                    if ((rs = cells) != null &&
                                        (m = rs.length) > 0 &&
                                        rs[j = (m - 1) & h] == null) {
                                        rs[j] = r;
                                        created = true;
                                    }
                                } finally {
    																// 不論是否放置元素成功,都要釋放鎖
                                    cellsBusy = 0;
                                }
                                if (created)
                                    break;
    														// 未成功,自旋
                                continue;           // Slot is now non-empty
                            }
                        }
                        collide = false;
                    }
    								// 1.2 多線程時寫入資料失敗才 為false
                    else if (!wasUncontended)       // CAS already known to fail
                        wasUncontended = true;      // Continue after rehash
    								// 1.3 目前線程rehash過 線程hash值, 并且新命中的cell數組下标已經存在值
    								// true 則表示設定成功 false 則表示依然存在競争
                    else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                                 fn.applyAsLong(v, x))))
                        break;
    								// 1.4 n大于cpu總值,不再擴容了, cells 已經被其線程擴容過了,rehash重試
                    else if (n >= NCPU || cells != as)
                        collide = false;            // At max size or stale
    								// 1.5 -> 1.4 之後近來,将該值設定true,有擴容意向
                    else if (!collide)
                        collide = true;
    								// 1.6 擴容
                    else if (cellsBusy == 0 && casCellsBusy()) {
                        try {
    												// 沒有被其他線程擴容
                            if (cells == as) {      // Expand table unless stale
    														// 擴容
                                Cell[] rs = new Cell[n << 1];
                                for (int i = 0; i < n; ++i)
                                    rs[i] = as[i];
                                cells = rs;
                            }
                        } finally {
    												// 擴容是否成功都需要釋放鎖
                            cellsBusy = 0;
                        }
    										// 關閉擴容意向
                        collide = false;
    										// 自旋重試
                        continue;                   // Retry with expanded table
                    }
    
    								// 重置線程hash值
                    h = advanceProbe(h);
                }
    						// 初始化cell數組
                else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                    boolean init = false;
                    try {                           // Initialize table
                        if (cells == as) {
                            Cell[] rs = new Cell[2];
                            rs[h & 1] = new Cell(x);
                            cells = rs;
                            init = true;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    if (init)
                        break;
                }
    						// 其他線程正在初始化cell數組,需要将值累加到base
    						// 線程被上鎖,搶占失敗,需要将值累加到base
                else if (casBase(v = base, ((fn == null) ? v + x :
                                            fn.applyAsLong(v, x))))
                    break;                          // Fall back on using base
            }
        }