天天看点

JavaEE 企业级分布式高级架构师(八)Zookeeper学习笔记(2)Zookeeper技术内幕

Zookeeper学习笔记

  • Zookeeper技术内幕
    • 重要理论
      • 数据模型znode
        • 节点类型
        • 节点状态
      • 会话
        • 会话状态
        • 会话连接超时管理——客户端维护
        • 会话连接事件
        • 会话空闲超时管理——服务端维护
      • ACL
        • ACL简介
        • zk的ACL维度
          • 授权策略scheme
          • 授权对象id
          • 权限permission
      • Watcher机制
        • Watcher工作原理
        • Watcher事件
        • Watcher特性
    • 客户端命令
      • 启动客户端
      • 查看子节点-ls
      • 创建节点-create
      • 获取节点数据内容-get
      • 更新节点数据内容-set
      • 删除节点-delete
      • ACL操作
    • ZkClient客户端
      • 简介
      • API介绍
        • 创建会话
        • 创建节点
        • 删除节点
        • 更新数据
        • 检测节点是否存在
        • 获取节点数据内容
        • 获取子节点列表
        • Watcher注册
      • 代码演示
        • 创建工程
        • 代码
    • Curator客户端
      • 简介
      • API介绍
        • 创建会话
          • 普通 API 创建 newClient()
          • Fluent风格创建
        • 创建节点 create()
        • 删除节点 delete()
        • 更新数据 setData()
        • 检测节点是否存在 checkExists()
        • 获取节点数据内容 getData()
        • 获取子节点列表 getChildren()
        • Watcher注册 usingWatcher()
      • 代码演示
        • 创建工程
        • 代码

Zookeeper技术内幕

重要理论

数据模型znode

JavaEE 企业级分布式高级架构师(八)Zookeeper学习笔记(2)Zookeeper技术内幕

zk数据存储结构与标准的 Unix 文件系统非常相似,都是在根节点下挂很多子节点。zk中没有引入传统文件系统中目录与文件的概念,而是使用了称为 znode 的数据节点概念。znode 是 zk 中数据的最小单元,每个 znode 上都可以保存数据,同时还可以挂载子节点,形成一个树形化命名空间。

节点类型

  • 持久节点(PERSISENT):一旦创建,除非主动调用删除操作,否则一直存储在 zk 上。
  • 持久顺序节点(PERSISTENT_SEQUENTIAL):这类节点的基本特性和上面的节点类型是一致的。额外的特性是,在ZK中,每个父节点会为他的第一级子节点维护一份时序,会记录每个子节点创建的先后顺序。基于这个特性,在创建子节点的时候,可以设置这个属性,那么在创建节点过程中,ZK会自动为给定节点名加上一个数字后缀,作为新的节点名。这个数字后缀的范围是整型的最大值。 在创建节点的时候只需要传入节点 “/test_”,这样之后,zookeeper 自动会给“test_”后面补充数字。
  • 临时节点(EPHEMERAL):与客户端会话绑定,一旦客户端会话失效,这个客户端所创建的所有临时节点都会被移除;只能作为叶子节点,即其不能挂载子节点。
  • 临时顺序节点(EPHEMERAL_SEQUENTIAL):此节点是属于临时节点,不过带有顺序,客户端会话结束节点就消失。

节点状态

  • cZxid:Created Zxid,表示当前 znode 被创建的事务 ID
  • ctime:Created Time,表示当前 znode 被创建的时间
  • mZxid:Modified Zxid,表示当前 znode 最后一次被修改时的事务 ID
  • mtime:Modified Time,表示当前 znode 最后一次被修改时的时间
  • pZxid:表示当前 znode 的子节点列表最后一次被修改时的事务 ID。注意:只能是其子节点列表变更了才会引起 pZxid 的变更,子节点内容的修改不会影响 pZxid。
  • cversion:Children Version,表示子节点的版本号,该版本号用于充当乐观锁。
  • dataVersion:表示当前 znode 数据的版本号,该版本号用于充当乐观锁。
  • aclVersion:表示当前 znode 的权限 ACL 的版本号,该版本号用于充当乐观锁。
  • ephemeralOwner:若当前 znode 是持久节点,则其值为 0;若为临时节点,则其值为创建该节点的会话的 SessionID。当会话消失后,会根据 SessionID来查找与该会话相关的临时节点进行删除。
  • dataLength:当前 znode 中存放的数据的长度。
  • numChildren:当前 znode 所包含的子节点的个数。

会话

  • 会话是 zk 中最重要的概念之一,客户端与服务端之间的任何交互操作都与会话相关。
  • Zookeeper客户端启动时,首先会与 zk 服务建立一个TCP长连接。连接一旦建立,客户端会话的生命周期也就开始了。
JavaEE 企业级分布式高级架构师(八)Zookeeper学习笔记(2)Zookeeper技术内幕

会话状态

会话的状态有三种:

  • 连接中 CONNECTING:连接中。Client要创建一个连接,其首先会在本地创建一个 zk 对象,用于表示其所连接上的server。
  • 已连接 CONNECTED:已连接。连接成功后,该连接的各种临时性数据会被初始化到 zk 对象中。
  • 已关闭 CLOSE:已关闭。连接关闭后,这个代表Server的 zk 对象会被删除。

会话连接超时管理——客户端维护

  • zk 客户端维护着会话超时管理,主要管理的超时有两类:读超时与连接超时。
  • 当客户端长时间没有收到服务端请求响应或心跳响应时,会发生读超时;当客户端发出连接请求后,长时间没有收到服务端的连接成功ACK,此时发生连接超时。

会话连接事件

客户端与服务端连接成功后并不是一成不变的,也会出现一些问题,比较典型的有以下三种:

  • 会话丢失:因为网络抖动等原因导致客户端长时间收不到服务端的心跳回复,客户端就会导致连接丢失。连接丢失会引发客户端自动从 zk 地址列表中逐个尝试重新连接,直到重连成功,或按照指定的重试策略终止。
  • 会话转移:当发生连接丢失后,客户端又以原来的 sessionId 重新连接上了服务器。若重连上的服务器不是原来的服务器,那么客户端就需要更新本地 zk 对象中的相关信息,例如连接上的 Server 的 IP 地址,这就是会话转移。
  • 会话失效:若客户端连接丢失后,在会话超时范围内没有连接上服务器,则服务器会将该会话从服务器中删除。在服务端将某客户端的会话删除后,该客户端仍使用原来的 sessionId 又重新连接上了服务器。那么服务器会给客户端发送一个连接关闭响应,表示这个会话已经失效。客户端在收到响应后会根据配置,要么关闭连接,要么重新发起新的会话 id 的连接。

会话空闲超时管理——服务端维护

  • 服务器为每一个客户端的会话都记录着上一次交互后空闲的时长,及从上一次交互结束开始会话空闲超时的时间点。一旦空闲时长超时,服务端就会将该会话的 SessionID从服务端清除。这也就是为什么客户端在空闲时需要定时向服务端发送心跳,就是为了维护这个会话长连接的。服务器是通过空闲超时管理来判断会话是否发生中断的。
  • 服务端对于会话空闲超时管理,采用了一种特殊的方式——分桶策略。

ACL

ACL简介

  • ACL全称为 Access Control List(访问控制列表),是一种细粒度的权限管理方式,可以针对任意用户和组进行细粒度的权限控制。目前大多数 Unix 已经支持了 ACL,Linux 也从 2.6 版本开始支持了 ACL(在此之前是UGO,User Group Others 权限控制模型)。zk 利用 ACL 策略控制 znode 节点的访问权限,如节点数据读写、节点创建、节点删除、读取子节点列表、设置节点权限等。
  • 不过需要注意的是,Unix/Linux 文件或子目录默认会继承其父目录的 ACL。而在 Zookeeper 中,znode 的 ACL 是没有继承关系的,每个 znode 的权限都是独立控制的,只有客户端满足 znode 设置的权限要求时,才能完成相应的操作。

zk的ACL维度

Unix/Linux 系统的 ACL 分为两个维度:组与权限。而 Zookeeper 的 ACL 分为三个维度:授权策略scheme、授权对象id、用户权限permission。

