laitimes

Guava Cache Principles and Best Practices

author:Flash Gene

Cache has become a big part of most Internet architectures. Common solutions include well-known NoSQL databases (Redis, Memcached), as well as a large number of in-process caches such as EhCache, Guava Cache, Caffeine, etc.

In this series of articles, we will compare the advantages and disadvantages of local caching and distributed caching (NoSQL), use cases, best practices in the project, and principle analysis. This article introduces and analyzes Guava Cache, the big brother of local cache.

Basic Usage

Guava Cache can quickly construct cache objects that meet your needs through a simple and easy-to-use client, which does not require too many complex configurations, and in most cases is as simple as constructing a POJO. Here are two ways to construct a Cache object: CacheLoader and Callable

▐CacheLoader

The key to constructing a LoadingCache is to implement the load method, that is, when the cache items that need to be accessed do not exist, the cache will automatically call the load method to load the data into the cache. Here you will definitely wonder what to do if multiple threads come to access the non-existent cache item, that is, how to deal with the concurrency problem of the cache and whether human intervention is required, which will be covered below.

In addition to implementing the load method, you can also configure some cache-related properties, such as expiration load policies and refresh policies.

private static final LoadingCache<String, String> CACHE = CacheBuilder
    .newBuilder()
    // 最大容量为 100 超过容量有对应的淘汰机制,下文详述
    .maximumSize(100)
    // 缓存项写入后多久过期,下文详述
    .expireAfterWrite(60 * 5, TimeUnit.SECONDS)
    // 缓存写入后多久自动刷新一次,下文详述
    .refreshAfterWrite(60, TimeUnit.SECONDS)
    // 创建一个 CacheLoader,load 表示缓存不存在的时候加载到缓存并返回
    .build(new CacheLoader<String, String>() {
        // 加载缓存数据的方法
        @Override
        public String load(String key) {
            return "cache [" + key + "]";
        }
    });


public void getTest() throws Exception {
    CACHE.get("KEY_25487");
}           

▐Callable

In addition to specifying the load method to load the cache when constructing the cache object, we can also specify the loading method when getting the cache item, and we can use different loading methods in different locations according to the use case.

For example, in some places you can load cached items that don't exist through the L2 cache, while in other places you can load cached items directly from the DB.

// 注意返回值是 Cache
private static final Cache<String, String> SIMPLE_CACHE = CacheBuilder
    .newBuilder()
    .build();


public void getTest1() throws Exception {
    String key = "KEY_25487";
    // get 缓存项的时候指定 callable 加载缓存项
    SIMPLE_CACHE.get(key, () -> "cache [" + key + "]");
}           

Cached item loading mechanism

If a cache expires or the cache item does not exist in the cache, and there happens to be a large number of requests for the cached item at this time, the lack of protection mechanism will cause a large number of threads to request the data source to load the data and generate the cached item at the same time, which is called "cache breakdown".

As a simple example, if there are 100 requests at the same time to request KEY_25487 cache item, and the cache item happens to fail, then those 100 threads (if there are so many machines and traffic) will load the data from the DB at the same time.

Guava Cache Principles and Best Practices

【Cache Breakdown Legend】

If multiple threads come over, if we only let one thread load the data to generate the cache item, the other threads wait and then read the generated cache item, wouldn't it be a perfect solution. Then congratulations, you are on the same page as the Google engineers on this issue. However, with this solution, the problem is solved but not completely solved, and its flaws will be discussed later.

In fact, Guava Cache does concurrency control during load, ensuring that only one thread enters the load method when multiple threads request a non-existent or expired cache item, and the other threads wait until the cache item is generated, so as to avoid a large number of threads from breaking down the cache and reaching the DB. However, imagine that if tens of thousands of QPS come at the same time, there will be a large number of threads blocking that will cause the threads to be unable to be released, and even the embarrassing scenario of the thread pool being full, which is why this solution solves the "cache breakdown" problem but does not completely solve it.

The above mechanism is actually controlled by expireAfterWrite/expireAfterAccess, if you configure the expiration policy, the corresponding cached items will be accessed after expiration, and the cached items will be loaded by the above process.

Cache item refresh mechanism

The refresh and loading of cached items looks similar, both keeping the cached data up to date. The differences are:

  1. Cache item loading is a passive process, while cache flushing is an active triggering action. If the cache item does not exist or expires, the new value will only be loaded the next time it gets. Cache flushes, on the other hand, are more active in replacing old values in the cache.
  2. It is also important to note that the cache-flushed item must be in the cache, replacing the old value and not the NULL value.

Since the premise of a cached item refresh is that the cached item exists in the cache, the cache refresh does not make other threads wait like the cache loading process, but allows one thread to go to the data source to get the data, and the other threads return the old value until the asynchronous thread generates a new cached item.

