天天看点

只用了半个Redisson的Semaphore实现并发控制

作者:互联网高级架构师

做过企业微信开发的同学应该知道,企业微信有一个很讨厌的报错--接口并发超过限制(45033)。报错的原因呢就是因为有多个线程在同时调用企业微信的接口,为了不让接口调用一直报错,我的服务就也要有一个接口并发控制体系。

首先想到的就是用线程池来实现

只用了半个Redisson的Semaphore实现并发控制

刚开始看起来似乎没什么问题,但是企业微信的接口之间并发限制是互相隔离的,那就意味着,我需要创建几十个线程池。。。你以为这就完了?其实不同企业用户调用的时候又是互相隔离的,所以,我需要为每个企业。。。。

只用了半个Redisson的Semaphore实现并发控制

线程池肯定是用不了了,我猛然想起前天晚上做梦,大师似乎跟我讲了信号量可以用来控制并发,考虑到线上环境有多个节点,同时Redisson也提供了Semaphore,打算用一下试试。想想距离上次使用信号量已经过去了∞天(毕竟我从来都没有用过),我直接打开IDEA,先用java的试试水,public static void main一气呵成

public static void main(String[] args) {
    //创建一个信号量
    Semaphore semaphore = new Semaphore(1);
    //如果没获取到就抛出异常
    if (!semaphore.tryAcquire()) {
        throw new RuntimeException();
    }
    //try不能括到上面,获取不到的时候是不需要释放的
    try{
        //做一些业务
        System.out.println("do something");
    }finally {
        //释放掉
        semaphore.release();
    }
}           

又检查了一下逻辑,看起来似乎没什么问题,然后我突发奇想,如果我直接release()会怎么样

Semaphore semaphore = new Semaphore(1);
semaphore.release();
System.out.println(semaphore.availablePermits());           
只用了半个Redisson的Semaphore实现并发控制

。。。0.0

此时我的心态出现了亿点变化,我意识到,其实初始化的new Semaphore(1)只是设置了一个初始值,并不是一个最大值,直接调用release(),可获取信号量的值会直接+1,这时我赶紧用Redisson的api redissonClient.getSemaphore("test"); (其他代码和上面的一样)试了一下,万一呢

果然效果和Java的是一模一样。

不过,问题似乎也不是很大?如果我保证业务在拿到信号后一定会在finally{..}中释放掉,并且semaphore.trySetPermits(1);这个方法只初始化一次,并且一定是单线程,那其实就没有问题。

只用了半个Redisson的Semaphore实现并发控制

企业微信的这个并发限制,没有明文规定,那上线之后,大概是要在违法的边缘多试探试探的,所以就涉及到更新信号量的问题了

只用了半个Redisson的Semaphore实现并发控制

可以看到,如果在多线程的情况下,贸然更新信号量,由于模型没有最大值的概念,更新后的信号量总量比起期望值是只多不少的。

问题主要还是出在release()这个方法没有上限概念上,那我来弄一个有上限的release方法就好了。此时第一个思路就是继承RedissonSemaphore这个类重写一下release(),但是后来发现自己的子类无法用redissonClient初始化出来,方案就pass掉了。所谓工作时间长了,什么都敢干,我直接打开源码,我只要仿照着写个一样的是不就行了!

只用了半个Redisson的Semaphore实现并发控制
public RFuture<Void> releaseAsync(int permits) {
    ...
    RFuture<Void> future = commandExecutor.evalWriteAsync(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
            "local value = redis.call('incrby', KEYS[1], ARGV[1]); " +
                    "redis.call('publish', KEYS[2], value); ",
            Arrays.asList(getRawName(), getChannelName()), permits);
    ...
}           

好家伙,核心逻辑其实就一行lua脚本redis.call('incrby', KEYS[1], ARGV[1]);,我只要一个判断上限的就好了。虽然之前也没写过lua脚本,但是好在逻辑不复杂,只需要一个if判断就行了,redisson源码中有很多这样的判断,现学现卖就可以了。

另外,鉴于面向对象的思想,如果我自己工具类写一个release()方法,并让这个方法直接出现在业务中多少有一些不妥。所以新建了一个对象,封装一下获取和释放两个方法,之后项目中所有信号量的相关业务,都用这个对象,redisson的信号量概念就不会再出现在具体的业务中了,可以提高业务代码的可读性,也会信号量相关操作的修改控制在这个类中。

经过了一番折腾,总算是把功能实现了。部分代码如下,tryAcquire()方法还是使用redisson提供的,但是release()方法用了自己实现的~

public class SemaphoreModel {
    /**
     * 信号量本体
     */
    private RedissonSemaphore semaphore;

    private RedissonClient redissonClient;

    /**
     * 获取到信号的mark
     */
    private boolean mark = false;

    public void release() {
         //获取信号量的值
        String script = "local value = redis.call('get', '" + semaphore.getRawName() + "'); " +
                //如果没有设置过(false意味着undefind)或者当前值小于5(我设置的最大值)就做release的逻辑
                "if (value ~= false and tonumber(value) < 5) then " +
                //信号量加一
                "local val = redis.call('incrby', '" + semaphore.getRawName() + "', 1); " +
                "redis.call('publish', '" + RedissonSemaphore.getChannelName(semaphore.getRawName()) + "', val); " +
                "return 1; " +
                "end; " +
                "return 0;";
        redissonClient.getScript().eval(RScript.Mode.READ_WRITE, script, RScript.ReturnType.VALUE);
    }

    /**
     * 获取一个信号量
     */
    public boolean tryAcquire() {
        return semaphore.tryAcquire();
    }

}           

经过了这样的设计,就造出了一个有上限的信号量,问题也都解决了。

作者:笨天才

链接:https://juejin.cn/post/7197782794675585080

来源:稀土掘金

继续阅读