天天看點

【JDK專題】——JDK資料結構——ConcurrentHashMap源碼剖析

ConcurrentHashMap——設計本意

HashMap的線程安全問題:

(1)初始化的時候容易造成兩個線程同時初始化 (2)擴容的時候,容易數字的實時性和判斷代碼的線程問題

(3)對于連結清單和紅黑樹的插入這種全局性影響結果的操作會産生線程安全,比如循環連結清單;

HashTable的性能低效問題:

hashTable直接所有線程共用一個把鎖,put\get\resize串行執行。不同key之間也會造成競争關系,性能太低
【JDK專題】——JDK資料結構——ConcurrentHashMap源碼剖析
【JDK專題】——JDK資料結構——ConcurrentHashMap源碼剖析

ConcurrentHashMap——版本對比

【JDK專題】——JDK資料結構——ConcurrentHashMap源碼剖析
此圖原文:https://blog.csdn.net/programmer_at/article/details/79715177

ConcurrentHashMap——适用場景

HashTable:強一緻性;get\put串行執行;不适合大量的插入,以後可能不同key之間進行隔離也可以優化一下;

ConcurrentHashMap:弱一緻性,需要減少哈希沖突;不然也會産生重量鎖,連結清單和紅黑樹其實算局部性影響結果,是以可能以後版本更新會在這裡優化一下;

ConcurrentHashMap——資料結構

nextTable屬性

/**
     * @遷移中轉地
     * 擴容時,将table中的元素遷移至nextTable;先擴容為兩倍
     * 擴容後,将nextTable清空
     */
    private transient volatile Node<K,V>[] nextTable;

           

sizeCtl屬性

/**
     * @擴容狀态标記/擴容閥值
     * 這是一個聯合意義的資料,可能作為狀态标記;也可能作為閥值判斷;
     * 多線程之間,以volatile的方式讀取sizeCtl屬性,來判斷ConcurrentHashMap目前所處的狀态。通過cas設定sizeCtl屬性,告知其他線程ConcurrentHashMap的狀态變更
     * 未初始化:sizeCtl=0:表示沒有指定初始容量。sizeCtl>0:表示初始容量。
     * 初始化中:sizeCtl=-1,标記作用,告知其他線程,正在初始化
     * 正常狀态:sizeCtl=0.75n ,擴容門檻值
     * 擴容中: sizeCtl < 0 : 表示有其他線程正在執行擴容;sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2 :表示此時隻有一個線程在執行擴容
     */
    private transient volatile int sizeCtl;
           

transferIndex屬性

/**
     * 擴容線程每次最少要遷移16個hash桶
     */
    private static final int MIN_TRANSFER_STRIDE = 16;
    /**
     * @分割遷移任務的數組指針
     * 高并發清空下,多線程擴容會産生沖突;與其阻塞其他線程擴容,倒不如讓其他線程協助擴容
     * 是以首先将哈希按MIN_TRANSFER_STRIDE,16個哈希桶為機關分割為多個遷移任務
     * transferIndex代表目前執行任務的位置,當然去擷取擴容任務也會産生并發沖突是以用cas解決
     * 擴容之前:transferIndex 在數組的最右邊
     * 擴容中:transferIndex=transferIndex-stride(stride是是以首先将哈希按MIN_TRANSFER_STRIDE的整數倍,代表可能不隻一次一個機關)
     * 由此可以看出遷移是從後往前遷移
     */
    private transient volatile int transferIndex;
           

ForwardingNode内部類

