天天看点

Zookeeper使用详解之Java原生API操作篇

作者:但求无Bug

关于zookeeper如何安装以及常见操作可参照我的前两篇文章,本篇只介绍如何使用zookeeper的原生Java API来进行常见的操作。

一、所需依赖

首先是导入对应的依赖,如下:

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.8.1</version>
</dependenc>           

接下来将介绍常用的API。

二、相关操作

2.1创建会话

对于创建 ZooKeeper会话而言,可使用如下的构造方法:

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)           

上述构造方法中各参数的含义为:

  • connectString:该字段用于表示 zk 的服务器列表,其格式为:host:port,如:192.168.1.2:2181,当然,后面也可跟指定的路径,以表示客户端的所有操作均在此目录下,如:192.168.1.2:2181/zk-hook,这表示客户端对zookeeper的所有操作均在 /zk-hook 目录下。
  • sessionTimeout: 该字段用于表示会话的超时时间,其单位为毫秒。若在指定时间内未收到对应的心跳检测,则会认为该会话失效。
  • watcher:该字段用于表示设置的监听器,为null时表示不进行设置。
  • canBeReadOnly: 该字段用于标识当前会话是否支持“Read-Only”模式。所谓“read-only”模式是指,默认情况下,当集群中某个机器与半数以上的机器无法连通时,其将不再接收任何客户端的读写请求。但将该参数设为 true 时,客户端依然能从该机器中读取数据。
  • sessionid和sessionPasswd:这两个字段分别表示会话id 和会话密钥,通过这两参数可唯一确定一个会话,也就是说,通过这两个参数可实现会话的复用。

下面给出对应的示例:

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.util.concurrent.CountDownLatch;

public class CreateSessionTest {

    private static CountDownLatch latch = new CountDownLatch(1);

    public static void main(String[] args) throws Exception {
        /*
            客户端可以通过创建一个zk实例来连接zk服务器
            new Zookeeper(connectString,sesssionTimeOut,Wather)
            connectString: 连接地址:IP:端口
            sesssionTimeOut:会话超时时间:单位毫秒
            Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端)
         */
        ZooKeeper zooKeeper = new ZooKeeper("192.168.217.128:2181", 5000, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("process方法开始执行");
                latch.countDown();
            }
        });
        latch.await();
        System.out.println("客户端与服务端的会话已经建立");
    }
}           

执行结果如下:

Zookeeper使用详解之Java原生API操作篇

2.2 创建节点

创建zookeeper 节点的方法为:create,其对应的声明如下:

public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode)
public void create(final String path, byte data[], List<ACL> acl, CreateMode createMode, StringCallback cb, Object ctx)           

上述方法中的各参数含义为:

  • path: 用于表示被创建的节点路径
  • data[]:表示节点中的数据,其格式为字节数组。
  • acl:用于表示acl策略。
  • createMode:用于表示节点类型,其为一个枚举类,常用的值有四种,分别为:持久(PERSISTENT)、持久顺序(PERSISTENT_SEQUENTIAL)、临时(EPHEMERAL)和临时顺序(EPHEMERAL_SEQUENTIAL)。
  • cb:用于表示异步回调函数,需要说明的是,该回调函数需实现StringCallback接口。当连接服务器成功后,会自动调用对应的processResult方法。
  • ctx:用于传递上下文对象。

对于org.apache.zookeeper.data.ACL类来说,其有两个成员,如下:

private int perms;
private org.apache.zookeeper.data.Id id;           

对于perms 而言,其属于 ACL 组成 Schema:id:permission中的schema,其声明如下所示:

public interface Perms {
    int READ = 1;
    int WRITE = 2;
    int CREATE = 4;
    int DELETE = 8;
    int ADMIN = 16;
    int ALL = 31;
}           

当然在org.apache.zookeeper.ZooDefs.Ids接口中预定了若干ACL策略,对应代码声明如下:

public interface Ids {
    Id ANYONE_ID_UNSAFE = new Id("world", "anyone");
    Id AUTH_IDS = new Id("auth", "");
    @SuppressFBWarnings(
        value = {"MS_MUTABLE_COLLECTION"},
        justification = "Cannot break API"
    )
    ArrayList<ACL> OPEN_ACL_UNSAFE = new ArrayList(Collections.singletonList(new ACL(31, ANYONE_ID_UNSAFE)));
    @SuppressFBWarnings(
        value = {"MS_MUTABLE_COLLECTION"},
        justification = "Cannot break API"
    )
    ArrayList<ACL> CREATOR_ALL_ACL = new ArrayList(Collections.singletonList(new ACL(31, AUTH_IDS)));
    @SuppressFBWarnings(
        value = {"MS_MUTABLE_COLLECTION"},
        justification = "Cannot break API"
    )
    ArrayList<ACL> READ_ACL_UNSAFE = new ArrayList(Collections.singletonList(new ACL(1, ANYONE_ID_UNSAFE)));
}           

下面给出对应的示例:

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class CreateNodeTest {

    private static CountDownLatch latch = new CountDownLatch(1);

    private static ZooKeeper zooKeeper;

    public static void main(String[] args) throws Exception {
        zooKeeper = new ZooKeeper("192.168.217.128:2181", 5000, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
                    System.out.println("process方法开始执行");
                    try {
                        createNodeSync();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        System.out.println(zooKeeper.getState());
        latch.await();
    }

    /**
     * 创建节点
     *
     * @throws Exception
     */
    private static void createNodeSync() throws Exception {
        /**
         *  path        :节点创建的路径
         *  data[]      :节点创建要保存的数据,是个byte类型的
         *  acl         :节点创建的权限信息(4种类型)
         *                 ANYONE_ID_UNSAFE    : 表示任何人
         *                 AUTH_IDS    :此ID仅可用于设置ACL。它将被客户机验证的ID替换。
         *                 OPEN_ACL_UNSAFE    :这是一个完全开放的ACL(常用)--> world:anyone
         *     createMode    :创建节点的类型(4种类型)            CREATOR_ALL_ACL  :此ACL授予创建者身份验证ID的所有权限
         *
         *                  PERSISTENT:持久节点
         *              PERSISTENT_SEQUENTIAL:持久顺序节点
         *                  EPHEMERAL:临时节点
         *                  EPHEMERAL_SEQUENTIAL:临时顺序节点
         String node = zookeeper.create(path,data,acl,createMode);
         */
        // 创建持久节点
        String node_persistent = zooKeeper.create("/test-persistent", "持久节点内容".getBytes(StandardCharsets.UTF_8),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        // 创建临时节点
        String node_ephemeral = zooKeeper.create("/test-ephemeral", "临时节点内容".getBytes(StandardCharsets.UTF_8),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        // 创建持久顺序节点
        String node_persistent_sequential = zooKeeper.create("/test-persistent-sequential", "持久顺序节点内容".getBytes(StandardCharsets.UTF_8),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
        // 创建临时顺序节点
        String node_ephemeral_sequential = zooKeeper.create("/test-ephemeral-sequential", "临时顺序节点内容".getBytes(StandardCharsets.UTF_8),
                ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println("创建的持久节点为:" + node_persistent);
        System.out.println("创建的临时节点为:" + node_ephemeral);
        System.out.println("创建的持久顺序节点为:" + node_persistent_sequential);
        System.out.println("创建的临时顺序节点为:" + node_ephemeral_sequential);

        Thread.sleep(TimeUnit.MINUTES.toMillis(1));
        latch.countDown();
    }
}           

执行结果如下:

Zookeeper使用详解之Java原生API操作篇

此时使用PrettyZoo查看,结果如下:

Zookeeper使用详解之Java原生API操作篇

等待一分钟后,程序会解除阻塞并断开与zookeeper服务的连接,此时再次进行查看,结果如下:

Zookeeper使用详解之Java原生API操作篇

2.3 检查节点是否存在

关于检查节点是否存在对应的API如下:

public Stat exists(final String path, Watcher watcher) throws KeeperException, InterruptedException
public Stat exists(String path, boolean watch) throws KeeperException, InterruptedException
public void exists(final String path, Watcher watcher, StatCallback cb, Object ctx)
public void exists(String path, boolean watch, StatCallback cb, Object ctx)           

上述方法中各参数的含义为:

  • path: 表示数据节点的路径
  • watcher:表示监听器,当被监听的节点被创建、删除和更新时就会得到对应的通知。
  • watch:表示是否使用默然的watcher
  • cb:表示执行的回调函数
  • ctx: 需进行传递的上下文对象

下面给出对应的示例:

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.util.concurrent.CountDownLatch;

public class NodeExistsTest {

    private static CountDownLatch latch = new CountDownLatch(1);

    private static ZooKeeper zooKeeper;

    public static void main(String[] args) throws Exception {

        ZooKeeper zooKeeper = new ZooKeeper("192.168.217.128:2181", 5000, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("process方法开始执行");
                latch.countDown();
            }
        });
        latch.await();
        System.out.println("客户端与服务端的会话已经建立");
        Stat stat = zooKeeper.exists("/test-persistent", false);
        System.out.println("stat:" + stat);
        Stat stat2 = zooKeeper.exists("/test-ephemeral", false);
        System.out.println("stat2:" + stat2);
    }
}           

执行结果如下:

Zookeeper使用详解之Java原生API操作篇

2.4 获取节点信息

对于zookeeper而言,其获取节点信息的api如下:

public byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException
public byte[] getData(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException
public void getData(String path, Watcher watcher, AsyncCallback.DataCallback cb, Object ctx)
public void getData(String path, boolean watch, AsyncCallback.DataCallback cb, Object ctx)           

上述方法中各参数的含义为:

  • path: 表示数据节点的路径。
  • watcher:表示当path下的节点发生变更时,会触发该watcher的process方法
  • stat: 表示节点的状态信息,为null 时表示获取最新版本的数据
  • cb: 表示异步回调函数
  • ctx:表示传递的上下文信息

下面给出对应的示例:

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.util.concurrent.CountDownLatch;

public class GetNodeDataTest {

    private static CountDownLatch latch = new CountDownLatch(1);

    private static ZooKeeper zooKeeper;

    public static void main(String[] args) throws Exception {
        ZooKeeper zooKeeper = new ZooKeeper("192.168.217.128:2181", 5000, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("process方法开始执行");
                latch.countDown();
            }
        });
        latch.await();
        System.out.println("客户端与服务端的会话已经建立");
        byte[] data = zooKeeper.getData("/test-persistent", false, null);
        System.out.println(new String(data));
    }
}           

执行结果如下:

Zookeeper使用详解之Java原生API操作篇

2.5 获取子节点列表

对于zookeeper来说,提供如下几个获取子节点的API:

public List<String> getChildren(final String path, Watcher watcher) throws KeeperException, InterruptedException
public List<String> getChildren(String path, boolean watch) throws KeeperException, InterruptedException
public void getChildren(final String path, Watcher watcher, ChildrenCallback cb, Object ctx)
public void getChildren(String path, boolean watch, ChildrenCallback cb, Object ctx)
public List<String> getChildren(final String path, Watcher watcher, Stat stat)
public List<String> getChildren(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException
public void getChildren(final String path, Watcher watcher, Children2Callback cb, Object ctx)
public void getChildren(String path, boolean watch, Children2Callback cb, Object ctx)           

对于如上的方法来说,其参数说明如下所示:

  • path:数据节点的路径。
  • watcher:当path下的节点发生变更时,会触发该watcher的process方法
  • version: 用于指定节点版本,若版本不为最新,则会报错。
  • cb:异步回调函数
  • ctx:传递的上下文信息

为了进行测试,先通过PrettyZoo在/test-persistent下新建一个节点,如下:

Zookeeper使用详解之Java原生API操作篇

下面给出对应的示例:

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.util.List;
import java.util.concurrent.CountDownLatch;

public class GetChildNodesTest {

    private static CountDownLatch latch = new CountDownLatch(1);

    private static ZooKeeper zooKeeper;

    public static void main(String[] args) throws Exception{
        ZooKeeper zooKeeper = new ZooKeeper("192.168.217.128:2181", 5000, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("process方法开始执行");
                latch.countDown();
            }
        });
        latch.await();
        System.out.println("客户端与服务端的会话已经建立");
        List<String> children = zooKeeper.getChildren("/test-persistent", true);
        System.out.println(children);
    }
}           

执行结果如下:

Zookeeper使用详解之Java原生API操作篇

2.6 设置或更新节点内容

对于设置或更新节点而言,其常用的API如下:

public Stat setData(String path, byte[] data, int version) throws KeeperException, InterruptedException
public void setData(String path, byte[] data, int version, AsyncCallback.StatCallback cb, Object ctx)           
  • path: 表示被更新节点的路径
  • data: 表示更新的数据
  • version: 表示节点的数据版本。
  • cb:异步回调函数。
  • ctx:上下文信息对象。

下面给出对应的示例:

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;

public class UpdateNodeDataTest {

    private static CountDownLatch latch = new CountDownLatch(1);

    private static ZooKeeper zooKeeper;

    public static void main(String[] args) throws Exception {
        zooKeeper = new ZooKeeper("192.168.217.128:2181", 5000, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("process方法开始执行");
                latch.countDown();
            }
        });
        System.out.println(zooKeeper.getState());
        latch.await();

        byte[] data = zooKeeper.getData("/test-persistent", false, null);
        System.out.println("修改前的值为:" + new String(data));

        Stat stat = zooKeeper.setData("/test-persistent", "客户端修改了数据节点".getBytes(StandardCharsets.UTF_8), -1);

        byte[] data2 = zooKeeper.getData("/test-persistent", false, null);
        System.out.println("修改后的值为:" + new String(data2));
    }
}           

