使用分布式鎖優化可以實作雙寫一緻性,今天就來介紹一種具體實作方式redisson-分布式鎖
原理
原理圖如下:
主要分析如下:
何時加鎖?
- 線程去擷取鎖,擷取成功: 執行lua腳本,儲存資料到redis資料庫。
- 線程去擷取鎖,擷取失敗: 一直通過while循環嘗試擷取鎖,擷取成功後,執行lua腳本,儲存資料到redis
什麼是WatchDog自動延期機制?
- 在一個分布式環境下,假如一個線程獲得鎖後,突然伺服器當機了,那麼這個時候在一定時間後這個鎖會自動釋放,你也可以設定鎖的有效時間(不設定預設30秒),這樣的目的主要是防止死鎖的發生
- 線程A業務還沒有執行完,時間就過了,線程A 還想持有鎖的話,就會啟動一個watch dog背景線程,不斷的延長鎖key的生存時間
lua腳本
- 主要是如果你的業務邏輯複雜的話,通過封裝在lua腳本中發送給redis,而且redis是單線程的,這樣就保證這段複雜業務邏輯執行的原子性
基本使用
RLock是繼承Lock鎖,是以他有Lock鎖的所有特性,比如lock、unlock、trylock等特性,同時它還有很多新特性:強制鎖釋放,帶有效期的鎖
public interface RLock {
//----------------------Lock接口方法-----------------------
/**
* 加鎖 鎖的有效期預設30秒
*/
void lock();
/**
* 加鎖 可以手動設定鎖的有效時間
*
* @param leaseTime 鎖有效時間
* @param unit 時間機關 小時、分、秒、毫秒等
*/
void lock(long leaseTime, TimeUnit unit);
/**
* tryLock()方法是有傳回值的,用來嘗試擷取鎖,
* 如果擷取成功,則傳回true,如果擷取失敗(即鎖已被其他線程擷取),則傳回false .
*/
boolean tryLock();
/**
* tryLock(long time, TimeUnit unit)方法和tryLock()方法是類似的,
* 隻不過差別在于這個方法在拿不到鎖時會等待一定的時間,
* 在時間期限之内如果還拿不到鎖,就傳回false。如果如果一開始拿到鎖或者在等待期間内拿到了鎖,則傳回true。
*
* @param time 等待時間
* @param unit 時間機關 小時、分、秒、毫秒等
*/
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
/**
* 比上面多一個參數,多添加一個鎖的有效時間
*
* @param waitTime 等待時間
* @param leaseTime 鎖有效時間
* @param unit 時間機關 小時、分、秒、毫秒等
*/
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
/**
* 解鎖
*/
void unlock();
}
lock方法
**lock()**:此方法為加鎖,但是鎖的有效期采用預設30秒,如果主線程未釋放,且目前鎖未調用unlock方法,則進入到watchDog機制,如果主線程未釋放,且目前鎖調用unlock方法,則直接釋放鎖
@GetMapping("/lock")
public String lock() {
//獲得分布式鎖對象,注意,此時還沒有加鎖成功
RLock lock = redissonClient.getLock("mylock");
try {
//加鎖:鎖的有效期預設30秒
lock.lock();
long timeToLive=lock.remainTimeToLive();
log.info("線程:{},獲得鎖,鎖存活時間:{}S",Thread.currentThread().getName(),timeToLive/1000);
//休眠一下
Thread.sleep(2000);
}catch (Exception ex){
System.out.println("出現異常!!!");
}finally {
//如果主線程未釋放,且目前鎖未調用unlock方法,則進入到watchDog機制
//如果主線程未釋放,且目前鎖調用unlock方法,則直接釋放鎖
log.info("線程:{},釋放鎖",Thread.currentThread().getName());
}
return "OK";
}
注意:加鎖:鎖的有效期預設30秒
lock(long leaseTime, TimeUnit unit): 可以手動設定鎖的有效時間,如果主線程未釋放,且目前鎖未調用unlock方法,則鎖到期後會自動釋放,如果主線程未釋放,且目前鎖調用unlock方法,則直接釋放鎖
@GetMapping("/lockLaseTime")
public String lockLaseTime() {
//獲得分布式鎖對象,注意,此時還沒有加鎖成功
RLock lock = redissonClient.getLock("lockLaseTime");
try {
//這裡可以手動設定鎖的有效時間,鎖到期後會自動釋放的
lock.lock(10,TimeUnit.SECONDS);
long timeToLive=lock.remainTimeToLive();
log.info("線程:{},獲得鎖,鎖存活時間:{}S",Thread.currentThread().getName(),timeToLive/1000);
//休眠一下
Thread.sleep(2000);
}catch (Exception ex){
System.out.println("出現異常!!!");
}finally {
//如果主線程未釋放,且目前鎖未調用unlock方法,則進入到watchDog機制
//如果主線程未釋放,且目前鎖調用unlock方法,則直接釋放鎖
log.info("線程:{},釋放鎖",Thread.currentThread().getName());
}
return "OK";
}
tryLock()方法
tryLock(): 用來嘗試擷取鎖,如果擷取成功,則傳回true,如果擷取失敗(即鎖已被其他線程擷取),則傳回false,如果主線程未釋放,且目前鎖未調用unlock方法,則進入到watchDog機制,如果主線程未釋放,且目前鎖調用unlock方法,則直接釋放鎖
@GetMapping("/tryLock")
public String tryLock() {
//獲得分布式鎖對象,注意,此時還沒有加鎖成功
RLock lock = redissonClient.getLock("tryLock");
try {
//如果擷取成功,則傳回true,如果擷取失敗(即鎖已被其他線程擷取),則傳回false .
boolean flag=lock.tryLock();
if(flag){
long timeToLive=lock.remainTimeToLive();
log.info("線程:{},獲得鎖,鎖存活時間:{}S,加鎖狀态:{}",Thread.currentThread().getName(),timeToLive/1000,flag);
//休眠一下
Thread.sleep(2000);
}
}catch (Exception ex){
System.out.println("出現異常!!!");
}finally {
//如果主線程未釋放,且目前鎖未調用unlock方法,則進入到watchDog機制
//如果主線程未釋放,且目前鎖調用unlock方法,則直接釋放鎖
log.info("線程:{},釋放鎖",Thread.currentThread().getName());
}
return "OK";
}
tryLock(long time, TimeUnit unit): tryLock(long time, TimeUnit unit)方法和tryLock()方法是類似的,隻不過差別在于這個方法在拿不到鎖時會等待一定的時間, 在時間期限之内如果還拿不到鎖,就傳回false。如果如果一開始拿到鎖或者在等待期間内拿到了鎖,則傳回true,如果主線程未釋放,且目前鎖未調用unlock方法,則進入到watchDog機制如果主線程未釋放,且目前鎖調用unlock方法,則直接釋放鎖
@GetMapping("/tryLockWaitTime")
public String tryLockWaitTime() {
//獲得分布式鎖對象,注意,此時還沒有加鎖成功
RLock lock = redissonClient.getLock("tryLockWaitTime");
try {
//隻不過差別在于這個方法在拿不到鎖時會等待一定的時間,
//在時間期限之内如果還拿不到鎖,就傳回false。如果如果一開始拿到鎖或者在等待期間内拿到了鎖,則傳回true。
boolean flag=lock.tryLock(6, TimeUnit.SECONDS);
if(flag){
long timeToLive=lock.remainTimeToLive();
log.info("線程:{},獲得鎖,鎖存活時間:{}S,加鎖狀态:{}",Thread.currentThread().getName(),timeToLive/1000,flag);
//休眠一下
Thread.sleep(2000);
}
}catch (Exception ex){
System.out.println("出現異常!!!");
}finally {
//如果主線程未釋放,且目前鎖未調用unlock方法,則進入到watchDog機制
//如果主線程未釋放,且目前鎖調用unlock方法,則直接釋放鎖
log.info("線程:{},釋放鎖",Thread.currentThread().getName());
}
return "OK";
}
注意:tryLock(long time, TimeUnit unit),這個方法在拿不到鎖時會等待一定的時間,在時間期限之内如果還拿不到鎖,就傳回false。如果如果一開始拿到鎖或者在等待期間内拿到了鎖,則傳回true
SpringBoot項目依賴
<!--redis-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>2.4.0</version>
</dependency>
<!--使用redisson作為分布式鎖-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.16.8</version>
</dependency>
配置類和配置檔案
- 配置檔案
spring:
redis:
# Redis資料庫索引(預設為0)
database: 0
# Redis伺服器位址
host: 127.0.0.1
# Redis伺服器連接配接端口
port: 6379
# Redis伺服器連結密碼(預設為空)
password:
jedis:
pool:
# 連接配接池最大連結數(負值表示沒有限制)
max-active: 20
# 連接配接池最大阻塞等待時間(負值表示沒有限制)
max-wait: -1
# 連結池中最大空閑連結
max-idle: 10
# 連接配接池中最小空閑連結
min-idle: 0
# 連結超市時間(毫秒)
timeout: 1000
server:
port: 8899
- 配置類
@Configuration
public class RedissonConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private String port;
@Bean
public RedissonClient redissonClient(){
Config config = new Config();
config.useSingleServer().setAddress("redis://" + host + ":" + port);
final RedissonClient client = Redisson.create(config);
return client;
}
}