This solution perfectly solves the "cache breakdown" problem encountered above, but only if the cache item has been generated. In the actual production situation, we can do cache warm-up to generate cache items in advance to avoid thread accumulation caused by traffic peaks.

This mechanism is implemented in Guava Cache through refreshAfterWrite, after configuring the refresh policy, the corresponding cache items will be refreshed at a set time to avoid thread blocking and ensure that the cached items are in the latest state.

But he's not perfect, for example, his limitation is that the cache items have already been generated, and if you happen to be unlucky, a large number of cached items need to be refreshed or expired at the same time, there will be a large number of threads requesting the DB, which is often referred to as "cache blood crash".

Asynchronous refresh mechanism for cached items

As mentioned above, a large area of cache entries will fail or refresh will cause an avalanche, then you can only limit the number of access DBs in three places:

  1. Source: Because the thread that loads the cache is the foreground request thread, if controlling the number of request threads is indeed to reduce the number of requests to the DB with large area failures, then there will be no high concurrent requests, even if the cache is not used.
  2. Middle layer buffer: Because the request thread and the thread accessing the DB are the same, if a layer of buffer is added in the middle, all request threads can be directly returned to the old value by asynchronously flushing the cache through a background thread pool, so that the traffic to the DB can be controlled by the pool size of the background thread pool.
  3. Bottom layer: Directly control the pool size of the DB connection pool, so that the number of connections to the DB will naturally be less, but if a large number of requests to the connection pool are found that the connection program cannot be obtained, the connection pool will be full, and there will be a large number of exceptions that the connection is rejected.

So the most appropriate way is to refresh the data asynchronously by adding an asynchronous thread pool, and the implementation in Guava Cache is to rewrite the reload method of the CacheLoader.

private static final LoadingCache<String, String> ASYNC_CACHE = CacheBuilder.newBuilder()
    .build(
    CacheLoader.asyncReloading(new CacheLoader<String, String>() {
        @Override
        public String load(String key) {
            return key;
        }


        @Override
        public ListenableFuture<String> reload(String key, String oldValue) throws Exception {
            return super.reload(key, oldValue);
        }
    }, new ThreadPoolExecutor(5, Integer.MAX_VALUE,
                              60L, TimeUnit.SECONDS,
                              new SynchronousQueue<>()))
);           

LocalCache 源码分析

Let's take a look at the class structure of Cache as a whole, and the following subclasses represent different ways of creating LocalCache

Guava Cache Principles and Best Practices

【Cache Class Diagram】

The core code is in the LocalCache file, and the essence of Guava Cache is ConcurrentMap.

Guava Cache Principles and Best Practices

【LocalCache Inheritance and Implementation】

Before looking at the source code, sort out the process and clarify your thoughts. If you want to look at the source code directly to understand the process, you can skip this diagram ~

Guava Cache Principles and Best Practices

【 Get Caching Data Flow Diagram 】

Here is the core of the Get process, the put stage is relatively simple, so we don't need to do the analysis.

▐LocalCache#get

V get(K key, CacheLoader<? super K, V> loader) throws ExecutionException {
    int hash = hash(checkNotNull(key));
    // 根据 hash 获取对应的 segment 然后从 segment 获取具体值
    return segmentFor(hash).get(key, hash, loader);
}           

▐Segment#get

V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
    checkNotNull(key);
    checkNotNull(loader);
    try {
        // count 表示在这个 segment 中存活的项目个数
        if (count != 0) {
            // 获取 segment 中的元素 (ReferenceEntry) 包含正在 load 的数据
            ReferenceEntry<K, V> e = getEntry(key, hash);
            if (e != null) {
                long now = map.ticker.read();
                // 获取缓存值,如果是 load,invalid,expired 返回 null,同时检查是否过期了,过期移除并返回 null
                V value = getLiveValue(e, now);
                if (value != null) {
                    // 记录访问时间
                    recordRead(e, now);
                    // 记录缓存命中一次
                    statsCounter.recordHits(1);
                    // 刷新缓存并返回缓存值 ,后面展开
                    return scheduleRefresh(e, key, hash, value, now, loader);
                }
                ValueReference<K, V> valueReference = e.getValueReference();
                // 如果在 loading 等着 ,后面展开
                if (valueReference.isLoading()) {
                    return waitForLoadingValue(e, key, valueReference);
                }
            }
        }


        // 走到这说明从来没写入过值 或者 值为 null 或者 过期(数据还没做清理),后面展开
        return lockedGetOrLoad(key, hash, loader);
    } catch (ExecutionException ee) {
        Throwable cause = ee.getCause();
        if (cause instanceof Error) {
            throw new ExecutionError((Error) cause);
        } else if (cause instanceof RuntimeException) {
            throw new UncheckedExecutionException(cause);
        }
        throw ee;
    } finally {
        postReadCleanup();
    }
}           

