天天看點

分布式鎖(四)——基于zookeeper的分布式鎖執行個體zookeeper簡介springboot中內建zookeeperzookeeper分布式鎖初步版本總結

zookeeper簡介

關于zookeeper之前簡單寫過兩篇部落格進行總結,但是比較偏執行個體,關于zookeeper分布式鎖的介紹可以參看這篇簡書——zookeeper的分布式鎖。

之前的兩篇總結位址:curator基本操作,curator的watcher機制。

springboot中內建zookeeper

引入zookeeper的jar包

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>${zookeeper.version}</version>
    <exclusions>
        <exclusion>
            <artifactId>slf4j-log4j12</artifactId>
            <groupId>org.slf4j</groupId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>${curator.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>${curator.version}</version>
</dependency>
           

這裡包含了zookeeper和curator的jar包資訊。

引入zookeeper的配置資訊

zookeeper.address.username=localhost:2181
zookeeper.address.namespace=spring_boot_distribute_lock
           

這裡指定了連接配接資訊和命名空間。

将CuratorFramework加入容器

/**
 * autor:liman
 * createtime:2020/1/29
 * comment:
 */
@Configuration
public class MiddlewareConfiguration {

    //注入環境配置的實體,用于讀取配置資訊
    @Autowired
    private Environment env;

    @Bean
    public CuratorFramework curatorFramework(){
        CuratorFramework curatorFramework =
                CuratorFrameworkFactory.builder().connectString(env.getProperty("zookeeper.address.username"))
                .namespace(env.getProperty("zookeeper.address.namespace"))
                .retryPolicy(new RetryNTimes(5,1000))
                .build();
		//開啟zookeeper的連接配接
        curatorFramework.start();

        return curatorFramework;
    }
}
           

zookeeper分布式鎖初步版本

利用zookeeper實作分布式鎖,主要是利用curator高度封裝的InterProcessMutex的acquire操作完成,如果acquire成功表示擷取到分布式鎖,如果失敗表示擷取鎖失敗。釋放鎖通過InterProcessMutex的release操作即可。

暴力失敗版本。

/**
 * zookeeper實作分布式鎖
 * @param productLockDto
 * @return
 */
public int updateStockWithZookeeper(ProductLockDto productLockDto) throws Exception {
    int res = 0;
    InterProcessMutex mutex = new InterProcessMutex(curatorClient,pathPrefix);
    try{
        if(mutex.acquire(10L,TimeUnit.SECONDS)){
            //TODO:真正的業務處理
            ProductLock productLockEntity = lockMapper.selectByPrimaryKey(productLockDto.getId());
            int leftStock = productLockEntity.getStock();
            if(productLockEntity!=null && productLockEntity.getStock().compareTo(productLockDto.getStock())>=0){
                productLockEntity.setStock(productLockDto.getStock());
                res = lockMapper.updateStockForNegative(productLockEntity);

                if(res>0){
                    log.info("基于zookeeper的分布式鎖更新庫存成功,剩餘stock:{}",leftStock-1);
                }
            }
        }else{
            log.error("基于zookeeper,擷取分布式鎖失敗");
            throw new RuntimeException("zookeeper,擷取分布式鎖失敗");
        }
    }catch (Exception e){
        log.error("zookeeper,擷取分布式鎖失敗,{}",e.fillInStackTrace());
        throw new RuntimeException("擷取分布式鎖失敗");
    }finally {
        mutex.release();//釋放鎖
    }
    return res;
}
           

這是一個簡單的版本,出現異常,則直接抛出異常,就直接按照失敗處理。

/**
 * zookeeper實作分布式鎖 cas
 * @param productLockDto
 * @return
 */
public int updateStockWithZookeeperCAS(ProductLockDto productLockDto){
    int res = 0;
    InterProcessMutex mutex = new InterProcessMutex(curatorClient,pathPrefix);
    boolean flag = true;
    while(flag){
        try{
            if(mutex.acquire(10L,TimeUnit.SECONDS)){
                //TODO:真正的業務處理
                flag=false;
                ProductLock productLockEntity = lockMapper.selectByPrimaryKey(productLockDto.getId());
                int leftStock = productLockEntity.getStock();
                if(productLockEntity!=null && productLockEntity.getStock().compareTo(productLockDto.getStock())>=0){
                    productLockEntity.setStock(productLockDto.getStock());
                    res = lockMapper.updateStockForNegative(productLockEntity);

                    if(res>0){
                        log.info("基于zookeeper的分布式鎖更新庫存成功,剩餘stock:{}",leftStock-1);
                    }
                }
            }else{
                log.error("基于zookeeper,擷取分布式鎖失敗");
                flag = true;
            }
        }catch (Exception e){
            log.error("基于zookeeper,擷取分布式鎖失敗");
            flag = true;
        }finally {
            try{
                mutex.release();//釋放鎖
            }catch (Exception e){
                log.error("釋放鎖失敗:重新擷取鎖");
                flag=true;
            }
        }
    }
    return res;
}
           

這裡提供了一個新版本,如果出現異常,則繼續擷取鎖。

總結

其實沒有過多可總結的,測試結果就不貼出了,總體來說按照的簡書部落格中總結的内容,就是如下輪子,第二個版本使得擷取鎖失敗的時候也能繼續加入到鎖的競争中。

InterProcessMutex lock = new InterProcessMutex(client, lockPath);
if ( lock.acquire(maxWait, waitUnit) ) 
{
    try 
    {
        // do some work inside of the critical section here
    }
    finally
    {
        lock.release();
    }
}