天天看點

Guava Cache 原理分析與最佳實踐

作者:閃念基因

在大部分網際網路架構中 Cache 已經成為了必可不少的一環。常用的方案有大家熟知的 NoSQL 資料庫(Redis、Memcached),也有大量的程序内緩存比如 EhCache 、Guava Cache、Caffeine 等。

本系列文章會選取本地緩存和分布式緩存(NoSQL)的優秀架構比較他們各自的優缺點、應用場景、項目中的最佳實踐以及原理分析。本文主要針對本地 Cache 的老大哥 Guava Cache 進行介紹和分析。

基本用法

Guava Cache 通過簡單好用的 Client 可以快速構造出符合需求的 Cache 對象,不需要過多複雜的配置,大多數情況就像構造一個 POJO 一樣的簡單。這裡介紹兩種構造 Cache 對象的方式:CacheLoader 和 Callable

▐CacheLoader

構造 LoadingCache 的關鍵在于實作 load 方法,也就是在需要通路的緩存項不存在的時候 Cache 會自動調用 load 方法将資料加載到 Cache 中。這裡你肯定會想假如有多個線程過來通路這個不存在的緩存項怎麼辦,也就是緩存的并發問題如何怎麼處理是否需要人工介入,這些在下文中也會介紹到。

除了實作 load 方法之外還可以配置緩存相關的一些性質,比如過期加載政策、重新整理政策 。

private static final LoadingCache<String, String> CACHE = CacheBuilder
    .newBuilder()
    // 最大容量為 100 超過容量有對應的淘汰機制,下文詳述
    .maximumSize(100)
    // 緩存項寫入後多久過期,下文詳述
    .expireAfterWrite(60 * 5, TimeUnit.SECONDS)
    // 緩存寫入後多久自動重新整理一次,下文詳述
    .refreshAfterWrite(60, TimeUnit.SECONDS)
    // 建立一個 CacheLoader,load 表示緩存不存在的時候加載到緩存并傳回
    .build(new CacheLoader<String, String>() {
        // 加載緩存資料的方法
        @Override
        public String load(String key) {
            return "cache [" + key + "]";
        }
    });


public void getTest() throws Exception {
    CACHE.get("KEY_25487");
}           

▐Callable

除了在構造 Cache 對象的時候指定 load 方法來加載緩存外,我們亦可以在擷取緩存項時指定載入緩存的方法,并且可以根據使用場景在不同的位置采用不同的加載方式。

比如在某些位置可以通過二級緩存加載不存在的緩存項,而有些位置則可以直接從 DB 加載緩存項。

// 注意傳回值是 Cache
private static final Cache<String, String> SIMPLE_CACHE = CacheBuilder
    .newBuilder()
    .build();


public void getTest1() throws Exception {
    String key = "KEY_25487";
    // get 緩存項的時候指定 callable 加載緩存項
    SIMPLE_CACHE.get(key, () -> "cache [" + key + "]");
}           

緩存項加載機制

如果某個緩存過期了或者緩存項不存在于緩存中,而恰巧此此時有大量請求過來請求這個緩存項,如果沒有保護機制就會導緻大量的線程同時請求資料源加載資料并生成緩存項,這就是所謂的 “緩存擊穿” 。

舉個簡單的例子,某個時刻有 100 個請求同時請求 KEY_25487 這個緩存項,而不巧這個緩存項剛好失效了,那麼這 100 個線程(如果有這麼多機器和流量的話)就會同時從 DB 加載這個資料,很可怕的點在于就算某一個線程率先擷取到資料生成了緩存項,其他的線程還是繼續請求 DB 而不會走到緩存。

Guava Cache 原理分析與最佳實踐

【緩存擊穿圖例】

看到上面這個圖或許你已經有方法解這個問題了,如果多個線程過來如果我們隻讓一個線程去加載資料生成緩存項,其他線程等待然後讀取生成好的緩存項豈不是就完美解決。那麼恭喜你在這個問題上,和 Google 工程師的思路是一緻的。不過采用這個方案,問題是解了但沒有完全解,後面會說到它的缺陷。

