InterProcessMutex
InterProcessMutex
类的源码注释:
A re-entrant mutex that works across JVMs. Uses Zookeeper to hold the lock. All processes in all JVMs that use the same lock path will achieve an inter-process critical section. Further, this mutex is “fair” - each user will get the mutex in the order requested (from ZK’s point of view).
跨
工作的可重入互斥锁。使用
JVM
来持有锁。所有
Zookeeper
中使用相同锁路径的所有进程都将实现进程间临界区。此外,这个互斥锁是“公平的”—每个用户都将按照请求的顺序获得互斥锁(从
JVM
的角度来看)。
ZK
测试
pom.xml
:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.kaven</groupId>
<artifactId>zookeeper</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
</dependencies>
</project>
CuratorFrameworkProperties
类(提供
CuratorFramework
需要的一些配置信息):
package com.kaven.zookeeper;
import org.apache.curator.RetryPolicy;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorFrameworkProperties {
// 连接地址
public static final String CONNECT_ADDRESS = "192.168.1.3:9000";
// 连接超时时间
public static final int CONNECTION_TIMEOUT_MS = 40000;
// Session超时时间
public static final int SESSION_TIMEOUT_MS = 10000;
// 命名空间
public static final String NAMESPACE = "MyNamespace";
// 重试策略
public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 3);
}
InterProcessMutexRunnable
类(实现了
Runnable
接口,模拟分布式节点获取分布式锁):
package com.kaven.zookeeper;
import lombok.SneakyThrows;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import java.util.Random;
public class InterProcessMutexRunnable implements Runnable{
@SneakyThrows
@Override
public void run() {
// 使用不同的CuratorFramework实例,表示不同的分布式节点
CuratorFramework curator = getCuratorFramework();
curator.start();
assert curator.getState().equals(CuratorFrameworkState.STARTED);
// 模拟随机加入的分布式节点
int randomSleep = new Random().nextInt(1000);
Thread.sleep(randomSleep);
// 分布式锁的锁路径
String path = "/kaven";
// 创建InterProcessMutex实例,用于获取分布式锁
InterProcessMutex mutex = new InterProcessMutex(curator, path);
// 阻塞,直到获取分布式锁
mutex.acquire();
if(mutex.isOwnedByCurrentThread()) {
System.out.println(Thread.currentThread().getName() + " 持有分布式锁");
mutex.getParticipantNodes().forEach(System.out::println);
// 处理业务
Thread.sleep(5000);
// 业务处理完成
System.out.println(Thread.currentThread().getName() + " 业务处理完成");
// 释放分布式锁
mutex.release();
}
else {
throw new RuntimeException("获取分布式锁时被中断");
}
}
private CuratorFramework getCuratorFramework() {
// 创建CuratorFramework实例
return CuratorFrameworkFactory.builder()
.connectString(CuratorFrameworkProperties.CONNECT_ADDRESS)
.retryPolicy(CuratorFrameworkProperties.RETRY_POLICY)
.connectionTimeoutMs(CuratorFrameworkProperties.CONNECTION_TIMEOUT_MS)
.sessionTimeoutMs(CuratorFrameworkProperties.SESSION_TIMEOUT_MS)
.namespace(CuratorFrameworkProperties.NAMESPACE)
.build();
}
}
启动类:
package com.kaven.zookeeper;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Application {
private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();
public static void main(String[] args) throws Exception {
for (int i = 0; i < 5; i++) {
EXECUTOR_SERVICE.execute(new InterProcessMutexRunnable());
}
Thread.sleep(10000000);
}
}
模拟
5
个分布式节点获取分布式锁,输出如下所示:
pool-1-thread-4 持有分布式锁
/kaven/_c_e1940de6-9e12-4c94-8c1e-98d22178a94e-lock-0000000000
/kaven/_c_b53a7f04-2e85-4675-839a-c26d24585f2c-lock-0000000001
/kaven/_c_ad36f188-8d5b-4c8f-878b-2675dfb2e3ee-lock-0000000002
/kaven/_c_e843c51c-acf6-431f-b02a-479f7423c4d3-lock-0000000003
/kaven/_c_f594e331-b10d-4447-a475-6fd809d1b8c6-lock-0000000004
pool-1-thread-4 业务处理完成
pool-1-thread-5 持有分布式锁
/kaven/_c_b53a7f04-2e85-4675-839a-c26d24585f2c-lock-0000000001
/kaven/_c_ad36f188-8d5b-4c8f-878b-2675dfb2e3ee-lock-0000000002
/kaven/_c_e843c51c-acf6-431f-b02a-479f7423c4d3-lock-0000000003
/kaven/_c_f594e331-b10d-4447-a475-6fd809d1b8c6-lock-0000000004
pool-1-thread-5 业务处理完成
pool-1-thread-2 持有分布式锁
/kaven/_c_ad36f188-8d5b-4c8f-878b-2675dfb2e3ee-lock-0000000002
/kaven/_c_e843c51c-acf6-431f-b02a-479f7423c4d3-lock-0000000003
/kaven/_c_f594e331-b10d-4447-a475-6fd809d1b8c6-lock-0000000004
pool-1-thread-2 业务处理完成
pool-1-thread-1 持有分布式锁
/kaven/_c_e843c51c-acf6-431f-b02a-479f7423c4d3-lock-0000000003
/kaven/_c_f594e331-b10d-4447-a475-6fd809d1b8c6-lock-0000000004
pool-1-thread-1 业务处理完成
pool-1-thread-3 持有分布式锁
/kaven/_c_f594e331-b10d-4447-a475-6fd809d1b8c6-lock-0000000004
pool-1-thread-3 业务处理完成
InterProcessMutex
类提供了两种方法来获取分布式锁:
/**
* 阻塞,直到获取分布式锁
* 同一个线程可以重复获取该锁,即可重入锁
* 每次获取锁都必须通过调用release()方法来释放锁
*/
@Override
public void acquire() throws Exception
{
if ( !internalLock(-1, null) )
{
throw new IOException("Lost connection while trying to acquire lock: " + basePath);
}
}
/**
* 阻塞,直到获取分布式锁或给定时间已到
* 同一个线程可以重复获取该锁,即可重入锁
* 每次获取锁都必须通过调用release()方法来释放锁
*/
@Override
public boolean acquire(long time, TimeUnit unit) throws Exception
{
return internalLock(time, unit);
}
acquire(long time, TimeUnit unit)
方法后面会演示。
可重入锁
@SneakyThrows
@Override
public void run() {
...
// 阻塞,直到获取分布式锁
mutex.acquire();
if(mutex.isOwnedByCurrentThread()) {
System.out.println(Thread.currentThread().getName() + " 持有分布式锁");
mutex.getParticipantNodes().forEach(System.out::println);
// 处理业务
reentry(5, mutex);
// 业务处理完成
System.out.println(Thread.currentThread().getName() + " 业务处理完成");
// 释放分布式锁
mutex.release();
}
else {
throw new RuntimeException("获取分布式锁时被中断");
}
}
private void reentry(int nums, InterProcessMutex mutex) throws Exception {
if(nums == 0) return;
// 重复获取锁
mutex.acquire();
Thread.sleep(1000);
reentry(nums - 1, mutex);
// 每获取一次锁,都需要释放
mutex.release();
}
通过递归调用来模拟锁重入,输出如下所示:
pool-1-thread-4 持有分布式锁
/kaven/_c_90f9c755-249e-4d80-a101-98d7c894b5bc-lock-0000000000
/kaven/_c_a86b8fc8-e5e0-40fa-9cb0-d25a82e55926-lock-0000000001
/kaven/_c_e2a8e273-e728-4e43-9261-3bd5933d73c8-lock-0000000002
/kaven/_c_9e08256f-cef1-40c4-9d76-9675d92a84e3-lock-0000000003
/kaven/_c_3704bb49-1470-4c4e-a03d-66737967cbff-lock-0000000004
pool-1-thread-4 业务处理完成
pool-1-thread-2 持有分布式锁
/kaven/_c_a86b8fc8-e5e0-40fa-9cb0-d25a82e55926-lock-0000000001
/kaven/_c_e2a8e273-e728-4e43-9261-3bd5933d73c8-lock-0000000002
/kaven/_c_9e08256f-cef1-40c4-9d76-9675d92a84e3-lock-0000000003
/kaven/_c_3704bb49-1470-4c4e-a03d-66737967cbff-lock-0000000004
pool-1-thread-2 业务处理完成
pool-1-thread-3 持有分布式锁
/kaven/_c_e2a8e273-e728-4e43-9261-3bd5933d73c8-lock-0000000002
/kaven/_c_9e08256f-cef1-40c4-9d76-9675d92a84e3-lock-0000000003
/kaven/_c_3704bb49-1470-4c4e-a03d-66737967cbff-lock-0000000004
pool-1-thread-3 业务处理完成
pool-1-thread-5 持有分布式锁
/kaven/_c_9e08256f-cef1-40c4-9d76-9675d92a84e3-lock-0000000003
/kaven/_c_3704bb49-1470-4c4e-a03d-66737967cbff-lock-0000000004
pool-1-thread-5 业务处理完成
pool-1-thread-1 持有分布式锁
/kaven/_c_3704bb49-1470-4c4e-a03d-66737967cbff-lock-0000000004
pool-1-thread-1 业务处理完成
可撤销锁
InterProcessMutex
类实现了
Revocable
接口,使得锁可撤销,其实就是想要获取锁的用户给持有锁的用户发送一个请求(通过
Curator
的
CuratorWatcher
),持有锁的用户接收到这个请求后就可以进行处理,比如直接释放锁、平滑释放锁(处理完当前业务后再释放锁)甚至可以不理会该请求,因此这种撤销操作是合作完成的。
/**
* 指定可以撤销的锁
*/
public interface Revocable<T>
{
/**
* 使锁可撤销
* 当另一个进程/线程希望持有锁的用户释放锁时,该用户的监听器将被调用
*/
public void makeRevocable(RevocationListener<T> listener);
public void makeRevocable(RevocationListener<T> listener, Executor executor);
}
博主感觉这个功能不是很方便,撤销锁时需要知道持有锁的路径,而
InterProcessMutex
类并没有提供该路径的直接获取方法,博主只能通过获取参与竞争分布式锁的节点列表来完成该操作(获取该列表的第一个值,就是持有锁的路径),但这样做不是很方便,并且性能不太好。
@SneakyThrows
@Override
public void run() {
...
// 创建InterProcessMutex实例,用于获取分布式锁
InterProcessMutex mutex = new InterProcessMutex(curator, path);
// 本线程是否持有锁
AtomicBoolean locked = new AtomicBoolean(false);
mutex.makeRevocable((mtx) -> {
// 接收到撤销锁的请求
try {
System.out.println(Thread.currentThread().getName() + " 接收到撤销锁的请求");
if(locked.get()) {
// 直接释放锁
locked.set(false);
System.out.println(Thread.currentThread().getName() + " 直接释放分布式锁");
}
} catch (Exception e) {
e.printStackTrace();
}
});
// 阻塞,直到获取分布式锁或给定时间已到
mutex.acquire(8, TimeUnit.SECONDS);
if(mutex.isOwnedByCurrentThread()) {
locked.set(true);
System.out.println(Thread.currentThread().getName() + " 持有分布式锁");
mutex.getParticipantNodes().forEach(System.out::println);
// 处理业务
while (locked.get()) {
Thread.sleep(2000);
}
// 释放锁
if(!locked.get()) {
mutex.release();
}
}
else {
// 给持有锁的用户发送撤销锁的请求
// 其实就是设置持有锁的路径的值为__REVOKE__
// 然后持有锁的用户的监听器(CuratorWatcher)会被触发
Revoker.attemptRevoke(curator,
new ArrayList<>(mutex.getParticipantNodes()).get(0));
// 再次阻塞,直到获取分布式锁或给定时间已到
mutex.acquire(8, TimeUnit.SECONDS);
if(mutex.isOwnedByCurrentThread()) {
System.out.println(Thread.currentThread().getName() + " 持有分布式锁");
mutex.getParticipantNodes().forEach(System.out::println);
}
}
}
public class Application {
private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();
public static void main(String[] args) throws Exception {
for (int i = 0; i < 2; i++) {
EXECUTOR_SERVICE.execute(new InterProcessMutexRunnable());
}
Thread.sleep(10000000);
}
}
输出如下所示:
pool-1-thread-2 持有分布式锁
/kaven/_c_b5d1ada9-6ef8-4e71-a3c5-222c0d249d6f-lock-0000000000
/kaven/_c_802055d5-07f1-4e47-8f46-4ec64f46405d-lock-0000000001
pool-1-thread-2-EventThread 接收到撤销锁的请求
pool-1-thread-2-EventThread 直接释放分布式锁
pool-1-thread-1 持有分布式锁
/kaven/_c_00ad570e-f4ae-461c-977f-b35fa6157406-lock-0000000002
为什么会有三个节点?
/kaven/_c_b5d1ada9-6ef8-4e71-a3c5-222c0d249d6f-lock-0000000000
/kaven/_c_802055d5-07f1-4e47-8f46-4ec64f46405d-lock-0000000001
/kaven/_c_00ad570e-f4ae-461c-977f-b35fa6157406-lock-0000000002