授权策略scheme

授权策略用于确定权限验证过程中使用的验证策略(简单地说就是,通过什么来验证权限,即一个用户要访问某个 znode,如何验证其身份),在 zk 中最常用的有四种策略:

  • ip:使用 ip 地址进行权限验证
  • digest:使用 用户名+密码 进行权限验证
  • world:不验证
  • super:超级用户
授权对象id

授权对象指的是权限赋予的用户,不同的授权策略具有不同类型的授权对象。下面是各个授权模式对应的授权对象id:

  • ip:授权对象的 IP 地址
  • digest:授权对象是 “用户名:密码”
  • world:只有一个,即 anyone
  • super:与 digest 模式相同
权限permission

权限指的是通过验证的用户可以对 znode 执行的操作,共有五种权限,不过 zk 支持自定义权限。

  • c:Create:允许创建子节点
  • d:Delete:允许删除当前节点
  • r:Read:允许读取当前节点的数据内容,以及子节点列表
  • w:Write:允许修改当前节点的数据内容,以及子节点列表
  • a:admin/acl:允许修改当前节点的 ACL 权限

Watcher机制

zk 通过 Watcher 机制实现了发布/订阅模式。

Watcher工作原理

  • zk客户端生成一个watcher对象,将该对象加入到 WatcherManagement 集合中,一旦加入到该集合中就会向zk服务端发送该watcher对象的注册信息,服务端接收到该信息并进行注册(并没有发送watcher对象)。
  • 当服务端触发了watcher事件,zk服务端会向zk客户端发送相应的事件通知。
  • zk客户端接收到该事件后,会根据通知找到相应的watcher对象,然后watcher对象会执行响应的回调方法。
    JavaEE 企业级分布式高级架构师(八)Zookeeper学习笔记(2)Zookeeper技术内幕

Watcher事件

对于同一个事件类型,在不同的通知状态中代表的含义是不同的。

JavaEE 企业级分布式高级架构师(八)Zookeeper学习笔记(2)Zookeeper技术内幕

Watcher特性

zk的watcher机制具有以下几个特性:

  • 一次性:无论是服务端还是客户端,一旦一个 Watcher 被触发,ZooKeeper 都会将其从相应的存储中移除。因此,在 Watcher 的使用上,需要反复注册。这样的设计有效地减轻了服务端的压力。【Zookeeper不适合监听高频变化对象】
  • 串行化:客户端 Watcher 回调的过程是一个串行同步的过程,这为我们保证了顺序。同时,需要注意的一点是,一定不能因为一个 Watcher 的处理逻辑影响了整个客户端的 Watcher 回调,所以,我觉得客户端 Watcher 的实现类要另开一个线程进行处理业务逻辑,以便给其他的 Watcher 调用让出时间。
  • 轻量级:WatchedEvent 是最小通知单元,仅仅包括:通知状态、事件类型、节点路径。仅仅通知客户端发生了事件,不会带事件具体内容,具体内容需要客户端再次请求获取。客户端向服务端传递的也不是watcher 对象,使用Boolean类型标记属性,服务端保存当前连接的ServerCnxn。

客户端命令

启动客户端

  • 连接本机 zk 服务器:zkCli.sh
  • 连接其它 zk 服务器:zkCli.sh -server 192.168.254.130:2181

查看子节点-ls

  • 查看根节点及/brokers下包含的所有子节点列表:
    JavaEE 企业级分布式高级架构师(八)Zookeeper学习笔记(2)Zookeeper技术内幕

创建节点-create

  • 创建永久节点:创建一个名称为 china 的 znode,其值为 999
    JavaEE 企业级分布式高级架构师(八)Zookeeper学习笔记(2)Zookeeper技术内幕
  • 创建顺序节点:在 /china 节点下创建顺序子节点 beijing、shanghai、guangzhou,它们的数据内容分别为 bj、sh、gz
    JavaEE 企业级分布式高级架构师(八)Zookeeper学习笔记(2)Zookeeper技术内幕
  • 创建临时节点:临时节点与持久节点的区别,在后面 get 命令中可以看到
    JavaEE 企业级分布式高级架构师(八)Zookeeper学习笔记(2)Zookeeper技术内幕

获取节点数据内容-get

  • 获取持久节点数据:
    JavaEE 企业级分布式高级架构师(八)Zookeeper学习笔记(2)Zookeeper技术内幕
  • 获取顺序节点数据:
    JavaEE 企业级分布式高级架构师(八)Zookeeper学习笔记(2)Zookeeper技术内幕
  • 获取临时节点数据:
    JavaEE 企业级分布式高级架构师(八)Zookeeper学习笔记(2)Zookeeper技术内幕

更新节点数据内容-set

  • 更新数据:
    JavaEE 企业级分布式高级架构师(八)Zookeeper学习笔记(2)Zookeeper技术内幕

删除节点-delete

  • 删除操作:
    JavaEE 企业级分布式高级架构师(八)Zookeeper学习笔记(2)Zookeeper技术内幕
  • 若要删除具有子节点的节点,会报错:
    JavaEE 企业级分布式高级架构师(八)Zookeeper学习笔记(2)Zookeeper技术内幕

ACL操作

  • 查看权限-getAcl:
    JavaEE 企业级分布式高级架构师(八)Zookeeper学习笔记(2)Zookeeper技术内幕
  • 设置权限:下面的命令是,首先增加一个认证用户 zs,密码为 123,然后为 /china 节点指定只有 zs 用户才可以访问该节点,而访问权限为所有权限。
    JavaEE 企业级分布式高级架构师(八)Zookeeper学习笔记(2)Zookeeper技术内幕
    quit 退出后再进入访问节点/china:
    JavaEE 企业级分布式高级架构师(八)Zookeeper学习笔记(2)Zookeeper技术内幕
    JavaEE 企业级分布式高级架构师(八)Zookeeper学习笔记(2)Zookeeper技术内幕

ZkClient客户端

简介

ZkClient是一个开源的客户端,在Zookeeper原生API接口的基础上进行了包装,更便于开发人员使用。内部实现了 Session 超时重连,Watcher 反复注册等功能。像 dubbo 等框架对其也进行了集成使用。

API介绍

以下API方法均为ZkClient类中的方法。

创建会话

ZkClient中提供了9个构造器用于创建会话:

JavaEE 企业级分布式高级架构师(八)Zookeeper学习笔记(2)Zookeeper技术内幕
  • 其中前8个构造方法最终都会调用第9个构造方法,查看该方法源码如下:
public ZkClient(IZkConnection zkConnection, int connectionTimeout, ZkSerializer zkSerializer, long operationRetryTimeout) {
    this._childListener = new ConcurrentHashMap();
    this._dataListener = new ConcurrentHashMap();
    this._stateListener = new CopyOnWriteArraySet();
    this._zkEventLock = new ZkLock();
    if(zkConnection == null) {
        throw new NullPointerException("Zookeeper connection is null!");
    } else {
        this._connection = zkConnection;
        this._zkSerializer = zkSerializer;
        this._operationRetryTimeoutInMillis = operationRetryTimeout;
        this._isZkSaslEnabled = this.isZkSaslEnabled();
        this.connect((long)connectionTimeout, this);
    }
}
           
  • 其中具体参数的意义如下:

    zkServers:指定 zk 服务器列表,由英文状态逗号分开的 host:port 字符串组成。

    connectionTimeout:设置连接创建超时时间,单位毫秒。在此时间内无法创建与 zk 的连接,则直接放弃连接,并抛出异常。

    sessionTimeout:设置回话超时时间,单位毫秒。

    zkSerializer:为会话指定序列器。zk节点内容仅支持字节数组(byte[])类型,且 zk 不负责序列化。在创建 zkClient 时需要指定所要使用的序列化器,例如 Hessian 或 Kryo。默认使用 Java 自带的序列化方式进行对象的序列化。当为会话指定了序列化器后,客户端在进行读写操作时就会自动进行序列化与反序列化。

    connection:IZkConnection 接口对象,是对 zk 原生 API 的最直接包装,是和 zk 最直接的交互层,包含了增删改查等一系列方法。该接口最常用的实现类是 zkClient 默认的实现类 ZkConnection,其可以完成绝大部分的业务需求。

    operationRetryTimeout:设置重试超时时间,单位毫秒。

创建节点

ZkClient 中提供了 15 个方法用于创建节点:

JavaEE 企业级分布式高级架构师(八)Zookeeper学习笔记(2)Zookeeper技术内幕
  • 方法3:
public void createPersistent(String path, boolean createParents, List<ACL> acl) throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException {
    try {
        this.create(path, (Object)null, acl, CreateMode.PERSISTENT);
    } catch (ZkNodeExistsException var6) {
        if(!createParents) {
            throw var6;
        }
    } catch (ZkNoNodeException var7) {
        if(!createParents) {
            throw var7;
        }

        String parentDir = path.substring(0, path.lastIndexOf(47));
        this.createPersistent(parentDir, createParents, acl);
        this.createPersistent(path, createParents, acl);
    }

}
           
  • 方法11:
public String create(final String path, Object data, final List<ACL> acl, final CreateMode mode) {
    if(path == null) {
        throw new NullPointerException("Missing value for path");
    } else if(acl != null && acl.size() != 0) {
        final byte[] bytes = data == null?null:this.serialize(data);
        return (String)this.retryUntilConnected(new Callable<String>() {
            public String call() throws Exception {
                return ZkClient.this._connection.create(path, bytes, acl, mode);
            }
        });
    } else {
        throw new NullPointerException("Missing value for ACL");
    }
}
           
  • 其中具体参数的意义如下:

    path:要创建的节点完整路径。

    data:节点的初始数据内容,可以传入 Object 类型及null。zk 原生 API 中只允许向节点传入 byte[] 数据作为数据内容,但 zkClient 中具有自定义序列化器,所以可以传入各种类型对象。

    mode:节点类型,CreateMode枚举常量,常用的有四种类型:PERSISTENT 持久性,PERSISTENT_SEQUENTIAL 持久顺序型,EPHEMERAL:临时型,EPHEMERAL_SEQUENTIAL 临时顺序型。

    acl:节点的 ACL 策略。

    callback:回调接口。

    context:执行回调时可以使用的上下文对象。

    createParents:是否递归创建节点。zk 原生 API 中要创建的节点路径必须存在,即要创建子节点,父节点必须存在。但 zkClient 解决了这个问题,可以做递归节点创建。没有父节点,可以先自动创建了父节点,然后再在其下创建子节点。

删除节点

ZkClient 中提供了 3 个方法用于删除节点:

JavaEE 企业级分布式高级架构师(八)Zookeeper学习笔记(2)Zookeeper技术内幕
  • 查看源码如下:
public boolean delete(String path) {
    return this.delete(path, -1);
}
public boolean delete(final String path, final int version) {
    try {
        this.retryUntilConnected(new Callable<Object>() {
            public Object call() throws Exception {
                ZkClient.this._connection.delete(path, version);
                return null;
            }
        });
        return true;
    } catch (ZkNoNodeException var4) {
        return false;
    }
}
public boolean deleteRecursive(String path) {
    List children;
    try {
        children = this.getChildren(path, false);
    } catch (ZkNoNodeException var5) {
        return true;
    }

    Iterator i$ = children.iterator();

    String subPath;
    do {
        if(!i$.hasNext()) {
            return this.delete(path);
        }

        subPath = (String)i$.next();
    } while(this.deleteRecursive(path + "/" + subPath));

    return false;
}
           
  • 其中具体参数的意义如下:

    path:要删除的节点的完整路径。

    version:要删除的节点中包含的数据版本。

更新数据

ZkClient 中提供了 3 个方法用于修改节点数据内容:

JavaEE 企业级分布式高级架构师(八)Zookeeper学习笔记(2)Zookeeper技术内幕

查看源码如下:

