天天看點

redis 分布式鎖的 5個坑,真是又大又深

引言

最近項目上線的頻率頗高,連着幾天加班熬夜,身體有點吃不消精神也有些萎靡,無奈業務方催的緊,工期就在眼前隻能硬着頭皮上了。腦子渾渾噩噩的時候,寫的就不能叫代碼,可以直接叫做

Bug

。我就熬夜寫了一個

bug

被罵慘了。

由于是做商城業務,要頻繁的對商品庫存進行扣減,應用是叢集部署,為避免并發造成庫存

超買超賣

等問題,采用

redis

分布式鎖加以控制。本以為給扣庫存的代碼加上鎖

lock.tryLock

就萬事大吉了

/**
     * @author xiaofu
     * @description 扣減庫存
     * @date 2020/4/21 12:10
     */
   public String stockLock() {
        RLock lock = redissonClient.getLock("stockLock");
        try {
            /**
             * 擷取鎖
             */
            if (lock.tryLock(10, TimeUnit.SECONDS)) {
                /**
                 * 查詢庫存數
                 */
                Integer stock = Integer.valueOf(stringRedisTemplate.opsForValue().get("stockCount"));
                /**
                 * 扣減庫存
                 */
                if (stock > 0) {
                    stock = stock - 1;
                    stringRedisTemplate.opsForValue().set("stockCount", stock.toString());
                    LOGGER.info("庫存扣減成功,剩餘庫存數量:{}", stock);
                } else {
                    LOGGER.info("庫存不足~");
                }
            } else {
                LOGGER.info("未擷取到鎖業務結束..");
            }
        } catch (Exception e) {
            LOGGER.info("處理異常", e);
        } finally {
            lock.unlock();
        }
        return "ok";
  }           

結果業務代碼執行完以後我忘了釋放鎖

lock.unlock()

,導緻

redis

線程池被打滿,

redis

服務大面積故障,造成庫存資料扣減混亂,被上司一頓臭罵,這個月績效~ 哎·~。

随着 使用

redis

鎖的時間越長,我發現

redis

鎖的坑遠比想象中要多。就算在面試題當中

redis

分布式鎖的出鏡率也比較高,比如:“用鎖遇到過哪些問題?” ,“又是如何解決的?” 基本都是一套連招問出來的。

今天就分享一下我用

redis

分布式鎖的踩坑日記,以及一些解決方案,和大家一起共勉。

一、鎖未被釋放

這種情況是一種低級錯誤,就是我上邊犯的錯,由于目前線程 擷取到

redis

鎖,處理完業務後未及時釋放鎖,導緻其它線程會一直嘗試擷取鎖阻塞,例如:用

Jedis

用戶端會報如下的錯誤資訊

redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool           

redis線程池

已經沒有空閑線程來處理用戶端指令。

解決的方法也很簡單,隻要我們細心一點,拿到鎖的線程處理完業務及時釋放鎖,如果是重入鎖未拿到鎖後,線程可以釋放目前連接配接并且

sleep

一段時間。

public void lock() {
      while (true) {
          boolean flag = this.getLock(key);
          if (flag) {
                TODO .........
          } else {
                // 釋放目前redis連接配接
                redis.close();
                // 休眠1000毫秒
                sleep(1000);
          }
        }
    }           

二、B的鎖被A給釋放了

我們知道

Redis

實作鎖的原理在于

SETNX

指令。當

key

不存在時将

key

的值設為

value

,傳回值為

1

;若給定的

key

已經存在,則

SETNX

不做任何動作,傳回值為

SETNX key value           

我們來設想一下這個場景:

A

B

兩個線程來嘗試給

key

myLock

加鎖,

A線程

先拿到鎖(假如鎖

3秒

後過期),

B線程

就在等待嘗試擷取鎖,到這一點毛病沒有。

那如果此時業務邏輯比較耗時,執行時間已經超過

redis

鎖過期時間,這時

A線程

的鎖自動釋放(删除

key

),

B線程

檢測到

myLock

這個

key

不存在,執行

SETNX

指令也拿到了鎖。

但是,此時

A線程

執行完業務邏輯之後,還是會去釋放鎖(删除

key

),這就導緻

B線程

的鎖被

A線程

給釋放了。

為避免上邊的情況,一般我們在每個線程加鎖時要帶上自己獨有的

value

值來辨別,隻釋放指定

value

key

,否則就會出現釋放鎖混亂的場景。

三、資料庫事務逾時

emm~ 聊

redis

鎖咋還扯到資料庫事務上來了?别着急往下看,看下邊這段代碼:

@Transaction
   public void lock() {
   
        while (true) {
            boolean flag = this.getLock(key);
            if (flag) {
                insert();
            }
        }
    }           