其實 Guava Cache 在 load 的時候做了并發控制,在多個線程請求一個不存在或者過期的緩存項時保證隻有一個線程進入 load 方法,其他線程等待直到緩存項被生成,這樣就避免了大量的線程擊穿緩存直達 DB 。不過試想下如果有上萬 QPS 同時過來會有大量的線程阻塞導緻線程無法釋放,甚至會出現線程池滿的尴尬場景,這也是說為什麼這個方案解了 “緩存擊穿” 問題但又沒完全解。

上述機制其實就是 expireAfterWrite/expireAfterAccess 來控制的,如果你配置了過期政策對應的緩存項在過期後被通路就會走上述流程來加載緩存項。

緩存項重新整理機制

緩存項的重新整理和加載看起來是相似的,都是讓緩存資料處于最新的狀态。差別在于:

  1. 緩存項加載是一個被動的過程,而緩存重新整理是一個主動觸發動作。如果緩存項不存在或者過期隻有下次 get 的時候才會觸發新值加載。而緩存重新整理則更加主動替換緩存中的老值。
  2. 另外一個很重要點的在于,緩存重新整理的項目一定是存在緩存中的,他是對老值的替換而非是對 NULL 值的替換。

由于緩存項重新整理的前提是該緩存項存在于緩存中,那麼緩存的重新整理就不用像緩存加載的流程一樣讓其他線程等待而是允許一個線程去資料源擷取資料,其他線程都先傳回老值直到異步線程生成了新緩存項。

這個方案完美解決了上述遇到的 “緩存擊穿” 問題,不過他的前提是已經生成緩存項了。在實際生産情況下我們可以做 緩存預熱 ,提前生成緩存項,避免流量洪峰造成的線程堆積。

這套機制在 Guava Cache 中是通過 refreshAfterWrite 實作的,在配置重新整理政策後,對應的緩存項會按照設定的時間定時重新整理,避免線程阻塞的同時保證緩存項處于最新狀态。

但他也不是完美的,比如他的限制是緩存項已經生成,并且如果恰巧你運氣不好,大量的緩存項同時需要重新整理或者過期, 就會有大量的線程請求 DB,這就是常說的 “緩存血崩”。

緩存項異步重新整理機制

上面說到緩存項大面積失效或者重新整理會導緻雪崩,那麼就隻能限制通路 DB 的數量了,位置有三個地方:

  1. 源頭:因為加載緩存的線程就是前台請求線程,是以如果控制請求線程數量的确是減少大面積失效對 DB 的請求,那這樣一來就不存在高并發請求,就算不用緩存都可以。
  2. 中間層緩沖:因為請求線程和通路 DB 的線程是同一個,假如在中間加一層緩沖,通過一個背景線程池去異步重新整理緩存所有請求線程直接傳回老值,這樣對于 DB 的通路的流量就可以被背景線程池的池大小控住。
  3. 底層:直接控 DB 連接配接池的池大小,這樣通路 DB 的連接配接數自然就少了,但是如果大量請求到連接配接池發現擷取不到連接配接程式一樣會出現連接配接池滿的問題,會有大量連接配接被拒絕的異常。

是以比較合适的方式是通過添加一個異步線程池異步重新整理資料,在 Guava Cache 中實作方案是重寫 CacheLoader 的 reload 方法。

private static final LoadingCache<String, String> ASYNC_CACHE = CacheBuilder.newBuilder()
    .build(
    CacheLoader.asyncReloading(new CacheLoader<String, String>() {
        @Override
        public String load(String key) {
            return key;
        }


        @Override
        public ListenableFuture<String> reload(String key, String oldValue) throws Exception {
            return super.reload(key, oldValue);
        }
    }, new ThreadPoolExecutor(5, Integer.MAX_VALUE,
                              60L, TimeUnit.SECONDS,
                              new SynchronousQueue<>()))
);           

LocalCache 源碼分析

先整體看下 Cache 的類結構,下面的這些子類表示了不同的建立方式本質還都是 LocalCache