▐Segment#scheduleRefresh

// com.google.common.cache.LocalCache.Segment#scheduleRefresh


V scheduleRefresh(
    ReferenceEntry<K, V> entry,
    K key,
    int hash,
    V oldValue,
    long now,
    CacheLoader<? super K, V> loader) {
    
    if (
        // 配置了刷新策略 refreshAfterWrite
        map.refreshes()
        // 到刷新时间了
        && (now - entry.getWriteTime() > map.refreshNanos)
        // 没在 loading
        && !entry.getValueReference().isLoading()) {
        // 开始刷新,下面展开
        V newValue = refresh(key, hash, loader, true);
        if (newValue != null) {
            return newValue;
        }
    }
    return oldValue;
}




// com.google.common.cache.LocalCache.Segment#refresh


V refresh(K key, int hash, CacheLoader<? super K, V> loader, boolean checkTime) {
    // 插入 loading 节点
    final LoadingValueReference<K, V> loadingValueReference =
        insertLoadingValueReference(key, hash, checkTime);
    
    if (loadingValueReference == null) {
        return null;
    }


    // 异步刷新,下面展开
    ListenableFuture<V> result = loadAsync(key, hash, loadingValueReference, loader);
    if (result.isDone()) {
        try {
            return Uninterruptibles.getUninterruptibly(result);
        } catch (Throwable t) {
            // don't let refresh exceptions propagate; error was already logged
        }
    }
    return null;
}


// com.google.common.cache.LocalCache.Segment#loadAsync


ListenableFuture<V> loadAsync(
    final K key,
    final int hash,
    final LoadingValueReference<K, V> loadingValueReference,
    CacheLoader<? super K, V> loader) {
    // 通过 loader 异步加载数据,下面展开
    final ListenableFuture<V> loadingFuture = loadingValueReference.loadFuture(key, loader);
    loadingFuture.addListener(
        new Runnable() {
            @Override
            public void run() {
                try {
                    getAndRecordStats(key, hash, loadingValueReference, loadingFuture);
                } catch (Throwable t) {
                    logger.log(Level.WARNING, "Exception thrown during refresh", t);
                    loadingValueReference.setException(t);
                }
            }
        },
        directExecutor());
    return loadingFuture;
}


// com.google.common.cache.LocalCache.LoadingValueReference#loadFuture


public ListenableFuture<V> loadFuture(K key, CacheLoader<? super K, V> loader) {
    try {
        stopwatch.start();
        // oldValue 指在写入 loading 节点前这个位置的值,如果这个位置之前没有值 oldValue 会被赋值为 UNSET
        // UNSET.get() 值为 null ,所以这个缓存项从来没有进入缓存需要同步 load 具体原因前面提到了,如果通过
        // 异步 reload ,由于没有老值会导致其他线程返回的都是 null
        V previousValue = oldValue.get();
        if (previousValue == null) {
            V newValue = loader.load(key);
            return set(newValue) ? futureValue : Futures.immediateFuture(newValue);
        }
        // 异步 load
        ListenableFuture<V> newValue = loader.reload(key, previousValue);
        if (newValue == null) {
            return Futures.immediateFuture(null);
        }
        // To avoid a race, make sure the refreshed value is set into loadingValueReference
        // *before* returning newValue from the cache query.
        return transform(
            newValue,
            new com.google.common.base.Function<V, V>() {
                @Override
                public V apply(V newValue) {
                    LoadingValueReference.this.set(newValue);
                    return newValue;
                }
            },
            directExecutor());
    } catch (Throwable t) {
        ListenableFuture<V> result = setException(t) ? futureValue : fullyFailedFuture(t);
        if (t instanceof InterruptedException) {
            Thread.currentThread().interrupt();
        }
        return result;
    }
}           

▐Segment#waitForLoadingValue

V waitForLoadingValue(ReferenceEntry<K, V> e, K key, ValueReference<K, V> valueReference)
    throws ExecutionException {
    // 首先你要是一个 loading 节点
    if (!valueReference.isLoading()) {
        throw new AssertionError();
    }


    checkState(!Thread.holdsLock(e), "Recursive load of: %s", key);
    // don't consider expiration as we're concurrent with loading
    try {
        V value = valueReference.waitForValue();
        if (value == null) {
            throw new InvalidCacheLoadException("CacheLoader returned null for key " + key + ".");
        }
        // re-read ticker now that loading has completed
        long now = map.ticker.read();
        recordRead(e, now);
        return value;
    } finally {
        statsCounter.recordMisses(1);
    }
}


// com.google.common.cache.LocalCache.LoadingValueReference#waitForValue


public V waitForValue() throws ExecutionException {
    return getUninterruptibly(futureValue);
}


