一、ConcurrentHashMap的引入
HashMap是非線程安全的,Hashtable是線程安全的,但是由于Hashtable是采用synchronized進行同步,相當于所有線程進行讀寫時都去競争一把鎖,導緻效率非常低下。有沒有一種既能保證效率,又能保證的線程安全的機制呢?Java提供了ConcurrentHashMap類,ConcurrentHashMap代替同步的Map(Collections.synchronized(new HashMap())),衆所周知,HashMap是根據散列值分段存儲的,同步Map在同步的時候鎖住了所有的段,而ConcurrentHashMap加鎖的時候根據散列值鎖住了散列值鎖對應的那段,是以提高了并發性能。
二、ConcurrentHashMap的結構
ConcurrentHashMap為了提高本身的并發能力,在内部采用了一個叫做Segment的結構,一個Segment其實就是一個類Hash Table的結構,Segment内部維護了一個連結清單數組,我們用下面這一幅圖來看下ConcurrentHashMap的内部結構:
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsISMygjMyEjMxIzNwQDM4EDMy8CX0Vmbu4GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.jpg)
三、ConcurrentHashMap的初始化
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}
CurrentHashMap的初始化一共有三個參數,一個initialCapacity,表示初始的容量,一個loadFactor,表示負載參數,最後一個是concurrentLevel,代表ConcurrentHashMap内部的Segment的數量,ConcurrentLevel一經指定,不可改變,後續如果ConcurrentHashMap的元素數量增加導緻ConrruentHashMap需要擴容,ConcurrentHashMap不會增加Segment的數量,而隻會增加Segment中連結清單數組的容量大小,這樣的好處是擴容過程不需要對整個ConcurrentHashMap做rehash,而隻需要對Segment裡面的元素做一次rehash就可以了。
整個ConcurrentHashMap的初始化方法還是非常簡單的,先是根據concurrentLevel來new出Segment,這裡Segment的數量是不大于concurrentLevel的最大的2的指數,就是說Segment的數量永遠是2的指數個,這樣的好處是友善采用移位操作來進行hash,加快hash的過程。接下來就是根據intialCapacity确定Segment的容量的大小,每一個Segment的容量大小也是2的指數,同樣使為了加快hash的過程。
四、ConcurrentHashMap的get操作
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
spread這個函數用了位操作來确定Segment,根據傳入的hash值向右無符号右移segmentShift位,然後和segmentMask進行與操作,結合我們之前說的segmentShift和segmentMask的值,就可以得出以下結論:假設Segment的數量是2的n次方,根據元素的hash值的高n位就可以确定元素到底在哪一個Segment中。
五、ConcurrentHashMap的put操作
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
首先對Segment的put操作是加鎖完成的,如果Segment中元素的數量超過了門檻值(由構造函數中的loadFactor算出)這需要進行對Segment擴容,并且要進行rehash
六、ConcurrentHashMap的remove操作
public V remove(Object key) {
return replaceNode(key, null, null);
}
final V replaceNode(Object key, V value, Object cv) {
int hash = spread(key.hashCode());
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0 ||
(f = tabAt(tab, i = (n - 1) & hash)) == null)
break;
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
boolean validated = false;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
validated = true;
for (Node<K,V> e = f, pred = null;;) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
V ev = e.val;
if (cv == null || cv == ev ||
(ev != null && cv.equals(ev))) {
oldVal = ev;
if (value != null)
e.val = value;
else if (pred != null)
pred.next = e.next;
else
setTabAt(tab, i, e.next);
}
break;
}
pred = e;
if ((e = e.next) == null)
break;
}
}
else if (f instanceof TreeBin) {
validated = true;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
if ((r = t.root) != null &&
(p = r.findTreeNode(hash, key, null)) != null) {
V pv = p.val;
if (cv == null || cv == pv ||
(pv != null && cv.equals(pv))) {
oldVal = pv;
if (value != null)
p.val = value;
else if (t.removeTreeNode(p))
setTabAt(tab, i, untreeify(t.first));
}
}
}
}
}
if (validated) {
if (oldVal != null) {
if (value == null)
addCount(-1L, -1);
return oldVal;
}
break;
}
}
}
return null;
}
remove操作也是确定需要删除的元素的位置,不過這裡删除元素的方法不是簡單地把待删除元素的前面的一個元素的next指向後面一個就完事了,我們之前已經說過HashEntry中的next是final的,一經指派以後就不可修改,在定位到待删除元素的位置以後,程式就将待删除元素前面的那一些元素全部複制一遍,然後再一個一個重新接到連結清單上去。