天天看點

Redis高并發問題這麼解決,泰褲啦

作者:java小悠

前言

當今網際網路世界中,高并發一直是各大網站、應用面臨的一大挑戰。為了應對高并發的流量壓力,各種技術手段被不斷研究和應用。其中,Redis 作為一款高性能的記憶體資料庫,被廣泛應用于解決高并發問題。

與傳統的關系型資料庫不同,Redis 采用了記憶體存儲的方式,可以實作快速的讀寫操作。同時,Redis 還具有豐富的資料結構和強大的緩存功能,可以大大提升系統的性能和可靠性。在處理高并發問題方面,Redis 也提供了諸如分布式鎖、限流、隊列等常用的解決方案,可以幫助開發者輕松應對高并發場景。

本篇文章将介紹 Redis 在處理高并發問題方面的應用,包括緩存穿透、緩存擊穿和緩存雪崩問題等,并針對這些問題給出實際的解決方案(附代碼),持續更新。希望本文能夠對大家在解決高并發問題時提供幫助和啟示。

緩存穿透

緩存穿透是指使用者查詢資料時,資料庫和緩存中都沒有資料。導緻了查詢請求直接繞過緩存,直接穿透到資料庫。

解決方法:

緩存空值

查詢Redis為null,查詢資料庫也為null,此時設定該key在緩存中,且值為null,過期時間為随機時間。random(10)。這樣子能保證資料在這段時間暴力請求,也隻會在這短暫的時間内擷取null,而有另外的線程在讀取資料庫表,并緩存在Redis中

scss複制代碼/**
 * 解決緩存穿透
 * @return
 */
public User getUser(String userId) {
    //從緩存中擷取user資訊
    User user = (User) redisTemplate.opsForValue().get(userId);

    if(user == null) {
        //如果緩存資料為空,從資料庫中擷取user資訊
        user = userService.getUserByUserId(userId);

        if(user == null) {
            //如果資料庫中資料為空,則存入一個空值,設定短時間内過期,防止緩存穿透
            redisTemplate.opsForValue().set(userId,null,5, TimeUnit.MINUTES);
        }else {
            //将資料寫入緩存
            redisTemplate.opsForValue().set(userId,user);
        }
    }
    return user;
}
           

布隆過濾器

可參考:www.cnblogs.com/throwable/p…

布隆過濾器(英語:Bloom Filter)是1970年由布隆提出的。它實際上是一個很長的二進制向量和一系列随機映射函數。布隆過濾器可以用于檢索一個元素是否在一個集合中。它的優點是空間效率和查詢時間都遠遠超過一般的算法,缺點是有一定的誤識别率和删除困難。

優點:

  1. 空間效率高,不用像Set集合一樣儲存元素的值,極大地節省了記憶體空間。隻需将插入的key通過Hash計算放到bitMap中的一個位置,在判斷是否存在該key的時候,隻需判斷bitMap中的位置是0還是1即可,達到了Set集合判斷是否存在某值的效果。
  2. 查詢效率高:布隆過濾器可以在非常快的時間内判斷一個元素是否存在于集合中,而不需要像傳統資料結構那樣進行線性掃描。這對于大規模資料集和高并發查詢場景尤其有用。

缺點:

  1. 布隆過濾器中的存儲的key越多,誤判率越高。将不存在的元素誤判為存在。
  2. 不能删除布隆過濾器中已存在的key

具體使用:使用Guava中的API

導入依賴

xml複制代碼    <dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>30.1-jre</version>
    </dependency>
           

構造BloomFilter的最多參數的靜态工廠方法是BloomFilter create(Funnel funnel, long expectedInsertions, double fpp, BloomFilter.Strategy strategy),參數如下:

  • funnel:主要是把任意類型的資料轉化成HashCode,是一個頂層接口,有大量内置實作,見Funnels
  • expectedInsertions:期望插入的元素個數
  • fpp:猜測是False Positive Percent,誤判率,小數而非百分數,預設值0.03
  • strategy:映射政策,目前隻有MURMUR128_MITZ_32和MURMUR128_MITZ_64(預設政策)
java複制代碼@RestController
@RequestMapping("user")
public class UserController{

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    private static final int expectedInsertions = 10000;

