天天看点

jdk1.8中对ConcurrentHashMap的理解(三)

上一篇博客中,了解了put方法,结尾的时候,有addCount()方法,今天继续学习。

在map中,容易出现线程安全的,除了put()方法的操作,另一个就是map的扩容机制。 接下来就深入理解concurrentHashMap中的的扩容机制。

在调用addCount()方法的时候,会涉及到transfer()方法的调用,而该方法就是我们今天要深入学习的扩容方法。

transfer()

jdk1.8中对ConcurrentHashMap的理解(三)
  1. 表示 扩容之前table的数组的长度; stride 表示分配给线程任务的步长;
  1. 当nextTab == null 成立:表示当前线程为触发本次扩容,需要做一些扩容准备工作;不成立:表示当前线程需要扩容
if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;
        }
           
  1. nextn 表示 新数组的长度;
  1. FWD节点:当某个桶位数据处理完毕后,将此桶位设置为fwd节点,其他写线程或读线程看到后,会有不同的逻辑

5.推进标记 / 表示完成标记

boolean advance = true;
 boolean finishing = false; // to ensure sweep before committing nextTab
           

6.接下来在for循环中,i 表示分配给当前线程的任务,执行到的桶位; bound: 表示分配给当前线程任务的下界限制

在循环里面,有几处节点,第一是while循环;第二个是if (i < 0 || i >= n || i + n >= nextn) 及其他;

先说while循环这个方法,该方法有三个作用

1)给当前线程发配任务区间

2)维护当前线程任务进度(i表示当前线程处理的桶位)

3)维护当前map对象全局范围内的进度

7. nextIndex 表示分配任务开始的下标; nextBound 表示分配任务结束的下标

8.该while 里面的第一个if 成立的话,表示当前线程任务的尚未完成,还有相应的区间的桶位需要处理 --i 就让当前线程处理下一个桶位

if (--i >= bound || finishing)
     advance = false;
           
  1. 第二个if 前置条件: 当前线程任务已经完成,或者 未分配;条件成立表示对象全局范围内的桶位都已经分配完毕了,没有区间分配了,则设置当前线程的i变量为 -1,跳出循环后,执行退出偏移任务相关的程序.
else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
           

10.第三个if 前置条件 1,当前线程需要分配任务区间,2.全局范围内,还有桶位尚未迁移; 如果条件为true 说明给当前线程分配任务成功

