天天看點

高并發系統之限流、緩存和降級設計方案高并發系統設計方案

高并發系統設計方案

高并發系統設計一般會考慮三個方面:限流、緩存、降級

限流:控制在一定時間内的通路量,比如秒殺,這種場景下通路量過于龐大,使用緩存或者降級根本無法解決通路量巨大的問題,那麼隻能選擇限流

緩存:緩存設計是我們常用的減輕伺服器壓力的方案,常用的緩存有 redis(分布式)、 memcache(分布式)、google guava cache(本地緩存)等

降級:高并發高負載情況下,選擇動态的關閉一些不重要的服務拒絕通路等,為重要的服務節省資源,比如雙11當天淘寶關閉了退款等功能

限流

常見的限流算法:令牌桶、漏桶、計數器

接入層限流:指請求流量的入口,該層的主要目的有 負載均衡、非法請求過濾、請求聚合、服務品質監控等等

Nginx接入層限流:使用Nginx自帶了兩個子產品,連接配接數限流子產品ngx_http_limit_conn_module和漏桶算法實作的請求限流子產品ngx_http_limit_req_module

應用層限流:比如TPS/QPS超過一定範圍後進行控制,比如tomcat可配置可接受的等待連接配接數、最大連接配接數、最大線程數等

令牌桶:Guava架構提供了令牌桶算法實作,可直接拿來使用,Guava RateLimiter提供了令牌桶算法實作:平滑突發限流(SmoothBursty)和平滑預熱限流(SmoothWarmingUp)實作,代碼如下:

import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;

import com.google.common.util.concurrent.RateLimiter;

/**
 * 固定速率請求,每200ms允許一個請求通過
 *
 * @date 2019-10-14 16:54
 **/