    private static final double fpp = 0.0444D;
    private static BloomFilter<String> bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()), expectedInsertions, fpp);

    @GetMapping("/user/{id}")
    public User getUserById(@PathVariable Long id){
        // 先從布隆過濾器中判斷此id是否存在,初始化的時候需要将資料庫的所有id都存到布隆過濾器,缺點不能删除已經不存在的id
        if(!bloomFilter.mightContain(String.valueof(id))){
            return null;
        }
        // 查詢緩存資料
        String userKey = "user_"+id;
        User user = (User) redisTemplate.opsForValue().get(userKey);
        if(user == null){
            // 查詢資料庫
            user = userRepository.findById(id).orElse(null);
            if(user != null){
                // 将查詢到的資料加入緩存
                redisTemplate.opsForValue().set(userKey, user, 300, TimeUnit.SECONDS);
            }
        }
        return user;
    }
}
           

緩存擊穿

緩存擊穿是指一個非常熱門的、但是不存在的資料被大量請求,導緻請求直接落到資料庫上,進而使得資料庫瞬間承受巨大的壓力,進而導緻資料庫響應變慢,甚至當機的現象。和緩存雪崩的差別在于熱點資料的量多不多。

解決方法:

緩存資料永不過期

将熱門的、但是不經常更新的資料設定為永不過期,可以避免緩存擊穿的風險。但是這種方法可能會導緻緩存資料的時效性降低,需要根據實際情況進行權衡。

分布式鎖

在加載緩存資料時,添加互斥鎖可以保證隻有一個請求去加載資料并更新緩存,其他請求等待緩存更新完成後再擷取資料,進而避免了大量請求直接落到資料庫上的情況。

scss複制代碼@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private RedissonClient redissonClient;

public Object getData(String key) {
    // 嘗試從緩存中擷取資料
    Object value = redisTemplate.opsForValue().get(key);
    if (value != null) {
        return value;
    }
    // 如果緩存中不存在資料,擷取分布式鎖
    RLock lock = redissonClient.getLock(key);
    try {
        lock.lock();
        // 再次嘗試從緩存中擷取資料,避免其他線程在擷取鎖之前已經寫入了緩存
        value = redisTemplate.opsForValue().get(key);
        if (value != null) {
            return value;
        }
        // 如果緩存中不存在資料,從資料庫中查詢
        value = getDataFromDatabase(key);
        if (value == null) {
            // 如果資料庫中也不存在資料,将空對象寫入緩存,并設定較短的過期時間
            redisTemplate.opsForValue().set(key, NullValue.INSTANCE, 1, TimeUnit.MINUTES);
        } else {
            // 如果資料庫中存在資料,将資料寫入緩存,并設定較長的過期時間
            redisTemplate.opsForValue().set(key, value, 5, TimeUnit.MINUTES);
        }
        return value;
    } finally {
        // 釋放分布式鎖
        lock.unlock();
    }
}
           

緩存雪崩

緩存雪崩是指在緩存中大量的緩存資料同時過期或者失效,導緻大量的請求直接落到了資料庫上,使得資料庫瞬間承受巨大的壓力,進而導緻資料庫響應變慢,甚至當機的現象。

解決方法:

過期時間随機化

将緩存資料的過期時間設定為随機值,可以避免大量緩存資料同時過期的情況發生,進而減少緩存雪崩的風險

scss複制代碼/**
 * 解決緩存雪崩
 * @return
 */
public User getUser2(String userId) {
    //從緩存中擷取user資訊
    User user = (User) redisTemplate.opsForValue().get(userId);

    if(user == null) {
        //如果緩存資料為空,從資料庫中擷取user資訊
        user = lUserMapper.getUserByUserId(userId);

        if(user == null) {
            redisTemplate.opsForValue().set(userId,null,3, TimeUnit.MINUTES);
        }else {
            //設定随機過期時間,将資料寫入緩存,防止緩存雪崩
            long mins = random.nextInt(60) + 60;
            redisTemplate.opsForValue().set(userId, user, mins, TimeUnit.MINUTES);
        }
    }
    return user;
}
           

分布式鎖

使用分布式鎖可以保證在緩存失效時,隻有一個請求去加載資料并更新緩存,其他請求等待緩存更新完成後再擷取資料,進而避免了大量請求同時落到資料庫上的情況(同緩存穿透)

引出問題

通過上面的例子我們已經了解到了Redis在高并發狀态下可能出現的問題以及解決方法,但是如果應用到實際場景中,針對每個接口都需要考慮這麼處理,那代碼中會充斥着大量的重複代碼,那肯定是不能接受的。那有沒有一種好的通用的解決方案呢?

