天天看點

Redis分布式鎖實作OneByOne元件背景實作方案實作要點核心代碼優點

文章目錄

  • 背景
  • 實作方案
  • 實作要點
  • 核心代碼
      • OneByOneTemplate模闆實作
        • 模闆
          • 定義模闆接口
          • 定義模闆回調接口
          • 具體模闆實作
      • 鎖實作
        • 加鎖
        • 鎖釋放
  • 優點

背景

随着網際網路項目的通路量增大,對系統的要求越來越高。應運而生出分布式系統,高可用叢集等技術。而且已經非常成熟,在公司裡面通路量再小的應用标配都是2台叢集,單機應用已經一去不複返了。

java的多線程加鎖的方式已經沒有辦法支撐這種分布式應用,因為java的線程鎖,隻起作用于目前運作的JVM中,多個JVM之間是互相分隔,無法控制的。

實作方案

核心就是将鎖置于一個集中式的地方管理,讓鎖隻有一把,所有叢集應用争奪的就是這把鎖。

因為是集中式通路,是以要求的性能較高-輕快,且要保證高可用-穩定:業内選擇有比較多,zookeeper,redis等等,早期還有用DB(淘汰了)。

這邊介紹下目前項目中設計的利用redis實作的。

實作要點

  1. 利用redis的setNX指令的原子性操作來進行加鎖。如果單純取鎖,判斷,再加鎖存在不原子的情況,極端情況下會産生并發。
  2. 利用redis的lua腳本來釋放鎖,同樣保證原子性
  3. 保證釋放鎖的動作,必須是目前鎖的擁有者
  4. OneByOne阻塞等待:在現實應用中,盡可能的在防止業務并發的情況,保證業務的成功,是以排他鎖(擷取不到鎖就失敗)這種場景反而不是最多的。最多的是OneByOne,将并發轉串行,盡量等待前一個業務完成之後,後一個業務能繼續執行,而不是直接失敗。 那勢必需要設計一個阻塞的機制,這個阻塞等待并不像需要流量消峰那樣用隊列來做,隻要釋放可以有多個競争者搶鎖,搶到就繼續執行。

核心代碼

OneByOneTemplate模闆實作

加鎖,釋放鎖排隊等等都是一個标準的流程,是以可以建立一個模闆。調用者隻要關注加鎖代碼塊的業務邏輯實作。

模闆

提供兩個方法,一個是預設逾時時間及排隊時間。另一個可以指定鎖的逾時時間以及排隊等待鎖的時間。

定義模闆接口
public interface OneByOneTemplate {
    <T> T execute(OneByOne oneByOne, CallBack<T> callBack);

    <T> T execute(OneByOne oneByOne, boolean waitInQueue, int timeoutMsecs, int expireMsecs, CallBack<T> callBack);
}
           
定義模闆回調接口

業務實作邏輯,實作回調接口,并且實作回調方法即可

public interface CallBack<T> {
    T invoke();
}
           
具體模闆實作
public class OneByOneTemplateImpl implements OneByOneTemplate {

    private static final int DEFAULT_TIME_OUT_MSECS = 10000;

    private static final int DEFAULT_EXPIRE_MSECS = 30000;

    @Override
    public <T> T execute(OneByOne oneByOne, CallBack<T> callBack) {
        return execute(oneByOne, Boolean.TRUE, DEFAULT_TIME_OUT_MSECS, DEFAULT_EXPIRE_MSECS, callBack);
    }

    @Override
    public <T> T execute(OneByOne oneByOne, boolean waitInQueue, int timeoutMsecs, int expireMsecs, CallBack<T> callBack) {
        // 需要排隊
        if (waitInQueue) {
            // 當參數timeoutMsecs取值小于等于零時,則使用預設的排隊10秒
            if (timeoutMsecs <= 0) {
                timeoutMsecs = DEFAULT_TIME_OUT_MSECS;
            } else {
                timeoutMsecs = 0;
            }

            // 不需要排隊
        } else {
            timeoutMsecs = 0;
        }

        // 當參數expireMsecs取值小于等于零時,則使用預設的有效期30秒
        if (expireMsecs <= 0) {
            expireMsecs = DEFAULT_EXPIRE_MSECS;
        } 

        return invoke(oneByOne, timeoutMsecs, expireMsecs, callBack);
    }

    private <T> T invoke(OneByOne oneByOne, int timeoutMsecs, int expireMsecs, CallBack<T> callBack) {
        final String key = RedisCacheKeyConstants.REDIS_ONE_BY_ONE + oneByOne.getBizType() + "_" + oneByOne.getBizId();
        SedisLock sedisLock = new SedisLock(RedisUtil.redisClient, key, timeoutMsecs, expireMsecs);
        try {
            if (sedisLock.acquire()) { // 啟用鎖
                return callBack.invoke();
            } else {
                throw new AppException("bizType:" + oneByOne.getBizType() + ",bizId:" + oneByOne.getBizId() + ",并發執行!");
            }
        } catch (InterruptedException e) {
            throw new AppException("");
        } finally {
            sedisLock.release();
        }
    }

}
           

鎖實作

聲明redis鎖的類,這段代碼是在上面模闆裡面。這邊列出來主要是為了控制釋放鎖時的owner

加鎖

加鎖過程中會進行鎖等待排隊,利用的是輪詢+wait()。

redis中存儲的這個鎖的key,并且要存放這個對象的随機屬性,owner屬性可以是一個UUID。

public synchronized boolean acquire() throws InterruptedException {
    int timeout = timeoutMsecs;
    while (timeout >= 0) {
        if ("OK".equals(redisClient.execute(new ShardedJedisAction<String>() {
            @Override
            public String doAction(ShardedJedis jedis) {
                Jedis j = jedis.getShard(lockKey);
                // redis中不存在就設定lockKey對應的值,同時設定毫秒級過期時間
                return j.set(lockKey, owner, "NX", "PX", expireMsecs);
            }
        }))) {
            // lock acquired
            locked = true;
            return true;
        }

        int spinWatingTime = random.nextInt(200) + 1;
        timeout -= spinWatingTime;
        wait(spinWatingTime);
    }
    return false;
}
           

輪詢200内的随機毫秒進行一次嘗試擷取鎖,并且排隊時間減去相應的時間,一直等待時間小于0則不再嘗試。

這邊random.nextInt(200)擷取的值是0-199,容易忽略會導緻wait(0),造成線程長期占用。是以需要+1

鎖釋放

需要校驗這個鎖是否加鎖狀态,并且判斷是否是鎖擁有者

利用redis支援的LUA腳本來将get和del兩個動作做成原子性

public void release() {
    if (locked) {
        // 判斷鎖擁有者和釋放鎖
        final String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
        Object result = redisClient.execute(new ShardedJedisAction<Object>() {
            //
            @Override
            public Object doAction(ShardedJedis jedis) {
                List<String> keysList = Collections.singletonList(lockKey);
                List<String> argsList = Collections.singletonList(owner);
                Jedis jedisKey = jedis.getShard(lockKey);
                return jedisKey.eval(script, keysList, argsList);
            }
        });
        if (1 == (Long) result) {
            locked = false;
        }
    }
}
           

這樣整體上一個OneByOne的分布式防并發鎖就完成了。性能方面基于redis的高性能讀寫來說還是比較好的。

優點

  • 基于redis的鎖機制,讀寫性能較高,提高系統的性能。
  • redis的原子性的特性的保證,為高并發下的鎖處理變的穩定且簡潔
  • 鎖等待的設計能很好的提升業務的成功率,逾時時間和等待時間設定合适的時間,可以達到很好的吞吐量和成功率。

繼續閱讀