天天看點

java容器中的幾種計數方法淺談

  本文讨論java集合容器中的幾種元素數量擷取的方式,命題很小,但是也足以讓我們思考一些東西。

  所謂計數:即是給出所在容器的元素總數的方式。一般能想到的就是兩種方式:一是使用某個字段直接存儲該計數值,二是在請求計數值時臨時去計算所有元素數量。貌似本文的答案已經出來了。好吧,那我們還是從源碼的角度來驗證下想法吧:

  一般在java的集合容器中,可以分為普通容器和并發容器,我們就姑且以這種方式來劃分下,驗證下其實作計數的方式吧!

1. 普通容器 --HashMap

  一般非并發容器在進行增删改時,都會同時維護一個count值,如hashmap中的實作:

// HashMap 增加和修改都在此實作
    /**
     * Implements Map.put and related methods
     *
     * @param hash hash for key
     * @param key the key
     * @param value the value to put
     * @param onlyIfAbsent if true, don't change existing value
     * @param evict if false, the table is in creation mode.
     * @return previous value, or null if none
     */
    final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
                   boolean evict) {
        Node<K,V>[] tab; Node<K,V> p; int n, i;
        if ((tab = table) == null || (n = tab.length) == 0)
            n = (tab = resize()).length;
        if ((p = tab[i = (n - 1) & hash]) == null)
            tab[i] = newNode(hash, key, value, null);
        else {
            Node<K,V> e; K k;
            if (p.hash == hash &&
                ((k = p.key) == key || (key != null && key.equals(k))))
                e = p;
            else if (p instanceof TreeNode)
                e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
            else {
                for (int binCount = 0; ; ++binCount) {
                    if ((e = p.next) == null) {
                        p.next = newNode(hash, key, value, null);
                        if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
                            treeifyBin(tab, hash);
                        break;
                    }
                    if (e.hash == hash &&
                        ((k = e.key) == key || (key != null && key.equals(k))))
                        break;
                    p = e;
                }
            }
            if (e != null) { // existing mapping for key
                V oldValue = e.value;
                if (!onlyIfAbsent || oldValue == null)
                    e.value = value;
                afterNodeAccess(e);
                return oldValue;
            }
        }
        ++modCount;
        // 直接對size進行增加1即可, 如果是更新key的值,則不會運作到此處,即不會進行相加
        if (++size > threshold)
            resize();
        afterNodeInsertion(evict);
        return null;
    }
    // 删除元素的實作,同時維護 size 大小
    /**
     * Implements Map.remove and related methods
     *
     * @param hash hash for key
     * @param key the key
     * @param value the value to match if matchValue, else ignored
     * @param matchValue if true only remove if value is equal
     * @param movable if false do not move other nodes while removing
     * @return the node, or null if none
     */
    final Node<K,V> removeNode(int hash, Object key, Object value,
                               boolean matchValue, boolean movable) {
        Node<K,V>[] tab; Node<K,V> p; int n, index;
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (p = tab[index = (n - 1) & hash]) != null) {
            Node<K,V> node = null, e; K k; V v;
            // 先查找node所在的位置
            if (p.hash == hash &&
                ((k = p.key) == key || (key != null && key.equals(k))))
                node = p;
            else if ((e = p.next) != null) {
                if (p instanceof TreeNode)
                    node = ((TreeNode<K,V>)p).getTreeNode(hash, key);
                else {
                    do {
                        if (e.hash == hash &&
                            ((k = e.key) == key ||
                             (key != null && key.equals(k)))) {
                            node = e;
                            break;
                        }
                        p = e;
                    } while ((e = e.next) != null);
                }
            }
            if (node != null && (!matchValue || (v = node.value) == value ||
                                 (value != null && value.equals(v)))) {
                if (node instanceof TreeNode)
                    ((TreeNode<K,V>)node).removeTreeNode(this, tab, movable);
                else if (node == p)
                    tab[index] = node.next;
                else
                    p.next = node.next;
                ++modCount;
                // 直接減小size即可
                --size;
                afterNodeRemoval(node);
                return node;
            }
        }
        return null;
    }      

  因為有了增删改時對計數器的維護,是以在想要擷取總數時,就容易許多了。隻需把size字段傳回即可。

// HashMap.size()
    /**
     * Returns the number of key-value mappings in this map.
     *
     * @return the number of key-value mappings in this map
     */
    public int size() {
        return size;
    }      

  是以,在這種情況下,擷取計數值的方式非常簡單。但是不管怎麼樣,size字段對外部是不可見的,因為它是容器内部的一個實作邏輯,它完全在将來的某個時刻改變掉。即 size() != size .