public void writeData(String path, Object object) {
    this.writeData(path, object, -1);
}
public void writeData(String path, Object datat, int expectedVersion) {
    this.writeDataReturnStat(path, datat, expectedVersion);
}
public Stat writeDataReturnStat(final String path, Object datat, final int expectedVersion) {
    final byte[] data = this.serialize(datat);
    return (Stat)this.retryUntilConnected(new Callable<Object>() {
        public Object call() throws Exception {
            Stat stat = ZkClient.this._connection.writeDataReturnStat(path, data, expectedVersion);
            return stat;
        }
    });
}
           
  • 其中具体参数的意义如下:

    path:要更新的节点的完整路径。

    data:要采用的新的数据值。

    expectedVersion:数据更新后要采用的数据版本号。

检测节点是否存在

ZkClient中提供了2个方法用于判断指定节点的存在性,但是 public 方法就一个:只有一个参数的exists()方法:

JavaEE 企业级分布式高级架构师(八)Zookeeper学习笔记(2)Zookeeper技术内幕
  • 查看源码如下:
protected boolean exists(final String path, final boolean watch) {
   return ((Boolean)this.retryUntilConnected(new Callable<Boolean>() {
        public Boolean call() throws Exception {
            return Boolean.valueOf(ZkClient.this._connection.exists(path, watch));
        }
    })).booleanValue();
}

public boolean exists(String path) {
    return this.exists(path, this.hasListeners(path));
}
           
  • 其中具体参数的意义如下:

    path:要判断存在性节点的完整路径。

    watch:要判断存在性及其子孙节点是否具有 watcher 监听。

获取节点数据内容

ZkClient中提供了4个方法用于获取节点数据内容,但 public 方法就三个:

JavaEE 企业级分布式高级架构师(八)Zookeeper学习笔记(2)Zookeeper技术内幕
  • 查看源码如下:
    JavaEE 企业级分布式高级架构师(八)Zookeeper学习笔记(2)Zookeeper技术内幕
  • 其中具体参数的意义如下:

    path:要读取数据内容的节点的完整路径。

    watch:指定节点及其子孙节点是否具有 watcher 监听。

    returnNullIfPathNotExists:这是个 boolean 值。默认情况下若指定的节点不存在,则会抛出KeeperException$NoNodeException异常。设置该值为true,若指定节点不存在,则直接返回 null 而不再抛出异常。

    stat:指定当前节点的状态信息。不过,执行过后该 stat 值会被最新获取到的 stat 值给替换。

获取子节点列表

ZkClient中提供了2个方法用于获取节点数据内容,但 public 方法就一个:

JavaEE 企业级分布式高级架构师(八)Zookeeper学习笔记(2)Zookeeper技术内幕
  • 查看源码如下:
    JavaEE 企业级分布式高级架构师(八)Zookeeper学习笔记(2)Zookeeper技术内幕
  • 其中具体参数的意义如下:

    path:要获取子节点列表的节点的完整路径。

    watch:要获取子节点列表的节点及其子孙节点是否具有 watcher 监听。

Watcher注册

ZkClient采用 Listener 来实现 Watcher 监听。客户端可以通过注册相关监听器来实现对 zk 服务器事件的订阅。可以通过 subscribeXxx() 方法实现 watcher 注册,即相关事件订阅;通过 unsubscribeXxx() 方法取消相关事件的订阅。

JavaEE 企业级分布式高级架构师(八)Zookeeper学习笔记(2)Zookeeper技术内幕
  • 查看源码如下:
    JavaEE 企业级分布式高级架构师(八)Zookeeper学习笔记(2)Zookeeper技术内幕
    JavaEE 企业级分布式高级架构师(八)Zookeeper学习笔记(2)Zookeeper技术内幕
    JavaEE 企业级分布式高级架构师(八)Zookeeper学习笔记(2)Zookeeper技术内幕
  • 其中具体参数的意义如下:

    path:要操作节点的完整路径。

    IZkChildListener:子节点数量变化监听器。

    IZkDataListener:数据内容变化监听器。

    IZkStateListener:客户端与zk的会话连接状态变化监听器,可以监听新会话的创建、会话创建出错、连接状态改变。连接状态是系统定义好的枚举类型 Event.KeeperState 的常量。

代码演示

创建工程

创建一个Java Maven工程,并导入一下依赖

<!-- zkClient依赖 -->
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.10</version>
</dependency>
           

代码