// com.google.common.util.concurrent.Uninterruptibles#getUninterruptibly


public static <V> V getUninterruptibly(Future<V> future) throws ExecutionException {
    boolean interrupted = false;
    try {
        while (true) {
            try {
                // hang 住,如果该线程被打断了继续回去 hang 住等结果,直到有结果返回
                return future.get();
            } catch (InterruptedException e) {
                interrupted = true;
            }
        }
    } finally {
        if (interrupted) {
            Thread.currentThread().interrupt();
        }
    }
}           

▐Segment#lockedGetOrLoad

V lockedGetOrLoad(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
    ReferenceEntry<K, V> e;
    ValueReference<K, V> valueReference = null;
    LoadingValueReference<K, V> loadingValueReference = null;
    boolean createNewEntry = true;


    // 要对 segment 写操作 ,先加锁
    lock();
    try {
        // re-read ticker once inside the lock
        long now = map.ticker.read();
        preWriteCleanup(now);


        // 这里基本就是 HashMap 的代码,如果没有 segment 的数组下标冲突了就拉一个链表
        int newCount = this.count - 1;
        AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
        int index = hash & (table.length() - 1);
        ReferenceEntry<K, V> first = table.get(index);


        for (e = first; e != null; e = e.getNext()) {
            K entryKey = e.getKey();
            if (e.getHash() == hash
                && entryKey != null
                && map.keyEquivalence.equivalent(key, entryKey)) {
                valueReference = e.getValueReference();


                // 如果在加载中 不做任何处理
                if (valueReference.isLoading()) {
                    createNewEntry = false;
                } else {
                    V value = valueReference.get();
                    // 如果缓存项为 null 数据已经被删除,通知对应的 queue 
                    if (value == null) {
                        enqueueNotification(
                            entryKey, hash, value, valueReference.getWeight(), RemovalCause.COLLECTED);
                    // 这个是 double check 如果缓存项过期 数据没被删除,通知对应的 queue 
                    } else if (map.isExpired(e, now)) {
                        // This is a duplicate check, as preWriteCleanup already purged expired
                        // entries, but let's accommodate an incorrect expiration queue.
                        enqueueNotification(
                            entryKey, hash, value, valueReference.getWeight(), RemovalCause.EXPIRED);
                    // 再次看到的时候这个位置有值了直接返回 
                    } else {
                        recordLockedRead(e, now);
                        statsCounter.recordHits(1);
                        return value;
                    }


                    // immediately reuse invalid entries
                    writeQueue.remove(e);
                    accessQueue.remove(e);
                    this.count = newCount; // write-volatile
                }
                break;
            }
        }


        // 没有 loading ,创建一个 loading 节点
        if (createNewEntry) {
            loadingValueReference = new LoadingValueReference<>();


            if (e == null) {
                e = newEntry(key, hash, first);
                e.setValueReference(loadingValueReference);
                table.set(index, e);
            } else {
                e.setValueReference(loadingValueReference);
            }
        }
    } finally {
        unlock();
        postWriteCleanup();
    }


    if (createNewEntry) {
        try {
            // Synchronizes on the entry to allow failing fast when a recursive load is
            // detected. This may be circumvented when an entry is copied, but will fail fast most
            // of the time.
            synchronized (e) {
                return loadSync(key, hash, loadingValueReference, loader);
            }
        } finally {
            statsCounter.recordMisses(1);
        }
    } else {
        // The entry already exists. Wait for loading.
        return waitForLoadingValue(e, key, valueReference);
    }
}           

summary

Combined with the above figure and the source code, we find that GuavaCache does not have additional threads to do data cleaning and refresh in the whole process, and basically triggers these actions through the Get method, which reduces the complexity of the design and reduces the system overhead.

A brief review of the process of Get, what was done at each stage, and the values returned. First, determine whether the cache is expired and then determine whether it needs to be refreshed, if it expires, call loading to synchronously load the data (blocked by other threads), if it only needs to be refreshed, call reloading to load asynchronously (other threads return the old value).

Therefore, if refreshTime > expireTime means that the cache refresh logic will never be followed, the cache refresh is to ensure the consistency of cached data as much as possible during the cache validity period, so when configuring the refresh policy and expiration policy, you must ensure that the refreshTime < expireTime.

Finally, some tips on how to use Guava Cache (best practices):

  1. If the refresh time configuration is short, be sure to overload the reload method to load data asynchronously and pass in a custom thread pool to protect the DB
  2. The expiration time must be greater than the refresh time
  3. If it is some small amount of data in resident memory, the refresh time can be configured to be a little shorter (according to the cache invalidation tolerance of the business)

Author: Azusa Chuan

Source-WeChat public account: big Taobao technology

Source: https://mp.weixin.qq.com/s/teGvFv-X3BTfJOD5OFr7Yg