2. 普通容器 --LinkedList

  看完hash類的計數實作,咱們再來看另外一個類型的容器LinkedList:

// LinkedList.add(E)   添加一個元素
    public boolean add(E e) {
        linkLast(e);
        return true;
    }
    /**
     * Links e as last element.
     */
    void linkLast(E e) {
        final Node<E> l = last;
        final Node<E> newNode = new Node<>(l, e, null);
        last = newNode;
        if (l == null)
            first = newNode;
        else
            l.next = newNode;
        // 同樣,直接使用一個 size 計數器統計即可
        size++;
        modCount++;
    }
    
    // 删除一個元素, 同時維護 size 字段
    public E removeFirst() {
        final Node<E> f = first;
        if (f == null)
            throw new NoSuchElementException();
        return unlinkFirst(f);
    }    
    /**
     * Unlinks non-null first node f.
     */
    private E unlinkFirst(Node<E> f) {
        // assert f == first && f != null;
        final E element = f.item;
        final Node<E> next = f.next;
        f.item = null;
        f.next = null; // help GC
        first = next;
        if (next == null)
            last = null;
        else
            next.prev = null;
        // 元素計數減1
        size--;
        modCount++;
        return element;
    }

    // 同樣,統計元素數量時,直接傳回size即可
    public int size() {
        return size;
    }      

  可見,LinkedList 也同樣是簡單地維護一個計數器字段,進而實作了高效地計數方法。而這簡單地實作,則是基于單線程的通路的,它同時維護一個計數字段,基本沒有多少開銷,卻給取值時帶來了便利。

  總結: 普通容器直接維護一個計數器字段,可以很友善地進行大小統計操作。

3. 并發容器 --ConcurrentHashMap

  而對于并發容器,則可能會不一樣些,但也有一些情況是一樣的。比較,HashTable, 直接使用 synchronized 來保證線程安全,則它也同樣可以直接使用一個size即可完成元素大小的統計。事實上,有些版本的HashTable僅僅是在HashMap的上面加上了synchronizd鎖而已(有些版本則是 不一樣的哦),細節咱們無需再看。

  而稍微有點不一樣的如: ConcurrentHashMap.size(), 早期的 ConcurrentHashMap 使用分段鎖,則需要統計各segement的元素,相加起來然後得到整體元素大小. 而jdk1.8中,已經放棄使用分段鎖來實作高性能安全的hash容器了,而是直接使用 synchronized + CAS + 紅黑樹 實作. 那麼,我們來看看其實作元素統計這一功能的實作有何不同吧!