执行结果如下:

Zookeeper使用详解之Java原生API操作篇

2.7 删除节点

对于删除ZooKeeper节点的常用API如下:

// 以同步的方式删除节点
public void delete(final String path, int version) throws InterruptedException, KeeperException
// 以异步的方式删除节点,如果写测试代码,客户端主线程不能退出,否则可能请求没有发到服物器或者异步回调不成功
public void delete(final String path, int version, VoidCallback cb, Object ctx)           

对于删除方法的参数为:

  • path:被删除节点的路径
  • version:节点的数据版本,若指定的版本不是最新的版本,则会报错。
  • cb:异步回调函数。
  • ctx:上下文信息对象。

下面给出对应的示例代码:

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.util.concurrent.CountDownLatch;

public class DeleteNodeTest {

    private static CountDownLatch latch = new CountDownLatch(1);

    private static ZooKeeper zooKeeper;

    public static void main(String[] args) throws Exception {
        zooKeeper = new ZooKeeper("192.168.217.128:2181", 5000, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("process方法开始执行");
                latch.countDown();
            }
        });
        System.out.println(zooKeeper.getState());
        latch.await();

        // 先判断该节点是否存在
        Stat stat = zooKeeper.exists("/test-persistent/child-1", false);
        System.out.println(stat == null ? "该节点不存在" : "该节点存在");

        if (stat != null) {
            zooKeeper.delete("/test-persistent/child-1", -1);
        }
        Stat stat2 = zooKeeper.exists("/test-persistent/child-1", false);
        System.out.println(stat2 == null ? "该节点不存在" : "该节点存在");
    }
}           