else if (U.compareAndSwapInt
                        (this, TRANSFERINDEX, nextIndex,
                                nextBound = (nextIndex > stride ?
                                        nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
           
  1. 接下来主要是描述里面的另一个方法,if(i < 0 || i >= n || i + n >= nextn ); 在该方法中 , sizeCtl 的值为 扩容数组大小的0.75的阈值
if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
           

12.接下来的方法,与上面两个方法不同的是,接下来的步骤主要是进行桶位数据的偏移工作处理,处理完毕后,advance 为true,表示继续推荐,然后就会回到for里面; 能进入到下面的步骤中的前置条件为:当前线程任务尚未完成,正在进行中

  1. CASE 2: 为true时, 说明当前桶位未存放数据,只需要将此设置为fwd节点即可
else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
           

14.CASE 3: 为true时,说明当前桶位已经迁移过了,当前线程不用处理,直接再次更新当前线程任务索引 再次处理下一个桶位或者其他操作

else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
           
  1. CASE 4 当前桶位有数据,且node节点不是fwd节点,说明这些数据需要迁移;

    该方法中会分别处理链表桶位;和红黑树的代理结点

    进入到该方法中,先进行synchtonized进行锁住头结点,再进行处理业务时,又进行了一次判断;判断当前头对象,是否是加锁对象,这样做的原因是,防止在加锁头对象之前,当前桶位头对象被其他线程修改过,导致目前加锁对象错误。

16.链表桶位进行偏移处理

if (fh >= 0) {
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
           
  1. 红黑树 代理结点 TreeBin
else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
           

整个扩容的机制流程如图所示

jdk1.8中对ConcurrentHashMap的理解(三)

具体的描述如下:

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        // n 表示 扩容之前table的数组的长度
        // stride 表示分配给线程任务的步长
        int n = tab.length, stride;

        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range

        // 条件成立:表示当前线程为触发本次扩容,需要做一些扩容准备工作
        // 不成立: 表示当前线程是写出扩容的线程
        if (nextTab == null) {            // initiating
            try {
                // 春暖国家一个比扩容之前大一倍的table
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            // 赋值给对象属性 nextTable,翻倍协助扩容线程,拿到新表
            nextTable = nextTab;
            // 记录迁移数据整体位置的一个标记,index 技术是从1开始计算的
            transferIndex = n;
        }
        // 表示新表数组的长度
        int nextn = nextTab.length;
        // FWD节点:当某个桶位数据处理完毕后,将此桶位设置为fwd节点,其他写线程或读线程看到后,会有不同的逻辑
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        // 推进标记
        boolean advance = true;
        // 表示完成标记
        boolean finishing = false; // to ensure sweep before committing nextTab

        // i 表示分配给当前线程的任务,执行到的桶位
        // bound: 表示分配给当前线程任务的下界限制
        for (int i = 0, bound = 0;;) {
            // f 表示桶位头结点
            // fh 表示头结点的hash
            Node<K,V> f; int fh;


            // 1.给当前线程发配任务区间
            // 2.维护当前线程任务进度(i表示当前线程处理的桶位)
            // 3. 维护当前map对象全局范围内的进度
            while (advance) {
                // 分配任务区间的变量
                // nextIndex 分配任务开始的下标
                // nextBound 分配任务结束的下标
                int nextIndex, nextBound;

                //CASE 1:
                // 条件一:  --i >= bound 成立:表示当前线程任务的尚未完成,还有相应的区间的桶位需要处理 --i 就让当前线程处理下一个桶位
                // 不成立: 当前线程任务已经完成,或者 未分配
                if (--i >= bound || finishing)
                    advance = false;
                // CASE 2: 前置条件:  当前线程任务已经完成,或者 未分配
                // 条件成立,表示对象全局范围内的桶位都已经分配完毕了,没有区间分配了
                // 则设置当前线程的i变量为 -1,跳出循环后,执行退出偏移任务相关的程序
                // 条件不成立: 说明对象全局范围内还有未分配的桶位,
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }

                // CASE 3: 前置条件 1,当前线程需要分配任务区间,2.全局范围内,还有桶位尚未迁移
                // true: 说明给当前线程分配任务成功
                // false: 说明分配给当前线程失败,应该是其他线程发生了竞争
                else if (U.compareAndSwapInt
                        (this, TRANSFERINDEX, nextIndex,
                                nextBound = (nextIndex > stride ?
                                        nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }

            // 处理线程任务后,线程退出transfer方法的逻辑
            // 条件一成立: 表示当前线程未分配到任务
            if (i < 0 || i >= n || i + n >= nextn) {
                // 保存sizeCtl 的变量
                int sc;
                //
                if (finishing) {
                    nextTable = null;
                    table = nextTab;
                    // sizeCtrl = 0.75 * 2n 扩容后的数组的大小的 0.75的阈值
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }

                //  true:说明设置sizeCtl 低16位 -1 成功,当前线程可以正常退出
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {

                    // (sc - 2) != resizeStamp(n) 为true时: 说明当前线程不是最后一个结束的线程
                    // false时: 是退出transfer任务的线程
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        // 正常退出
                        return;

                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }

            // 下方表示: 线程处理一个桶位数据的偏移工作,处理完毕以后,
            // advance 为true,表示继续推荐,然后就会回到for里面

            // 前置条件: 当前线程任务尚未完成,正在进行中

            //CASE 2:
            // true: 说明当前桶位未存放数据,只需要将此设置为fwd节点即可
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);

            // CASE 3:
            // true: 说明当前桶位已经迁移过了,当前线程不用处理,直接再次更新当前线程任务索引  再次处理下一个桶位或者其他操作
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed

             // 前置条件: 当前桶位有数据,且node节点不是fwd节点,说明这些数据需要迁移
            // CASE 4
            else {
                // 加锁当前桶位的头结点
                synchronized (f) {
                    // 判断当前头对象,是否是加锁的头对象,
                    // 防止在加锁头对象之前,当前桶位头对象被其他线程修改过,导致目前加锁对象错误
                    if (tabAt(tab, i) == f) {
                        // ln 表示低位链表引用
                        // hn 表示高位链表引用
                        Node<K,V> ln, hn;

                        // 条件成立: 表示当前桶位是链表桶位
                        if (fh >= 0) {
                            // lastRun 机制:
                            // 可以获取出当前链表末尾连续高位不变的node
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            // 条件成立: 说明lastRun的链表为 低位链表,那么就是让 ln 指向低位链表
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }

                            // 否则,说明lastRun引用链表为 高位链表, 就让hn指向高位链表
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }

                            //
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }



                        // 条件成立: 表示当前桶位是红黑树 代理结点 TreeBin
                        else if (f instanceof TreeBin) {
                            // 转换头结点为 TreeBin 引用 t
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            // lo : 低位双向链表,指向低位链表的头
                            // loTail : 指向低位链表的尾巴
                            // hi: 高位双向链表,指向高位链表的头
                            // hiTail: 指向高位链表的尾巴
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;

                            // lc 表示低位链表元素数量
                            // hc 表示高位链表元素数量
                            int lc = 0, hc = 0;

                            // 迭代TreeBin中的双向链表,从头结点 至 尾结点
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                // h 表示当前循环节点的 hash值
                                int h = e.hash;
                                // p 使用当前节点构建出来的新的 TreeNode
                                TreeNode<K,V> p = new TreeNode<K,V>
                                        (h, e.key, e.val, null, null);

                                // 条件成立, 表示当前循环节点,属于低位链表节点
                                if ((h & n) == 0) {
                                    // 条件成立:说明低位链表  没有数据
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    // 低位链表 已经有数据了,此时当前元素 追加到低位链表的末尾
                                    else
                                        loTail.next = p;

                                    // 将低位链表 尾指针 指向 p 节点
                                    loTail = p;

                                    ++lc;
                                }

                                // 否则,当前节点属于高位链表节点
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }


                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                    (hc != 0) ? new TreeBin<K,V>(lo) : t;

                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                    (lc != 0) ? new TreeBin<K,V>(hi) : t;


                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }