ZooKeeper :Java客户端Watcher API介绍
在上一篇博客中,博主给大家介绍了
Java
客户端的
Session
、
ACL
以及
Znode API
:
- ZooKeeper :Java客户端Session、ACL、Znode API介绍
这一篇博客,博主将会介绍
Watcher API
的使用。
先创建一个
maven
项目:
pom.xml
如下所示:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.kaven</groupId>
<artifactId>zookeeper</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.3</version>
</dependency>
</dependencies>
</project>
ZooKeeper
依赖包的版本最好要和
ZooKeeper
服务端的版本一致。
Watcher
Watcher
是
ZooKeeper
中非常重要的特性,
ZooKeeper
上创建的节点,可以对这些节点绑定监听事件,比如可以监听节点数据变更、节点删除、子节点状态变更等事件,通过这个事件机制,可以基于
ZooKeeper
实现分布式锁、集群管理等功能。
Watcher
特性:比如当节点数据发生变化的时候,
ZooKeeper
会产生一个
Watcher
事件,并且会发送到客户端,客户端收到监听的节点事件后,就可以进行相应的业务处理了。
ZooKeeper
的
Watcher
机制,可以分为三个过程:客户端注册
Watcher
、服务端处理
Watcher
和客户端回调。
添加Watcher
在客户端命令介绍这篇博客中,博主介绍了
ZooKeeper
客户端的
addWatch
命令:
-
:客户端在节点上添加监听,有两种模式addWatch
和PERSISTENT
,PERSISTENT_RECURSIVE
模式只监听指定的节点事件,而PERSISTENT
模式会监听指定节点与它所有子节点的事件。PERSISTENT_RECURSIVE
addWatch
命令对应
Java
客户端的
addWatch
方法:
// 使用给定的模式向给定的ZNode添加Watcher
// 此方法只能设置AddWatchMode中的模式
void addWatch(String basePath, Watcher watcher, AddWatchMode mode)
// 使用给定的模式向给定的ZNode添加Watcher
// 此方法只能设置AddWatchMode中的模式
// 使用了默认的Watcher(创建ZooKeeper实例时传入的Watcher),其他方法类似
void addWatch(String basePath, AddWatchMode mode)
// addWatch(String, Watcher, AddWatchMode)异步版本
void addWatch(String basePath, Watcher watcher, AddWatchMode mode, VoidCallback cb, Object ctx)
// addWatch(String, AddWatchMode)异步版本
void addWatch(String basePath, AddWatchMode mode, VoidCallback cb, Object ctx)
AddWatchMode
枚举类:
public enum AddWatchMode {
// 在给定的路径上设置一个在触发时不会被移除的Watcher(即它保持活动状态直到被移除)
// 该Watcher对data和child两类事件进行触发
// 该Watcher的行为就像在给定路径的ZNode上放置一个exists() Watcher和一个getChildren() Watcher一样
// 要移除该Watcher,需要使用removeWatches()和WatcherType.Any
PERSISTENT(ZooDefs.AddWatchModes.persistent),
// 在给定的路径上设置一个Watcher
// a) 触发时不会被移除(即它保持活动状态直到被移除)
// b) 不仅适用于注册路径,而且递归地适用于所有子路径
// 该Watcher对data和child两类事件进行触发
// 该Watcher的行为就像在给定路径的ZNode上放置一个exists() Watcher和一个getChildren() Watcher一样
// 要删除该Watcher,需要使用removeWatches()和WatcherType.Any
// 注意:当有递归监听时,性能会略有下降,因为必须检查ZNode路径的所有子路径以进行事件监听
PERSISTENT_RECURSIVE(ZooDefs.AddWatchModes.persistentRecursive)
;
public int getMode() {
return mode;
}
private final int mode;
AddWatchMode(int mode) {
this.mode = mode;
}
}
Application
类(实现了
Watcher
接口,通过
process
方法,可以监听
WatchedEvent
的触发):
package com.kaven.zookeeper;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
import org.apache.zookeeper.server.watch.WatcherMode;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @Author: ITKaven
* @Date: 2021/11/20 10:30
* @Leetcode: https://leetcode-cn.com/u/kavenit
* @Notes:
*/
public class Application implements Watcher {
private static CountDownLatch latch;
private static final String SERVER_PROXY = "192.168.1.184:9000";
private static final int TIMEOUT = 40000;
private static long time;
private String watcherName;
protected Application(String watcherName) {
this.watcherName = watcherName;
}
public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException {
Watcher connectWatcher = new Application("connectWatcher");
latch = new CountDownLatch(1);
time = System.currentTimeMillis();
ZooKeeper zk = new ZooKeeper(SERVER_PROXY, TIMEOUT, connectWatcher);
latch.await();
System.out.println(zk.getState());
System.out.println("Connection complete!");
ACL acl = new ACL(
ZooDefs.Perms.ALL,
new Id("digest",
DigestAuthenticationProvider.generateDigest("kaven:itkaven"))
);
String message = "success";
latch = new CountDownLatch(1);
zk.create("/itkaven",
"hello kaven".getBytes(),
new ArrayList<>(Collections.singletonList(acl)),
CreateMode.PERSISTENT,
(rc, path, ctx, name, s) -> {
System.out.println("-----------------create------------------");
System.out.println(rc);
System.out.println(path);
System.out.println(name);
System.out.println(name.equals(path) ? ctx : "error");
System.out.println(s.getDataLength());
System.out.println("-----------------create------------------");
latch.countDown();
},
message);
latch.await();
Watcher nodeWatcher = new Application("nodeWatcher");
String nodeMessage = "nodeWatcher success";
latch = new CountDownLatch(1);
AtomicBoolean isOk = new AtomicBoolean(false);
zk.addWatch("/itkaven", nodeWatcher, AddWatchMode.PERSISTENT,
(rc, path, ctx) -> {
System.out.println("-----------------addWatch------------------");
System.out.println(rc);
if(rc == KeeperException.Code.OK.intValue()) {
System.out.println(path);
System.out.println(ctx);
isOk.set(true);
}
System.out.println("-----------------addWatch------------------");
latch.countDown();
},
nodeMessage);
latch.await();
if(isOk.get()) {
zk.addAuthInfo("digest","kaven:itkaven".getBytes());
zk.setData("/itkaven", "new data".getBytes(), -1);
zk.create("/itkaven/son1", "son1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.getData("/itkaven/son1", true, null);
zk.create("/itkaven/son1/grandson1", "grandson1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
zk.setData("/itkaven/son1", "new son1".getBytes(), -1);
zk.setData("/itkaven/son1", "new son2".getBytes(), -1);
zk.setData("/itkaven/son1/grandson1", "new grandson1".getBytes(), -1);
zk.delete("/itkaven/son1/grandson1", -1);
zk.delete("/itkaven/son1", -1);
zk.delete("/itkaven", -1);
}
Thread.sleep(1000000);
}
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("-----------------WatchedEvent------------------");
System.out.println(this.watcherName);
System.out.println(watchedEvent.getType());
System.out.println(watchedEvent.getState().name());
System.out.println(watchedEvent.getPath());
System.out.println("time use(ms):" + (System.currentTimeMillis() - time));
time = System.currentTimeMillis();
System.out.println("-----------------WatchedEvent------------------");
if(watchedEvent.getState().equals(Event.KeeperState.SyncConnected)) {
latch.countDown();
}
}
}
输出:
-----------------WatchedEvent------------------
connectWatcher
None
SyncConnected
null
time use(ms):13720
-----------------WatchedEvent------------------
CONNECTED
Connection complete!
-----------------create------------------
0
/itkaven
/itkaven
success
11
-----------------create------------------
-----------------addWatch------------------
0
/itkaven
nodeWatcher success
-----------------addWatch------------------
-----------------WatchedEvent------------------
nodeWatcher
NodeDataChanged
SyncConnected
/itkaven
time use(ms):31
-----------------WatchedEvent------------------
-----------------WatchedEvent------------------
nodeWatcher
NodeChildrenChanged
SyncConnected
/itkaven
time use(ms):3
-----------------WatchedEvent------------------
-----------------WatchedEvent------------------
connectWatcher
NodeDataChanged
SyncConnected
/itkaven/son1
time use(ms):8
-----------------WatchedEvent------------------
-----------------WatchedEvent------------------
nodeWatcher
NodeChildrenChanged
SyncConnected
/itkaven
time use(ms):13
-----------------WatchedEvent------------------
-----------------WatchedEvent------------------
nodeWatcher
NodeDeleted
SyncConnected
/itkaven
time use(ms):4
-----------------WatchedEvent------------------
AddWatchMode.PERSISTENT
类型的
Watcher
只会监听注册节点的相关事件(节点数据更改
NodeDataChanged
、子节点列表更改
NodeChildrenChanged
以及节点删除
NodeDeleted
等),而不会监听注册节点的子节点的相关事件(不会引起子节点列表更改的事件)。
ZooKeeper API
调用成功后可以在
ZooKeeper
服务端的数据节点上留下
Watcher
(如果只是通过布尔参数
boolean watch
来判断是否留下
Watcher
,而不是传入
Watcher
实例参数,则是使用默认
Watcher
实例来进行监听事件的触发回调,上面程序的输出也正好如此)。 其他成功的
ZooKeeper API
调用可以触发这些
Watcher
。 一旦
Watcher
被触发,一个事件将被传递给
Watcher
的处理方法(以回调的形式,如
process
方法)。 每个
Watcher
只能触发一次,如上面的
zk.getData("/itkaven/son1", true, null)
,在
/itkaven/son1
节点上留下了
Watcher
,但对
/itkaven/son1
节点的数据更改了两次,该
Watcher
只触发了一次,因为只能触发一次,和通过
addWatch
方法添加到节点上的
Watcher
不一样,因为这些
Watcher
触发时不会被移除。
// 方法定义
byte[] getData(String path, boolean watch, Stat stat)
byte[] getData(final String path, Watcher watcher, Stat stat)
EventType
枚举类(事件类型):
enum EventType {
None(-1),
NodeCreated(1),
NodeDeleted(2),
NodeDataChanged(3),
NodeChildrenChanged(4),
DataWatchRemoved(5),
ChildWatchRemoved(6),
PersistentWatchRemoved (7);
}
-
:连接建立事件。None
-
:节点创建事件。NodeCreated
-
:节点删除事件。NodeDeleted
-
:节点数据更改事件。NodeDataChanged
-
:子节点列表更改事件。NodeChildrenChanged
-
:节点的数据监听被移除事件。DataWatchRemoved
-
:节点的子节点列表监听被移除事件。ChildWatchRemoved
-
:节点的持久监听被移除事件。PersistentWatchRemoved
如果将
AddWatchMode.PERSISTENT
换成
AddWatchMode.PERSISTENT_RECURSIVE
:
zk.addWatch("/itkaven", nodeWatcher, AddWatchMode.PERSISTENT_RECURSIVE,
(rc, path, ctx) -> {
System.out.println("-----------------addWatch------------------");
System.out.println(rc);
if(rc == KeeperException.Code.OK.intValue()) {
System.out.println(path);
System.out.println(ctx);
isOk.set(true);
}
System.out.println("-----------------addWatch------------------");
latch.countDown();
},
nodeMessage);
输出也会改变:
-----------------WatchedEvent------------------
connectWatcher
None
SyncConnected
null
time use(ms):13763
-----------------WatchedEvent------------------
CONNECTED
Connection complete!
-----------------create------------------
0
/itkaven
/itkaven
success
11
-----------------create------------------
-----------------addWatch------------------
0
/itkaven
nodeWatcher success
-----------------addWatch------------------
-----------------WatchedEvent------------------
nodeWatcher
NodeDataChanged
SyncConnected
/itkaven
time use(ms):23
-----------------WatchedEvent------------------
-----------------WatchedEvent------------------
nodeWatcher
NodeCreated
SyncConnected
/itkaven/son1
time use(ms):3
-----------------WatchedEvent------------------
-----------------WatchedEvent------------------
nodeWatcher
NodeCreated
SyncConnected
/itkaven/son1/grandson1
time use(ms):4
-----------------WatchedEvent------------------
-----------------WatchedEvent------------------
connectWatcher
NodeDataChanged
SyncConnected
/itkaven/son1
time use(ms):2
-----------------WatchedEvent------------------
-----------------WatchedEvent------------------
nodeWatcher
NodeDataChanged
SyncConnected
/itkaven/son1
time use(ms):0
-----------------WatchedEvent------------------
-----------------WatchedEvent------------------
nodeWatcher
NodeDataChanged
SyncConnected
/itkaven/son1
time use(ms):2
-----------------WatchedEvent------------------
-----------------WatchedEvent------------------
nodeWatcher
NodeDataChanged
SyncConnected
/itkaven/son1/grandson1
time use(ms):3
-----------------WatchedEvent------------------
-----------------WatchedEvent------------------
nodeWatcher
NodeDeleted
SyncConnected
/itkaven/son1/grandson1
time use(ms):3
-----------------WatchedEvent------------------
-----------------WatchedEvent------------------
nodeWatcher
NodeDeleted
SyncConnected
/itkaven/son1
time use(ms):2
-----------------WatchedEvent------------------
-----------------WatchedEvent------------------
nodeWatcher
NodeDeleted
SyncConnected
/itkaven
time use(ms):4
-----------------WatchedEvent------------------
AddWatchMode.PERSISTENT_RECURSIVE
类型的
Watcher
不仅监听注册节点的相关事件(节点数据更改
NodeDataChanged
和节点删除
NodeDeleted
等,而子节点列表更改
NodeChildrenChanged
其实就是子节点的节点创建
NodeCreated
和节点删除
NodeDeleted
,应该触发的是子节点的事件监听),还会递归地监听注册节点的所有子节点的相关事件。
注册默认Watcher
调用
ZooKeeper
的某些
API
时,如果只是通过布尔参数
boolean watch
来判断是否给指定节点留下
Watcher
,而不是传入
Watcher
实例参数,则是使用默认
Watcher
实例来进行监听事件的触发回调(在创建
ZooKeeper
实例时被传入),
register
方法(
synchronized
修饰)用于注册默认
Watcher
,会覆盖构建期间指定的
Watcher
。
// 指定连接的默认Watcher(覆盖构建期间指定的Watcher)
public synchronized void register(Watcher watcher) {
watchManager.defaultWatcher = watcher;
}
public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException {
Watcher connectWatcher = new Application("connectWatcher");
latch = new CountDownLatch(1);
time = System.currentTimeMillis();
ZooKeeper zk = new ZooKeeper(SERVER_PROXY, TIMEOUT, connectWatcher);
latch.await();
System.out.println(zk.getState());
System.out.println("Connection complete!");
ACL acl = new ACL(
ZooDefs.Perms.ALL,
new Id("digest",
DigestAuthenticationProvider.generateDigest("kaven:itkaven"))
);
String message = "success";
latch = new CountDownLatch(1);
zk.create("/itkaven",
"hello kaven".getBytes(),
new ArrayList<>(Collections.singletonList(acl)),
CreateMode.PERSISTENT,
(rc, path, ctx, name, s) -> {
System.out.println("-----------------create------------------");
System.out.println(rc);
System.out.println(path);
System.out.println(name);
System.out.println(name.equals(path) ? ctx : "error");
System.out.println(s.getDataLength());
System.out.println("-----------------create------------------");
latch.countDown();
},
message);
latch.await();
zk.addAuthInfo("digest","kaven:itkaven".getBytes());
zk.create("/itkaven/son1", "son1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk.register(new Application("new watcher1"));
zk.register(new Application("new watcher2"));
zk.getData("/itkaven/son1", true, null);
zk.register(new Application("new watcher3"));
zk.setData("/itkaven/son1", "new son1".getBytes(), -1);
Thread.sleep(1000000);
}
输出:
-----------------WatchedEvent------------------
connectWatcher
None
SyncConnected
null
time use(ms):13699
-----------------WatchedEvent------------------
CONNECTED
Connection complete!
-----------------create------------------
0
/itkaven
/itkaven
success
11
-----------------create------------------
-----------------WatchedEvent------------------
new watcher2
NodeDataChanged
SyncConnected
/itkaven/son1
time use(ms):25
-----------------WatchedEvent------------------
由输出结果可知,
register
方法可以被多次调用(重复覆盖),并且
ZooKeeper API
会选择方法调用之前最新的默认
Watcher
,如
getData
方法(其他方法类似):
public byte[] getData(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException {
return getData(path, watch ? watchManager.defaultWatcher : null, stat);
}
volatile
修饰符保证了
defaultWatcher
属性的可见性,因此,只需要保证
register
方法的原子性即可,而
register
方法被
synchronized
修饰(具有原子性)。
protected volatile Watcher defaultWatcher;
删除Watcher
删除
Watcher
有以下方法(同步版本与异步版本):
// 对于给定的ZNode路径(path参数),删除指定类型(watcherType参数)的指定Watcher(watcher参数,因此watcher参数不能为空)
void removeWatches(String path, Watcher watcher, WatcherType watcherType, boolean local)
// 异步版本
void removeWatches(String path, Watcher watcher, WatcherType watcherType, boolean local, VoidCallback cb,Object ctx)
// 对于给定的ZNode路径(path参数),删除指定类型(watcherType参数)的所有Watcher(没有Watcher的限制)
void removeAllWatches(String path, WatcherType watcherType, boolean local)
// 异步版本
void removeAllWatches(String path, WatcherType watcherType, boolean local, VoidCallback cb, Object ctx)
-
:节点的路径。path
-
:一个具体的watcher
。Watcher
-
:要移除的watcherType
类型。Watcher
-
:没有服务端连接时,是否可以在本地移除local
。Watcher
-
:异步回调实例。VoidCallback
WatcherType
枚举类:
enum WatcherType {
Children(1),
Data(2),
Any(3);
}
public static void main(String[] args) throws IOException, InterruptedException, KeeperException, NoSuchAlgorithmException {
Watcher connectWatcher = new Application("connectWatcher");
latch = new CountDownLatch(1);
time = System.currentTimeMillis();
ZooKeeper zk = new ZooKeeper(SERVER_PROXY, TIMEOUT, connectWatcher);
latch.await();
System.out.println(zk.getState());
System.out.println("Connection complete!");
ACL acl = new ACL(
ZooDefs.Perms.ALL,
new Id("digest",
DigestAuthenticationProvider.generateDigest("kaven:itkaven"))
);
String message = "success";
latch = new CountDownLatch(1);
zk.create("/itkaven",
"hello kaven".getBytes(),
new ArrayList<>(Collections.singletonList(acl)),
CreateMode.PERSISTENT,
(rc, path, ctx, name, s) -> {
System.out.println("-----------------create------------------");
System.out.println(rc);
System.out.println(path);
System.out.println(name);
System.out.println(name.equals(path) ? ctx : "error");
System.out.println(s.getDataLength());
System.out.println("-----------------create------------------");
latch.countDown();
},
message);
latch.await();
Watcher nodeWatcher = new Application("nodeWatcher");
String nodeMessage = "nodeWatcher success";
latch = new CountDownLatch(1);
AtomicBoolean isOk = new AtomicBoolean(false);
zk.addWatch("/itkaven", nodeWatcher, AddWatchMode.PERSISTENT_RECURSIVE,
(rc, path, ctx) -> {
System.out.println("-----------------addWatch------------------");
System.out.println(rc);
if(rc == KeeperException.Code.OK.intValue()) {
System.out.println(path);
System.out.println(ctx);
isOk.set(true);
}
System.out.println("-----------------addWatch------------------");
latch.countDown();
},
nodeMessage);
latch.await();
if(isOk.get()) {
zk.addAuthInfo("digest","kaven:itkaven".getBytes());
zk.getData("/itkaven", true, null);
zk.removeWatches("/itkaven", connectWatcher, WatcherType.Data, false);
zk.getChildren("/itkaven", true);
zk.removeAllWatches("/itkaven", WatcherType.Children, false);
zk.getChildren("/itkaven", nodeWatcher);
zk.exists("/itkaven", nodeWatcher);
zk.removeAllWatches("/itkaven", WatcherType.Any, false);
}
Thread.sleep(1000000);
}
输出:
-----------------WatchedEvent------------------
connectWatcher
None
SyncConnected
null
time use(ms):13732
-----------------WatchedEvent------------------
CONNECTED
Connection complete!
-----------------create------------------
0
/itkaven
/itkaven
success
11
-----------------create------------------
-----------------addWatch------------------
0
/itkaven
nodeWatcher success
-----------------addWatch------------------
-----------------WatchedEvent------------------
connectWatcher
DataWatchRemoved
SyncConnected
/itkaven
time use(ms):23
-----------------WatchedEvent------------------
-----------------WatchedEvent------------------
connectWatcher
ChildWatchRemoved
SyncConnected
/itkaven
time use(ms):2
-----------------WatchedEvent------------------
-----------------WatchedEvent------------------
nodeWatcher
PersistentWatchRemoved
SyncConnected
/itkaven
time use(ms):3
-----------------WatchedEvent------------------
-----------------WatchedEvent------------------
nodeWatcher
DataWatchRemoved
SyncConnected
/itkaven
time use(ms):1
-----------------WatchedEvent------------------
-----------------WatchedEvent------------------
nodeWatcher
ChildWatchRemoved
SyncConnected
/itkaven
time use(ms):0
-----------------WatchedEvent------------------
getData
、
getChildren
以及
exists
三个方法都可以在节点上留下一次性
Watcher
,而这些
Watcher
的类型分别是
Data
、
Children
和
Data
,而通过
addWatch
方法可以在节点上添加持久
Watcher
(
PERSISTENT
和
PERSISTENT_RECURSIVE
),并且这些
Watcher
的类型是
Any
。由输出结果可知,删除类型为
Any
的
Watcher
,也会一起删除类型为
Children
和
Data
的
Watcher
,而删除类型为
Children
或
Data
的
Watcher
,只能删除对应类型的
Watcher
。
在Java客户端Session、ACL、Znode API介绍这篇博客中介绍了这些方法的特性:
-
:方法返回给定路径的节点的数据和状态信息(类似getData
客户端的ZooKeeper
命令)。如果get -s
为watch
并且调用成功(也可以传入一个true
实例),则Watcher
将留在给定路径的节点上。watch
将由在节点上设置数据或删除节点的成功操作触发,该方法也存在异步版本。watch
-
:方法返回给定路径的节点的状态信息(类似exists
客户端的ZooKeeper
命令)。如果不存在这样的节点,则返回stat
。如果null
为 watch
并且调用成功(也可以传入一个true
实例),则Watcher
将留在给定路径的节点上。 watch
将由创建、删除节点或在节点上设置数据的成功操作触发,该方法也存在异步版本。watch
-
:返回给定路径的节点的子节点列表。如果getChildren
为watch
并且调用成功(也可以传入一个true
实例),则Watcher
将留在给定路径的节点上。 删除给定路径的节点或在节点下创建、删除子节点的成功操作将触发watch
。返回的子节点列表未排序,该方法存在异步版本。watch
回顾前面博主对
AddWatchMode
枚举类的介绍: