目录
1、addCount
2、tryPresize / helpTransfer
3、transfer
4、remove / replace / replaceAll / clear
5、get / getOrDefault
6、compute
7、merge
8、Traverser / keys / elements
9、总结
本篇博客继续上一篇《Java8 ConcurrentHashMap(一) 源码解析》讲解ConcurrentHashMap的扩容,元素替换和查找等方法的实现。
1、addCount
addCount用于增加键值对个数,如果check大于1,则检查当前是否需要扩容,如果需要则扩容。
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells) != null || 如果counterCells不为null,该属性默认为null
//如果counterCells为null,则将baseCount原子加x,baseCount默认是0,如果修改失败进入if分支
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 || //如果as未初始化
(a = as[ThreadLocalRandom.getProbe() & m]) == null || //当前线程对应的CounterCell未初始化
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { //设置对应CounterCell的value失败
//初始化counterCells或者当前线程对应的CounterCell,修改对应CounterCell的value
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return; //不需要检查负载率,则直接返回
s = sumCount(); //计算当前的键值对个数
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
//如果当前键值对个数达到容量上限了且数组长度小于最大值
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
//计算扩容后的大小
int rs = resizeStamp(n);
if (sc < 0) { //当前正在执行扩容
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0) //上述条件成立表示转移的任务已分配完了或者已扩容完成
break;
//sc+1表示有一个新的线程进来了,退出时会减1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
//帮助转移扩容前数组元素中的节点到扩容后的数组元素中
transfer(tab, nt);
}
//rs << RESIZE_STAMP_SHIFT) + 2算出来的是一个负值,表示准备执行扩容
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2)) //cas修改成功,由当前线程负责扩容
//执行扩容
transfer(tab, null);
s = sumCount();
}
}
}
//修改当前线程对应的CounterCell的value,加上x
//如果当前线程的probe未初始化,则初始化,
//如果counterCells未初始化或者当前线程对应的CounterCell未初始化则初始化
//如果counterCells数组的长度小于CPU个数则扩容
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
if ((h = ThreadLocalRandom.getProbe()) == 0) {
//当前线程的probe未初始化,则强制初始化probe
ThreadLocalRandom.localInit();
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
if ((as = counterCells) != null && (n = as.length) > 0) {
//counterCells已初始化
if ((a = as[(n - 1) & h]) == null) {
//当前线程对应的CounterCell未初始化
if (cellsBusy == 0) { // 如果未加锁
CounterCell r = new CounterCell(x); // Optimistic create
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { //获取锁,cas加锁,只有一个线程操作成功
boolean created = false;
try { //成功加锁后,再次检查当前线程对应的CounterCell是否为null
CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
//如果为nul,则赋值
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0; //释放锁,该变量是volatile,可保证对其他CPU立即可见
}
if (created)
break; //创建了一个CounterCell则终止循环
continue; //再次检查时,对应的CounterCell不为null,则继续下一次循环
}//如果抢占锁失败,继续下一次for循环
}//如果锁被占用了
collide = false;
}
//当前线程对应的CounterCell已初始化
else if (!wasUncontended) //如果是wasUncontended,将其置为true,然后通过advanceProbe重新hash
wasUncontended = true;
//wasUncontended为true
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) //修改value成功,终止循环
break;
else if (counterCells != as || n >= NCPU) //counterCells扩容了,发生变更
collide = false; // At max size or stale
else if (!collide) //collide为false后,置为true,通过advanceProbe重新hash
collide = true;
//如果counterCells没变,n小于NCPU,则进入此分支扩容
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { //加锁成功
try {
if (counterCells == as) {//再次校验counterCells没有变更
CounterCell[] rs = new CounterCell[n << 1]; //扩容一倍
for (int i = 0; i < n; ++i) //将原来的CounterCell复制到新的数组中
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0; //释放锁
}
collide = false;
continue; // Retry with expanded table
}
h = ThreadLocalRandom.advanceProbe(h);
} //外层if结束
//counterCells未初始化
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { //未加锁,尝试加锁成功
boolean init = false;
try { // Initialize table
if (counterCells == as) { //再次检查counterCells未初始化
//初始化
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x); //赋值
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break; //已完成初始化,则终止循环
}
//上述else分支尝试加锁失败,则原子修改baseCount
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
} //for循环结束
}
//获取当前总的键值对个数
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
//将baseCount同各CounterCell的value累加
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
static final int resizeStamp(int n) {
//RESIZE_STAMP_BITS的值是16
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
2、tryPresize / helpTransfer
tryPresize会判断table是否初始化,如果未初始化则完成初始化,接着判断容量是否充足,如果不足则执行扩容,扩容的逻辑跟上面的addCount一致。helpTransfer方法会判断当前Map是否正在扩容,如果在扩容且有未分配的转移任务,则会调用transfer方法帮忙转移老数组中的节点到扩容后的新数组中。
private final void tryPresize(int size) {
//根据size计算对应的容量,如果找过MAXIMUM_CAPACITY的一半,则为MAXIMUM_CAPACITY
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
//如果未扩容
Node<K,V>[] tab = table; int n;
if (tab == null || (n = tab.length) == 0) {
//如果数组未初始化,此时sc记录了初始容量,取sc和c的最大值
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //原子的修改为-1,表示正在初始化中
try {
if (table == tab) { //再次检查是否未初始化
@SuppressWarnings("unchecked")
//初始化数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
//sc就是n的四分之三左右,即默认负载率0.75
sc = n - (n >>> 2);
}
} finally {
//更新sizeCtl,键值对个数超过此值则扩容
sizeCtl = sc;
}
}
}
//如果table已初始化,如果当前Map容量足够或者达到最大容量了,终止循环
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
else if (tab == table) { //容量不够,可以继续扩容,校验table是否变更
//resizeStamp和下面的RESIZE_STAMP_SHIFT配合使用,计算出一个负值
int rs = resizeStamp(n);
if (sc < 0) {
//如果正在扩容
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
//上述条件成立表示转移的任务已分配完了或者已扩容完成
break;
//sc+1表示有一个新的线程进来了,退出时会减1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
//帮助转移老数组中的元素到扩容后的新数组中
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2)) //sc成功cas成负值
//由当前线程负责扩容
transfer(tab, null);
}
}
}
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
//如果f是有效的ForwardingNode
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) { //如果扩容还未完成
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break; //所有转移任务已分配
//sc+1,表示当前线程会帮助转移老数组元素中的节点
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
//如果扩容已完成
return nextTab;
}
return table;
}
3、transfer
transfer方法就是执行扩容的核心了,扩容时会将原来的数组切分成多个段,每个调用transfer方法的线程会分配其中的一段,然后遍历其中的数组元素,如果数组元素为null,则将其初始化成ForwardingNode表示该数组元素正在迁移的过程中,不能往里面插入新节点;如果数组元素的hash值是MOVED表示已经处理过了;如果是hash值大于0,表示一个链表;否则就是一颗红黑树。无论是链表还是红黑树,都需要将节点的hash值与原来的容量重新计算,因为原来的容量都是2的整数次幂,所以计算结果是0或者1,就将计算结果是0和是1的节点分开,构成一个新的链表或者一颗新的红黑树,计算结果是0的则放在原来的位置不变,计算结果是1的则放在当前数组元素的索引加上原来容量对应的索引处。当某个线程处理完分配该线程的所有数组元素时就会重新申请一个新的段,如果都分配完了,则退出;最后一个退出的线程会负责将扩容后的新数组赋值给table并重新计算下一次扩容的阈值。注意当某个数组元素中的节点都被转移后会将其置为ForwardingNode。注意在转移某个数组元素中的节点时,老数组中的节点和链表关系都未发生变更,都是利用原来节点的属性重新创建了一个新节点,新节点构成了一个新链表或者新红黑树,等所有节点都转移完成才会将老数组元素置为ForwardingNode,该元素原来的链表或者红黑树还是没变,即扩容期间是允许其他线程并行的查找的。如果查找时发现数组数组是ForwardingNode,则会在扩容的新数组中查找。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//如果是多核的,stride等于n/(8*NCPU),如果结果低于MIN_TRANSFER_STRIDE,则为MIN_TRANSFER_STRIDE
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
//新数组的容量直接扩大一倍
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) {
//因为内存不足导致数组分配失败,此时并没有抛出异常终止进程,而是捕获了
//将触发扩容的阈值设置为int的最大值,避免再次触发扩容
sizeCtl = Integer.MAX_VALUE;
return;
}
//新数组分配成功,保存新数组和原数组的长度
nextTable = nextTab;
transferIndex = n;
}
//如果nextTab不为null则直接进入下面的逻辑
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
//通过while循环计算i和bound
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing) //i减一后大于等于bound则继续处理i对应的数组元素,不用重新分配一个stride范围的数组元素
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1; //所有的转移任务都分配完了
advance = false;
}
else if (U.compareAndSwapInt //cas修改transferIndex
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
//计算bound和i,将advance置为false,退出while循环
//下一次for循环时就进入上面的if分支,直到i减1小于bound后又会重新进入此else if分支
//这里实际是将i到bound之间的数组元素分配给当前线程,由当前线程负责转移这些数组元素中包含的节点
//如果有其他线程进来,因为transferIndex已经cas修改过了就会分配另外一个stride范围的数组元素
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
} //while结束
//i小于0
if (i < 0 || i >= n || i + n >= nextn) {
//所有任务已分配或者当前线程分配的任务已执行完成
int sc;
if (finishing) {
//如果已经完成扩容
nextTable = null;
table = nextTab;
//n*2-n/2,结果就是2n的四分之三
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//有一个新的线程来并行执行transfer时会将sc加1,此时减1表示当前线程的任务执行完成
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
//还有其他线程未完成任务,直接退出,最后一个退出的线程负责将finishing置为true
return;
//如果上面的if条件不成立,即sc - 2与后面的值相等,说明其他并行执行transfer的线程已经退出
//整个扩容已完成,则将finishing = advance置为true
finishing = advance = true;
i = n; //重新for循环,下一次for循环时就进入是上面的if分支,修改table和sizeCtl
}
}
//i大于0且i小于n的,即i是扩容前的数组的某个索引
else if ((f = tabAt(tab, i)) == null) //如果对应的数组元素为null,将初始化成fwd
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED) //如果对应的数组元素不为null且状态是MOVED
advance = true; // already processed
else {
//如果对应的数组元素状态正常
synchronized (f) {
//加锁成功
if (tabAt(tab, i) == f) { //再次检查数组元素是否变更
Node<K,V> ln, hn;
if (fh >= 0) { //如果是正常的链表
//n是2的整数倍,计算出来的结果要么是0,要么是1
//扩容一倍后计算hash,计算结果是0的依然还是在原来的位置,计算结果是1的在原来的索引的基础上加上原来的容量
int runBit = fh & n;
Node<K,V> lastRun = f;
//遍历f后面的节点,找到最后一个重新hash后不等于runBit的节点
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) { //如果b不等于runBit,更新runBit
runBit = b;
lastRun = p;
}
}
//ln就是 fh & n为0的节点构成的链表的头元素,hn就是fh & n为1的节点构成的链表的头元素
if (runBit == 0) {
ln = lastRun; //此时lastRun及其后面的多个节点fh & n都为0
hn = null;
}
else {
hn = lastRun; //此时lastRun及其后面的多个节点,fh & n都为1
ln = null;
}
//从f开始往后遍历直到lastRun,lastRun之后的多个节点其计算结果是一致的,要么是0,要么是1,不用再次遍历
//注意此处数组元素中链表的节点和链表关系都未发生变更,即扩容时老数组还是可以支持查找的
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln); //新节点插入到ln的前面
else
hn = new Node<K,V>(ph, pk, pv, hn); //新节点插入到hn的前面
}
//将ln对应的链表放到i
setTabAt(nextTab, i, ln);
//将hn对应的链表放到i+n
setTabAt(nextTab, i + n, hn);
//原数组索引为i的位置放上fwd
setTabAt(tab, i, fwd);
//重新计算i,处理原数组中下一个元素
advance = true;
}
else if (f instanceof TreeBin) { //如果是红黑树
TreeBin<K,V> t = (TreeBin<K,V>)f;
//lo是loTail链表的第一个节点
TreeNode<K,V> lo = null, loTail = null;
//ho是hiTail链表中的第一个节点
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
//从first开始往后遍历,first是最近才插入红黑树的节点
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
//创建一个新的TreeNode,此时next节点和父节点都是null
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
//插入到loTail的后面
if ((p.prev = loTail) == null)//loTail为空
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
//h& n等于1
//插入到hiTail的后面
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//如果节点数量低于UNTREEIFY_THRESHOLD,则转换成链表,否则转换成红黑树
//untreeify通过lo或者hi的next属性遍历
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t; //如果hc等于0,说明所有节点都在lo中,即不需要移动节点了,原来的红黑树可以不变
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
//将ln保存到索引为i处
setTabAt(nextTab, i, ln);
//将hn保存到索引为i+n处
setTabAt(nextTab, i + n, hn);
//原来的数组i处设置成fwd
setTabAt(tab, i, fwd);
//重新计算i,处理原数组中下一个元素
advance = true;
}
}
}//释放锁
}//else结束
} //for循环结束
}
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
4、remove / replace / replaceAll / clear
//移除指定的key对应的键值对,返回原来的value
public V remove(Object key) {
return replaceNode(key, null, null);
}
//移除key和value都相等的键值对,返回true表示移除成功
public boolean remove(Object key, Object value) {
if (key == null)
throw new NullPointerException();
return value != null && replaceNode(key, null, value) != null;
}
//将指定的key对应的键值对的value改成指定值,相当于put,返回原来的值
public V replace(K key, V value) {
if (key == null || value == null)
throw new NullPointerException();
return replaceNode(key, value, null);
}
//将key和value都匹配的键值对的value替换成newValue,返回true表示替换成功
public boolean replace(K key, V oldValue, V newValue) {
if (key == null || oldValue == null || newValue == null)
throw new NullPointerException();
return replaceNode(key, newValue, oldValue) != null;
}
//用于替换所有的节点,替换过程中如果value发生变更会重试
public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
if (function == null) throw new NullPointerException();
Node<K,V>[] t;
if ((t = table) != null) {
//创建一个遍历器
Traverser<K,V> it = new Traverser<K,V>(t, t.length, 0, t.length);
//advance方法返回下一个节点
for (Node<K,V> p; (p = it.advance()) != null; ) {
V oldValue = p.val;
for (K key = p.key;;) {
//计算新值
V newValue = function.apply(key, oldValue);
if (newValue == null)
throw new NullPointerException();
if (replaceNode(key, newValue, oldValue) != null ||
(oldValue = get(key)) == null)
//如果替换成功或者该key对应的值为null则终止内层for循环
//注意执行replaceNode时,该key对应的value可能发生变更,此replaceNode返回null,get(key)不为null
//然后借助get(key)读取了最新的value,通过内层for循环重试
break;
}
}
}
}
public void clear() {
long delta = 0L; // negative number of deletions
int i = 0;
Node<K,V>[] tab = table;
while (tab != null && i < tab.length) {
int fh;
Node<K,V> f = tabAt(tab, i);
if (f == null) //数组元素为空,处理下一个
++i;
else if ((fh = f.hash) == MOVED) { //正在扩容的过程中
tab = helpTransfer(tab, f);
i = 0; //扩容完成,从0开始重新清理
}
else {
synchronized (f) { //加锁
if (tabAt(tab, i) == f) { //f未发生变更
//如果f是链表则p等于f,如果f是红黑树,则p等于first节点,红黑树内部维护的链表
Node<K,V> p = (fh >= 0 ? f :
(f instanceof TreeBin) ?
((TreeBin<K,V>)f).first : null);
//遍历链表,统计总的节点数
while (p != null) {
--delta;
p = p.next;
}
//数组元素置为null,链表中的节点没有了数组元素的引用,会自动被GC回收掉
setTabAt(tab, i++, null);
}
}
}
}
if (delta != 0L)
//元素个数减少
addCount(delta, -1);
}
final V replaceNode(Object key, V value, Object cv) {
//计算key的hash值
int hash = spread(key.hashCode());
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0 || //table未初始化,Map为空
(f = tabAt(tab, i = (n - 1) & hash)) == null) //对应的数组为null
break; //上述两种情形下肯定没有匹配的元素,终止循环,返回null
else if ((fh = f.hash) == MOVED)
//如果正在扩容中,帮助完成扩容,扩容未完成期间会一直进入此分支,不断for循环,直到扩容完成
tab = helpTransfer(tab, f);
else {
V oldVal = null;
boolean validated = false;
synchronized (f) { //加锁
if (tabAt(tab, i) == f) { //检查数组元素是否变更
if (fh >= 0) { //如果是链表
validated = true;
//从f开始往后遍历
for (Node<K,V> e = f, pred = null;;) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) { //hash值和key值相等,找到匹配节点
V ev = e.val;
if (cv == null || cv == ev || //如果没有指定cv或者ev等于cv
(ev != null && cv.equals(ev))) {
oldVal = ev;
if (value != null)
e.val = value; //替换成新值,不用移除
//value为null,移除该节点
else if (pred != null)
pred.next = e.next;
else
setTabAt(tab, i, e.next);
}
break; //终止循环
}
pred = e;
//遍历下一个节点,如果为null则终止循环
if ((e = e.next) == null)
break;
}
}
else if (f instanceof TreeBin) { //如果是红黑树
validated = true;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
if ((r = t.root) != null &&
(p = r.findTreeNode(hash, key, null)) != null) { //通过根节点查找匹配的节点
V pv = p.val;
if (cv == null || cv == pv ||
(pv != null && cv.equals(pv))) { //如果cv为null或者cv等于pv
oldVal = pv;
if (value != null)
p.val = value; //重置val
//value为null,将该节点移除,该方法返回true时是说明树中节点数较少,需要将其转换成链表
else if (t.removeTreeNode(p))
//将其转成链表
setTabAt(tab, i, untreeify(t.first));
}
}
}
}
}
if (validated) { //如果查找过
if (oldVal != null) { //如果找到了目标节点
if (value == null)
addCount(-1L, -1); //如果是移除,则将计数减1
return oldVal; //返回原来的值
}
break;
}
}
}//循环终止
return null;
}
5、get / getOrDefault
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
//计算hash值
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
//table已初始化且对应的数组元素非空,否则返回null
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val; //hash值和key值相等,返回目标节点的value
}
else if (eh < 0)
//如果是ForwardingNode,会在扩容后的新数组中查找
//如果是TreeBin,会在红黑树中查找
//如果是ReservationNode,则返回null
return (p = e.find(h, key)) != null ? p.val : null;
//eh大于0的情形,遍历链表查找
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
//如果查找失败返回默认值,否则返回匹配key的value
public V getOrDefault(Object key, V defaultValue) {
V v;
return (v = get(key)) == null ? defaultValue : v;
}
6、compute
compute类有三个方法,compute,computeIfAbsent和computeIfPresent,compute包含后面两者的逻辑,computeIfAbsent是跟指定key匹配的键值对不存在时才调用remappingFunction,传入的参数为null,如果存在则返回原来的value;computeIfPresent是跟指定key匹配的键值对存在时才调用remappingFunction,传入的参数是原来的value,如果不存在则返回null;compute是先查找匹配的键值对,如果匹配会使用原来的value调用remappingFunction,如果没有匹配的,会有null调用remappingFunction。调用remappingFunction后,返回值不是null时,如果该节点已存在则需要更新value,如果不存在则插入新节点;如果返回值是null,如果该节点已存在则删除该节点,最后都返回remappingFunction的计算结果。
//如果找到了跟key匹配的节点,则使用原来的value调用remappingFunction,否则使用null调用remappingFunction
//remappingFunction返回null表示这个键值对需要被删除,返回非null,表示这个键值对的value需要被更新,如果没有该节点则put一个新节点
//最终返回remappingFunction计算的结果
public V compute(K key,
BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
if (key == null || remappingFunction == null)
throw new NullPointerException();
//计算hash值
int h = spread(key.hashCode());
V val = null;
int delta = 0;
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0) //table未初始化,则初始化talbe
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & h)) == null) { //对应的数组元素未初始化
Node<K,V> r = new ReservationNode<K,V>();
synchronized (r) {
if (casTabAt(tab, i, null, r)) { //数组元素置为ReservationNode
binCount = 1;
Node<K,V> node = null;
try {
if ((val = remappingFunction.apply(key, null)) != null) { //计算value
delta = 1;
node = new Node<K,V>(h, key, val, null); //创建一个新节点
}
} finally {
//更新数组元素
setTabAt(tab, i, node);
}
}
}
if (binCount != 0) //如果上面casTabAt成功,则终止循环,返回val
break;
}
else if ((fh = f.hash) == MOVED) //正在扩容,帮忙完成扩容
tab = helpTransfer(tab, f);
else {
synchronized (f) { //加锁
if (tabAt(tab, i) == f) { //校验数组元素没有变更,如果变更了,下一次for循环重新读取
if (fh >= 0) { //如果是链表
binCount = 1;
for (Node<K,V> e = f, pred = null;; ++binCount) {
K ek;
if (e.hash == h &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
//找到匹配的节点
val = remappingFunction.apply(key, e.val);
if (val != null)
e.val = val; //更新val
else {
//计算的val为空,则移除该节点
delta = -1;
Node<K,V> en = e.next;
if (pred != null)
pred.next = en;
else
setTabAt(tab, i, en);
}
break;
}
pred = e;
//遍历下一个节点,如果遍历到最后一个节点
if ((e = e.next) == null) {
val = remappingFunction.apply(key, null);
if (val != null) {
delta = 1;
//插入一个新节点
pred.next =
new Node<K,V>(h, key, val, null);
}
break;
}
}
}
else if (f instanceof TreeBin) { //如果是红黑树
binCount = 1;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
//找到目标节点
if ((r = t.root) != null)
p = r.findTreeNode(h, key, null);
else
p = null;
V pv = (p == null) ? null : p.val;
val = remappingFunction.apply(key, pv);
if (val != null) {
if (p != null)
p.val = val; //更新val
else {
//插入新节点
delta = 1;
t.putTreeVal(h, key, val);
}
}
//val为空且该节点存在
else if (p != null) {
delta = -1;
if (t.removeTreeNode(p)) //移除该节点,如果元素个数较少,则转换成链表
setTabAt(tab, i, untreeify(t.first));
}
}
}
}
if (binCount != 0) {
//如果是链表,binCount记录了遍历的节点数,如果是红黑树binCount为1
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i); //超过阈值,链表转换成红黑树
break;
}
}
}//for循环结束
if (delta != 0) //要么插入一个新节点,要么删除原来的节点,更新总的键值对个数
addCount((long)delta, binCount);
return val;
}
//逻辑大体相同,不过只有跟key匹配的键值不存在才会调用mappingFunction,使用null来计算,如果返回值不是null则插入一个新节点,最终返回mappingFunction的计算结果
//如果找到了匹配的键值对,则直接返回原来的value
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
if (key == null || mappingFunction == null)
throw new NullPointerException();
int h = spread(key.hashCode());
V val = null;
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0) //初始化数组
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & h)) == null) { //初始化数组元素
Node<K,V> r = new ReservationNode<K,V>();
synchronized (r) {
if (casTabAt(tab, i, null, r)) {
binCount = 1;
Node<K,V> node = null;
try {
if ((val = mappingFunction.apply(key)) != null)
node = new Node<K,V>(h, key, val, null);
} finally {
setTabAt(tab, i, node);
}
}
}
if (binCount != 0)
break;
}
else if ((fh = f.hash) == MOVED) //帮助扩容
tab = helpTransfer(tab, f);
else {
boolean added = false;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) { //链表
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek; V ev;
if (e.hash == h &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
val = e.val;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
//遍历完了,没有找到匹配节点才会调用mappingFunction
if ((val = mappingFunction.apply(key)) != null) {
added = true;
pred.next = new Node<K,V>(h, key, val, null);
}
break;
}
}
}
else if (f instanceof TreeBin) { //红黑树
binCount = 2;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
if ((r = t.root) != null &&
(p = r.findTreeNode(h, key, null)) != null)
val = p.val;
else if ((val = mappingFunction.apply(key)) != null) {
added = true;
t.putTreeVal(h, key, val);
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (!added) //找到了匹配的节点,不需要增加总的键值对个数
return val;
break;
}
}
}
if (val != null) //没有找到匹配的节点,新增了一个节点,键值对个数加1
addCount(1L, binCount);
return val;
}
//只有存在匹配key的键值对才调用remappingFunction计算,使用原来的value计算,结果为非null则修改value,为null则移除该节点,最终返回计算结果
//如果没有匹配的键值对,则返回null
public V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
if (key == null || remappingFunction == null)
throw new NullPointerException();
int h = spread(key.hashCode());
V val = null;
int delta = 0;
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0) //初始化数组
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & h)) == null) //对应数组元素为null,直接返回null
break;
else if ((fh = f.hash) == MOVED) //帮助扩容
tab = helpTransfer(tab, f);
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) { //链表
binCount = 1;
for (Node<K,V> e = f, pred = null;; ++binCount) {
K ek;
if (e.hash == h &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
//只有找到匹配节点才计算
val = remappingFunction.apply(key, e.val);
if (val != null)
e.val = val;
else {
delta = -1;
Node<K,V> en = e.next;
if (pred != null)
pred.next = en;
else
setTabAt(tab, i, en);
}
break;
}
pred = e;
if ((e = e.next) == null)
break;
}
}
else if (f instanceof TreeBin) { //红黑树
binCount = 2;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
if ((r = t.root) != null &&
(p = r.findTreeNode(h, key, null)) != null) {
//匹配到目标节点
val = remappingFunction.apply(key, p.val);
if (val != null)
p.val = val;
else {
delta = -1;
if (t.removeTreeNode(p))
setTabAt(tab, i, untreeify(t.first));
}
}
}
}
}
if (binCount != 0)
break;
}
}
if (delta != 0)
addCount((long)delta, binCount);
return val;
}
7、merge
merge的逻辑跟compute基本一致,区别在于如果没有跟key的键值对,会直接使用value插入一个新的键值对,而compute会使用null算一次,如果结果不为null才插入一个新的键值对。另外如果匹配时,会把匹配的键值对的val和参数中的value都传给remappingFunction计算,计算结果为null则删除节点,不为null则更新val。
public V merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction) {
if (key == null || value == null || remappingFunction == null)
throw new NullPointerException();
int h = spread(key.hashCode());
V val = null;
int delta = 0;
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0) //初始化数组
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & h)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(h, key, value, null))) { //初始化数组元素,如果cas失败则通过下一次for循环修改
delta = 1;
val = value;
break;
}
}
else if ((fh = f.hash) == MOVED) //帮助扩容
tab = helpTransfer(tab, f);
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) { //链表
binCount = 1;
for (Node<K,V> e = f, pred = null;; ++binCount) {
K ek;
if (e.hash == h &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
val = remappingFunction.apply(e.val, value);
if (val != null)
e.val = val; //更新val
else {
//val为空,删除键值对
delta = -1;
Node<K,V> en = e.next;
if (pred != null)
pred.next = en;
else
setTabAt(tab, i, en);
}
break;
}
pred = e;
if ((e = e.next) == null) {
//没有找到匹配键值对,插入一个新节点
delta = 1;
val = value;
pred.next =
new Node<K,V>(h, key, val, null);
break;
}
}
}
else if (f instanceof TreeBin) { //红黑树
binCount = 2;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r = t.root;
TreeNode<K,V> p = (r == null) ? null :
r.findTreeNode(h, key, null);
//没有匹配的节点,val等于value
val = (p == null) ? value :
remappingFunction.apply(p.val, value);
if (val != null) {
if (p != null)
p.val = val; //更新value
else {
//插入新节点
delta = 1;
t.putTreeVal(h, key, val);
}
}
else if (p != null) {
delta = -1;
if (t.removeTreeNode(p))
setTabAt(tab, i, untreeify(t.first));
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
break;
}
}
}
if (delta != 0)
addCount((long)delta, binCount);
return val;
}
8、Traverser / keys / elements
Traverser是遍历器的基类,其类继承关系如下:
其中KeyIterator和ValueIterator就是上述两个遍历方法的底层实现了,如下:
public Enumeration<K> keys() {
Node<K,V>[] t;
int f = (t = table) == null ? 0 : t.length;
return new KeyIterator<K,V>(t, f, 0, f, this);
}
public Enumeration<V> elements() {
Node<K,V>[] t;
int f = (t = table) == null ? 0 : t.length;
return new ValueIterator<K,V>(t, f, 0, f, this);
}
EntryIterator比较特殊是内部类EntrySetView返回的遍历器,我们for循环遍历entrySet方法返回的视图时,底层调用的实际就是EntryIterator的方法,如下:
这几个类的实现如下:
static class Traverser<K,V> {
Node<K,V>[] tab; //保存键值对的数组
Node<K,V> next; //下一次next方法返回的节点
TableStack<K,V> stack, spare; //spare表示空闲的TableStack链表,stack表示正在使用中的
int index; // 下一次遍历的数组索引
int baseIndex; // 初始值跟index一样,会跟index一起改变
int baseLimit; // 保存初始化时传入的limit
final int baseSize; //保存构造方法传入的size
//通常index传0,size和limit传入数组的长度
Traverser(Node<K,V>[] tab, int size, int index, int limit) {
this.tab = tab;
this.baseSize = size;
this.baseIndex = this.index = index;
this.baseLimit = limit;
this.next = null;
}
final Node<K,V> advance() {
Node<K,V> e;
if ((e = next) != null)
e = e.next; //获取下一个节点
for (;;) {
Node<K,V>[] t; int i, n; // must use locals in checks
if (e != null)
return next = e; //返回next
//e的下一个节点为null了,需要遍历其他数组元素中的节点
if (baseIndex >= baseLimit || (t = tab) == null ||
(n = t.length) <= (i = index) || i < 0) //数组元素都遍历完了,返回null
return next = null;
//i等于index,n是数组长度
if ((e = tabAt(t, i)) != null && e.hash < 0) {
//如果数组元素不为null,且hash值小于0
if (e instanceof ForwardingNode) { //如果正在扩容
tab = ((ForwardingNode<K,V>)e).nextTable; //更新tab
e = null;
//t是原来tab,i是当前遍历的数组索引,n是原tab的长度
pushState(t, i, n);
continue; //下一次for循环就遍历扩容后新数组中的节点了
}
else if (e instanceof TreeBin) //如果是红黑树则使用first节点遍历
e = ((TreeBin<K,V>)e).first;
else
e = null;
}
//如果正在扩容
if (stack != null)
//此时n是扩容后的数组的长度,第一次调用时会将index加上原数组的长度,下一次获取遍历的数组元素时就会获取该索引处的数组元素
//如果该处的数组元素遍历完成则再将index加上原数组的长度,就会进入while循环中,将index和table恢复成原来的,继续遍历原数组中的下一个元素
//即遇到发生扩容的节点,就遍历扩容的后的新数组的index处,再遍历新数组的index+原数组长度处,然后接着遍历老数组中下一个元素
recoverState(n);
else if ((index = i + baseSize) >= n) //没有扩容的情形下会进入此分支,因为baseSize初始时等于n,所以i+baseSize必然大于n
index = ++baseIndex; //baseIndex的初始值和index是一样的,将baseIndex加1,遍历下一个元素
}
}
//如果spare始终为null,则创建新的节点并插入到stack链表的前面
//若干spare不为null,则取出spare链表头的节点初始化,然后插入到stack链表的前面
private void pushState(Node<K,V>[] t, int i, int n) {
TableStack<K,V> s = spare; // reuse if possible
if (s != null)
spare = s.next;
else
//spare为null,则创建一个新的TableStack
s = new TableStack<K,V>();
s.tab = t;
s.length = n;
s.index = i;
s.next = stack;
stack = s;
}
private void recoverState(int n) {
TableStack<K,V> s; int len;
//index表示已遍历的元素个数,stack的length表示扩容前的数组长度,n表示扩容后的数组长度
//第一次调用时index加原数组的长度,然后返回,第二次调用时还是加上原数组的长度就满足了while条件
while ((s = stack) != null && (index += (len = s.length)) >= n) {
n = len;
//恢复index和tab
index = s.index;
tab = s.tab;
//将s的tab置为null,将其插入到spare链表的前面
s.tab = null;
TableStack<K,V> next = s.next;
s.next = spare; // save for reuse
stack = next;//继续遍历stack链表的下一个节点,正常stack只有一个节点,因为出现ForwardingNode后就会遍历新数组了
spare = s;
}//while循环结束
//stack为null,退出循环后n等于原数组的长度,此时因为baseSize就等于n,所以if条件成立
if (s == null && (index += baseSize) >= n)
index = ++baseIndex; //index加1,遍历下一个数组元素
}
}
static class BaseIterator<K,V> extends Traverser<K,V> {
final ConcurrentHashMap<K,V> map; //关联的Map
Node<K,V> lastReturned; //上一次返回的节点
BaseIterator(Node<K,V>[] tab, int size, int index, int limit,
ConcurrentHashMap<K,V> map) {
super(tab, size, index, limit);
this.map = map;
advance(); //获取下一个节点
}
public final boolean hasNext() { return next != null; }
public final boolean hasMoreElements() { return next != null; }
public final void remove() {
Node<K,V> p;
if ((p = lastReturned) == null)
throw new IllegalStateException();
lastReturned = null;
//通过replaceNode移除节点
map.replaceNode(p.key, null, null);
}
}
static final class KeyIterator<K,V> extends BaseIterator<K,V>
implements Iterator<K>, Enumeration<K> {
//注意子类的构造方法的入参的顺序和父类不一致,index和size的顺序反了
KeyIterator(Node<K,V>[] tab, int index, int size, int limit,
ConcurrentHashMap<K,V> map) {
//此处的index实际对应BaseIterator的size,size对应BaseIterator的index
super(tab, index, size, limit, map);
}
public final K next() {
Node<K,V> p;
if ((p = next) == null)
throw new NoSuchElementException();
K k = p.key; //返回下一个节点的key
lastReturned = p;
advance(); //重新获取下一个节点
return k;
}
public final K nextElement() { return next(); }
}
static final class ValueIterator<K,V> extends BaseIterator<K,V>
implements Iterator<V>, Enumeration<V> {
ValueIterator(Node<K,V>[] tab, int index, int size, int limit,
ConcurrentHashMap<K,V> map) {
super(tab, index, size, limit, map);
}
public final V next() {
Node<K,V> p;
if ((p = next) == null)
throw new NoSuchElementException();
V v = p.val; //返回下一个节点的val
lastReturned = p;
advance(); //重新获取下一个节点
return v;
}
public final V nextElement() { return next(); }
}
static final class EntryIterator<K,V> extends BaseIterator<K,V>
implements Iterator<Map.Entry<K,V>> {
EntryIterator(Node<K,V>[] tab, int index, int size, int limit,
ConcurrentHashMap<K,V> map) {
super(tab, index, size, limit, map);
}
public final Map.Entry<K,V> next() {
Node<K,V> p;
if ((p = next) == null)
throw new NoSuchElementException();
K k = p.key;
V v = p.val;
lastReturned = p;
advance(); //获取下一个节点
return new MapEntry<K,V>(k, v, map); //利用next节点的key和val返回一个新的MapEntry
}
}
//MapEntry是一个简单的数据结构
static final class MapEntry<K,V> implements Map.Entry<K,V> {
final K key; // non-null
V val; // non-null
final ConcurrentHashMap<K,V> map;
MapEntry(K key, V val, ConcurrentHashMap<K,V> map) {
this.key = key;
this.val = val;
this.map = map;
}
public K getKey() { return key; }
public V getValue() { return val; }
public int hashCode() { return key.hashCode() ^ val.hashCode(); }
public String toString() { return key + "=" + val; }
public boolean equals(Object o) {
Object k, v; Map.Entry<?,?> e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == val || v.equals(val)));
}
public V setValue(V value) {
if (value == null) throw new NullPointerException();
V v = val;
val = value;
map.put(key, value);
return v;
}
}
static final class TableStack<K,V> {
int length;
int index;
Node<K,V>[] tab;
TableStack<K,V> next;
}
9、总结
除上述常用的方法外,ConcurrentHashMap还提供了多个并行执行某个操作的方法,这些方法不是ConcurrentMap的接口方法,如下图:
这些方法底层是通过ConcurrentHashMap定义的内部类来实现的,如下:
这些内部类都继承CountedCompleter,该类继承自ForkJoinTask,通过ForkJoinPool来并行执行,后面讲解ForkJoin相关类的使用说明时会再来探讨这些方法的实现细节。现总结下 ConcurrentHashMap实现线程安全的要点:
- 无论是插入一个新的节点还是移除旧的节点或者修改一个已有节点的val,都会对该节点所属的数组元素,即对应的链表头或者TreeBin节点加锁,直到修改结束才释放锁,一旦加锁其他想要修改属于同一个数组元素的节点的线程就必须等待前面的修改完成。如果修改的是红黑树,为了避免查找的结果不准确,还需要对红黑树加写锁,此时如果正在查找红黑树中的某个节点,会等待所有执行查找的线程释放红黑树的读锁,会通过park将当前线程休眠或者自旋等待。
- 查找某个key值时不需要对数组元素加锁,但是如果数组元素对应的是一颗红黑树,需要对红黑树加读锁,避免其他线程对红黑树的修改,这是因为修改后,红黑树为了保持平衡会移动节点的位置,有可能导致某个已经存在的key查找不出来,读锁可以被多个线程同时获取,必须等所有读锁都释放了才能加写锁,才能修改红黑树中的节点。
- 在Map扩容时,ConcurrentHashMap会将原来的数组按照长度切割成多个段,每个线程占用其中的一个段,负责将其中的数组元素包含的节点转移到扩容后的新数组中,当修改Map时如果发现对应的数组元素的hash值是MOVED,就认为当前正在扩容,会尝试去申请一个段,如果所有的段都分配了,则自旋等待扩容完成,数组元素的hash值变成非MOVED,最后一个完成转移任务的线程会负责将扩容后已经完成节点转移的临时数组设置成Map的有效数组。注意原数组中的数组元素及其节点关系都未发生变更,即此时其他线程可以继续在原数组中遍历,直到感知到了临时数组被设置成新数组了。如果没有线程访问原数组,原数组因为没有根节点引用,就会整体被垃圾回收掉。
- 为了避免并行修改Map的键值对个数时出现误差,同时保证性能,ConcurrentHashMap采用了类似于LongAdder的实现机制,将对键值对个数的修改压力分散到多个CounterCell中,并行修改时通常一个线程命中一个CounterCell,最后将所有CounterCell的值累加起来即是实际的键值对总数。