給這個方法添加一個

@Transaction

注解開啟事務,如代碼中抛出異常進行復原,要知道資料庫事務可是有逾時時間限制的,并不會無條件的一直等一個耗時的資料庫操作。

比如:我們解析一個大檔案,再将資料存入到資料庫,如果執行時間太長,就會導緻事務逾時自動復原。

一旦你的

key

長時間擷取不到鎖,擷取鎖

等待的時間

遠超過資料庫事務

逾時時間

,程式就會報異常。

一般為解決這種問題,我們就需要将資料庫事務改為手動送出、復原事務。

@Autowired
    DataSourceTransactionManager dataSourceTransactionManager;

    @Transaction
    public void lock() {
        //手動開啟事務
        TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
        try {
            while (true) {
                boolean flag = this.getLock(key);
                if (flag) {
                    insert();
                    //手動送出事務
                    dataSourceTransactionManager.commit(transactionStatus);
                }
            }
        } catch (Exception e) {
            //手動復原事務
            dataSourceTransactionManager.rollback(transactionStatus);
        }
    }           

四、鎖過期了,業務還沒執行完

這種情況和我們上邊提到的第二種比較類似,但解決思路上略有不同。

同樣是

redis

分布式鎖過期,而業務邏輯沒執行完的場景,不過,這裡換一種思路想問題,把

redis

鎖的過期時間再弄長點不就解決了嗎?

那還是有問題,我們可以在加鎖的時候,手動調長

redis

鎖的過期時間,可這個時間多長合适?業務邏輯的執行時間是不可控的,調的過長又會影響操作性能。

要是

redis

鎖的過期時間能夠自動續期就好了。

為了解決這個問題我們使用

redis

用戶端

redisson

redisson

很好的解決了

redis

在分布式環境下的一些棘手問題,它的宗旨就是讓使用者減少對

Redis

的關注,将更多精力用在處理業務邏輯上。

redisson

對分布式鎖做了很好封裝,隻需調用

API

即可。

RLock lock = redissonClient.getLock("stockLock");           

redisson

在加鎖成功後,會注冊一個定時任務監聽這個鎖,每隔10秒就去檢視這個鎖,如果還持有鎖,就對

過期時間

進行續期。預設過期時間30秒。這個機制也被叫做:“

看門狗

”,這名字。。。

舉例子:假如加鎖的時間是30秒,過10秒檢查一次,一旦加鎖的業務沒有執行完,就會進行一次續期,把鎖的過期時間再次重置成30秒。

通過分析下邊

redisson

的源碼實作可以發現,不管是

加鎖

解鎖

續約

都是用戶端把一些複雜的業務邏輯,通過封裝在

Lua

腳本中發送給

redis

,保證這段複雜業務邏輯執行的

原子性

@Slf4j
@Service
public class RedisDistributionLockPlus {
 
    /**
     * 加鎖逾時時間,機關毫秒, 即:加鎖時間内執行完操作,如果未完成會有并發現象
     */
    private static final long DEFAULT_LOCK_TIMEOUT = 30;
 
    private static final long TIME_SECONDS_FIVE = 5 ;
 
    /**
     * 每個key的過期時間 {@link LockContent}
     */
    private Map<String, LockContent> lockContentMap = new ConcurrentHashMap<>(512);
 
    /**
     * redis執行成功的傳回
     */
    private static final Long EXEC_SUCCESS = 1L;
 
    /**
     * 擷取鎖lua腳本, k1:獲鎖key, k2:續約耗時key, arg1:requestId,arg2:逾時時間
     */
    private static final String LOCK_SCRIPT = "if redis.call('exists', KEYS[2]) == 1 then ARGV[2] = math.floor(redis.call('get', KEYS[2]) + 10) end " +
            "if redis.call('exists', KEYS[1]) == 0 then " +
               "local t = redis.call('set', KEYS[1], ARGV[1], 'EX', ARGV[2]) " +
               "for k, v in pairs(t) do " +
                 "if v == 'OK' then return tonumber(ARGV[2]) end " +
               "end " +
            "return 0 end";
 