执行结果如下:

Zookeeper使用详解之Java原生API操作篇

2.8 完整案例

下面给出使用zookeeper原生API进行操作的完整示例:

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CountDownLatch;

public class ZookeeperOriginalTest {

    // 客户端连接地址
    private static final String ZK_ADDRESS = "192.168.217.128:2181";

    // 倒计时器
    private static CountDownLatch latch = new CountDownLatch(1);

    // zookeeper对象
    private static ZooKeeper zooKeeper = null;

    public static void main(String[] args) throws Exception {

        // 初始化zookeeper,创建客户端
        initConnect(ZK_ADDRESS, 5000);
        // 创建节点
        createNode("/root/user", "user data1", false);
        createNode("/root/home", "home data1", true);

        // 查询节点
        queryNode("/root");

        // 修改节点
        updateNodeData("/root", "nice");

        // 删除节点
        deleteNode("/root", true);
    }

    /**
     * 初始化zookeeper对象
     *
     * @param connectAddress
     * @param sessionTimeout
     */
    private static void initConnect(String connectAddress, int sessionTimeout) {

        try {
            // 创建zookeeper对象
            zooKeeper = new ZooKeeper(connectAddress, sessionTimeout, watchedEvent -> {
                // 获取监听事件的状态
                Watcher.Event.KeeperState state = watchedEvent.getState();
                // 获取监听事件的类型
                Watcher.Event.EventType type = watchedEvent.getType();
                if (Watcher.Event.KeeperState.SyncConnected == state) {
                    if (Watcher.Event.EventType.None == type) {
                        System.out.println("zookeeper创建成功......");
                        latch.countDown();
                    }
                }

                if (Watcher.Event.EventType.NodeCreated == type) {
                    System.out.println("zookeepr有新节点【" + watchedEvent.getPath() + "】创建");
                }

                if (Watcher.Event.EventType.NodeDataChanged == type) {
                    System.out.println("zookeepr有节点【" + watchedEvent.getPath() + "】的数据发生变化");
                }

                if (Watcher.Event.EventType.NodeDeleted == type) {
                    System.out.println("zookeepr有节点【" + watchedEvent.getPath() + "】被删除");
                }

                if (Watcher.Event.EventType.NodeChildrenChanged == type) {
                    System.out.println("zookeepr有资节点【" + watchedEvent.getPath() + "】变化");
                }
            });
            latch.await();
            System.out.println("init connect success: " + zooKeeper);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 创建节点
     *
     * @param nodePath
     * @param data
     * @param autoCreateParent
     */
    private static void createNode(String nodePath, String data, boolean autoCreateParent) throws Exception {
        if (nodePath == null || nodePath.length() == 0) {
            System.out.println("节点【" + nodePath + "】不能为空");
            return;
        }

        Stat exists = zooKeeper.exists(nodePath, true);
        if (exists != null) {
            System.out.println("节点【" + nodePath + "】已存在无法新增");
            return;
        }

        if (autoCreateParent) {
            createNode(nodePath.substring(0, nodePath.lastIndexOf("/")), "", true);
        }

        String result = zooKeeper.create(nodePath, data.getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println("create: 【" + nodePath + " --> " + data + "】");
    }

    /**
     * 查找节点
     *
     * @param nodePath
     */
    private static void queryNode(String nodePath) throws Exception {
        Stat stat = new Stat();
        byte[] data = zooKeeper.getData(nodePath, true, stat);
        System.out.println("queryNode: 【" + nodePath + "】, result: " + new String(data) + ", stat:" + stat);
    }

    /**
     * 更新节点信息
     *
     * @param nodePath
     * @param data
     */
    private static void updateNodeData(String nodePath, String data) throws Exception {
        Stat stat = zooKeeper.setData(nodePath, data.getBytes(StandardCharsets.UTF_8), -1);
        System.out.println("setData: 【" + nodePath + "】, stat:" + stat);
    }

    /**
     * 删除节点
     *
     * @param nodePath
     * @throws Exception
     */
    private static void deleteNode(String nodePath, boolean autoDeleteChild) throws Exception {
        Stat exists = zooKeeper.exists(nodePath, true);
        if (exists == null) {
            System.out.println(nodePath + "不存在,请核实后再进行相关操作!");
            return;
        }
        List<String> children = zooKeeper.getChildren(nodePath, true);
        if (!children.isEmpty()) {
            for (String child : children) {
                deleteNode(nodePath + "/" + child, true);
            }
        }

        zooKeeper.delete(nodePath, -1);
        System.out.println("delete node: 【" + nodePath + "】");
    }
}           

执行结果如下:

Zookeeper使用详解之Java原生API操作篇

继续阅读