public class ZKClientTest {
    // 指定zk集群
    private static final String CLUSTER = "192.168.254.128:2181";
    // 指定节点名称
    private static final String PATH = "/mylog";

    public static void main(String[] args){
        // ----------- 创建会话 -----------
        // 创建 zkclient
        ZkClient zkClient = new ZkClient(CLUSTER);
        // 为 zkclient 指定序列化器
        zkClient.setZkSerializer(new SerializableSerializer());

        // ----------- 创建节点 -----------
        // 指定创建持久节点
        CreateMode mode = CreateMode.PERSISTENT;
        // 指定节点数据内容
        String data = "first log";
        // 创建节点
        String nodeName = zkClient.create(PATH, data, mode);
        System.out.println("新创建的节点名称为:" + nodeName);

        // --------- 获取数据内容 ----------
        Object readData = zkClient.readData(PATH);
        System.out.println("节点的数据内容为:" + readData);

        // --------- 注册Watcher ---------
        zkClient.subscribeDataChanges(PATH, new IZkDataListener(){
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
                System.out.print("节点" + dataPath);
                System.out.println("的数据已经更新为了" + data);
            }
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                System.out.println(dataPath + "的数据内容被删除");
            }
        });

        // --------- 更新数据内容 ----------
        zkClient.writeData(PATH, "second log");
        String updateData = zkClient.readData(PATH);
        System.out.println("更新过的数据内容为:" + updateData);

        // ----------- 删除节点 -----------
        zkClient.delete(PATH);

        // --------- 判断节点存在性 -----------
        boolean isExists = zkClient.exists(PATH);
        System.out.println(PATH + "节点仍存在吗?" + isExists);
    }
}
           

Curator客户端

简介

Curator 是 Netflix 公司开源的一套 zk 客户端框架,与 ZkClient一样,其也封装了 zk 原生 API。其目前已经成为 Apache 的顶级项目,同时,Curator 还提供了一套易用性、可读性更强的 **Fluent 风格(链式调用)**的客户端 API 框架。

API介绍

这里主要以 Fluent 风格客户端 API 为主进行介绍。

创建会话

普通 API 创建 newClient()
  • 在CuratorFrameworkFactory类中提供了两个静态方法用于完成会话的创建。
    JavaEE 企业级分布式高级架构师(八)Zookeeper学习笔记(2)Zookeeper技术内幕
  • 查看源码如下:
private static final int DEFAULT_SESSION_TIMEOUT_MS = Integer.getInteger("curator-default-session-timeout", '\uea60').intValue();
    private static final int DEFAULT_CONNECTION_TIMEOUT_MS = Integer.getInteger("curator-default-connection-timeout", 15000).intValue();
    
public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy) {
    return newClient(connectString, DEFAULT_SESSION_TIMEOUT_MS, DEFAULT_CONNECTION_TIMEOUT_MS, retryPolicy);
}

public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy) {
    return builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build();
}
           
  • 其中具体参数的意义如下:

    connectString:指定 zk 服务器列表,由英文状态逗号分开的 host:port 字符串组成。

    sessionTimeoutMs:设置会话超时时间,单位毫秒,默认 60 秒。

    connectionTimeoutMs:设置连接超时时间,单位毫秒,默认 15 秒。

    retryPolicy:重试策略,内置有四种策略,分别由以下四个类的实例指定:ExponentialBackoffRetry、RetryNTimes、RetryOneTime、RetryUntilElapsed。

Fluent风格创建
JavaEE 企业级分布式高级架构师(八)Zookeeper学习笔记(2)Zookeeper技术内幕

创建节点 create()

下面以满足各种需求的举例方式分别讲解节点创建的方法。说明:下面所使用的 client 为前面所创建的 Curator 客户端实例。

  • 创建一个节点,初始内容为空:
// 默认创建的是持久节点,数据内容为空
client.create().forPath(path);
           
  • 创建一个节点,附带初始内容:
// Curator在指定数据内容时,只能使用 byte[] 作为方法参数
client.create().forPath(path, "mydata".getBytes())
           
  • 创建一个临时节点,初始内容为空:
