zookeeper簡介
關于zookeeper之前簡單寫過兩篇部落格進行總結,但是比較偏執行個體,關于zookeeper分布式鎖的介紹可以參看這篇簡書——zookeeper的分布式鎖。
之前的兩篇總結位址:curator基本操作,curator的watcher機制。
springboot中內建zookeeper
引入zookeeper的jar包
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>${curator.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>${curator.version}</version>
</dependency>
這裡包含了zookeeper和curator的jar包資訊。
引入zookeeper的配置資訊
zookeeper.address.username=localhost:2181
zookeeper.address.namespace=spring_boot_distribute_lock
這裡指定了連接配接資訊和命名空間。
将CuratorFramework加入容器
/**
* autor:liman
* createtime:2020/1/29
* comment:
*/
@Configuration
public class MiddlewareConfiguration {
//注入環境配置的實體,用于讀取配置資訊
@Autowired
private Environment env;
@Bean
public CuratorFramework curatorFramework(){
CuratorFramework curatorFramework =
CuratorFrameworkFactory.builder().connectString(env.getProperty("zookeeper.address.username"))
.namespace(env.getProperty("zookeeper.address.namespace"))
.retryPolicy(new RetryNTimes(5,1000))
.build();
//開啟zookeeper的連接配接
curatorFramework.start();
return curatorFramework;
}
}
zookeeper分布式鎖初步版本
利用zookeeper實作分布式鎖,主要是利用curator高度封裝的InterProcessMutex的acquire操作完成,如果acquire成功表示擷取到分布式鎖,如果失敗表示擷取鎖失敗。釋放鎖通過InterProcessMutex的release操作即可。
暴力失敗版本。
/**
* zookeeper實作分布式鎖
* @param productLockDto
* @return
*/
public int updateStockWithZookeeper(ProductLockDto productLockDto) throws Exception {
int res = 0;
InterProcessMutex mutex = new InterProcessMutex(curatorClient,pathPrefix);
try{
if(mutex.acquire(10L,TimeUnit.SECONDS)){
//TODO:真正的業務處理
ProductLock productLockEntity = lockMapper.selectByPrimaryKey(productLockDto.getId());
int leftStock = productLockEntity.getStock();
if(productLockEntity!=null && productLockEntity.getStock().compareTo(productLockDto.getStock())>=0){
productLockEntity.setStock(productLockDto.getStock());
res = lockMapper.updateStockForNegative(productLockEntity);
if(res>0){
log.info("基于zookeeper的分布式鎖更新庫存成功,剩餘stock:{}",leftStock-1);
}
}
}else{
log.error("基于zookeeper,擷取分布式鎖失敗");
throw new RuntimeException("zookeeper,擷取分布式鎖失敗");
}
}catch (Exception e){
log.error("zookeeper,擷取分布式鎖失敗,{}",e.fillInStackTrace());
throw new RuntimeException("擷取分布式鎖失敗");
}finally {
mutex.release();//釋放鎖
}
return res;
}
這是一個簡單的版本,出現異常,則直接抛出異常,就直接按照失敗處理。
/**
* zookeeper實作分布式鎖 cas
* @param productLockDto
* @return
*/
public int updateStockWithZookeeperCAS(ProductLockDto productLockDto){
int res = 0;
InterProcessMutex mutex = new InterProcessMutex(curatorClient,pathPrefix);
boolean flag = true;
while(flag){
try{
if(mutex.acquire(10L,TimeUnit.SECONDS)){
//TODO:真正的業務處理
flag=false;
ProductLock productLockEntity = lockMapper.selectByPrimaryKey(productLockDto.getId());
int leftStock = productLockEntity.getStock();
if(productLockEntity!=null && productLockEntity.getStock().compareTo(productLockDto.getStock())>=0){
productLockEntity.setStock(productLockDto.getStock());
res = lockMapper.updateStockForNegative(productLockEntity);
if(res>0){
log.info("基于zookeeper的分布式鎖更新庫存成功,剩餘stock:{}",leftStock-1);
}
}
}else{
log.error("基于zookeeper,擷取分布式鎖失敗");
flag = true;
}
}catch (Exception e){
log.error("基于zookeeper,擷取分布式鎖失敗");
flag = true;
}finally {
try{
mutex.release();//釋放鎖
}catch (Exception e){
log.error("釋放鎖失敗:重新擷取鎖");
flag=true;
}
}
}
return res;
}
這裡提供了一個新版本,如果出現異常,則繼續擷取鎖。
總結
其實沒有過多可總結的,測試結果就不貼出了,總體來說按照的簡書部落格中總結的内容,就是如下輪子,第二個版本使得擷取鎖失敗的時候也能繼續加入到鎖的競争中。
InterProcessMutex lock = new InterProcessMutex(client, lockPath);
if ( lock.acquire(maxWait, waitUnit) )
{
try
{
// do some work inside of the critical section here
}
finally
{
lock.release();
}
}