文章目錄
- 背景
- 實作方案
- 實作要點
- 核心代碼
-
-
- OneByOneTemplate模闆實作
-
- 模闆
-
- 定義模闆接口
- 定義模闆回調接口
- 具體模闆實作
- 鎖實作
-
- 加鎖
- 鎖釋放
-
- 優點
背景
随着網際網路項目的通路量增大,對系統的要求越來越高。應運而生出分布式系統,高可用叢集等技術。而且已經非常成熟,在公司裡面通路量再小的應用标配都是2台叢集,單機應用已經一去不複返了。
java的多線程加鎖的方式已經沒有辦法支撐這種分布式應用,因為java的線程鎖,隻起作用于目前運作的JVM中,多個JVM之間是互相分隔,無法控制的。
實作方案
核心就是将鎖置于一個集中式的地方管理,讓鎖隻有一把,所有叢集應用争奪的就是這把鎖。
因為是集中式通路,是以要求的性能較高-輕快,且要保證高可用-穩定:業内選擇有比較多,zookeeper,redis等等,早期還有用DB(淘汰了)。
這邊介紹下目前項目中設計的利用redis實作的。
實作要點
- 利用redis的setNX指令的原子性操作來進行加鎖。如果單純取鎖,判斷,再加鎖存在不原子的情況,極端情況下會産生并發。
- 利用redis的lua腳本來釋放鎖,同樣保證原子性
- 保證釋放鎖的動作,必須是目前鎖的擁有者
- OneByOne阻塞等待:在現實應用中,盡可能的在防止業務并發的情況,保證業務的成功,是以排他鎖(擷取不到鎖就失敗)這種場景反而不是最多的。最多的是OneByOne,将并發轉串行,盡量等待前一個業務完成之後,後一個業務能繼續執行,而不是直接失敗。 那勢必需要設計一個阻塞的機制,這個阻塞等待并不像需要流量消峰那樣用隊列來做,隻要釋放可以有多個競争者搶鎖,搶到就繼續執行。
核心代碼
OneByOneTemplate模闆實作
加鎖,釋放鎖排隊等等都是一個标準的流程,是以可以建立一個模闆。調用者隻要關注加鎖代碼塊的業務邏輯實作。
模闆
提供兩個方法,一個是預設逾時時間及排隊時間。另一個可以指定鎖的逾時時間以及排隊等待鎖的時間。
定義模闆接口
public interface OneByOneTemplate {
<T> T execute(OneByOne oneByOne, CallBack<T> callBack);
<T> T execute(OneByOne oneByOne, boolean waitInQueue, int timeoutMsecs, int expireMsecs, CallBack<T> callBack);
}
定義模闆回調接口
業務實作邏輯,實作回調接口,并且實作回調方法即可
public interface CallBack<T> {
T invoke();
}
具體模闆實作
public class OneByOneTemplateImpl implements OneByOneTemplate {
private static final int DEFAULT_TIME_OUT_MSECS = 10000;
private static final int DEFAULT_EXPIRE_MSECS = 30000;
@Override
public <T> T execute(OneByOne oneByOne, CallBack<T> callBack) {
return execute(oneByOne, Boolean.TRUE, DEFAULT_TIME_OUT_MSECS, DEFAULT_EXPIRE_MSECS, callBack);
}
@Override
public <T> T execute(OneByOne oneByOne, boolean waitInQueue, int timeoutMsecs, int expireMsecs, CallBack<T> callBack) {
// 需要排隊
if (waitInQueue) {
// 當參數timeoutMsecs取值小于等于零時,則使用預設的排隊10秒
if (timeoutMsecs <= 0) {
timeoutMsecs = DEFAULT_TIME_OUT_MSECS;
} else {
timeoutMsecs = 0;
}
// 不需要排隊
} else {
timeoutMsecs = 0;
}
// 當參數expireMsecs取值小于等于零時,則使用預設的有效期30秒
if (expireMsecs <= 0) {
expireMsecs = DEFAULT_EXPIRE_MSECS;
}
return invoke(oneByOne, timeoutMsecs, expireMsecs, callBack);
}
private <T> T invoke(OneByOne oneByOne, int timeoutMsecs, int expireMsecs, CallBack<T> callBack) {
final String key = RedisCacheKeyConstants.REDIS_ONE_BY_ONE + oneByOne.getBizType() + "_" + oneByOne.getBizId();
SedisLock sedisLock = new SedisLock(RedisUtil.redisClient, key, timeoutMsecs, expireMsecs);
try {
if (sedisLock.acquire()) { // 啟用鎖
return callBack.invoke();
} else {
throw new AppException("bizType:" + oneByOne.getBizType() + ",bizId:" + oneByOne.getBizId() + ",并發執行!");
}
} catch (InterruptedException e) {
throw new AppException("");
} finally {
sedisLock.release();
}
}
}
鎖實作
聲明redis鎖的類,這段代碼是在上面模闆裡面。這邊列出來主要是為了控制釋放鎖時的owner
加鎖
加鎖過程中會進行鎖等待排隊,利用的是輪詢+wait()。
redis中存儲的這個鎖的key,并且要存放這個對象的随機屬性,owner屬性可以是一個UUID。
public synchronized boolean acquire() throws InterruptedException {
int timeout = timeoutMsecs;
while (timeout >= 0) {
if ("OK".equals(redisClient.execute(new ShardedJedisAction<String>() {
@Override
public String doAction(ShardedJedis jedis) {
Jedis j = jedis.getShard(lockKey);
// redis中不存在就設定lockKey對應的值,同時設定毫秒級過期時間
return j.set(lockKey, owner, "NX", "PX", expireMsecs);
}
}))) {
// lock acquired
locked = true;
return true;
}
int spinWatingTime = random.nextInt(200) + 1;
timeout -= spinWatingTime;
wait(spinWatingTime);
}
return false;
}
輪詢200内的随機毫秒進行一次嘗試擷取鎖,并且排隊時間減去相應的時間,一直等待時間小于0則不再嘗試。
這邊random.nextInt(200)擷取的值是0-199,容易忽略會導緻wait(0),造成線程長期占用。是以需要+1
鎖釋放
需要校驗這個鎖是否加鎖狀态,并且判斷是否是鎖擁有者
利用redis支援的LUA腳本來将get和del兩個動作做成原子性
public void release() {
if (locked) {
// 判斷鎖擁有者和釋放鎖
final String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
Object result = redisClient.execute(new ShardedJedisAction<Object>() {
//
@Override
public Object doAction(ShardedJedis jedis) {
List<String> keysList = Collections.singletonList(lockKey);
List<String> argsList = Collections.singletonList(owner);
Jedis jedisKey = jedis.getShard(lockKey);
return jedisKey.eval(script, keysList, argsList);
}
});
if (1 == (Long) result) {
locked = false;
}
}
}
這樣整體上一個OneByOne的分布式防并發鎖就完成了。性能方面基于redis的高性能讀寫來說還是比較好的。
優點
- 基于redis的鎖機制,讀寫性能較高,提高系統的性能。
- redis的原子性的特性的保證,為高并發下的鎖處理變的穩定且簡潔
- 鎖等待的設計能很好的提升業務的成功率,逾時時間和等待時間設定合适的時間,可以達到很好的吞吐量和成功率。