引言
最近項目上線的頻率頗高,連着幾天加班熬夜,身體有點吃不消精神也有些萎靡,無奈業務方催的緊,工期就在眼前隻能硬着頭皮上了。腦子渾渾噩噩的時候,寫的就不能叫代碼,可以直接叫做
Bug
。我就熬夜寫了一個
bug
被罵慘了。
由于是做商城業務,要頻繁的對商品庫存進行扣減,應用是叢集部署,為避免并發造成庫存
超買超賣
等問題,采用
redis
分布式鎖加以控制。本以為給扣庫存的代碼加上鎖
lock.tryLock
就萬事大吉了
/**
* @author xiaofu
* @description 扣減庫存
* @date 2020/4/21 12:10
*/
public String stockLock() {
RLock lock = redissonClient.getLock("stockLock");
try {
/**
* 擷取鎖
*/
if (lock.tryLock(10, TimeUnit.SECONDS)) {
/**
* 查詢庫存數
*/
Integer stock = Integer.valueOf(stringRedisTemplate.opsForValue().get("stockCount"));
/**
* 扣減庫存
*/
if (stock > 0) {
stock = stock - 1;
stringRedisTemplate.opsForValue().set("stockCount", stock.toString());
LOGGER.info("庫存扣減成功,剩餘庫存數量:{}", stock);
} else {
LOGGER.info("庫存不足~");
}
} else {
LOGGER.info("未擷取到鎖業務結束..");
}
} catch (Exception e) {
LOGGER.info("處理異常", e);
} finally {
lock.unlock();
}
return "ok";
}
結果業務代碼執行完以後我忘了釋放鎖
lock.unlock()
,導緻
redis
線程池被打滿,
redis
服務大面積故障,造成庫存資料扣減混亂,被上司一頓臭罵,這個月績效~ 哎·~。
随着 使用
redis
鎖的時間越長,我發現
redis
鎖的坑遠比想象中要多。就算在面試題當中
redis
分布式鎖的出鏡率也比較高,比如:“用鎖遇到過哪些問題?” ,“又是如何解決的?” 基本都是一套連招問出來的。
今天就分享一下我用
redis
分布式鎖的踩坑日記,以及一些解決方案,和大家一起共勉。
一、鎖未被釋放
這種情況是一種低級錯誤,就是我上邊犯的錯,由于目前線程 擷取到
redis
鎖,處理完業務後未及時釋放鎖,導緻其它線程會一直嘗試擷取鎖阻塞,例如:用
Jedis
用戶端會報如下的錯誤資訊
redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool
redis線程池
已經沒有空閑線程來處理用戶端指令。
解決的方法也很簡單,隻要我們細心一點,拿到鎖的線程處理完業務及時釋放鎖,如果是重入鎖未拿到鎖後,線程可以釋放目前連接配接并且
sleep
一段時間。
public void lock() {
while (true) {
boolean flag = this.getLock(key);
if (flag) {
TODO .........
} else {
// 釋放目前redis連接配接
redis.close();
// 休眠1000毫秒
sleep(1000);
}
}
}
二、B的鎖被A給釋放了
我們知道
Redis
實作鎖的原理在于
SETNX
指令。當
key
不存在時将
key
的值設為
value
,傳回值為
1
;若給定的
key
已經存在,則
SETNX
不做任何動作,傳回值為
。
SETNX key value
我們來設想一下這個場景:
A
、
B
兩個線程來嘗試給
key
myLock
加鎖,
A線程
先拿到鎖(假如鎖
3秒
後過期),
B線程
就在等待嘗試擷取鎖,到這一點毛病沒有。
那如果此時業務邏輯比較耗時,執行時間已經超過
redis
鎖過期時間,這時
A線程
的鎖自動釋放(删除
key
),
B線程
檢測到
myLock
這個
key
不存在,執行
SETNX
指令也拿到了鎖。
但是,此時
A線程
執行完業務邏輯之後,還是會去釋放鎖(删除
key
),這就導緻
B線程
的鎖被
A線程
給釋放了。
為避免上邊的情況,一般我們在每個線程加鎖時要帶上自己獨有的
value
值來辨別,隻釋放指定
value
的
key
,否則就會出現釋放鎖混亂的場景。
三、資料庫事務逾時
emm~ 聊
redis
鎖咋還扯到資料庫事務上來了?别着急往下看,看下邊這段代碼:
@Transaction
public void lock() {
while (true) {
boolean flag = this.getLock(key);
if (flag) {
insert();
}
}
}
給這個方法添加一個
@Transaction
注解開啟事務,如代碼中抛出異常進行復原,要知道資料庫事務可是有逾時時間限制的,并不會無條件的一直等一個耗時的資料庫操作。
比如:我們解析一個大檔案,再将資料存入到資料庫,如果執行時間太長,就會導緻事務逾時自動復原。
一旦你的
key
長時間擷取不到鎖,擷取鎖
等待的時間
遠超過資料庫事務
逾時時間
,程式就會報異常。
一般為解決這種問題,我們就需要将資料庫事務改為手動送出、復原事務。
@Autowired
DataSourceTransactionManager dataSourceTransactionManager;
@Transaction
public void lock() {
//手動開啟事務
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
try {
while (true) {
boolean flag = this.getLock(key);
if (flag) {
insert();
//手動送出事務
dataSourceTransactionManager.commit(transactionStatus);
}
}
} catch (Exception e) {
//手動復原事務
dataSourceTransactionManager.rollback(transactionStatus);
}
}
四、鎖過期了,業務還沒執行完
這種情況和我們上邊提到的第二種比較類似,但解決思路上略有不同。
同樣是
redis
分布式鎖過期,而業務邏輯沒執行完的場景,不過,這裡換一種思路想問題,把
redis
鎖的過期時間再弄長點不就解決了嗎?
那還是有問題,我們可以在加鎖的時候,手動調長
redis
鎖的過期時間,可這個時間多長合适?業務邏輯的執行時間是不可控的,調的過長又會影響操作性能。
要是
redis
鎖的過期時間能夠自動續期就好了。
為了解決這個問題我們使用
redis
用戶端
redisson
,
redisson
很好的解決了
redis
在分布式環境下的一些棘手問題,它的宗旨就是讓使用者減少對
Redis
的關注,将更多精力用在處理業務邏輯上。
redisson
對分布式鎖做了很好封裝,隻需調用
API
即可。
RLock lock = redissonClient.getLock("stockLock");
redisson
在加鎖成功後,會注冊一個定時任務監聽這個鎖,每隔10秒就去檢視這個鎖,如果還持有鎖,就對
過期時間
進行續期。預設過期時間30秒。這個機制也被叫做:“
看門狗
”,這名字。。。
舉例子:假如加鎖的時間是30秒,過10秒檢查一次,一旦加鎖的業務沒有執行完,就會進行一次續期,把鎖的過期時間再次重置成30秒。
通過分析下邊
redisson
的源碼實作可以發現,不管是
加鎖
解鎖
續約
都是用戶端把一些複雜的業務邏輯,通過封裝在
Lua
腳本中發送給
redis
,保證這段複雜業務邏輯執行的
原子性
@Slf4j
@Service
public class RedisDistributionLockPlus {
/**
* 加鎖逾時時間,機關毫秒, 即:加鎖時間内執行完操作,如果未完成會有并發現象
*/
private static final long DEFAULT_LOCK_TIMEOUT = 30;
private static final long TIME_SECONDS_FIVE = 5 ;
/**
* 每個key的過期時間 {@link LockContent}
*/
private Map<String, LockContent> lockContentMap = new ConcurrentHashMap<>(512);
/**
* redis執行成功的傳回
*/
private static final Long EXEC_SUCCESS = 1L;
/**
* 擷取鎖lua腳本, k1:獲鎖key, k2:續約耗時key, arg1:requestId,arg2:逾時時間
*/
private static final String LOCK_SCRIPT = "if redis.call('exists', KEYS[2]) == 1 then ARGV[2] = math.floor(redis.call('get', KEYS[2]) + 10) end " +
"if redis.call('exists', KEYS[1]) == 0 then " +
"local t = redis.call('set', KEYS[1], ARGV[1], 'EX', ARGV[2]) " +
"for k, v in pairs(t) do " +
"if v == 'OK' then return tonumber(ARGV[2]) end " +
"end " +
"return 0 end";
/**
* 釋放鎖lua腳本, k1:獲鎖key, k2:續約耗時key, arg1:requestId,arg2:業務耗時 arg3: 業務開始設定的timeout
*/
private static final String UNLOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then " +
"local ctime = tonumber(ARGV[2]) " +
"local biz_timeout = tonumber(ARGV[3]) " +
"if ctime > 0 then " +
"if redis.call('exists', KEYS[2]) == 1 then " +
"local avg_time = redis.call('get', KEYS[2]) " +
"avg_time = (tonumber(avg_time) * 8 + ctime * 2)/10 " +
"if avg_time >= biz_timeout - 5 then redis.call('set', KEYS[2], avg_time, 'EX', 24*60*60) " +
"else redis.call('del', KEYS[2]) end " +
"elseif ctime > biz_timeout -5 then redis.call('set', KEYS[2], ARGV[2], 'EX', 24*60*60) end " +
"end " +
"return redis.call('del', KEYS[1]) " +
"else return 0 end";
/**
* 續約lua腳本
*/
private static final String RENEW_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1], ARGV[2]) else return 0 end";
private final StringRedisTemplate redisTemplate;
public RedisDistributionLockPlus(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
ScheduleTask task = new ScheduleTask(this, lockContentMap);
// 啟動定時任務
ScheduleExecutor.schedule(task, 1, 1, TimeUnit.SECONDS);
}
/**
* 加鎖
* 取到鎖加鎖,取不到鎖一直等待知道獲得鎖
*
* @param lockKey
* @param requestId 全局唯一
* @param expire 鎖過期時間, 機關秒
* @return
*/
public boolean lock(String lockKey, String requestId, long expire) {
log.info("開始執行加鎖, lockKey ={}, requestId={}", lockKey, requestId);
for (; ; ) {
// 判斷是否已經有線程持有鎖,減少redis的壓力
LockContent lockContentOld = lockContentMap.get(lockKey);
boolean unLocked = null == lockContentOld;
// 如果沒有被鎖,就擷取鎖
if (unLocked) {
long startTime = System.currentTimeMillis();
// 計算逾時時間
long bizExpire = expire == 0L ? DEFAULT_LOCK_TIMEOUT : expire;
String lockKeyRenew = lockKey + "_renew";
RedisScript<Long> script = RedisScript.of(LOCK_SCRIPT, Long.class);
List<String> keys = new ArrayList<>();
keys.add(lockKey);
keys.add(lockKeyRenew);
Long lockExpire = redisTemplate.execute(script, keys, requestId, Long.toString(bizExpire));
if (null != lockExpire && lockExpire > 0) {
// 将鎖放入map
LockContent lockContent = new LockContent();
lockContent.setStartTime(startTime);
lockContent.setLockExpire(lockExpire);
lockContent.setExpireTime(startTime + lockExpire * 1000);
lockContent.setRequestId(requestId);
lockContent.setThread(Thread.currentThread());
lockContent.setBizExpire(bizExpire);
lockContent.setLockCount(1);
lockContentMap.put(lockKey, lockContent);
log.info("加鎖成功, lockKey ={}, requestId={}", lockKey, requestId);
return true;
}
}
// 重複擷取鎖,線上程池中由于線程複用,線程相等并不能确定是該線程的鎖
if (Thread.currentThread() == lockContentOld.getThread()
&& requestId.equals(lockContentOld.getRequestId())){
// 計數 +1
lockContentOld.setLockCount(lockContentOld.getLockCount()+1);
return true;
}
// 如果被鎖或擷取鎖失敗,則等待100毫秒
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
// 這裡用lombok 有問題
log.error("擷取redis 鎖失敗, lockKey ={}, requestId={}", lockKey, requestId, e);
return false;
}
}
}
/**
* 解鎖
*
* @param lockKey
* @param lockValue
*/
public boolean unlock(String lockKey, String lockValue) {
String lockKeyRenew = lockKey + "_renew";
LockContent lockContent = lockContentMap.get(lockKey);
long consumeTime;
if (null == lockContent) {
consumeTime = 0L;
} else if (lockValue.equals(lockContent.getRequestId())) {
int lockCount = lockContent.getLockCount();
// 每次釋放鎖, 計數 -1,減到0時删除redis上的key
if (--lockCount > 0) {
lockContent.setLockCount(lockCount);
return false;
}
consumeTime = (System.currentTimeMillis() - lockContent.getStartTime()) / 1000;
} else {
log.info("釋放鎖失敗,不是自己的鎖。");
return false;
}
// 删除已完成key,先删除本地緩存,減少redis壓力, 分布式鎖,隻有一個,是以這裡不加鎖
lockContentMap.remove(lockKey);
RedisScript<Long> script = RedisScript.of(UNLOCK_SCRIPT, Long.class);
List<String> keys = new ArrayList<>();
keys.add(lockKey);
keys.add(lockKeyRenew);
Long result = redisTemplate.execute(script, keys, lockValue, Long.toString(consumeTime),
Long.toString(lockContent.getBizExpire()));
return EXEC_SUCCESS.equals(result);
}
/**
* 續約
*
* @param lockKey
* @param lockContent
* @return true:續約成功,false:續約失敗(1、續約期間執行完成,鎖被釋放 2、不是自己的鎖,3、續約期間鎖過期了(未解決))
*/
public boolean renew(String lockKey, LockContent lockContent) {
// 檢測執行業務線程的狀态
Thread.State state = lockContent.getThread().getState();
if (Thread.State.TERMINATED == state) {
log.info("執行業務的線程已終止,不再續約 lockKey ={}, lockContent={}", lockKey, lockContent);
return false;
}
String requestId = lockContent.getRequestId();
long timeOut = (lockContent.getExpireTime() - lockContent.getStartTime()) / 1000;
RedisScript<Long> script = RedisScript.of(RENEW_SCRIPT, Long.class);
List<String> keys = new ArrayList<>();
keys.add(lockKey);
Long result = redisTemplate.execute(script, keys, requestId, Long.toString(timeOut));
log.info("續約結果,True成功,False失敗 lockKey ={}, result={}", lockKey, EXEC_SUCCESS.equals(result));
return EXEC_SUCCESS.equals(result);
}
static class ScheduleExecutor {
public static void schedule(ScheduleTask task, long initialDelay, long period, TimeUnit unit) {
long delay = unit.toMillis(initialDelay);
long period_ = unit.toMillis(period);
// 定時執行
new Timer("Lock-Renew-Task").schedule(task, delay, period_);
}
}
static class ScheduleTask extends TimerTask {
private final RedisDistributionLockPlus redisDistributionLock;
private final Map<String, LockContent> lockContentMap;
public ScheduleTask(RedisDistributionLockPlus redisDistributionLock, Map<String, LockContent> lockContentMap) {
this.redisDistributionLock = redisDistributionLock;
this.lockContentMap = lockContentMap;
}
@Override
public void run() {
if (lockContentMap.isEmpty()) {
return;
}
Set<Map.Entry<String, LockContent>> entries = lockContentMap.entrySet();
for (Map.Entry<String, LockContent> entry : entries) {
String lockKey = entry.getKey();
LockContent lockContent = entry.getValue();
long expireTime = lockContent.getExpireTime();
// 減少線程池中任務數量
if ((expireTime - System.currentTimeMillis())/ 1000 < TIME_SECONDS_FIVE) {
//線程池異步續約
ThreadPool.submit(() -> {
boolean renew = redisDistributionLock.renew(lockKey, lockContent);
if (renew) {
long expireTimeNew = lockContent.getStartTime() + (expireTime - lockContent.getStartTime()) * 2 - TIME_SECONDS_FIVE * 1000;
lockContent.setExpireTime(expireTimeNew);
} else {
// 續約失敗,說明已經執行完 OR redis 出現問題
lockContentMap.remove(lockKey);
}
});
}
}
}
}
}
五、redis主從複制的坑
redis
高可用最常見的方案就是
主從複制
(master-slave),這種模式也給
redis分布式鎖
挖了一坑。
redis cluster
叢集環境下,假如現在
A用戶端
想要加鎖,它會根據路由規則選擇一台
master
節點寫入
key
mylock
,在加鎖成功後,
master
節點會把
key
異步複制給對應的
slave
節點。
如果此時
redis master
節點當機,為保證叢集可用性,會進行
主備切換
slave
變為了
redis master
B用戶端
在新的
master
節點上加鎖成功,而
A用戶端
也以為自己還是成功加了鎖的。
此時就會導緻同一時間内多個用戶端對一個分布式鎖完成了加鎖,導緻各種髒資料的産生。
至于解決辦法嘛,目前看還沒有什麼根治的方法,隻能盡量保證機器的穩定性,減少發生此事件的機率。
總結
上面就是我在使用
Redis
分布式鎖時遇到的一些坑,有點小感慨,經常用一個方法填上這個坑,沒多久就發現另一個坑又出來了,其實根本沒有什麼十全十美的解決方案,哪有什麼銀彈,隻不過是在權衡利弊後,選一個在接受範圍内的折中方案而已。
小福利:
最近身邊很多小夥伴都在面試,我整了一些Java方面的架構、面試資料,還有一些付費課程 ,噓~,免費 送給小夥伴們。需要的小夥伴可以關注我的公号,無套路自行領取