一,分布式鎖誕生的原因
為什麼分布式鎖會誕生?類似于淘寶雙11的秒殺活動,同一件商品怎麼才能隻被一個使用者搶到,其他使用者搶不到?分布式鎖就能巧妙地解決類似秒殺和搶單的問題。技術源于生活,更高于生活。對于阿裡的那種的大型秒殺活動,分布式鎖隻是其中的一環,單單靠分布式鎖不足以支撐那種大并發的情景,後續解決方案會陸續更新。本期隻講解分布式鎖。
二,常見的分布式鎖
1,基于資料庫實作的分布式鎖
基于表實作的分布式鎖,如果有多個請求同時送出到資料庫的話,資料庫會保證隻有一個操作可以成功,那麼我們就可以認為操作成功的那個線程獲得了該方法的鎖
2,Redis分布式鎖
主要通過redis的存值取值進行判斷的,根據傳回的參數判斷是否能拿到鎖
3,Zookeeper分布式鎖
利用Zookeeper的順序臨時節點,來實作分布式鎖和等待隊列。Zookeeper設計的初衷,就是為了實作分布式鎖服務的。
三,SpringBoot項目中的Redis分布式鎖
1,引入相關的依賴
首先你的springboot項目是需要正常的能夠跑通的,然後在你的主pom檔案引入redis的相關依賴
<!--springboot 內建reids-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.1.3.RELEASE</version>
</dependency>
2,redis的分布鎖的原理
一般情況像這種經常用到的代碼,單獨抽一個Redis工具類出來,友善自己檢視和調用。比如,我們現在進行一個活動,整點秒殺一台macPro電腦。想要将商品秒殺到,那隻需要改變資料庫的商品表的該商品的狀态置為已下架,同時建立訂單即可。在整點的時候,很多人同時秒殺更新資料庫,那我們如何保證隻被一個人拿到?道理很簡單,我們将下單(更新資料庫,建立訂單)打包成一個方法,在方法的外面加鎖,該鎖被第一個A線程拿到後,其他線程未拿到鎖則傳回【很遺憾~您手慢啦~】,A線程将該商品置為已下架并且建立訂單後,釋放該鎖,防止死鎖。哪怕後面的線程延遲,再A線程釋放鎖後又拿到下單方法,因為商品的狀态為已下架同樣沒有辦法進行建立訂單。至此,達到我們最初的目的,秒殺功能完成。
3,加鎖操作
redis有StringRedisTemplate 和RedisTemplate 。我這裡使用前者實作。
@Autowired
private StringRedisTemplate redisTemplate;
在高并發的情況下,確定某一個方法隻能被一個人調用,那麼我們隻要在該方法外調用工具類的加鎖方法,該加鎖方法傳回true,則代表該方法沒有被其他線程占用。若傳回false,則代表該方法已經被其他線程占用,同步傳回【很遺憾~您手慢啦~】。
/**
* 對傳過來的redis的key進行加鎖
* TimeUnit.SECONDS 秒
* @param key 需要加鎖的key
* @param expire 過期時間
* @return
*/
public Boolean lockEnable(String key, long expire) {
//這是将目前線程的名字置為key的value值,表明該鎖被誰拿到
String keyValue = Thread.currentThread().getName();
//1,這是StringRedisTemplate在set key的同時增加了過期時間,防止死鎖。保證了原子性。
//2,setIfAbsent該方法如果該key不存在時候,設定值進去後,傳回true;若是已經存在,則傳回false;
Boolean aBoolean = redisTemplate.opsForValue().setIfAbsent(key, keyValue, expire, TimeUnit.SECONDS);
Long surplusTime = redisTemplate.getExpire(key, TimeUnit.SECONDS);
if (!aBoolean) {
log.info("該線程【{}】加鎖失敗,該key【{}】剩餘過期時間【{}】秒", keyValue, key, surplusTime);
return false;
}
log.info("該線程【{}】加鎖成功,該key【{}】剩餘過期時間【{}】秒", keyValue, key, surplusTime);
return true;
}
4,解鎖操作
解鎖這邊不可以單純的删除redis的值,這裡需要對key和value兩個參數進行和redis裡面存儲的是否一緻,防止誤删别人的鎖。避免其他錯誤的産生,由于我們設定鎖的時候,鎖和失效時間有原子性,故不存在加完鎖後就當機,導緻死鎖。
@Autowired
private DefaultRedisScript<Long> redisScript;
/**
* lua腳本
*
* @return
*/
@Bean
public DefaultRedisScript<Long> defaultRedisScript() {
DefaultRedisScript<Long> defaultRedisScript = new DefaultRedisScript<>();
defaultRedisScript.setResultType(Long.class);
defaultRedisScript.setScriptText("if redis.call('get', KEYS[1]) == KEYS[2] then return redis.call('del', KEYS[1]) else return 0 end");
return defaultRedisScript;
}
同樣的在完成該方法後一定要記得解鎖,不要因為有過期時間就不釋放鎖,不要給自己埋坑,自己偷的懶早晚要還回來的。
/**
* 對傳過來的redis的key進行解鎖
* key和value不一緻時,傳回:【0】
* key和value不一緻時,傳回:【1】
* @param key
* @return
*/
public Boolean lockUnable(String key) {
String keyValue = Thread.currentThread().getName();
//key和value不一緻時,傳回:【0】
//key和value不一緻時,傳回:【1】
Long execute = redisTemplate.execute(redisScript, Arrays.asList(key, keyValue));
if(execute != 1 ){
Boolean aBoolean = redisTemplate.hasKey(key);
Long surplusTime = redisTemplate.getExpire(key, TimeUnit.SECONDS);
log.info("該key【{}】解鎖失敗,是否存在【{}】,剩餘過期時間【{}】秒", key, aBoolean, surplusTime);
return false;
}
log.info("該key【{}】解鎖成功", key);
Boolean aBoolean = redisTemplate.hasKey(key);
log.info("該key是否存在【{}】",aBoolean);
return true;
}
5,分布式鎖的測試類
由于資料敏感問題,我這邊自己使用了自己建立的員工表進行模拟秒殺,效果是一緻的,我們鎖的多台伺服器上面的同一個方法。
/**
* 1000個線程搶一條資料
*/
@Test
public void catchData() {
for (int i = 0; i <1000; i++) {
Thread thread = new Thread(() -> {
threadTest();
});
thread.start();
thread.setName("thread" + i);
}
while (true){
}
}
/**
* 線程調用的測試方法
*/
public void threadTest() {
/**
* 對某一條資料的id進行加鎖
*/
Boolean aBoolean = redisUtils.lockEnable("狄仁傑", 600);
if (!aBoolean) {
log.info("線程【{}】沒有拿到鎖,結束流程",Thread.currentThread().getName());
return;
}
UpdateUserDTO updateUserDTO = new UpdateUserDTO();
updateUserDTO.setUserName("狄仁傑");
updateUserDTO.setUserStatus("UNABLE");
updateUserDTO.setMobilePhone("15555406855");
updateUserDTO.setUpdatedBy(Thread.currentThread().getName());
Result<Boolean> result = userBaseInfo.updateUser(updateUserDTO);
if (!result.getResult()) {
log.info("線程【{}】更新資料失敗",Thread.currentThread().getName());
return;
}
log.info("線程【{}】更新資料成功",Thread.currentThread().getName());
/**
* 釋放該條資料的鎖
*/
Boolean aBoolean1 = redisUtils.lockUnable("狄仁傑");
log.info("線程【{}】是否成功釋放鎖:【{}】",Thread.currentThread().getName(),aBoolean1);
}
5,分布式鎖的測試結果展示
資料庫之前的資料
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIn5GcugDN4ADO0IjM3EzNwkTMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
跑完測試之後的資料