// CreateMode为枚举类型
client.create().widthMode(CreateMode.EPHEMERAL).forPath(path);
           
  • 创建一个临时节点,并自动递归创建父节点:
// 若指定的节点多级父节点均不存在,则会自动创建
client.create().createingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
           

删除节点 delete()

  • 删除一个节点:
// 只能将叶子节点删除,其父节点不会被删除
client.delete().forPath(path);
           
  • 删除一个节点,并递归删除其所有子节点:
// 该方法在使用时需谨慎
client.delete().deletingChildrenIfNeeded().forPath(path);
           

更新数据 setData()

  • 设置一个节点的数据内容:

检测节点是否存在 checkExists()

  • 设置一个节点的数据内容:
// 该方法具有返回值,返回值为Stat状态对象。若stat为null,说明该节点不存在,否则说明节点是存在的
Stat stat = client.checkExists().forPath(path);
           

获取节点数据内容 getData()

  • 读取一个节点的数据内容:
// 其返回值为byte[]数组
byte[] data = client.getData().forPath(path);
           

获取子节点列表 getChildren()

  • 读取一个节点的所有子节点列表:
// 其返回值为List数组
List<String> childrenNames = client.getChildren().forPath(path);
           

Watcher注册 usingWatcher()

curator 中绑定 watcher 的操纵有三个:checkExists()、getData()、getChildren()。这三个方法的共性是:都用于获取。这三个操作用于 watcher 注册的方法是相同的,都是usingWatcher()方法。

JavaEE 企业级分布式高级架构师(八)Zookeeper学习笔记(2)Zookeeper技术内幕

这两个方法中的参数 CuratorWatcher 与 Watcher 都为接口。这两个接口中均包含一个 process() 方法,它们的区别是:CuratorWatcher 中的 process() 方法能够抛出异常,这样的话,该异常就可以被记录到日志中。

  • 监听节点的存在性变化:
Stat stat = client.checkExists().usingWatcher((CuratorWatcher) event -> {
	System.out.println("节点存在性发生变化");
}).forPath(path);
           
  • 监听节点的内容变化:
byte[] data = client.getData().usingWatcher((CuratorWatcher) event -> {
	System.out.println("节点数据内容发生变化");
}).forPath(path);
           
  • 监听节点子节点列表变化:
List<String> sons = client.getChildren().usingWatcher((CuratorWatcher) event -> {
	System.out.println("节点的子节点列表发生变化");
}).forPath(path);
           

代码演示

创建工程

<!-- curator依赖 -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>2.12.0</version>
</dependency>
           

代码

public class FluentTest {
    public static void main(String[] args) throws Exception{
        // ----------- 创建会话 -----------
        // 创建重试策略对象:第1秒重试1次,最多重试3次
        ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
        // 创建客户端
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("10.5.5.181:2181")
                .sessionTimeoutMs(15000)
                .connectionTimeoutMs(13000)
                .retryPolicy(retryPolicy)
                .namespace("logs")
                .build();
        // 开启客户端
        client.start();
        // 指定要创建和操作的节点,注意:其是相对于 /logs 节点的
        String nodePath = "/host";

        // ----------- 创建节点 -----------
        String nodeName = client.create().forPath(nodePath, "myhost".getBytes());
        System.out.println("新创建的节点名称为:" + nodeName);

        // ---- 获取数据内容并注册Watcher ----
        byte[] data = client.getData().usingWatcher((CuratorWatcher) event -> {
            System.out.println(event.getPath() + "数据内容发送变化");
        }).forPath(nodePath);
        System.out.println("节点的数据内容为:" + new String(data));

        // --------- 更新数据内容 ----------
        client.setData().forPath(nodePath, "newhost".getBytes());
        // 获取更新过的数据内容
        byte[] newData = client.getData().forPath(nodePath);
        System.out.println("更新过的数据内容为:" + new String(newData));

        // ----------- 删除节点 -----------
        client.delete().forPath(nodePath);

        // --------- 判断节点存在性 -----------
        Stat stat = client.checkExists().forPath(nodePath);
        boolean isExists = true;
        if(stat == null){
            isExists = false;
        }
        System.out.println(nodePath + "节点仍存在吗?" + isExists);
    }
}