    /**
     * 釋放鎖lua腳本, k1:獲鎖key, k2:續約耗時key, arg1:requestId,arg2:業務耗時 arg3: 業務開始設定的timeout
     */
    private static final String UNLOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then " +
            "local ctime = tonumber(ARGV[2]) " +
            "local biz_timeout = tonumber(ARGV[3]) " +
            "if ctime > 0 then  " +
               "if redis.call('exists', KEYS[2]) == 1 then " +
                   "local avg_time = redis.call('get', KEYS[2]) " +
                   "avg_time = (tonumber(avg_time) * 8 + ctime * 2)/10 " +
                   "if avg_time >= biz_timeout - 5 then redis.call('set', KEYS[2], avg_time, 'EX', 24*60*60) " +
                   "else redis.call('del', KEYS[2]) end " +
               "elseif ctime > biz_timeout -5 then redis.call('set', KEYS[2], ARGV[2], 'EX', 24*60*60) end " +
            "end " +
            "return redis.call('del', KEYS[1]) " +
            "else return 0 end";
    /**
     * 續約lua腳本
     */
    private static final String RENEW_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1], ARGV[2]) else return 0 end";
 
 
    private final StringRedisTemplate redisTemplate;
 
    public RedisDistributionLockPlus(StringRedisTemplate redisTemplate) {
        this.redisTemplate = redisTemplate;
        ScheduleTask task = new ScheduleTask(this, lockContentMap);
        // 啟動定時任務
        ScheduleExecutor.schedule(task, 1, 1, TimeUnit.SECONDS);
    }
 
    /**
     * 加鎖
     * 取到鎖加鎖,取不到鎖一直等待知道獲得鎖
     *
     * @param lockKey
     * @param requestId 全局唯一
     * @param expire   鎖過期時間, 機關秒
     * @return
     */
    public boolean lock(String lockKey, String requestId, long expire) {
        log.info("開始執行加鎖, lockKey ={}, requestId={}", lockKey, requestId);
        for (; ; ) {
            // 判斷是否已經有線程持有鎖,減少redis的壓力
            LockContent lockContentOld = lockContentMap.get(lockKey);
            boolean unLocked = null == lockContentOld;
            // 如果沒有被鎖,就擷取鎖
            if (unLocked) {
                long startTime = System.currentTimeMillis();
                // 計算逾時時間
                long bizExpire = expire == 0L ? DEFAULT_LOCK_TIMEOUT : expire;
                String lockKeyRenew = lockKey + "_renew";
 
                RedisScript<Long> script = RedisScript.of(LOCK_SCRIPT, Long.class);
                List<String> keys = new ArrayList<>();
                keys.add(lockKey);
                keys.add(lockKeyRenew);
                Long lockExpire = redisTemplate.execute(script, keys, requestId, Long.toString(bizExpire));
                if (null != lockExpire && lockExpire > 0) {
                    // 将鎖放入map
                    LockContent lockContent = new LockContent();
                    lockContent.setStartTime(startTime);
                    lockContent.setLockExpire(lockExpire);
                    lockContent.setExpireTime(startTime + lockExpire * 1000);
                    lockContent.setRequestId(requestId);
                    lockContent.setThread(Thread.currentThread());
                    lockContent.setBizExpire(bizExpire);
                    lockContent.setLockCount(1);
                    lockContentMap.put(lockKey, lockContent);
                    log.info("加鎖成功, lockKey ={}, requestId={}", lockKey, requestId);
                    return true;
                }
            }
            // 重複擷取鎖,線上程池中由于線程複用,線程相等并不能确定是該線程的鎖
            if (Thread.currentThread() == lockContentOld.getThread()
                      && requestId.equals(lockContentOld.getRequestId())){
                // 計數 +1
                lockContentOld.setLockCount(lockContentOld.getLockCount()+1);
                return true;
            }
 
            // 如果被鎖或擷取鎖失敗,則等待100毫秒
            try {
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                // 這裡用lombok 有問題
                log.error("擷取redis 鎖失敗, lockKey ={}, requestId={}", lockKey, requestId, e);
                return false;
            }
        }
    }
 
 
    /**
     * 解鎖
     *
     * @param lockKey
     * @param lockValue
     */
    public boolean unlock(String lockKey, String lockValue) {
        String lockKeyRenew = lockKey + "_renew";
        LockContent lockContent = lockContentMap.get(lockKey);
 
        long consumeTime;
        if (null == lockContent) {
            consumeTime = 0L;
        } else if (lockValue.equals(lockContent.getRequestId())) {
            int lockCount = lockContent.getLockCount();
            // 每次釋放鎖, 計數 -1,減到0時删除redis上的key
            if (--lockCount > 0) {
                lockContent.setLockCount(lockCount);
                return false;
            }
            consumeTime = (System.currentTimeMillis() - lockContent.getStartTime()) / 1000;
        } else {
            log.info("釋放鎖失敗,不是自己的鎖。");
            return false;
        }
 
        // 删除已完成key,先删除本地緩存,減少redis壓力, 分布式鎖,隻有一個,是以這裡不加鎖
        lockContentMap.remove(lockKey);
 
        RedisScript<Long> script = RedisScript.of(UNLOCK_SCRIPT, Long.class);
        List<String> keys = new ArrayList<>();
        keys.add(lockKey);
        keys.add(lockKeyRenew);
 
        Long result = redisTemplate.execute(script, keys, lockValue, Long.toString(consumeTime),
                Long.toString(lockContent.getBizExpire()));
        return EXEC_SUCCESS.equals(result);
 
    }
 
    /**
     * 續約
     *
     * @param lockKey
     * @param lockContent
     * @return true:續約成功,false:續約失敗(1、續約期間執行完成,鎖被釋放 2、不是自己的鎖,3、續約期間鎖過期了(未解決))
     */
    public boolean renew(String lockKey, LockContent lockContent) {
 
        // 檢測執行業務線程的狀态
        Thread.State state = lockContent.getThread().getState();
        if (Thread.State.TERMINATED == state) {
            log.info("執行業務的線程已終止,不再續約 lockKey ={}, lockContent={}", lockKey, lockContent);
            return false;
        }
 
        String requestId = lockContent.getRequestId();
        long timeOut = (lockContent.getExpireTime() - lockContent.getStartTime()) / 1000;
 
        RedisScript<Long> script = RedisScript.of(RENEW_SCRIPT, Long.class);
        List<String> keys = new ArrayList<>();
        keys.add(lockKey);
 
        Long result = redisTemplate.execute(script, keys, requestId, Long.toString(timeOut));
        log.info("續約結果,True成功,False失敗 lockKey ={}, result={}", lockKey, EXEC_SUCCESS.equals(result));
        return EXEC_SUCCESS.equals(result);
    }
 
 
    static class ScheduleExecutor {
 
        public static void schedule(ScheduleTask task, long initialDelay, long period, TimeUnit unit) {
            long delay = unit.toMillis(initialDelay);
            long period_ = unit.toMillis(period);
            // 定時執行
            new Timer("Lock-Renew-Task").schedule(task, delay, period_);
        }
    }
 
    static class ScheduleTask extends TimerTask {
 
        private final RedisDistributionLockPlus redisDistributionLock;
        private final Map<String, LockContent> lockContentMap;
 
        public ScheduleTask(RedisDistributionLockPlus redisDistributionLock, Map<String, LockContent> lockContentMap) {
            this.redisDistributionLock = redisDistributionLock;
            this.lockContentMap = lockContentMap;
        }
 
        @Override
        public void run() {
            if (lockContentMap.isEmpty()) {
                return;
            }
            Set<Map.Entry<String, LockContent>> entries = lockContentMap.entrySet();
            for (Map.Entry<String, LockContent> entry : entries) {
                String lockKey = entry.getKey();
                LockContent lockContent = entry.getValue();
                long expireTime = lockContent.getExpireTime();
                // 減少線程池中任務數量
                if ((expireTime - System.currentTimeMillis())/ 1000 < TIME_SECONDS_FIVE) {
                    //線程池異步續約
                    ThreadPool.submit(() -> {
                        boolean renew = redisDistributionLock.renew(lockKey, lockContent);
                        if (renew) {
                            long expireTimeNew = lockContent.getStartTime() + (expireTime - lockContent.getStartTime()) * 2 - TIME_SECONDS_FIVE * 1000;
                            lockContent.setExpireTime(expireTimeNew);
                        } else {
                            // 續約失敗,說明已經執行完 OR redis 出現問題
                            lockContentMap.remove(lockKey);
                        }
                    });
                }
            }
        }
    }
}
           

