步驟:
1- pom.xml
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
2- yml配置:
zk:
url: 127.0.0.1:2181
localPath: /newlock
timeout: 3000
3- 配置類
package com.test.domi.config;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ZookeeperConf {
@Value("${zk.url}")
private String zkUrl;
@Bean
public CuratorFramework getCuratorFramework(){
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
CuratorFramework client = CuratorFrameworkFactory.newClient(zkUrl,retryPolicy);
client.start();
return client;
}
}
4- 使用
package com.test.domi.common.utils.lock;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.zookeeper.CreateMode;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@Component("zklock")
public class ZKlock implements Lock {
@Autowired
private CuratorFramework zkClient;
@Value("${zk.localPath}")
private String lockPath;
private String currentPath;
private String beforePath;
@Override
public boolean tryLock() {
try {
//根節點的初始化放在構造函數裡面不生效
if (zkClient.checkExists().forPath(lockPath) == null) {
System.out.println("初始化根節點==========>" + lockPath);
zkClient.create().creatingParentsIfNeeded().forPath(lockPath);
}
System.out.println("目前線程" + Thread.currentThread().getName() + "初始化根節點" + lockPath);
} catch (Exception e) {
}
if (currentPath == null) {
try {
currentPath = this.zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(lockPath + "/");
} catch (Exception e) {
return false;
}
}
try {
//此處該如何擷取所有的臨時節點呢?如locks00004.而不是擷取/locks/order中的order作為子節點??
List<String> childrens = this.zkClient.getChildren().forPath(lockPath);
Collections.sort(childrens);
if (currentPath.equals(lockPath + "/" + childrens.get(0))) {
System.out.println("目前線程獲得鎖" + currentPath);
return true;
}else{
//取前一個節點
int curIndex = childrens.indexOf(currentPath.substring(lockPath.length() + 1));
//如果是-1表示children裡面沒有該節點
beforePath = lockPath + "/" + childrens.get(curIndex - 1);
}
} catch (Exception e) {
return false;
}
return false;
}
@Override
public void lock() {
if (!tryLock()) {
waiForLock();
lock();
}
}
@Override
public void unlock() {
try {
zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(currentPath);
} catch (Exception e) {
//guaranteed()保障機制,若未删除成功,隻要會話有效會在背景一直嘗試删除
}
}
private void waiForLock(){
CountDownLatch cdl = new CountDownLatch(1);
//建立監聽器watch
NodeCache nodeCache = new NodeCache(zkClient,beforePath);
try {
nodeCache.start(true);
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
cdl.countDown();
System.out.println(beforePath + "節點監聽事件觸發,重新獲得節點内容為:" + new String(nodeCache.getCurrentData().getData()));
}
});
} catch (Exception e) {
}
//如果前一個節點還存在,則阻塞自己
try {
if (zkClient.checkExists().forPath(beforePath) == null) {
cdl.await();
}
} catch (Exception e) {
}finally {
//阻塞結束,說明自己是最小的節點,則取消watch,開始擷取鎖
try {
nodeCache.close();
} catch (IOException e) {
}
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public Condition newCondition() {
return null;
}
}
5- 調用demo
package com.test.domi.controller;
import com.test.domi.common.utils.ZkUtil;
import com.test.domi.common.utils.lock.ZKlock;
import org.I0Itec.zkclient.ZkClient;
import org.apache.curator.framework.CuratorFramework;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/zk")
public class ZKController {
@Autowired
private CuratorFramework zkClient;
// @Autowired
// private ZkClient zkClient;
private String url = "127.0.0.1:2181";
private int timeout = 3000;
private String lockPath = "/testl";
@Autowired
private ZKlock zklock;
private int k = 1;
@GetMapping("/lock")
public Boolean getLock() throws Exception{
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
zklock.lock();
zklock.unlock();
} }).start(); }
return true; } }
轉載于:https://www.cnblogs.com/domi22/p/9748083.html