Zookeeper学习笔记
- Zookeeper技术内幕
-
- 重要理论
-
- 数据模型znode
-
- 节点类型
- 节点状态
- 会话
-
- 会话状态
- 会话连接超时管理——客户端维护
- 会话连接事件
- 会话空闲超时管理——服务端维护
- ACL
-
- ACL简介
- zk的ACL维度
-
- 授权策略scheme
- 授权对象id
- 权限permission
- Watcher机制
-
- Watcher工作原理
- Watcher事件
- Watcher特性
- 客户端命令
-
- 启动客户端
- 查看子节点-ls
- 创建节点-create
- 获取节点数据内容-get
- 更新节点数据内容-set
- 删除节点-delete
- ACL操作
- ZkClient客户端
-
- 简介
- API介绍
-
- 创建会话
- 创建节点
- 删除节点
- 更新数据
- 检测节点是否存在
- 获取节点数据内容
- 获取子节点列表
- Watcher注册
- 代码演示
-
- 创建工程
- 代码
- Curator客户端
-
- 简介
- API介绍
-
- 创建会话
-
- 普通 API 创建 newClient()
- Fluent风格创建
- 创建节点 create()
- 删除节点 delete()
- 更新数据 setData()
- 检测节点是否存在 checkExists()
- 获取节点数据内容 getData()
- 获取子节点列表 getChildren()
- Watcher注册 usingWatcher()
- 代码演示
-
- 创建工程
- 代码
Zookeeper技术内幕
重要理论
数据模型znode
zk数据存储结构与标准的 Unix 文件系统非常相似,都是在根节点下挂很多子节点。zk中没有引入传统文件系统中目录与文件的概念,而是使用了称为 znode 的数据节点概念。znode 是 zk 中数据的最小单元,每个 znode 上都可以保存数据,同时还可以挂载子节点,形成一个树形化命名空间。
节点类型
- 持久节点(PERSISENT):一旦创建,除非主动调用删除操作,否则一直存储在 zk 上。
- 持久顺序节点(PERSISTENT_SEQUENTIAL):这类节点的基本特性和上面的节点类型是一致的。额外的特性是,在ZK中,每个父节点会为他的第一级子节点维护一份时序,会记录每个子节点创建的先后顺序。基于这个特性,在创建子节点的时候,可以设置这个属性,那么在创建节点过程中,ZK会自动为给定节点名加上一个数字后缀,作为新的节点名。这个数字后缀的范围是整型的最大值。 在创建节点的时候只需要传入节点 “/test_”,这样之后,zookeeper 自动会给“test_”后面补充数字。
- 临时节点(EPHEMERAL):与客户端会话绑定,一旦客户端会话失效,这个客户端所创建的所有临时节点都会被移除;只能作为叶子节点,即其不能挂载子节点。
- 临时顺序节点(EPHEMERAL_SEQUENTIAL):此节点是属于临时节点,不过带有顺序,客户端会话结束节点就消失。
节点状态
- cZxid:Created Zxid,表示当前 znode 被创建的事务 ID
- ctime:Created Time,表示当前 znode 被创建的时间
- mZxid:Modified Zxid,表示当前 znode 最后一次被修改时的事务 ID
- mtime:Modified Time,表示当前 znode 最后一次被修改时的时间
- pZxid:表示当前 znode 的子节点列表最后一次被修改时的事务 ID。注意:只能是其子节点列表变更了才会引起 pZxid 的变更,子节点内容的修改不会影响 pZxid。
- cversion:Children Version,表示子节点的版本号,该版本号用于充当乐观锁。
- dataVersion:表示当前 znode 数据的版本号,该版本号用于充当乐观锁。
- aclVersion:表示当前 znode 的权限 ACL 的版本号,该版本号用于充当乐观锁。
- ephemeralOwner:若当前 znode 是持久节点,则其值为 0;若为临时节点,则其值为创建该节点的会话的 SessionID。当会话消失后,会根据 SessionID来查找与该会话相关的临时节点进行删除。
- dataLength:当前 znode 中存放的数据的长度。
- numChildren:当前 znode 所包含的子节点的个数。
会话
- 会话是 zk 中最重要的概念之一,客户端与服务端之间的任何交互操作都与会话相关。
- Zookeeper客户端启动时,首先会与 zk 服务建立一个TCP长连接。连接一旦建立,客户端会话的生命周期也就开始了。
会话状态
会话的状态有三种:
- 连接中 CONNECTING:连接中。Client要创建一个连接,其首先会在本地创建一个 zk 对象,用于表示其所连接上的server。
- 已连接 CONNECTED:已连接。连接成功后,该连接的各种临时性数据会被初始化到 zk 对象中。
- 已关闭 CLOSE:已关闭。连接关闭后,这个代表Server的 zk 对象会被删除。
会话连接超时管理——客户端维护
- zk 客户端维护着会话超时管理,主要管理的超时有两类:读超时与连接超时。
- 当客户端长时间没有收到服务端请求响应或心跳响应时,会发生读超时;当客户端发出连接请求后,长时间没有收到服务端的连接成功ACK,此时发生连接超时。
会话连接事件
客户端与服务端连接成功后并不是一成不变的,也会出现一些问题,比较典型的有以下三种:
- 会话丢失:因为网络抖动等原因导致客户端长时间收不到服务端的心跳回复,客户端就会导致连接丢失。连接丢失会引发客户端自动从 zk 地址列表中逐个尝试重新连接,直到重连成功,或按照指定的重试策略终止。
- 会话转移:当发生连接丢失后,客户端又以原来的 sessionId 重新连接上了服务器。若重连上的服务器不是原来的服务器,那么客户端就需要更新本地 zk 对象中的相关信息,例如连接上的 Server 的 IP 地址,这就是会话转移。
- 会话失效:若客户端连接丢失后,在会话超时范围内没有连接上服务器,则服务器会将该会话从服务器中删除。在服务端将某客户端的会话删除后,该客户端仍使用原来的 sessionId 又重新连接上了服务器。那么服务器会给客户端发送一个连接关闭响应,表示这个会话已经失效。客户端在收到响应后会根据配置,要么关闭连接,要么重新发起新的会话 id 的连接。
会话空闲超时管理——服务端维护
- 服务器为每一个客户端的会话都记录着上一次交互后空闲的时长,及从上一次交互结束开始会话空闲超时的时间点。一旦空闲时长超时,服务端就会将该会话的 SessionID从服务端清除。这也就是为什么客户端在空闲时需要定时向服务端发送心跳,就是为了维护这个会话长连接的。服务器是通过空闲超时管理来判断会话是否发生中断的。
- 服务端对于会话空闲超时管理,采用了一种特殊的方式——分桶策略。
ACL
ACL简介
- ACL全称为 Access Control List(访问控制列表),是一种细粒度的权限管理方式,可以针对任意用户和组进行细粒度的权限控制。目前大多数 Unix 已经支持了 ACL,Linux 也从 2.6 版本开始支持了 ACL(在此之前是UGO,User Group Others 权限控制模型)。zk 利用 ACL 策略控制 znode 节点的访问权限,如节点数据读写、节点创建、节点删除、读取子节点列表、设置节点权限等。
- 不过需要注意的是,Unix/Linux 文件或子目录默认会继承其父目录的 ACL。而在 Zookeeper 中,znode 的 ACL 是没有继承关系的,每个 znode 的权限都是独立控制的,只有客户端满足 znode 设置的权限要求时,才能完成相应的操作。
zk的ACL维度
Unix/Linux 系统的 ACL 分为两个维度:组与权限。而 Zookeeper 的 ACL 分为三个维度:授权策略scheme、授权对象id、用户权限permission。
授权策略scheme
授权策略用于确定权限验证过程中使用的验证策略(简单地说就是,通过什么来验证权限,即一个用户要访问某个 znode,如何验证其身份),在 zk 中最常用的有四种策略:
- ip:使用 ip 地址进行权限验证
- digest:使用 用户名+密码 进行权限验证
- world:不验证
- super:超级用户
授权对象id
授权对象指的是权限赋予的用户,不同的授权策略具有不同类型的授权对象。下面是各个授权模式对应的授权对象id:
- ip:授权对象的 IP 地址
- digest:授权对象是 “用户名:密码”
- world:只有一个,即 anyone
- super:与 digest 模式相同
权限permission
权限指的是通过验证的用户可以对 znode 执行的操作,共有五种权限,不过 zk 支持自定义权限。
- c:Create:允许创建子节点
- d:Delete:允许删除当前节点
- r:Read:允许读取当前节点的数据内容,以及子节点列表
- w:Write:允许修改当前节点的数据内容,以及子节点列表
- a:admin/acl:允许修改当前节点的 ACL 权限
Watcher机制
zk 通过 Watcher 机制实现了发布/订阅模式。
Watcher工作原理
- zk客户端生成一个watcher对象,将该对象加入到 WatcherManagement 集合中,一旦加入到该集合中就会向zk服务端发送该watcher对象的注册信息,服务端接收到该信息并进行注册(并没有发送watcher对象)。
- 当服务端触发了watcher事件,zk服务端会向zk客户端发送相应的事件通知。
- zk客户端接收到该事件后,会根据通知找到相应的watcher对象,然后watcher对象会执行响应的回调方法。
Watcher事件
对于同一个事件类型,在不同的通知状态中代表的含义是不同的。
Watcher特性
zk的watcher机制具有以下几个特性:
- 一次性:无论是服务端还是客户端,一旦一个 Watcher 被触发,ZooKeeper 都会将其从相应的存储中移除。因此,在 Watcher 的使用上,需要反复注册。这样的设计有效地减轻了服务端的压力。【Zookeeper不适合监听高频变化对象】
- 串行化:客户端 Watcher 回调的过程是一个串行同步的过程,这为我们保证了顺序。同时,需要注意的一点是,一定不能因为一个 Watcher 的处理逻辑影响了整个客户端的 Watcher 回调,所以,我觉得客户端 Watcher 的实现类要另开一个线程进行处理业务逻辑,以便给其他的 Watcher 调用让出时间。
- 轻量级:WatchedEvent 是最小通知单元,仅仅包括:通知状态、事件类型、节点路径。仅仅通知客户端发生了事件,不会带事件具体内容,具体内容需要客户端再次请求获取。客户端向服务端传递的也不是watcher 对象,使用Boolean类型标记属性,服务端保存当前连接的ServerCnxn。
客户端命令
启动客户端
- 连接本机 zk 服务器:zkCli.sh
- 连接其它 zk 服务器:zkCli.sh -server 192.168.254.130:2181
查看子节点-ls
- 查看根节点及/brokers下包含的所有子节点列表:
创建节点-create
- 创建永久节点:创建一个名称为 china 的 znode,其值为 999
- 创建顺序节点:在 /china 节点下创建顺序子节点 beijing、shanghai、guangzhou,它们的数据内容分别为 bj、sh、gz
- 创建临时节点:临时节点与持久节点的区别,在后面 get 命令中可以看到
获取节点数据内容-get
- 获取持久节点数据:
- 获取顺序节点数据:
- 获取临时节点数据:
更新节点数据内容-set
- 更新数据:
删除节点-delete
- 删除操作:
- 若要删除具有子节点的节点,会报错:
ACL操作
- 查看权限-getAcl:
- 设置权限:下面的命令是,首先增加一个认证用户 zs,密码为 123,然后为 /china 节点指定只有 zs 用户才可以访问该节点,而访问权限为所有权限。 quit 退出后再进入访问节点/china:
ZkClient客户端
简介
ZkClient是一个开源的客户端,在Zookeeper原生API接口的基础上进行了包装,更便于开发人员使用。内部实现了 Session 超时重连,Watcher 反复注册等功能。像 dubbo 等框架对其也进行了集成使用。
API介绍
以下API方法均为ZkClient类中的方法。
创建会话
ZkClient中提供了9个构造器用于创建会话:
- 其中前8个构造方法最终都会调用第9个构造方法,查看该方法源码如下:
public ZkClient(IZkConnection zkConnection, int connectionTimeout, ZkSerializer zkSerializer, long operationRetryTimeout) {
this._childListener = new ConcurrentHashMap();
this._dataListener = new ConcurrentHashMap();
this._stateListener = new CopyOnWriteArraySet();
this._zkEventLock = new ZkLock();
if(zkConnection == null) {
throw new NullPointerException("Zookeeper connection is null!");
} else {
this._connection = zkConnection;
this._zkSerializer = zkSerializer;
this._operationRetryTimeoutInMillis = operationRetryTimeout;
this._isZkSaslEnabled = this.isZkSaslEnabled();
this.connect((long)connectionTimeout, this);
}
}
-
其中具体参数的意义如下:
zkServers:指定 zk 服务器列表,由英文状态逗号分开的 host:port 字符串组成。
connectionTimeout:设置连接创建超时时间,单位毫秒。在此时间内无法创建与 zk 的连接,则直接放弃连接,并抛出异常。
sessionTimeout:设置回话超时时间,单位毫秒。
zkSerializer:为会话指定序列器。zk节点内容仅支持字节数组(byte[])类型,且 zk 不负责序列化。在创建 zkClient 时需要指定所要使用的序列化器,例如 Hessian 或 Kryo。默认使用 Java 自带的序列化方式进行对象的序列化。当为会话指定了序列化器后,客户端在进行读写操作时就会自动进行序列化与反序列化。
connection:IZkConnection 接口对象,是对 zk 原生 API 的最直接包装,是和 zk 最直接的交互层,包含了增删改查等一系列方法。该接口最常用的实现类是 zkClient 默认的实现类 ZkConnection,其可以完成绝大部分的业务需求。
operationRetryTimeout:设置重试超时时间,单位毫秒。
创建节点
ZkClient 中提供了 15 个方法用于创建节点:
- 方法3:
public void createPersistent(String path, boolean createParents, List<ACL> acl) throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
try {
this.create(path, (Object)null, acl, CreateMode.PERSISTENT);
} catch (ZkNodeExistsException var6) {
if(!createParents) {
throw var6;
}
} catch (ZkNoNodeException var7) {
if(!createParents) {
throw var7;
}
String parentDir = path.substring(0, path.lastIndexOf(47));
this.createPersistent(parentDir, createParents, acl);
this.createPersistent(path, createParents, acl);
}
}
- 方法11:
public String create(final String path, Object data, final List<ACL> acl, final CreateMode mode) {
if(path == null) {
throw new NullPointerException("Missing value for path");
} else if(acl != null && acl.size() != 0) {
final byte[] bytes = data == null?null:this.serialize(data);
return (String)this.retryUntilConnected(new Callable<String>() {
public String call() throws Exception {
return ZkClient.this._connection.create(path, bytes, acl, mode);
}
});
} else {
throw new NullPointerException("Missing value for ACL");
}
}
-
其中具体参数的意义如下:
path:要创建的节点完整路径。
data:节点的初始数据内容,可以传入 Object 类型及null。zk 原生 API 中只允许向节点传入 byte[] 数据作为数据内容,但 zkClient 中具有自定义序列化器,所以可以传入各种类型对象。
mode:节点类型,CreateMode枚举常量,常用的有四种类型:PERSISTENT 持久性,PERSISTENT_SEQUENTIAL 持久顺序型,EPHEMERAL:临时型,EPHEMERAL_SEQUENTIAL 临时顺序型。
acl:节点的 ACL 策略。
callback:回调接口。
context:执行回调时可以使用的上下文对象。
createParents:是否递归创建节点。zk 原生 API 中要创建的节点路径必须存在,即要创建子节点,父节点必须存在。但 zkClient 解决了这个问题,可以做递归节点创建。没有父节点,可以先自动创建了父节点,然后再在其下创建子节点。
删除节点
ZkClient 中提供了 3 个方法用于删除节点:
- 查看源码如下:
public boolean delete(String path) {
return this.delete(path, -1);
}
public boolean delete(final String path, final int version) {
try {
this.retryUntilConnected(new Callable<Object>() {
public Object call() throws Exception {
ZkClient.this._connection.delete(path, version);
return null;
}
});
return true;
} catch (ZkNoNodeException var4) {
return false;
}
}
public boolean deleteRecursive(String path) {
List children;
try {
children = this.getChildren(path, false);
} catch (ZkNoNodeException var5) {
return true;
}
Iterator i$ = children.iterator();
String subPath;
do {
if(!i$.hasNext()) {
return this.delete(path);
}
subPath = (String)i$.next();
} while(this.deleteRecursive(path + "/" + subPath));
return false;
}
-
其中具体参数的意义如下:
path:要删除的节点的完整路径。
version:要删除的节点中包含的数据版本。
更新数据
ZkClient 中提供了 3 个方法用于修改节点数据内容:
查看源码如下:
public void writeData(String path, Object object) {
this.writeData(path, object, -1);
}
public void writeData(String path, Object datat, int expectedVersion) {
this.writeDataReturnStat(path, datat, expectedVersion);
}
public Stat writeDataReturnStat(final String path, Object datat, final int expectedVersion) {
final byte[] data = this.serialize(datat);
return (Stat)this.retryUntilConnected(new Callable<Object>() {
public Object call() throws Exception {
Stat stat = ZkClient.this._connection.writeDataReturnStat(path, data, expectedVersion);
return stat;
}
});
}
-
其中具体参数的意义如下:
path:要更新的节点的完整路径。
data:要采用的新的数据值。
expectedVersion:数据更新后要采用的数据版本号。
检测节点是否存在
ZkClient中提供了2个方法用于判断指定节点的存在性,但是 public 方法就一个:只有一个参数的exists()方法:
- 查看源码如下:
protected boolean exists(final String path, final boolean watch) {
return ((Boolean)this.retryUntilConnected(new Callable<Boolean>() {
public Boolean call() throws Exception {
return Boolean.valueOf(ZkClient.this._connection.exists(path, watch));
}
})).booleanValue();
}
public boolean exists(String path) {
return this.exists(path, this.hasListeners(path));
}
-
其中具体参数的意义如下:
path:要判断存在性节点的完整路径。
watch:要判断存在性及其子孙节点是否具有 watcher 监听。
获取节点数据内容
ZkClient中提供了4个方法用于获取节点数据内容,但 public 方法就三个:
- 查看源码如下:
-
其中具体参数的意义如下:
path:要读取数据内容的节点的完整路径。
watch:指定节点及其子孙节点是否具有 watcher 监听。
returnNullIfPathNotExists:这是个 boolean 值。默认情况下若指定的节点不存在,则会抛出KeeperException$NoNodeException异常。设置该值为true,若指定节点不存在,则直接返回 null 而不再抛出异常。
stat:指定当前节点的状态信息。不过,执行过后该 stat 值会被最新获取到的 stat 值给替换。
获取子节点列表
ZkClient中提供了2个方法用于获取节点数据内容,但 public 方法就一个:
- 查看源码如下:
-
其中具体参数的意义如下:
path:要获取子节点列表的节点的完整路径。
watch:要获取子节点列表的节点及其子孙节点是否具有 watcher 监听。
Watcher注册
ZkClient采用 Listener 来实现 Watcher 监听。客户端可以通过注册相关监听器来实现对 zk 服务器事件的订阅。可以通过 subscribeXxx() 方法实现 watcher 注册,即相关事件订阅;通过 unsubscribeXxx() 方法取消相关事件的订阅。
- 查看源码如下:
-
其中具体参数的意义如下:
path:要操作节点的完整路径。
IZkChildListener:子节点数量变化监听器。
IZkDataListener:数据内容变化监听器。
IZkStateListener:客户端与zk的会话连接状态变化监听器,可以监听新会话的创建、会话创建出错、连接状态改变。连接状态是系统定义好的枚举类型 Event.KeeperState 的常量。
代码演示
创建工程
创建一个Java Maven工程,并导入一下依赖
<!-- zkClient依赖 -->
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
代码
public class ZKClientTest {
// 指定zk集群
private static final String CLUSTER = "192.168.254.128:2181";
// 指定节点名称
private static final String PATH = "/mylog";
public static void main(String[] args){
// ----------- 创建会话 -----------
// 创建 zkclient
ZkClient zkClient = new ZkClient(CLUSTER);
// 为 zkclient 指定序列化器
zkClient.setZkSerializer(new SerializableSerializer());
// ----------- 创建节点 -----------
// 指定创建持久节点
CreateMode mode = CreateMode.PERSISTENT;
// 指定节点数据内容
String data = "first log";
// 创建节点
String nodeName = zkClient.create(PATH, data, mode);
System.out.println("新创建的节点名称为:" + nodeName);
// --------- 获取数据内容 ----------
Object readData = zkClient.readData(PATH);
System.out.println("节点的数据内容为:" + readData);
// --------- 注册Watcher ---------
zkClient.subscribeDataChanges(PATH, new IZkDataListener(){
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.print("节点" + dataPath);
System.out.println("的数据已经更新为了" + data);
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println(dataPath + "的数据内容被删除");
}
});
// --------- 更新数据内容 ----------
zkClient.writeData(PATH, "second log");
String updateData = zkClient.readData(PATH);
System.out.println("更新过的数据内容为:" + updateData);
// ----------- 删除节点 -----------
zkClient.delete(PATH);
// --------- 判断节点存在性 -----------
boolean isExists = zkClient.exists(PATH);
System.out.println(PATH + "节点仍存在吗?" + isExists);
}
}
Curator客户端
简介
Curator 是 Netflix 公司开源的一套 zk 客户端框架,与 ZkClient一样,其也封装了 zk 原生 API。其目前已经成为 Apache 的顶级项目,同时,Curator 还提供了一套易用性、可读性更强的 **Fluent 风格(链式调用)**的客户端 API 框架。
API介绍
这里主要以 Fluent 风格客户端 API 为主进行介绍。
创建会话
普通 API 创建 newClient()
- 在CuratorFrameworkFactory类中提供了两个静态方法用于完成会话的创建。
- 查看源码如下:
private static final int DEFAULT_SESSION_TIMEOUT_MS = Integer.getInteger("curator-default-session-timeout", '\uea60').intValue();
private static final int DEFAULT_CONNECTION_TIMEOUT_MS = Integer.getInteger("curator-default-connection-timeout", 15000).intValue();
public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy) {
return newClient(connectString, DEFAULT_SESSION_TIMEOUT_MS, DEFAULT_CONNECTION_TIMEOUT_MS, retryPolicy);
}
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy) {
return builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build();
}
-
其中具体参数的意义如下:
connectString:指定 zk 服务器列表,由英文状态逗号分开的 host:port 字符串组成。
sessionTimeoutMs:设置会话超时时间,单位毫秒,默认 60 秒。
connectionTimeoutMs:设置连接超时时间,单位毫秒,默认 15 秒。
retryPolicy:重试策略,内置有四种策略,分别由以下四个类的实例指定:ExponentialBackoffRetry、RetryNTimes、RetryOneTime、RetryUntilElapsed。
Fluent风格创建
创建节点 create()
下面以满足各种需求的举例方式分别讲解节点创建的方法。说明:下面所使用的 client 为前面所创建的 Curator 客户端实例。
- 创建一个节点,初始内容为空:
// 默认创建的是持久节点,数据内容为空
client.create().forPath(path);
- 创建一个节点,附带初始内容:
// Curator在指定数据内容时,只能使用 byte[] 作为方法参数
client.create().forPath(path, "mydata".getBytes())
- 创建一个临时节点,初始内容为空:
// CreateMode为枚举类型
client.create().widthMode(CreateMode.EPHEMERAL).forPath(path);
- 创建一个临时节点,并自动递归创建父节点:
// 若指定的节点多级父节点均不存在,则会自动创建
client.create().createingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
删除节点 delete()
- 删除一个节点:
// 只能将叶子节点删除,其父节点不会被删除
client.delete().forPath(path);
- 删除一个节点,并递归删除其所有子节点:
// 该方法在使用时需谨慎
client.delete().deletingChildrenIfNeeded().forPath(path);
更新数据 setData()
- 设置一个节点的数据内容:
检测节点是否存在 checkExists()
- 设置一个节点的数据内容:
// 该方法具有返回值,返回值为Stat状态对象。若stat为null,说明该节点不存在,否则说明节点是存在的
Stat stat = client.checkExists().forPath(path);
获取节点数据内容 getData()
- 读取一个节点的数据内容:
// 其返回值为byte[]数组
byte[] data = client.getData().forPath(path);
获取子节点列表 getChildren()
- 读取一个节点的所有子节点列表:
// 其返回值为List数组
List<String> childrenNames = client.getChildren().forPath(path);
Watcher注册 usingWatcher()
curator 中绑定 watcher 的操纵有三个:checkExists()、getData()、getChildren()。这三个方法的共性是:都用于获取。这三个操作用于 watcher 注册的方法是相同的,都是usingWatcher()方法。
这两个方法中的参数 CuratorWatcher 与 Watcher 都为接口。这两个接口中均包含一个 process() 方法,它们的区别是:CuratorWatcher 中的 process() 方法能够抛出异常,这样的话,该异常就可以被记录到日志中。
- 监听节点的存在性变化:
Stat stat = client.checkExists().usingWatcher((CuratorWatcher) event -> {
System.out.println("节点存在性发生变化");
}).forPath(path);
- 监听节点的内容变化:
byte[] data = client.getData().usingWatcher((CuratorWatcher) event -> {
System.out.println("节点数据内容发生变化");
}).forPath(path);
- 监听节点子节点列表变化:
List<String> sons = client.getChildren().usingWatcher((CuratorWatcher) event -> {
System.out.println("节点的子节点列表发生变化");
}).forPath(path);
代码演示
创建工程
<!-- curator依赖 -->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
代码
public class FluentTest {
public static void main(String[] args) throws Exception{
// ----------- 创建会话 -----------
// 创建重试策略对象:第1秒重试1次,最多重试3次
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
// 创建客户端
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("10.5.5.181:2181")
.sessionTimeoutMs(15000)
.connectionTimeoutMs(13000)
.retryPolicy(retryPolicy)
.namespace("logs")
.build();
// 开启客户端
client.start();
// 指定要创建和操作的节点,注意:其是相对于 /logs 节点的
String nodePath = "/host";
// ----------- 创建节点 -----------
String nodeName = client.create().forPath(nodePath, "myhost".getBytes());
System.out.println("新创建的节点名称为:" + nodeName);
// ---- 获取数据内容并注册Watcher ----
byte[] data = client.getData().usingWatcher((CuratorWatcher) event -> {
System.out.println(event.getPath() + "数据内容发送变化");
}).forPath(nodePath);
System.out.println("节点的数据内容为:" + new String(data));
// --------- 更新数据内容 ----------
client.setData().forPath(nodePath, "newhost".getBytes());
// 获取更新过的数据内容
byte[] newData = client.getData().forPath(nodePath);
System.out.println("更新过的数据内容为:" + new String(newData));
// ----------- 删除节点 -----------
client.delete().forPath(nodePath);
// --------- 判断节点存在性 -----------
Stat stat = client.checkExists().forPath(nodePath);
boolean isExists = true;
if(stat == null){
isExists = false;
}
System.out.println(nodePath + "节点仍存在吗?" + isExists);
}
}