Guava Cache 原理分析與最佳實踐

【Cache 類圖】

核心代碼都在 LocalCache 這個檔案中,并且通過這個繼承關系可以看出 Guava Cache 的本質就是 ConcurrentMap。

Guava Cache 原理分析與最佳實踐

【LocalCache 繼承與實作】

在看源碼之前先理一下流程,先理清思路。如果想直接看源碼了解流程可以先跳過這張圖 ~

Guava Cache 原理分析與最佳實踐

【 get 緩存資料流程圖】

這裡核心理一下 Get 的流程,put 階段比較簡單就不做分析了。

▐LocalCache#get

V get(K key, CacheLoader<? super K, V> loader) throws ExecutionException {
    int hash = hash(checkNotNull(key));
    // 根據 hash 擷取對應的 segment 然後從 segment 擷取具體值
    return segmentFor(hash).get(key, hash, loader);
}           

▐Segment#get

V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
    checkNotNull(key);
    checkNotNull(loader);
    try {
        // count 表示在這個 segment 中存活的項目個數
        if (count != 0) {
            // 擷取 segment 中的元素 (ReferenceEntry) 包含正在 load 的資料
            ReferenceEntry<K, V> e = getEntry(key, hash);
            if (e != null) {
                long now = map.ticker.read();
                // 擷取緩存值,如果是 load,invalid,expired 傳回 null,同時檢查是否過期了,過期移除并傳回 null
                V value = getLiveValue(e, now);
                if (value != null) {
                    // 記錄通路時間
                    recordRead(e, now);
                    // 記錄緩存命中一次
                    statsCounter.recordHits(1);
                    // 重新整理緩存并傳回緩存值 ,後面展開
                    return scheduleRefresh(e, key, hash, value, now, loader);
                }
                ValueReference<K, V> valueReference = e.getValueReference();
                // 如果在 loading 等着 ,後面展開
                if (valueReference.isLoading()) {
                    return waitForLoadingValue(e, key, valueReference);
                }
            }
        }


        // 走到這說明從來沒寫入過值 或者 值為 null 或者 過期(資料還沒做清理),後面展開
        return lockedGetOrLoad(key, hash, loader);
    } catch (ExecutionException ee) {
        Throwable cause = ee.getCause();
        if (cause instanceof Error) {
            throw new ExecutionError((Error) cause);
        } else if (cause instanceof RuntimeException) {
            throw new UncheckedExecutionException(cause);
        }
        throw ee;
    } finally {
        postReadCleanup();
    }
}           

▐Segment#scheduleRefresh

// com.google.common.cache.LocalCache.Segment#scheduleRefresh


V scheduleRefresh(
    ReferenceEntry<K, V> entry,
    K key,
    int hash,
    V oldValue,
    long now,
    CacheLoader<? super K, V> loader) {
    
    if (
        // 配置了重新整理政策 refreshAfterWrite
        map.refreshes()
        // 到重新整理時間了
        && (now - entry.getWriteTime() > map.refreshNanos)
        // 沒在 loading
        && !entry.getValueReference().isLoading()) {
        // 開始重新整理,下面展開
        V newValue = refresh(key, hash, loader, true);
        if (newValue != null) {
            return newValue;
        }
    }
    return oldValue;
}




// com.google.common.cache.LocalCache.Segment#refresh


V refresh(K key, int hash, CacheLoader<? super K, V> loader, boolean checkTime) {
    // 插入 loading 節點
    final LoadingValueReference<K, V> loadingValueReference =
        insertLoadingValueReference(key, hash, checkTime);
    
    if (loadingValueReference == null) {
        return null;
    }


    // 異步重新整理,下面展開
    ListenableFuture<V> result = loadAsync(key, hash, loadingValueReference, loader);
    if (result.isDone()) {
        try {
            return Uninterruptibles.getUninterruptibly(result);
        } catch (Throwable t) {
            // don't let refresh exceptions propagate; error was already logged
        }
    }
    return null;
}


// com.google.common.cache.LocalCache.Segment#loadAsync


