上一篇博客中,了解了put方法,结尾的时候,有addCount()方法,今天继续学习。
在map中,容易出现线程安全的,除了put()方法的操作,另一个就是map的扩容机制。 接下来就深入理解concurrentHashMap中的的扩容机制。
在调用addCount()方法的时候,会涉及到transfer()方法的调用,而该方法就是我们今天要深入学习的扩容方法。
transfer()
- 表示 扩容之前table的数组的长度; stride 表示分配给线程任务的步长;
- 当nextTab == null 成立:表示当前线程为触发本次扩容,需要做一些扩容准备工作;不成立:表示当前线程需要扩容
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
- nextn 表示 新数组的长度;
- FWD节点:当某个桶位数据处理完毕后,将此桶位设置为fwd节点,其他写线程或读线程看到后,会有不同的逻辑
5.推进标记 / 表示完成标记
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
6.接下来在for循环中,i 表示分配给当前线程的任务,执行到的桶位; bound: 表示分配给当前线程任务的下界限制
在循环里面,有几处节点,第一是while循环;第二个是if (i < 0 || i >= n || i + n >= nextn) 及其他;
先说while循环这个方法,该方法有三个作用
1)给当前线程发配任务区间
2)维护当前线程任务进度(i表示当前线程处理的桶位)
3)维护当前map对象全局范围内的进度
7. nextIndex 表示分配任务开始的下标; nextBound 表示分配任务结束的下标
8.该while 里面的第一个if 成立的话,表示当前线程任务的尚未完成,还有相应的区间的桶位需要处理 --i 就让当前线程处理下一个桶位
if (--i >= bound || finishing)
advance = false;
- 第二个if 前置条件: 当前线程任务已经完成,或者 未分配;条件成立表示对象全局范围内的桶位都已经分配完毕了,没有区间分配了,则设置当前线程的i变量为 -1,跳出循环后,执行退出偏移任务相关的程序.
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
10.第三个if 前置条件 1,当前线程需要分配任务区间,2.全局范围内,还有桶位尚未迁移; 如果条件为true 说明给当前线程分配任务成功
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
- 接下来主要是描述里面的另一个方法,if(i < 0 || i >= n || i + n >= nextn ); 在该方法中 , sizeCtl 的值为 扩容数组大小的0.75的阈值
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
12.接下来的方法,与上面两个方法不同的是,接下来的步骤主要是进行桶位数据的偏移工作处理,处理完毕后,advance 为true,表示继续推荐,然后就会回到for里面; 能进入到下面的步骤中的前置条件为:当前线程任务尚未完成,正在进行中
- CASE 2: 为true时, 说明当前桶位未存放数据,只需要将此设置为fwd节点即可
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
14.CASE 3: 为true时,说明当前桶位已经迁移过了,当前线程不用处理,直接再次更新当前线程任务索引 再次处理下一个桶位或者其他操作
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
-
CASE 4 当前桶位有数据,且node节点不是fwd节点,说明这些数据需要迁移;
该方法中会分别处理链表桶位;和红黑树的代理结点
进入到该方法中,先进行synchtonized进行锁住头结点,再进行处理业务时,又进行了一次判断;判断当前头对象,是否是加锁对象,这样做的原因是,防止在加锁头对象之前,当前桶位头对象被其他线程修改过,导致目前加锁对象错误。
16.链表桶位进行偏移处理
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
- 红黑树 代理结点 TreeBin
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
整个扩容的机制流程如图所示
具体的描述如下:
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
// n 表示 扩容之前table的数组的长度
// stride 表示分配给线程任务的步长
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 条件成立:表示当前线程为触发本次扩容,需要做一些扩容准备工作
// 不成立: 表示当前线程是写出扩容的线程
if (nextTab == null) { // initiating
try {
// 春暖国家一个比扩容之前大一倍的table
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
// 赋值给对象属性 nextTable,翻倍协助扩容线程,拿到新表
nextTable = nextTab;
// 记录迁移数据整体位置的一个标记,index 技术是从1开始计算的
transferIndex = n;
}
// 表示新表数组的长度
int nextn = nextTab.length;
// FWD节点:当某个桶位数据处理完毕后,将此桶位设置为fwd节点,其他写线程或读线程看到后,会有不同的逻辑
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 推进标记
boolean advance = true;
// 表示完成标记
boolean finishing = false; // to ensure sweep before committing nextTab
// i 表示分配给当前线程的任务,执行到的桶位
// bound: 表示分配给当前线程任务的下界限制
for (int i = 0, bound = 0;;) {
// f 表示桶位头结点
// fh 表示头结点的hash
Node<K,V> f; int fh;
// 1.给当前线程发配任务区间
// 2.维护当前线程任务进度(i表示当前线程处理的桶位)
// 3. 维护当前map对象全局范围内的进度
while (advance) {
// 分配任务区间的变量
// nextIndex 分配任务开始的下标
// nextBound 分配任务结束的下标
int nextIndex, nextBound;
//CASE 1:
// 条件一: --i >= bound 成立:表示当前线程任务的尚未完成,还有相应的区间的桶位需要处理 --i 就让当前线程处理下一个桶位
// 不成立: 当前线程任务已经完成,或者 未分配
if (--i >= bound || finishing)
advance = false;
// CASE 2: 前置条件: 当前线程任务已经完成,或者 未分配
// 条件成立,表示对象全局范围内的桶位都已经分配完毕了,没有区间分配了
// 则设置当前线程的i变量为 -1,跳出循环后,执行退出偏移任务相关的程序
// 条件不成立: 说明对象全局范围内还有未分配的桶位,
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// CASE 3: 前置条件 1,当前线程需要分配任务区间,2.全局范围内,还有桶位尚未迁移
// true: 说明给当前线程分配任务成功
// false: 说明分配给当前线程失败,应该是其他线程发生了竞争
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// 处理线程任务后,线程退出transfer方法的逻辑
// 条件一成立: 表示当前线程未分配到任务
if (i < 0 || i >= n || i + n >= nextn) {
// 保存sizeCtl 的变量
int sc;
//
if (finishing) {
nextTable = null;
table = nextTab;
// sizeCtrl = 0.75 * 2n 扩容后的数组的大小的 0.75的阈值
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// true:说明设置sizeCtl 低16位 -1 成功,当前线程可以正常退出
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// (sc - 2) != resizeStamp(n) 为true时: 说明当前线程不是最后一个结束的线程
// false时: 是退出transfer任务的线程
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
// 正常退出
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
// 下方表示: 线程处理一个桶位数据的偏移工作,处理完毕以后,
// advance 为true,表示继续推荐,然后就会回到for里面
// 前置条件: 当前线程任务尚未完成,正在进行中
//CASE 2:
// true: 说明当前桶位未存放数据,只需要将此设置为fwd节点即可
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// CASE 3:
// true: 说明当前桶位已经迁移过了,当前线程不用处理,直接再次更新当前线程任务索引 再次处理下一个桶位或者其他操作
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
// 前置条件: 当前桶位有数据,且node节点不是fwd节点,说明这些数据需要迁移
// CASE 4
else {
// 加锁当前桶位的头结点
synchronized (f) {
// 判断当前头对象,是否是加锁的头对象,
// 防止在加锁头对象之前,当前桶位头对象被其他线程修改过,导致目前加锁对象错误
if (tabAt(tab, i) == f) {
// ln 表示低位链表引用
// hn 表示高位链表引用
Node<K,V> ln, hn;
// 条件成立: 表示当前桶位是链表桶位
if (fh >= 0) {
// lastRun 机制:
// 可以获取出当前链表末尾连续高位不变的node
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
// 条件成立: 说明lastRun的链表为 低位链表,那么就是让 ln 指向低位链表
if (runBit == 0) {
ln = lastRun;
hn = null;
}
// 否则,说明lastRun引用链表为 高位链表, 就让hn指向高位链表
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
// 条件成立: 表示当前桶位是红黑树 代理结点 TreeBin
else if (f instanceof TreeBin) {
// 转换头结点为 TreeBin 引用 t
TreeBin<K,V> t = (TreeBin<K,V>)f;
// lo : 低位双向链表,指向低位链表的头
// loTail : 指向低位链表的尾巴
// hi: 高位双向链表,指向高位链表的头
// hiTail: 指向高位链表的尾巴
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
// lc 表示低位链表元素数量
// hc 表示高位链表元素数量
int lc = 0, hc = 0;
// 迭代TreeBin中的双向链表,从头结点 至 尾结点
for (Node<K,V> e = t.first; e != null; e = e.next) {
// h 表示当前循环节点的 hash值
int h = e.hash;
// p 使用当前节点构建出来的新的 TreeNode
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
// 条件成立, 表示当前循环节点,属于低位链表节点
if ((h & n) == 0) {
// 条件成立:说明低位链表 没有数据
if ((p.prev = loTail) == null)
lo = p;
// 低位链表 已经有数据了,此时当前元素 追加到低位链表的末尾
else
loTail.next = p;
// 将低位链表 尾指针 指向 p 节点
loTail = p;
++lc;
}
// 否则,当前节点属于高位链表节点
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}