五、redis主從複制的坑

redis

高可用最常見的方案就是

主從複制

(master-slave),這種模式也給

redis分布式鎖

挖了一坑。

redis cluster

叢集環境下,假如現在

A用戶端

想要加鎖,它會根據路由規則選擇一台

master

節點寫入

key

mylock

,在加鎖成功後,

master

節點會把

key

異步複制給對應的

slave

節點。

如果此時

redis master

節點當機,為保證叢集可用性,會進行

主備切換

slave

變為了

redis master

B用戶端

在新的

master

節點上加鎖成功,而

A用戶端

也以為自己還是成功加了鎖的。

此時就會導緻同一時間内多個用戶端對一個分布式鎖完成了加鎖,導緻各種髒資料的産生。

至于解決辦法嘛,目前看還沒有什麼根治的方法,隻能盡量保證機器的穩定性,減少發生此事件的機率。

總結

上面就是我在使用

Redis

分布式鎖時遇到的一些坑,有點小感慨,經常用一個方法填上這個坑,沒多久就發現另一個坑又出來了,其實根本沒有什麼十全十美的解決方案,哪有什麼銀彈,隻不過是在權衡利弊後,選一個在接受範圍内的折中方案而已。

小福利:

最近身邊很多小夥伴都在面試,我整了一些Java方面的架構、面試資料,還有一些付費課程 ,噓~,免費 送給小夥伴們。需要的小夥伴可以關注我的公号,無套路自行領取