ListenableFuture<V> loadAsync(
    final K key,
    final int hash,
    final LoadingValueReference<K, V> loadingValueReference,
    CacheLoader<? super K, V> loader) {
    // 通過 loader 異步加載資料,下面展開
    final ListenableFuture<V> loadingFuture = loadingValueReference.loadFuture(key, loader);
    loadingFuture.addListener(
        new Runnable() {
            @Override
            public void run() {
                try {
                    getAndRecordStats(key, hash, loadingValueReference, loadingFuture);
                } catch (Throwable t) {
                    logger.log(Level.WARNING, "Exception thrown during refresh", t);
                    loadingValueReference.setException(t);
                }
            }
        },
        directExecutor());
    return loadingFuture;
}


// com.google.common.cache.LocalCache.LoadingValueReference#loadFuture


public ListenableFuture<V> loadFuture(K key, CacheLoader<? super K, V> loader) {
    try {
        stopwatch.start();
        // oldValue 指在寫入 loading 節點前這個位置的值,如果這個位置之前沒有值 oldValue 會被指派為 UNSET
        // UNSET.get() 值為 null ,是以這個緩存項從來沒有進入緩存需要同步 load 具體原因前面提到了,如果通過
        // 異步 reload ,由于沒有老值會導緻其他線程傳回的都是 null
        V previousValue = oldValue.get();
        if (previousValue == null) {
            V newValue = loader.load(key);
            return set(newValue) ? futureValue : Futures.immediateFuture(newValue);
        }
        // 異步 load
        ListenableFuture<V> newValue = loader.reload(key, previousValue);
        if (newValue == null) {
            return Futures.immediateFuture(null);
        }
        // To avoid a race, make sure the refreshed value is set into loadingValueReference
        // *before* returning newValue from the cache query.
        return transform(
            newValue,
            new com.google.common.base.Function<V, V>() {
                @Override
                public V apply(V newValue) {
                    LoadingValueReference.this.set(newValue);
                    return newValue;
                }
            },
            directExecutor());
    } catch (Throwable t) {
        ListenableFuture<V> result = setException(t) ? futureValue : fullyFailedFuture(t);
        if (t instanceof InterruptedException) {
            Thread.currentThread().interrupt();
        }
        return result;
    }
}           

▐Segment#waitForLoadingValue

V waitForLoadingValue(ReferenceEntry<K, V> e, K key, ValueReference<K, V> valueReference)
    throws ExecutionException {
    // 首先你要是一個 loading 節點
    if (!valueReference.isLoading()) {
        throw new AssertionError();
    }


    checkState(!Thread.holdsLock(e), "Recursive load of: %s", key);
    // don't consider expiration as we're concurrent with loading
    try {
        V value = valueReference.waitForValue();
        if (value == null) {
            throw new InvalidCacheLoadException("CacheLoader returned null for key " + key + ".");
        }
        // re-read ticker now that loading has completed
        long now = map.ticker.read();
        recordRead(e, now);
        return value;
    } finally {
        statsCounter.recordMisses(1);
    }
}


// com.google.common.cache.LocalCache.LoadingValueReference#waitForValue


public V waitForValue() throws ExecutionException {
    return getUninterruptibly(futureValue);
}


// com.google.common.util.concurrent.Uninterruptibles#getUninterruptibly


public static <V> V getUninterruptibly(Future<V> future) throws ExecutionException {
    boolean interrupted = false;
    try {
        while (true) {
            try {
                // hang 住,如果該線程被打斷了繼續回去 hang 住等結果,直到有結果傳回
                return future.get();
            } catch (InterruptedException e) {
                interrupted = true;
            }
        }
    } finally {
        if (interrupted) {
            Thread.currentThread().interrupt();
        }
    }
}           

▐Segment#lockedGetOrLoad