public class FixedRequestLimitDemo {
    @Test
    public void fixedRequestTest() {
        // 表示1秒内産生多少個令牌,即1秒内産生5個令牌,控制每200ms一個請求
        RateLimiter rateLimiter = RateLimiter.create(5);
        AtomicInteger counter = new AtomicInteger(0);

        for (int i = 0; i < 15; i++) {
            // 同時開啟15個線程通路
            new Thread(() -> fixedRequest(rateLimiter, counter)).start();
        }

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private void fixedRequest(RateLimiter rateLimiter, AtomicInteger counter) {
        double time = rateLimiter.acquire();
        if (time >= 0) {
            System.out.println("時間:" + time + " ,第 " + counter.incrementAndGet() + " 個業務處理");
        }
    }
}
           

計數器:使用計數器方案簡單粗暴的實作限流,使用 google guava cache(本地緩存),代碼如下:

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.junit.Test;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;

/**
 * 限流demo
 *
 * @date 2019-10-12 17:26
 **/
public class LimitDemo {
    @Test
    public void limitTest() {
        // 本地緩存、key (Long)表示目前時間秒、value (AtomicLong)表示請求計數器
        LoadingCache<Long, AtomicLong> counter = CacheBuilder.newBuilder().
                expireAfterAccess(2, TimeUnit.SECONDS).build(
                new CacheLoader<Long, AtomicLong>() {
                    @Override
                    public AtomicLong load(Long aLong) throws Exception {
                        return new AtomicLong(0);
                    }
                }
        );

        for (int i = 0; i < 15; i++) {
            // 同時開啟15個線程通路
            new Thread(() -> requestLimit(counter)).start();
        }

        try {
            // 這裡休眠是為了多線程全部執行完輸出結果
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

   /**
     * 請求限制,本方法加鎖是為了控制緩存LoadingCache get資料時候的并發
     *
     * @param counter
     */
    private synchronized void requestLimit(LoadingCache<Long, AtomicLong> counter) {
        try {
            // 流量限制數量
            long limit = 10;
            // 目前秒數
            long currentSecond = System.currentTimeMillis() / 1000;
            if (counter.get(currentSecond).incrementAndGet() > limit) {
                // 超出每秒内允許通路10個的限制
                System.out.println("第 " + counter.get(currentSecond) + " 個請求超出上限,限流了");
                return;
            }

            System.out.println("第 " + counter.get(currentSecond) + " 個業務處理");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
           

緩存

緩存方案可以有效地減輕伺服器壓力,但是它的設計也有一些必須考慮的問題,例如緩存雪崩、緩存擊穿、緩存穿透、緩存預熱、緩存降級、資源隔離等

緩存雪崩

設定緩存時使用了相同的過期時間,導緻大量的緩存在同一時刻同時失效,請求全部通路了DB(資料層),DB瞬間壓力過大而當機,進而引起一系列的嚴重後果

解決方案

1、使用鎖或者隊列的方式控制多線程同時對DB的讀寫,即避免失效時所有請求一下子全部通路到DB

2、緩存失效時間分散開,即設定緩存時設定不同的過期時間(原有的緩存時間上增加一個随機數),避免同一時刻大量緩存失效

3、緩存資料增加緩存失效标記,如果緩存标記失效,則更新資料緩存

使用加鎖一般适用于并發量不是特别大的場景,僞代碼如下:

//僞代碼
public object getProductList() {
    int cacheTime = 30;
    String cacheKey = "product_list";
    String lockKey = cacheKey;

    String cacheValue = CacheHelper.get(cacheKey);
    if (cacheValue != null) {
        return cacheValue;
    } else {
        // 對lockKey加鎖
        synchronized(lockKey) {
            cacheValue = CacheHelper.get(cacheKey);
            if (cacheValue != null) {
                return cacheValue;
            } else {
	            //這裡一般是sql查詢資料
                cacheValue = getProductListFromDB(); 
                CacheHelper.Add(cacheKey, cacheValue, cacheTime);
            }
        }
        return cacheValue;
    }
}
           

注意:加鎖排隊僅僅是減輕了資料庫的壓力,但是并沒有提高系統吞吐量,它不僅要解決分布式鎖的問題,還會産生線程阻塞問題,是以使用者體驗比較差!是以,在真正的高并發場景下很少使用!

緩存資料增加緩存失效标記,僞代碼如下:

//僞代碼
public object getProductList() {
    int cacheTime = 30;
    String cacheKey = "product_list";
    // 緩存标記
    String cacheSign = cacheKey + "_sign";

    String sign = CacheHelper.Get(cacheSign);
    // 擷取緩存值
    String cacheValue = CacheHelper.Get(cacheKey);
    if (sign != null) {
        //未過期,直接傳回
        return cacheValue; 
    } else {
        CacheHelper.Add(cacheSign, "1", cacheTime);
        // 開啟背景線程來更新緩存
        ThreadPool.QueueUserWorkItem((arg) -> {
			// sql查詢資料
            cacheValue = getProductListFromDB(); 
	        //日期設緩存時間的2倍,用于髒讀
	        CacheHelper.Add(cacheKey, cacheValue, cacheTime * 2);                 
        });
        return cacheValue;
    }
} 
           

注意:緩存标記的失效時間設定為緩存資料失效時間的一半,這樣根據緩存标記背景線程提前更新緩存資料,在這之前還可以傳回舊的緩存資料,這種方案對記憶體要求高,每個緩存都要設定一個對應的緩存标記

緩存擊穿

正好要過期的key在某一個時刻被高并發的通路,即某時刻的熱點資料,key正好過期了需要請求DB回寫到緩存中去,此時大量的請求都通路到DB,DB瞬間壓力過大也崩掉了。這裡和緩存雪崩不同的是緩存擊穿針對的是某一個key,而緩存雪崩針對的是多個key

解決方案

1、使用互斥鎖

2、緩存不過期,使用value内部的過期時間來控制過期更新緩存值

使用互斥鎖,僞代碼如下:

// 1、redis setnx 實作
    public String getValue(key) {
        String value = redis.get(key);
        if(value != null){
            return value;
        }
        // 緩存值過期,擷取鎖,設定3min的逾時,防止del操作失敗的時候産生死鎖
        if (redis.setnx(key_mutex, 1, 3 * 60) == 1) {
            // 代表擷取鎖成功
            value = db.get(key);
            redis.set(key, value, expire_secs);
            redis.del(key_mutex);
            return value;
        }

        // 沒有擷取到鎖表示其他線程已經重新設定緩存了,此時重試擷取緩存即可
        try {
            Thread.sleep(50);
            //重試
            getValue(key);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }


    // 2、memcache 實作
    public String getValue(String key) {
        String value = memcache.get(key);
        if (value != null) {
            return value;
        }

        // 加鎖
        if (memcache.add(key_mutex, 3 * 60 * 1000) == true) {
            value = db.get(key);
            memcache.set(key, value);
            memcache.delete(key_mutex);
            return value;
        }

        // 沒有擷取到鎖表示其他線程已經重新設定緩存了,此時重試擷取緩存即可
        try {
            Thread.sleep(50);
            //重試
            getValue(key);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
           

緩存不過期,使用value内部過期值更新緩存,僞代碼如下:

/**
     * redis 實作
     * <p>
     * V 對象中有兩個屬性,value 是緩存值,timeout 是緩存過期更新時間
     *
     * @param key
     * @return
     */
    public String getValue(String key) {
        V v = redis.get(key);
        String value = v.getValue();
        long timeout = v.getTimeout();
        if (timeout > System.currentTimeMillis()) {
            // 緩存值沒有到緩存過期時間,直接傳回
            return value;
        }

        // 緩存值過期,異步更新背景執行
        threadPool.execute(new Runnable() {
            public void run() {
                String keyMutex = "mutex:" + key;
                if (redis.setnx(keyMutex, "1", 3 * 60) == 1) {
                    String newValue = db.get(key);
                    redis.set(key, newValue);
                    redis.delete(keyMutex);
                }
            }
        });

        // 此時直接傳回舊value值
        return value;
    }
           

此方法的優點是并發性能好,缺點是不能及時的擷取到最新的緩存值,有點延遲

緩存穿透

查詢一個資料庫中不存在的資料,此時緩存中也不會設定緩存值,那麼所有請求都會直接查詢到DB,将好像是緩存穿透了一樣,請求過大将會導緻DB當機引起嚴重後果。黑客可以利用這種不存在的key來頻繁請求我們的應用,拖垮應用伺服器

解決方案

1、布隆過濾器,将所有可能存在的資料哈希到一個足夠大的bitmap中,一個一定不存在的資料會被這個bitmap攔截掉,進而避免了對底層存儲系統的查詢壓力

2、查詢到資料為空的資料也設定到緩存系統中,緩存時間可以設定的相對短一些,這樣子的話緩存就可以起作用,擋掉了直接通路DB的壓力

查詢到資料為空的資料也設定到緩存系統,僞代碼如下:

public String getValue(key) {
        String value = redis.get(key);
        if (value != null) {
            return value;
        }
        // 緩存值過期,擷取鎖,設定3min的逾時,防止del操作失敗的時候産生死鎖
        if (redis.setnx(key_mutex, 1, 3 * 60) == 1) {
            // 代表擷取鎖成功
            value = db.get(key);
            if (value == null) {
                // value 為空時也緩存起來
                value = String.empty;
            }
            redis.set(key, value, expire_secs);
            redis.del(key_mutex);
            return value;
        }

        // 沒有擷取到鎖表示其他線程已經重新設定緩存了,此時重試擷取緩存即可
        try {
            Thread.sleep(50);
            //重試
            getValue(key);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
           

緩存預熱

緩存預熱就是系統上線後,将需要用的緩存資料直接加載到緩存系統。這樣就可以避免在使用者請求的時候再去查詢DB設定資料緩存,使用者直接查詢事先被預熱的緩存資料即可

解決方案

1、資料量不大的時候直接在項目啟動的時候加載緩存資料

2、定時重新整理緩存資料

3、頁面按鈕手動操作重新整理緩存資料

緩存降級

當通路量劇增,緩存服務響應慢時,需要對某些資料自動緩存降級,也可以配置開關人工降級,例如redis緩存通路不到的時候降級通路二級緩存、本地緩存等

在進行降級之前要對系統進行梳理,那些緩存可以降級,那些不能降級,然後設定預案:

1、一般:比如有些服務偶爾因為網絡抖動或者服務正在上線而逾時,可以自動降級;

2、警告:有些服務在一段時間内成功率有波動(如在95~100%之間),可以自動降級或人工降級,并發送告警;

3、錯誤:比如可用率低于90%,或者資料庫連接配接池被打爆了,或者通路量突然猛增到系統能承受的最大閥值,此時可以根據情況自動降級或者人工降級;

4、嚴重錯誤:比如因為特殊原因資料錯誤了,此時需要緊急人工降級

使用Hystrix對Redis進行資源隔離

對redis的通路加上保護措施,全都用hystrix的command進行封裝,做資源隔離,確定redis的通路隻能在固定的線程池内的資源來進行通路,哪怕是redis通路的很慢,有等待和逾時,也不要緊,隻有少量額線程資源用來通路,緩存服務不會被拖垮

解決方案

引入Hystrix 保護redis

1、引入Hystrix依賴

<dependency>
    <groupId>com.netflix.hystrix</groupId>
    <artifactId>hystrix-core</artifactId>
    <version>1.5.18</version>
</dependency>
<dependency>
    <groupId>com.netflix.hystrix</groupId>
    <artifactId>hystrix-metrics-event-stream</artifactId>
    <version>1.5.18</version>
</dependency>
           

2、具體示例代碼如下:

/**
 * 儲存商品資訊到redis緩存中
 *
 * @date 2018/06/12
 */
public class SaveProductInfo2RedisCacheCommand extends HystrixCommand<Boolean> {

    private ProductInfo productInfo;

    public SaveProductInfo2RedisCacheCommand(ProductInfo productInfo) {

        super(HystrixCommandGroupKey.Factory.asKey("RedisGroup"));
        this.productInfo = productInfo;
    }


    @Override
    protected Boolean run() {
        StringRedisTemplate redisTemplate = SpringContext.getApplicationContext().getBean(StringRedisTemplate.class);

        String key = "product_info_" + productInfo.getId();
        redisTemplate.opsForValue().set(key, JSON.toJSONString(productInfo));

        return true;
    }
}


/**
 * 将商品資訊儲存到redis中
 *
 * @param productInfo
 */
public void saveProductInfo2RedisCache(ProductInfo productInfo) {
    SaveProductInfo2RedisCacheCommand command = new SaveProductInfo2RedisCacheCommand(productInfo);
    command.execute();
}



public class GetProductInfoCommand extends HystrixCommand<ProductInfo> {


    private Long productId;

    public GetProductInfoCommand(Long productId) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ProductInfoService"))
                .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("GetProductInfoPool"))
                .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
                        .withCoreSize(10)
                        .withMaxQueueSize(12)
                        .withQueueSizeRejectionThreshold(8)
                        .withMaximumSize(30)
                        .withAllowMaximumSizeToDivergeFromCoreSize(true)
                        .withKeepAliveTimeMinutes(1)
                        .withMaxQueueSize(50)
                        .withQueueSizeRejectionThreshold(100))
                .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                        // 多少個請求以上才會判斷斷路器是否需要開啟。
                        .withCircuitBreakerRequestVolumeThreshold(30)
                        // 錯誤的請求達到40%的時候就開始斷路。
                        .withCircuitBreakerErrorThresholdPercentage(40)
                        // 3秒以後嘗試恢複
                        .withCircuitBreakerSleepWindowInMilliseconds(4000))
        );
        this.productId = productId;
    }

    @Override
    protected ProductInfo run() throws Exception {
        String productInfoJSON = "{\"id\": " + productId + ", \"name\": \"iphone7手機\", \"price\": 5599, \"pictureList\":\"a.jpg,b.jpg\", \"specification\": \"iphone7的規格\", \"service\": \"iphone7的售後服務\", \"color\": \"紅色,白色,黑色\", \"size\": \"5.5\", \"shopId\": 1, \"modifiedTime\": \"2017-01-01 12:01:00\"}";
        return JSONObject.parseObject(productInfoJSON, ProductInfo.class);
    }
}
           

服務降級

高并發高負載情況下,選擇動态的關閉一些不重要的服務拒絕通路,比如推薦、留言等不太重要的服務

總結

上面是常用的高并發系統設計考慮的方面,尤其是緩存中的解決方案,沒有哪一個是最優的,适合自己的業務場景才是最好的

參考文章:

https://blog.csdn.net/kevin_love_it/article/details/88095271

https://blog.csdn.net/zeb_perfect/article/details/54135506

https://blog.csdn.net/xlgen157387/article/details/79530877

http://www.saily.top/2018/06/12/cache06/

繼續閱讀