這裡就不得不提起Spring Cache,Spring Cache 是Spring 提供的一整套的緩存解決方案。 雖然它本身并沒有提供緩存的實作,但是它提供了一整套的接口和代碼規範、配置、注解等,這樣它就可以整合各種緩存方案了,比如Redis、Ehcache,我們也就不用關心操作緩存的細節。Spring Cache怎麼整合Redis,本篇文章不做介紹,大家自行上網搜尋。

簡單介紹一下Spring Cache的幾個注解使用:

@Cacheable(key="#id") 根據id查詢或者查詢會啟動緩存

@CachePut(key="#post.postId") 插入或者更新會啟動緩存

@CacheEvict(key="#id") 删除時啟動緩存

Spring Cache解決方案

Spring Cache解決緩存穿透

有一個很簡單的解決方案,就是緩存null值,從緩存取不到的資料,在資料庫中也沒有取到,直接傳回空值。本身是不支援緩存null值的,需要在配置檔案開啟支援

ini複制代碼spring.cache.redis.cache-null-values=true
           

Spring Cache解決緩存擊穿

ini複制代碼@Cacheable(cacheNames="user", sync="true")
           

解釋:如果設定 sync 屬性為 true,表示該方法的緩存操作會使用同步鎖來保證線程安全,防止多個線程同時通路該方法導緻緩存出現問題。如果 sync 屬性為 false,則不會使用同步鎖,緩存操作可能存在并發問題。通過設定 sync 屬性為 true,可以保證多個線程同時通路同一個緩存方法時,隻有一個線程能夠執行方法,并将傳回值緩存到緩存中。其他線程會等待第一個線程執行完方法後,從緩存中擷取傳回值。這樣可以避免多個線程同時執行緩存方法,導緻緩存出現問題的情況。sync = true 可以有效的避免緩存擊穿的問題。

Spring Cache解決緩存雪崩

最簡單的方法是過期時間加上随機值,但是很麻煩的是,我們在使用@Cacheable注解的時候,原生功能沒法直接設定随機過期時間的,需要繼承RedisCacheManager,重寫裡面的getCache方法。

可參考:Spring Boot緩存實戰 Redis 設定有效時間和自動重新整理緩存,時間支援在配置檔案中配置

從上面可以看出Spring Cache解決Redis緩存問題還是比較麻煩的,特别是在解決緩存雪崩問題上。既然如此,我們為什麼不自己實作一個屬于我們自己的緩存機制,開幹!!!

設計一套緩存機制

綜上Redis出現的三個問題,給出綜合的解決方案:

  1. 緩存空值(布隆過濾器不建議) + 分布式鎖更新緩存解決Redis問題
  2. AOP + 自定義注解減少重複代碼。增加複用性

讀取緩存型注解@MyCacheable

less複制代碼@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface MyCacheable {

    /**
     * 緩存的Key,預設使用方法名作為Key
     */
    String value() default "";

    /**
     * 緩存的過期時間,機關為秒,預設值為60秒
     */
    int expireInSeconds() default 60;

}
           

讀取緩存型切面MyCacheableAspect

java複制代碼@Component
@Aspect
public class MyCacheableAspect {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @Autowired
    private RedissonClient redissonClient;

    /**
     * 定義緩存的切點,攔截所有标記了@Cached注解的方法
     */
    @Pointcut("@annotation(com.plus.annotation.MyCacheable)")
    public void cachedPointcut() {
    }

    private static final int expectedInsertions = 10000;

    private static final double fpp = 0.0444D;