// ConcurrentHashMap.putVal()  新增或修改一個元素
    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        // 主要是在進行新增成功時,再進行計數器的操作, 看起來不是 ++size 這麼簡單了
        addCount(1L, binCount);
        return null;
    }

    // 這個計數的相加看起來相當複雜
    /**
     * Adds to count, and if table is too small and not already
     * resizing, initiates transfer. If already resizing, helps
     * perform transfer if work is available.  Rechecks occupancy
     * after a transfer to see if another resize is already needed
     * because resizings are lagging additions.
     *
     * @param x the count to add
     * @param check if <0, don't check resize, if <= 1 only check if uncontended
     */
    private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        // 使用 CounterCell 來實作計數操作
        // 使用 CAS 保證更新計數時隻會有一個線程成功
        if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                // 使用一個類似随機負載均衡的方式,将計數值随機添加到 CounterCell 的某個值下面,減少多線程競争的可能性
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                // 通過cas将計數值x添加到 CounterCell 的 value 字段中
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                // 如果上面添加失敗,則使用 fullAddCount 進行重新添加該計數
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            // 基于 CounterCell 做一此彙總操作
            s = sumCount();
        }
        // 在進行put值時, check的值都是大于等于0的
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            // rehash 處理
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n);
                if (sc < 0) {
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    // 輔助進行hash擴容
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }
    // fullAddCount 比較複雜, 它的目的是為了保證多線程可以快速進行添加完成, 目标很簡單, 即向數組 CounterCell 中添加一個值 x
    // See LongAdder version for explanation
    private final void fullAddCount(long x, boolean wasUncontended) {
        int h;
        if ((h = ThreadLocalRandom.getProbe()) == 0) {
            ThreadLocalRandom.localInit();      // force initialization
            h = ThreadLocalRandom.getProbe();
            wasUncontended = true;
        }
        boolean collide = false;                // True if last slot nonempty
        for (;;) {
            CounterCell[] as; CounterCell a; int n; long v;
            if ((as = counterCells) != null && (n = as.length) > 0) {
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {            // Try to attach new Cell
                        CounterCell r = new CounterCell(x); // Optimistic create
                        if (cellsBusy == 0 &&
                            U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                            boolean created = false;
                            try {               // Recheck under lock
                                CounterCell[] rs; int m, j;
                                if ((rs = counterCells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                    break;
                else if (counterCells != as || n >= NCPU)
                    collide = false;            // At max size or stale
                else if (!collide)
                    collide = true;
                else if (cellsBusy == 0 &&
                         U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                    try {
                        if (counterCells == as) {// Expand table unless stale
                            CounterCell[] rs = new CounterCell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            counterCells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = ThreadLocalRandom.advanceProbe(h);
            }
            else if (cellsBusy == 0 && counterCells == as &&
                     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                boolean init = false;
                try {                           // Initialize table
                    if (counterCells == as) {
                        CounterCell[] rs = new CounterCell[2];
                        rs[h & 1] = new CounterCell(x);
                        counterCells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
                break;                          // Fall back on using base
        }
    }
    // ConcurrentHashMap.remove 删除元素
    /**
     * Implementation for the four public remove/replace methods:
     * Replaces node value with v, conditional upon match of cv if
     * non-null.  If resulting value is null, delete.
     */
    final V replaceNode(Object key, V value, Object cv) {
        int hash = spread(key.hashCode());
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0 ||
                (f = tabAt(tab, i = (n - 1) & hash)) == null)
                break;
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                boolean validated = false;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            validated = true;
                            for (Node<K,V> e = f, pred = null;;) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    V ev = e.val;
                                    if (cv == null || cv == ev ||
                                        (ev != null && cv.equals(ev))) {
                                        oldVal = ev;
                                        if (value != null)
                                            e.val = value;
                                        // 删除元素
                                        else if (pred != null)
                                            pred.next = e.next;
                                        else
                                            setTabAt(tab, i, e.next);
                                    }
                                    break;
                                }
                                pred = e;
                                if ((e = e.next) == null)
                                    break;
                            }
                        }
                        else if (f instanceof TreeBin) {
                            validated = true;
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> r, p;
                            if ((r = t.root) != null &&
                                (p = r.findTreeNode(hash, key, null)) != null) {
                                V pv = p.val;
                                if (cv == null || cv == pv ||
                                    (pv != null && cv.equals(pv))) {
                                    oldVal = pv;
                                    if (value != null)
                                        p.val = value;
                                    // 删除元素
                                    else if (t.removeTreeNode(p))
                                        setTabAt(tab, i, untreeify(t.first));
                                }
                            }
                        }
                    }
                }
                if (validated) {
                    if (oldVal != null) {
                        // value = null, 代表需要将元素删除,是以需要對計數器做減1操作
                        if (value == null)
                            addCount(-1L, -1);
                        return oldVal;
                    }
                    break;
                }
            }
        }
        return null;
    }      

  同樣是由于在增删時,維護一個計數器(CounterCell數組), 是以對于傳回計數值操作則會比較簡單化:

// ConcurrentHashMap.size()
    public int size() {
        long n = sumCount();
        return ((n < 0L) ? 0 :
                (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                (int)n);
    }
    // 直接将 CounterCell 中的值相加起來即可
    final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }      

  雖然ConcurrentHash的元素本身沒有使用分段式存儲了,但是其計數值還是存在了多個 CounterCell 中,目的自然是為了減少多線程競争對計數器的更新成性能瓶頸。在進行 size() 計數時,并未有上鎖操作,整個 CounterCell 使用 volatile 修飾,保證其可見性,但是整個size 卻是不保證絕對準确的哦。

4. 并發容器 --ArrayBlockingQueue

  下面我們再來看看另一各類型的并發容器: ArrayBlockingQueue

// ArrayBlockingQueue.offer()
    /**
     * Inserts the specified element at the tail of this queue if it is
     * possible to do so immediately without exceeding the queue's capacity,
     * returning {@code true} upon success and {@code false} if this queue
     * is full.  This method is generally preferable to method {@link #add},
     * which can fail to insert an element only by throwing an exception.
     *
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        // 直接上鎖操作
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                // 進行入隊操作
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
    
    /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        // 同樣,它還是通過一個  count 的計數器完成統計工作
        count++;
        notEmpty.signal();
    }
    // 移除動作時,也需要維護 count 的值
    /**
     * Deletes item at array index removeIndex.
     * Utility for remove(Object) and iterator.remove.
     * Call only when holding lock.
     */
    void removeAt(final int removeIndex) {
        // assert lock.getHoldCount() == 1;
        // assert items[removeIndex] != null;
        // assert removeIndex >= 0 && removeIndex < items.length;
        final Object[] items = this.items;
        if (removeIndex == takeIndex) {
            // removing front item; just advance
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0;
            // 移除成功, 将計數器減1
            count--;
            if (itrs != null)
                itrs.elementDequeued();
        } else {
            // an "interior" remove

            // slide over all others up through putIndex.
            // 通過輪詢的方式, 必然有一個元素被删除
            final int putIndex = this.putIndex;
            for (int i = removeIndex;;) {
                int next = i + 1;
                if (next == items.length)
                    next = 0;
                if (next != putIndex) {
                    items[i] = items[next];
                    i = next;
                } else {
                    items[i] = null;
                    this.putIndex = i;
                    break;
                }
            // 計數器相減
            count--;
            if (itrs != null)
                itrs.removedAt(removeIndex);
        }
        notFull.signal();
    }      

  同樣是維護了一個計數器,但是因為有上鎖機制的保證,整個過程看起來就簡單了許多。在擷取元素大小時,自然也就簡單了.

// ArrayBlockingQueue.size()
    /**
     * Returns the number of elements in this queue.
     *
     * @return the number of elements in this queue
     */
    public int size() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }      

  但是它為了保證結果的準确性,在計數時,同樣進行了上鎖操作。可見,并發容器的實作思路也基本一緻.并無太多奇淫技巧. 咱們再來看一下并發容器的實作: CopyOnWriteArrayList

5. 并發容器 --CopyOnWriteArrayList

  顧名思義,是在寫操作的時候,使用複制方式進行實作。

// CopyOnWriteArrayList.add()
    /**
     * Appends the specified element to the end of this list.
     *
     * @param e element to be appended to this list
     * @return {@code true} (as specified by {@link Collection#add})
     */
    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        // 同樣上鎖保證線程安全
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            // 将元素copy出來, 但其并非維護一個len字段
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e;
            setArray(newElements);
            return true;
        } finally {
            lock.unlock();
        }
    }
    // CopyOnWriteArrayList, 删除一個字段, 同其名稱一樣, 還是使用寫時複制實作 
    public E remove(int index) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            E oldValue = get(elements, index);
            int numMoved = len - index - 1;
            if (numMoved == 0)
                setArray(Arrays.copyOf(elements, len - 1));
            else {
                // 找到移除的字段位置, 依次複制其前後元素到新數組中,完成功能
                Object[] newElements = new Object[len - 1];
                System.arraycopy(elements, 0, newElements, 0, index);
                System.arraycopy(elements, index + 1, newElements, index,
                                 numMoved);
                setArray(newElements);
            }
            return oldValue;
        } finally {
            lock.unlock();
        }
    }

    
    // CopyOnWriteArrayList.size(), 直接使用數組長度字段
    /**
     * Returns the number of elements in this list.
     *
     * @return the number of elements in this list
     */
    public int size() {
        // 擷取元素大小時,直接擷取所有元素,取數組的長度即可. 借用jvm提供的數組長度元資訊實作
        return getArray().length;
    }
    /**
     * Gets the array.  Non-private so as to also be accessible
     * from CopyOnWriteArraySet class.
     */
    final Object[] getArray() {
        // 該array字段一定是要保證可見性的, 即至少得是  volatile 修飾的資料
        return array;
    }      

  CopyOnWriteArrayList, 因為其語義決定,其在一定程度上是線程安全的,是以,在讀操作時,就不需要上鎖,進而性能在某些場景會比較好。

  根據功能特性的不同, CopyOnWriteArrayList 采用了一個不同實作方式, 實作了元素的統計功能. 另外像 SynchronousQueue#size, 則永久傳回0, 因為它的定義是當被放一個元素後,必須等到有線程消費之後才可傳回,而其本身并不存儲元素. 是以, 雖然元素計數道理比較簡單通用, 但是還是要按照具體的場景進行相應的實作, 才能滿足具體的需求. 即不可脫離場景談技術. 

6. 更多計數

  類似資料庫類的産品,同樣的這樣的計數剛性需求,各自實作方式也有不同,但大體思路也差不多。比如 redis 的計數使用在計數時臨時周遊元素實作,mysql myisam 引擎使用一個表級的計數器等等。

不要害怕今日的苦,你要相信明天,更苦!

繼續閱讀