ZooKeeper : Curator框架Watcher API介绍
在之前的博客中,博主已经介绍了
Curator
框架的
Session
、
Znode
以及
ACL API
:
- ZooKeeper : Curator框架重试策略和Session API介绍
- ZooKeeper : Curator框架Znode、ACL API介绍
本篇博客将介绍
Curator
框架的
Watcher API
,它简化了
Java
客户端原生
Watcher API
的使用,但了解后者可以更好地理解前者的实现:
- ZooKeeper :Java客户端Watcher API介绍
博主使用的
Curator
框架版本是
5.2.0
,
ZooKeeper
版本是
3.6.3
。
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>
5.2.0
版本的
Curator
使用
3.6.3
版本的
ZooKeeper
,因此它们是兼容的。
Watcher
Watcher
是
ZooKeeper
中非常重要的特性,
ZooKeeper
上创建的节点,可以对这些节点绑定监听事件,比如可以监听节点数据变更、节点删除、子节点状态变更等事件,通过这个事件机制,可以基于
ZooKeeper
实现分布式锁、集群管理等功能。
Watcher
特性:比如当节点数据发生变化的时候,
ZooKeeper
会产生一个
Watcher
事件,并且会发送到客户端,客户端收到监听的节点事件后,就可以进行相应的业务处理了。
ZooKeeper
的
Watcher
机制,可以分为三个过程:客户端注册
Watcher
、服务端处理
Watcher
和客户端回调。
一次性Watcher
在
Java
客户端原生
API
中,调用
getData
、
getChildren
以及
exists
这三个方法都可以在节点上留下一次性
Watcher
,而这些一次性
Watcher
的类型分别是
Data
、
Children
和
Data
,
Watcher
类型在
WatcherType
枚举类中定义。
@Public
public static enum WatcherType {
Children(1),
Data(2),
Any(3);
private final int intValue;
private WatcherType(int intValue) {
this.intValue = intValue;
}
public int getIntValue() {
return this.intValue;
}
public static Watcher.WatcherType fromInt(int intValue) {
switch(intValue) {
case 1:
return Children;
case 2:
return Data;
case 3:
return Any;
default:
throw new RuntimeException("Invalid integer value for conversion to WatcherType");
}
}
}
Java
客户端原生
API
中
getData
、
getChildren
以及
exists
三个方法的一次性
Watcher
触发条件:
-
:一次性getData
将由在节点上设置数据或删除节点的成功操作触发。Watcher
-
:一次性exists
将由创建、删除节点或在节点上设置数据的成功操作触发。Watcher
-
:删除给定路径的节点或在节点下创建、删除子节点的成功操作将触发一次性getChildren
。Watcher
测试类
Application
(实现了
CuratorWatcher
接口,因此
Application
的实例有
Watcher
的功能 ):
package com.kaven.zookeeper;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
/**
* @Author: ITKaven
* @Date: 2021/11/20 10:30
* @Leetcode: https://leetcode-cn.com/u/kavenit
* @Notes:
*/
public class Application implements CuratorWatcher {
private static final String SERVER_PROXY = "192.168.31.172:9000";
private static final int CONNECTION_TIMEOUT_MS = 40000;
private static final int SESSION_TIMEOUT_MS = 10000;
private static final String NAMESPACE = "MyNamespace";
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework curator = CuratorFrameworkFactory.builder()
.connectString(SERVER_PROXY)
.retryPolicy(retryPolicy)
.connectionTimeoutMs(CONNECTION_TIMEOUT_MS)
.sessionTimeoutMs(SESSION_TIMEOUT_MS)
.namespace(NAMESPACE)
.build();
curator.start();
assert curator.getState().equals(CuratorFrameworkState.STARTED);
Application watcher = new Application();
curator.checkExists()
.usingWatcher(watcher)
.forPath("/father/son/grandson1");
curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath("/father/son/grandson1", "data".getBytes());
curator.setData()
.forPath("/father/son/grandson1", "new data".getBytes());
Thread.sleep(10000000);
}
@Override
public void process(WatchedEvent watchedEvent) throws Exception {
System.out.println(watchedEvent.getPath());
System.out.println(watchedEvent.getType());
}
}
输出:
/father/son/grandson1
NodeCreated
Curator
框架的
checkExists
方法对应于
Java
客户端的
exists
方法。对节点进行了两次操作(创建节点和修改节点数据),但
Watcher
只触发了一次。如果想要多次监听节点事件就需要多次添加这种一次性
Watcher
:
Application watcher = new Application();
curator.checkExists()
.usingWatcher(watcher)
.forPath("/father/son/grandson1");
curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath("/father/son/grandson1", "data".getBytes());
curator.checkExists()
.usingWatcher(watcher)
.forPath("/father/son/grandson1");
curator.setData()
.forPath("/father/son/grandson1", "new data".getBytes());
curator.checkExists()
.usingWatcher(watcher)
.forPath("/father/son/grandson1");
curator.delete()
.forPath("/father/son/grandson1");
输出:
/father/son/grandson1
NodeCreated
/father/son/grandson1
NodeDataChanged
/father/son/grandson1
NodeDeleted
很显然符合预期,想要多次监听事件,就必须多次添加一次性
Watcher
,而之前的程序由于没有再次添加一次性
Watcher
,节点的
NodeDataChanged
事件就不能监听,因为一次性
Watcher
在节点的
NodeCreated
事件触发后就被
Zookeeper
服务端移除了。
Curator
框架的
getData
方法对应于
Java
客户端的
getData
方法,它们都只能在节点存在的情况下给节点留下一次性
Watcher
,因此它们留下的一次性
Watcher
肯定不能监听该节点的创建事件,其他事件的监听与
checkExists
方法的一次性
Watcher
类似,这里不再赘述。
Application watcher = new Application();
curator.getData()
.usingWatcher(watcher)
.forPath("/father/son/grandson1");
curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath("/father/son/grandson1", "data".getBytes());
Curator
框架的
getChildren
方法对应于
Java
客户端的
getChildren
方法,给指定节点留下一次性
Watcher
的方式也是类似的,都是通过
usingWatcher
方法,删除给定路径的节点或在节点下创建、删除子节点的成功操作将触发这个一次性
Watcher
(临时节点不能创建子节点)。
curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/father/son/grandson1", "data".getBytes());
Application watcher = new Application();
curator.getChildren()
.usingWatcher(watcher)
.forPath("/father/son/grandson1");
curator.create()
.withMode(CreateMode.EPHEMERAL)
.forPath("/father/son/grandson1/test", "test".getBytes());
curator.getChildren()
.usingWatcher(watcher)
.forPath("/father/son/grandson1");
curator.delete()
.forPath("/father/son/grandson1/test");
curator.getChildren()
.usingWatcher(watcher)
.forPath("/father/son/grandson1");
curator.delete()
.forPath("/father/son/grandson1");
输出:
/father/son/grandson1
NodeChildrenChanged
/father/son/grandson1
NodeChildrenChanged
/father/son/grandson1
NodeDeleted
在
Curator
框架中可以使用两种
Watcher
,
Java
客户端提供的
Watcher
接口和
Curator
框架提供的
CuratorWatcher
接口。而它们的方法定义是类似的:
package org.apache.curator.framework.api;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
public interface CuratorWatcher
{
public void process(WatchedEvent event) throws Exception;
}
package org.apache.zookeeper;
import org.apache.yetus.audience.InterfaceAudience.Public;
@Public
public interface Watcher {
void process(WatchedEvent var1);
...
}
在
Curator
框架中这两种
Watcher
的实现都可以使用。
T usingWatcher(Watcher watcher);
T usingWatcher(CuratorWatcher watcher);
当需要一直监听指定节点的事件时,一次性
Watcher
就不太方便了,这时就需要持久
Watcher
。
持久Watcher
在
Java
客户端原生
API
中,调用
addWatch
方法可以在节点上添加持久
Watcher
(
PERSISTENT
和
PERSISTENT_RECURSIVE
),并且这些
Watcher
的类型是
Any
。在
Curator
框架中,可以如下所示给节点添加持久
Watcher
(通过
.watchers().add()
链式调用):
curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/father/son/grandson1", "data".getBytes());
Application watcher = new Application();
curator.watchers()
.add()
.withMode(AddWatchMode.PERSISTENT)
.usingWatcher(watcher)
.forPath("/father/son/grandson1");
curator.create()
.withMode(CreateMode.EPHEMERAL)
.forPath("/father/son/grandson1/test", "test".getBytes());
curator.setData()
.forPath("/father/son/grandson1/test", "new test".getBytes());
输出:
/father/son/grandson1
NodeChildrenChanged
AddWatchMode.PERSISTENT
类型的
Watcher
只会监听注册节点的相关事件(节点数据更改
NodeDataChanged
、子节点列表更改
NodeChildrenChanged
以及节点删除
NodeDeleted
等),而不会监听注册节点的子节点的相关事件(不会引起子节点列表更改的事件)。如果将
AddWatchMode.PERSISTENT
换成
AddWatchMode.PERSISTENT_RECURSIVE
:
curator.watchers()
.add()
.withMode(AddWatchMode.PERSISTENT_RECURSIVE)
.usingWatcher(watcher)
.forPath("/father/son/grandson1");
输出:
/father/son/grandson1/test
NodeCreated
/father/son/grandson1/test
NodeDataChanged
AddWatchMode.PERSISTENT_RECURSIVE
类型的
Watcher
不仅监听注册节点的相关事件(节点数据更改
NodeDataChanged
和节点删除
NodeDeleted
等,而子节点列表更改
NodeChildrenChanged
其实就是子节点的节点创建
NodeCreated
和节点删除
NodeDeleted
,应该触发的是子节点的事件监听),还会递归地监听注册节点的所有子节点的相关事件。
AddWatchMode
枚举类:
删除Watcher
删除类型为
Any
的
Watcher
,也会一起删除类型为
Children
和
Data
的
Watcher
,而删除类型为
Children
或
Data
的
Watcher
,只能删除对应类型的
Watcher
。
删除指定节点类型为
Data
的所有
Watcher
:
Application watcher1 = new Application();
Application watcher2 = new Application();
curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/father/son/grandson1", "data".getBytes());
curator.getChildren()
.usingWatcher(watcher1)
.forPath("/father/son/grandson1");
curator.getData()
.usingWatcher(watcher2)
.forPath("/father/son/grandson1");
curator.watchers()
.removeAll()
.ofType(Watcher.WatcherType.Data)
.forPath("/father/son/grandson1");
@Override
public void process(WatchedEvent watchedEvent) throws Exception {
System.out.println(watchedEvent.getPath());
System.out.println(watchedEvent.getType());
System.out.println(this);
}
输出:
/father/son/grandson1
DataWatchRemoved
com.kaven.zookeeper.Application@5af5ec05
删除指定节点类型为
Any
的所有
Watcher
:
curator.watchers()
.removeAll()
.ofType(Watcher.WatcherType.Any)
.forPath("/father/son/grandson1");
输出:
/father/son/grandson1
ChildWatchRemoved
com.kaven.zookeeper.Application@61613ed9
/father/son/grandson1
DataWatchRemoved
com.kaven.zookeeper.Application@5af5ec05
删除指定节点类型为
Any
的指定
Watcher
:
curator.watchers()
.remove(watcher1)
.ofType(Watcher.WatcherType.Any)
.forPath("/father/son/grandson1");
/father/son/grandson1
ChildWatchRemoved
com.kaven.zookeeper.Application@5ebb066a