    private static BloomFilter<String> bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charset.defaultCharset()), expectedInsertions, fpp);

    private static final String prefix = "lock-";

    /**
     * 在方法執行前嘗試從緩存中擷取資料,如果緩存中存在資料,直接傳回
     */
    @Around("cachedPointcut()")
    public Object cachedAround(ProceedingJoinPoint joinPoint) throws Throwable {

        // 擷取注解資訊
        MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
        Method method = methodSignature.getMethod();
        Cached cached = method.getAnnotation(MyCacheable.class);
        String key = StringUtils.isEmpty(cached.value()) ? method.getName() : cached.value();
        int expireInSeconds = cached.expireInSeconds();

        // 先從布隆過濾器中判斷此id是否存在,初始化的時候需要将資料庫的所有id都存到布隆過濾器。這麼多個資料表,id肯定有重複的。單獨存儲id是不行的。那就加上
        // 字首,例如user-id。初始化的時候就要周遊所有需要緩存資料的表,将該表的id都存到布隆過濾器,資料量很大,這也會導緻布隆過濾器誤判率增加。且後續這些
        // 表新增資料都要用将id也要存到布隆過濾器。缺點不能删除布隆過濾器已經不需要的id。布隆過濾器按我的見解是真不好用,雖然在一些特殊場景好用,但不包括此場景
        // 'mightContain(T)' is declared in unstable class 'com.google.common.hash.BloomFilter' marked with @Beta 說明這個方法是不穩定的,有可能誤判
        //if (key.contains("#id") && !bloomFilter.mightContain(key)) {
        //    return null;
        //}

        // 嘗試從緩存中擷取資料
        Object value = redisTemplate.opsForValue().get(key);
        if (value != null) {
            if (value instanceof NullValue) {
                // 如果緩存中存在空對象,傳回null
                return null;
            }
            return value;
        }

        // 如果緩存中不存在資料,擷取分布式鎖
        RLock lock = redissonClient.getLock(prefix + key);
        try {
            lock.lock();

            // 再次嘗試從緩存中擷取資料,避免其他線程在擷取鎖之前已經寫入了緩存
            value = redisTemplate.opsForValue().get(key);
            if (value != null) {
                if (value instanceof NullValue) {
                    // 如果緩存中存在空對象,傳回null
                    return null;
                }
                return value;
            }

            // 如果緩存中不存在資料,執行方法擷取資料
            value = joinPoint.proceed();

            if (value == null) {
                // 如果資料源中也不存在資料,将空對象寫入緩存,并設定較短的過期時間。防止緩存穿透,算是布隆過濾器的兜底
                redisTemplate.opsForValue().set(key, NullValue.INSTANCE, 1, TimeUnit.MINUTES);
            } else {
                // 如果資料源中存在資料,将資料寫入緩存,随機設定過期時間,避免緩存同時失效導緻緩存雪崩
                // 随機時間算法:以正常緩存時間為基準,取十分之一的範圍内生成随機數
                int seed = expireInSeconds / 10 == 0 ? expireInSeconds : expireInSeconds / 10;
                int randomTime = new Random().nextInt(seed);
                redisTemplate.opsForValue().set(key, value, expireInSeconds + randomTime, TimeUnit.SECONDS);
            }

            return value;
        } finally {
            // 釋放分布式鎖
            lock.unlock();
        }
    }

    /**
     * 緩存空對象類,受不了代碼規範插件報的null值警告才增加的。注意RedisTemplate<String, String> redisTemplate不能緩存null值,需要RedisTemplate<String, Object> redisTemplate
     */
    private static class NullValue implements Serializable {

        private static final long serialVersionUID = 1L;

        /**
         * 單例模式
         */
        private static final NullValue INSTANCE = new NullValue();

        private Object readResolve() {
            return INSTANCE;
        }

    }

}
           

更新緩存型注解@MyCacheEvict

less複制代碼@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface MyCacheEvict {
    /**
     * 緩存的Key
     */
    String value();

}
           

更新緩存型切面MyCacheEvictAspect

less複制代碼@Aspect
@Component
public class MyCacheEvictAspect {

    /**
     * 定義緩存的切點,攔截所有标記了@MyCacheEvict注解的方法
     */
    @Pointcut("@annotation(com.plus.annotation.MyCacheEvict)")
    public void cachedPointcut() {
    }

    @Autowired
    private RedisTemplate<String,Object> redisTemplate;

    @Around("cachedPointcut()")
    public Object cachedAround(ProceedingJoinPoint joinPoint) throws Throwable {

        // 擷取注解資訊
        MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
        Method method = methodSignature.getMethod();
        MyCacheEvict cacheEvict = method.getAnnotation(MyCacheEvict.class);
        String key = cacheEvict.value();
        // 執行方法
        Object result = joinPoint.proceed();
        redisTemplate.delete(key);
        return result;
    }
}
           

具體使用

less複制代碼@GetMapping("getAllUser")
@MyCacheable(value = "getAllUser")
public R<List<User>> getAllUser(){
    return R.data(userService.list(new UserDTO()));
}

@GetMapping("getUserById")
@MyCacheable(value = "id")
public R<User> getUserById(int id){
    return R.data(userService.getUserById(id));
}

@PostMapping("/save")
@MyCacheEvict("getAllUser")
public R save(@RequestBody @Validated User user) {
    return R.data(userService.save(user));
}
           

以上方法基本能解決大部分場景下的緩存問題,大家有需求可以自行拓展,例如支援多種格式的key處理。對此有疑問的,希望大家多多指導!!!

原文連結:https://juejin.cn/post/7231731488927465527

繼續閱讀