标記作用,表示其他線程正在擴容,并且此節點已經擴容完畢
關聯了nextTable,擴容期間可以通過find方法,通路已經遷移到了nextTable中的資料
 static final class ForwardingNode<K,V> extends Node<K,V> {
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
            //hash值為MOVED(-1)的節點就是ForwardingNode
            super(MOVED, null, null, null);
            this.nextTable = tab;
        }
        //通過此方法,通路被遷移到nextTable中的資料
        Node<K,V> find(int h, Object k) {
           ...
        }
 }
 主要展現在get方法
  public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
                (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            /**
             *@eh.hash<0,這樣就可以在擴容期間通路已經遷移号的資料了
             */
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {
                if (e.hash == h &&
                        ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }
           

協助擴容源代碼:

final V putVal(K key, V value, boolean onlyIfAbsent) {
      ...
       //f.hash == MOVED 表示為:ForwardingNode,說明其他線程正在擴容
       else if ((fh = f.hash) == MOVED)
           tab = helpTransfer(tab, f);
      ...
   }

           

ConcurrentHashMap——插入元素

final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        /**
         * @哈希均勻處理
         */
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            /*
             * tab 數組的引用
             * n 數組的長度引用
             * i 計算出來的哈希值
             * f 每次找到的目前節點的周遊
             * fh 目前節點的原哈希值
             */
            Node<K,V> f; int n, i, fh;
            /**
             * @(1)數組為空就是懶加載,初始化一波
             */
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            /**
             * @(2)表達目前這個位置是空的
             * 直接利用cas更新節點,避免已經被插入了
             */
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                        new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            /**
             * @(3)表示目前這個位置正在擴容
             * 擴容操作在外部進行為了避免線程安全,會在此處先占位等待擴容完成後自己再加入
             * 擴容好的節點連結清單或紅黑樹
             */
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            /**
             * @(4)表示目前這個位置是紅黑樹和連結清單
             * 由于并發問題連結清單和紅黑樹是全局性會影響插入結果
             * 是以插入過程中不能有最好不要有節點發生指針改變synchronized鎖住
             * 是以經常哈希沖突會影響性能
             * binCount
             */
            else {
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                        ((ek = e.key) == key ||
                                                (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                            value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                    value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        /*
         * 添加數量和擴容判斷
         */
        addCount(1L, binCount);
        return null;
    }
           

ConcurrentHashMap——擷取數量

ConcurrentHashMap的元素個數等于baseCounter和數組裡每個CounterCell的值之和,這樣做的原因是,當多個線程同時執行CAS修改baseCount值,失敗的線程會将值放到CounterCell中。是以統計元素個數時,要把baseCount和counterCells數組都考慮
final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }
           

ConcurrentHashMap——擴容機制

/**
     * x,擴容一個
     * check,檢查原節點的基數(比如連結清單原來是地盤上個,單節點是1個,紅黑樹是節點個數)
     */
    private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        /**
         * @(1)并發基礎size的處理
         * 用于傳回size以及各種擴容的判斷基礎
         * BASECOUNT:偏移位址
         * baseCount:期望值
         * s:最終值等于原值+1
         * counterCells as 更新的失敗線程
         */
        if ((as = counterCells) != null ||
                !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                    (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                    !(uncontended =
                            U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {

                /**
                 * @多線程修改baseCount時,用LongAdder方法實作數字并發,修改完後直接return整個方法
                 * 拆分成多個cell然後合并數字增加并發效率
                 * 最後也是直接return
                 */
                fullAddCount(x, uncontended);//
                return;
            }
            /**
             * @并發失敗一次,如果隻是單節點增加
             * 也是直接return,否則就将并發修改的數量累加起來進行擴容判斷
             */
            if (check <= 1)
                return;
            s = sumCount();
        }
        /**
         * @并發擴容的處理
         * 沒有baseCount的并發問題就進入擴容判斷流程
         */
        if (check >= 0) {
            /**
             * tab 數組指針
             * nt 指向分段數組的指針
             * n 數組的長度指針
             * sc 擴容閥值
             */
            Node<K,V>[] tab, nt; int n, sc;
            /**
             * 滿足擴容閥值進行擴容操作
             */
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                    (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n);
                /**
                 * @sc<0表示需要進行并發擴容了
                 * 并發擴容時,nextTable可能還未必初始化或已經被初始化是以帶個nt過來
                 */
                if (sc < 0) {
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                            sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                            transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                /**
                 *  @利用時間戳判斷此時隻有一個線程正在進行擴容
                 */
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                        (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }

           
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        /**
         * @n: 原table數組長度
         * @nextTab: 是用來辨別是不是第一個進來擴容的線程,是的話就需要初始化一下nextTale,由于volative是實時共享,但是我們隻想知道移動前是不是null是以直接将指針複制進來
         * @stride: 利用cpu空閑情況,算出此處線程應該應該拿到多個個任務
         */
        int n = tab.length, stride;
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE;
        /**
         * @(1)為空就建構一個nextTable為table的兩倍
         */
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;//将transferIndex等于數組長度
        }
        /**
         * @建構一個死循環完成複制功能
         */
        int nextn = nextTab.length;
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        boolean advance = true;
        boolean finishing = false;
        /**
         * @nextn: 擴容的數組長度;
         * @fwd: 将剛複制好的talbe包裝一個ForwardingNode節點
         * @advance:
         * @finishing: 所有的節點都已經完成複制工作的标記
         * @i: 每個線程的内置的計數器,用于判斷自己的任務執行到哪裡
         * @bound: 每個線程的内置的哈希數量,用于判斷自己的任務是否執行完
         * f 擴容線程會根據transferIndex任務标記号,周遊拿到屬于自己多個節點
         * fh 每次周遊的節點的哈希
         */
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            /**
             * @(1)主要将transferIndex同步給bound,然後不斷i說明任務執行的進度
             * 每個擴容線程先拿到任務号transferIndex
             * 讓并發的擴容線程擷取到transferIndex拿到遷移任務
             */
            while (advance) {
                //更新遷移索引i。
                int nextIndex, nextBound;
                /**
                 * 執行完了标記需要直接退出循環
                 */
                if (--i >= bound || finishing)
                    advance = false;
                /**
                 * transferIndex<=0表示已經沒有需要遷移的hash桶,将i置為-1,線程準備退出
                 */
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                /*
                 * 更新自己的bound嘗試更新transferIndex
                 */
                else if (U.compareAndSwapInt
                        (this, TRANSFERINDEX, nextIndex,
                                nextBound = (nextIndex > stride ?
                                        nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            /**
             * @利用算法判斷是否所有哈希捅被遷移完了
             */
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                /**
                 * @如果已經完成複制
                 * 就将nextTable指派為null,table指針指向我們的nextTab
                 * 然後擴容閥值增大
                 */
                if (finishing) {
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                 /**
                     第一個擴容的線程,執行transfer方法之前,會設定 sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2)
                     後續幫其擴容的線程,執行transfer方法之前,會設定 sizeCtl = sizeCtl+1
                     每一個退出transfer的方法的線程,退出之前,會設定 sizeCtl = sizeCtl-1
                     那麼最後一個線程退出時:
                     必然有sc == (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2),即 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT
                    */
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            /**
             * 空節點直接插入
             */
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            /**
             * 如果周遊到ForwardingNode節點 ,說明這個點已經被處理過,直接跳過
             */
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            /**
             * 非空節點擴容需要加鎖,但很明細對象鎖自己的線程有效(每個線程的任務分割不一樣),實作了分段鎖的
             */
            else {
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        if (fh >= 0) {
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                        (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                    (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                    (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }
           

ConcurrentHashMap——關鍵梳理

分段鎖的實作:jdk7是分割成二重數組後,用内置的lock鎖實作;而jdk8是利用tranferIndex進行任務分割,每個關鍵鎖隻鎖自己的任務哈希捅實作分段鎖;

插入的時候:如果标記為Forward節點就幫其擴容,增加擴容效率;除此以外普通節點直接cas,紅黑樹和連結清單就是用關鍵字鎖住自己的節點,鎖的粒度更小

擷取的時候:如果正在擴容可以利用Forward節點的find方法快速擷取,增強一緻性;

更新數量:如果并發更新因為争搶一個數組就會生成cell線程,類似于longadder的并發實作原理來合并最後的更新結果;

size的傳回:是以線程的真實數量就是baseCount加上還在進行fullAddCount的線程的數量之和;

擴容機制:首先用nextTab判斷null來決定初始化兩倍的nextTab數組;然後用兩個計數器嗎,一個代表目前哈希遷移執行進度,一個代表擷取到index邊界;strcip任務總體大小依據cpu閑置情況判斷,然後首先每個線程會不斷進行cas擷取到自己的分割任務的大小tranferIndex,然後同步給bound當然做了一些處理,最後就不斷for循環+i,直到處理完就目前線程就完成擴容任務,可能會繼續擷取下一個任務,finshing用于判斷所有節點是否遷移完;

GodSchool

緻力于簡潔的知識工程,輸出高品質的知識産出,我們一起努力

部落客私人微信:supperlzf

【JDK專題】——JDK資料結構——ConcurrentHashMap源碼剖析

繼續閱讀