天天看点

用zkClient基于zk实现分布式锁

用zk做分布式锁的基本原理:zk的节点特性,在同级目录下,相同名称的节点只有一个

基于该思想,有2中创建分布式锁的思路,分别如下:

给一个唯一的值,几个进程同时去创建该节点,只有一个能成功,成功的进程处理完业务之后,删除掉该节点,剩下的进程又去竞争-----缺点,当竞争的节点较多时,会产生惊群效应;

另一个思路是:多个进程去创建临时有序节点,最小序号的节点获得锁,依次发生下去,通过注册监听事件,达到通知的目的

本文是基于第二种思想去实现的:代码如下

public class DistributeLock implements Lock{

private static ZkClient client;

private String current_path;

private String wait_path;

private String root_path="/dubbo";


public DistributeLock(){
	//client.create("/dubbo/"+subPath, "1",CreateMode.PERSISTENT_SEQUENTIAL);
	if(!client.exists(root_path)){
		client.createPersistent(root_path);
	}
}

static{
	client=new ZkClient("192.168.1.101:2181");
}

@Override
public void lock() {
	if(tryLock()){
		System.out.println(Thread.currentThread().getName()
				+"->"+current_path+"获得锁成功");
		return;
	}
	try {
		waitForLock(wait_path);
		System.out.println(Thread.currentThread().getName()
				+"->"+current_path+"获得锁成功");
	} catch (Exception e) {
		e.printStackTrace();
	}
}

private void waitForLock(String prev) throws Exception {
	if(client.exists(prev)){
		System.out.println(Thread.currentThread().getName()
				+"等待的节点是:"+prev);
		CountDownLatch latch = new CountDownLatch(1);
		client.subscribeDataChanges(prev, new IZkDataListener(){
			@Override
			public void handleDataChange(String dataPath, Object data) throws Exception {
				latch.countDown();
			}
			@Override
			public void handleDataDeleted(String dataPath) throws Exception {
				//latch.countDown();
			}
		});
		latch.await();
	}
}

@Override
public void lockInterruptibly() throws InterruptedException {
	
}

@Override
public boolean tryLock() {
	current_path= client.createEphemeralSequential(root_path+"/", "1");
	System.out.println
		(Thread.currentThread().getName()+
				"->创建节点:"+current_path);
	
	List<String> children = client.getChildren(root_path);
	SortedSet<String> sortSet = new TreeSet<>();
	for(String str:children){
		sortSet.add(str);
	}
	String first = sortSet.first();
	if(first.equals(current_path.replace(root_path+"/", ""))){
		return true;
	}
	SortedSet<String> headSet = sortSet.headSet(current_path.replace(root_path+"/", ""));
	if(!headSet.isEmpty()){
		wait_path = root_path+"/"+headSet.last();
	}
	return false;
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
	return false;
}

@Override
public void unlock() {
	client.writeData(current_path, "hello");
}

@Override
public Condition newCondition() {
	return this.newCondition();
}
           

}

在本地测试代码入下:

public static void main(String[] args) {

CountDownLatch countDownLatch = new CountDownLatch(10);

for(int i=0;i<10;i++){

new Thread(()->{

try {

countDownLatch.await();

DistributeLock lock = new DistributeLock();

lock.lock();

TimeUnit.SECONDS.sleep(5);

lock.unlock();

} catch (Exception e) {

e.printStackTrace();

}

},"thread--"+i).start();
		countDownLatch.countDown();
	}
}
           

输出如下:

thread–6->创建节点:/dubbo/0000000148

thread–7->创建节点:/dubbo/0000000149

thread–8->创建节点:/dubbo/0000000150

thread–2->创建节点:/dubbo/0000000151

thread–1->创建节点:/dubbo/0000000157

thread–0->创建节点:/dubbo/0000000152

thread–4->创建节点:/dubbo/0000000156

thread–3->创建节点:/dubbo/0000000153

thread–5->创建节点:/dubbo/0000000154

thread–9->创建节点:/dubbo/0000000155

thread–6->/dubbo/0000000148获得锁成功

thread–2等待的节点是:/dubbo/0000000150

thread–7等待的节点是:/dubbo/0000000148

thread–4等待的节点是:/dubbo/0000000155

thread–1等待的节点是:/dubbo/0000000156

thread–3等待的节点是:/dubbo/0000000152

thread–0等待的节点是:/dubbo/0000000151

thread–5等待的节点是:/dubbo/0000000153

thread–9等待的节点是:/dubbo/0000000154

thread–8等待的节点是:/dubbo/0000000149

thread–7->/dubbo/0000000149获得锁成功

thread–8->/dubbo/0000000150获得锁成功

thread–2->/dubbo/0000000151获得锁成功

thread–0->/dubbo/0000000152获得锁成功

thread–3->/dubbo/0000000153获得锁成功

thread–5->/dubbo/0000000154获得锁成功

thread–9->/dubbo/0000000155获得锁成功

thread–4->/dubbo/0000000156获得锁成功

thread–1->/dubbo/0000000157获得锁成功

希望对学习zk实现分布式锁实现的同学有帮助