V lockedGetOrLoad(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
    ReferenceEntry<K, V> e;
    ValueReference<K, V> valueReference = null;
    LoadingValueReference<K, V> loadingValueReference = null;
    boolean createNewEntry = true;


    // 要對 segment 寫操作 ,先加鎖
    lock();
    try {
        // re-read ticker once inside the lock
        long now = map.ticker.read();
        preWriteCleanup(now);


        // 這裡基本就是 HashMap 的代碼,如果沒有 segment 的數組下标沖突了就拉一個連結清單
        int newCount = this.count - 1;
        AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
        int index = hash & (table.length() - 1);
        ReferenceEntry<K, V> first = table.get(index);


        for (e = first; e != null; e = e.getNext()) {
            K entryKey = e.getKey();
            if (e.getHash() == hash
                && entryKey != null
                && map.keyEquivalence.equivalent(key, entryKey)) {
                valueReference = e.getValueReference();


                // 如果在加載中 不做任何處理
                if (valueReference.isLoading()) {
                    createNewEntry = false;
                } else {
                    V value = valueReference.get();
                    // 如果緩存項為 null 資料已經被删除,通知對應的 queue 
                    if (value == null) {
                        enqueueNotification(
                            entryKey, hash, value, valueReference.getWeight(), RemovalCause.COLLECTED);
                    // 這個是 double check 如果緩存項過期 資料沒被删除,通知對應的 queue 
                    } else if (map.isExpired(e, now)) {
                        // This is a duplicate check, as preWriteCleanup already purged expired
                        // entries, but let's accommodate an incorrect expiration queue.
                        enqueueNotification(
                            entryKey, hash, value, valueReference.getWeight(), RemovalCause.EXPIRED);
                    // 再次看到的時候這個位置有值了直接傳回 
                    } else {
                        recordLockedRead(e, now);
                        statsCounter.recordHits(1);
                        return value;
                    }


                    // immediately reuse invalid entries
                    writeQueue.remove(e);
                    accessQueue.remove(e);
                    this.count = newCount; // write-volatile
                }
                break;
            }
        }


        // 沒有 loading ,建立一個 loading 節點
        if (createNewEntry) {
            loadingValueReference = new LoadingValueReference<>();


            if (e == null) {
                e = newEntry(key, hash, first);
                e.setValueReference(loadingValueReference);
                table.set(index, e);
            } else {
                e.setValueReference(loadingValueReference);
            }
        }
    } finally {
        unlock();
        postWriteCleanup();
    }


    if (createNewEntry) {
        try {
            // Synchronizes on the entry to allow failing fast when a recursive load is
            // detected. This may be circumvented when an entry is copied, but will fail fast most
            // of the time.
            synchronized (e) {
                return loadSync(key, hash, loadingValueReference, loader);
            }
        } finally {
            statsCounter.recordMisses(1);
        }
    } else {
        // The entry already exists. Wait for loading.
        return waitForLoadingValue(e, key, valueReference);
    }
}           

總結

結合上面圖以及源碼我們發現在整個流程中 GuavaCache 是沒有額外的線程去做資料清理和重新整理的,基本都是通過 Get 方法來觸發這些動作,減少了設計的複雜性和降低了系統開銷。

簡單回顧下 Get 的流程以及在每個階段做的事情,傳回的值。首先判斷緩存是否過期然後判斷是否需要重新整理,如果過期了就調用 loading 去同步加載資料(其他線程阻塞),如果是僅僅需要重新整理調用 reloading 異步加載(其他線程傳回老值)。

是以如果 refreshTime > expireTime 意味着永遠走不到緩存重新整理邏輯,緩存重新整理是為了在緩存有效期内盡量保證緩存資料一緻性是以在配置重新整理政策和過期政策時一定保證 refreshTime < expireTime 。

最後關于 Guava Cache 的使用建議 (最佳實踐) :

  1. 如果重新整理時間配置的較短一定要重載 reload 異步加載資料的方法,傳入一個自定義線程池保護 DB
  2. 失效時間一定要大于重新整理時間
  3. 如果是常駐記憶體的一些少量資料失效時間可以配置的較長重新整理時間配置短一點 (根據業務對緩存失效容忍度)

作者:梓川

來源-微信公衆号:大淘寶技術

出處:https://mp.weixin.qq.com/s/teGvFv-X3BTfJOD5OFr7Yg