天天看點

zookeeper之CuratorCurator用戶端開發測試

文章目錄

  • Curator用戶端
    • 建立會話
    • 建立節點
    • 擷取節點和資料
    • 更新資料
    • 删除節點
    • 事務
    • 節點存在
    • 事件監聽
    • 其他工具類
  • 開發測試

Curator用戶端

  1. Curator

    包含了幾個包:
    • curator-framework

      :對zookeeper的底層api的一些封裝
    • curator-client

      :提供一些用戶端的操作,例如重試政策等
    • curator-recipes

      :封裝了一些進階特性,如:Cache事件監聽、選舉、分布式鎖、分布式計數器、分布式Barrier等
  2. 版本
    • The are currently two released versions of Curator, 2.x.x and 3.x.x:
    • Curator 2.x.x - compatible with both ZooKeeper 3.4.x and ZooKeeper 3.5.x
    • Curator 3.x.x - compatible only with ZooKeeper 3.5.x and includes support for new features such as dynamic reconfiguration, etc.
  3. Curator

    主要解決了以下問題
    • 封裝

      ZooKeeper client

      ZooKeeper server

      之間的連接配接處理
    • 提供了一套

      Fluent

      風格的操作API
    • 提供

      ZooKeeper

      各種應用場景(recipe, 比如共享鎖服務, 叢集上司選舉機制)的抽象封裝
  4. 這裡使用的版本
    • 服務端

      Zookeeper

      版本:

      3.4.6

    • Curator

      版本:

      compile "org.apache.curator:curator-recipes:2.11.0"

建立會話

  1. 建立會話
    @Test
        public void testCreateClient() throws Exception{
            String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183";
            int sessionTimeoutMs = 25000;
            int connectionTimeoutMs = 5000;
            /**
             *   失去連接配接重試政策
             *    baseSleepTimeMs 初始sleep時間
             *    maxRetries    最大重試次數
             *    maxSleepMs    最大sleep時間
             *    目前sleep時間=baseSleepTimeMs*Math.mac(1,random.next(1)) << (retryCount+1)
             */
            int baseSleepTimeMs;
            int maxRetries;
            int maxSleepMs;
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
    
            //第一種方式,使用Builder fluent風格的API方式建立
            CuratorFramework curatorFramework1 = CuratorFrameworkFactory.builder()
                    .connectString(connectString)
                    .sessionTimeoutMs(sessionTimeoutMs)
                    .connectionTimeoutMs(connectionTimeoutMs)
                    .retryPolicy(retryPolicy)
                    .build();
            CuratorFrameworkState state1 = curatorFramework1.getState();
            //zk-curator - LATENT
            logger.info(state1.toString());
    
            //當連接配接狀态變化時觸發
            curatorFramework1.getConnectionStateListenable().addListener(new ConnectionStateListener() {
                @Override
                public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                    //連接配接狀态事件:true
                    logger.info("連接配接狀态事件:{}",connectionState.isConnected());
                }
            });
    
            //完成會話的建立
            curatorFramework1.start();
    
    
    
            //第二種方式,使用靜态工廠方式建立,此方式本質上還是通過第一種方式建立
            CuratorFramework curatorFramework2 =
                    CuratorFrameworkFactory.newClient(
                            connectString,
                            sessionTimeoutMs,
                            connectionTimeoutMs,
                            retryPolicy);
            CuratorFrameworkState state2 = curatorFramework2.getState();
            //zk-curator - LATENT
            logger.info(state2.toString());
    
    
            /**
             * 臨時用戶端:
             *      一定時間不活動後連接配接會被關閉,使用buildTemp()建立
             */
            CuratorTempFramework curatorFramewor3 = CuratorFrameworkFactory.builder()
                    .connectString(connectString)
                    .sessionTimeoutMs(sessionTimeoutMs)
                    .connectionTimeoutMs(connectionTimeoutMs)
                    .retryPolicy(retryPolicy)
                    .buildTemp(2000,TimeUnit.SECONDS);
                    //.buildTemp();
            TimeUnit.SECONDS.sleep(20);
    
        }
                    

建立節點

  1. 建立節點
    @Test
      public void testCreateZnode() throws Exception {
    
          String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183";
          int sessionTimeoutMs = 25000;
          int connectionTimeoutMs = 5000;
          String rootPath = "jannal-create";
          RetryPolicy retryPolicy = new ExponentialBackoffRetry(4000, 3);
    
          CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
                  .connectString(connectString)
                  .sessionTimeoutMs(sessionTimeoutMs)
                  .connectionTimeoutMs(connectionTimeoutMs)
                  .retryPolicy(retryPolicy)
                  .namespace(rootPath) //用戶端對zk上的資料節點的操作都是基本此根節點進行的
                  .build();
          //完成會話建立
          curatorFramework.start();
    
          //建立一個初始内容為空的znode,預設建立的是持久節點
          curatorFramework.create().forPath("/userEmpty");
          //建立一個附帶内容的znode
          curatorFramework.create().forPath("/userData", "我是jannal".getBytes(Charsets.UTF_8));
          //建立一個臨時節點
          curatorFramework.create().
                  withMode(CreateMode.EPHEMERAL).
                  forPath("/userTemp", "jannal".getBytes(Charsets.UTF_8));
          //自動遞歸建立父節點creatingParentsIfNeeded()
          curatorFramework.create()
                  .creatingParentsIfNeeded()
                  .withMode(CreateMode.PERSISTENT)
                  .forPath("/user/password", "123456".getBytes(Charsets.UTF_8));
    
    
          curatorFramework.getCuratorListenable().addListener(new CuratorListener() {
              @Override
              public void eventReceived(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
    
              }
          });
    
          ExecutorService executorService = Executors.newFixedThreadPool(2);
          CountDownLatch countDownLatch = new CountDownLatch(1);
          /**
           * 在原生的zookeeper用戶端中,所有異步通知事件處理都是通過EventThread這個
           * 線程處理的(串行處理所有事件的通知)。一旦發生複雜的處理單元就會消耗很長事件
           * 進而影響其他事件的處理,Curator可以使用線程池來處理,如果不指定線程池
           * 則預設使用EventThread來處理
           */
          curatorFramework.create()
                  .creatingParentsIfNeeded()
                  .withMode(CreateMode.EPHEMERAL)
                  .inBackground(new BackgroundCallback() {
                      @Override
                      public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                          logger.info(event.toString());
                          countDownLatch.countDown();
                      }
                  },executorService)
                  .forPath("/user2","jannal2".getBytes(Charsets.UTF_8));
    
    
    
          countDownLatch.await();
          executorService.shutdown();
    
      }
                    

擷取節點和資料

  1. 擷取節點和資料
    @Test
        public void testGetZnode() throws Exception {
            String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183";
            int sessionTimeoutMs = 25000;
            int connectionTimeoutMs = 5000;
            String rootPath = "jannal-get";
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(4000, 3);
    
            CuratorFramework client = CuratorFrameworkFactory.builder()
                    .connectString(connectString)
                    .sessionTimeoutMs(sessionTimeoutMs)
                    .connectionTimeoutMs(connectionTimeoutMs)
                    .retryPolicy(retryPolicy)
                    .namespace(rootPath)
                    .build();
            client.start();
            client.create()
                    .creatingParentsIfNeeded()
                    .forPath("/user/username/jannal", "jannal123".getBytes(Charsets.UTF_8));
    
            //擷取資料
            byte[] bytes = client.getData().forPath("/user/username/jannal");
    
            Assert.assertEquals("jannal123", new String(bytes, Charsets.UTF_8));
    
            //讀取一個節點的資料内容,同時擷取到該節點的stat
            Stat stat = new Stat();
            bytes = client.getData().storingStatIn(stat).forPath("/user/username/jannal");
            logger.info(ToStringBuilder.reflectionToString(stat));
    
            client.create().creatingParentsIfNeeded().forPath("/user/username1", "abc".getBytes(Charsets.UTF_8));
            client.create().creatingParentsIfNeeded().forPath("/user/username2", "def".getBytes(Charsets.UTF_8));
            //擷取節點的子節點路徑
            List<String> strings = client.getChildren().forPath("/user");
            //[username2, username1, username]
            logger.info(strings.toString());
    
        }
    
                    

更新資料

  1. 更新資料
    @Test
        public void testSetZnode() throws Exception {
            String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183";
            int sessionTimeoutMs = 25000;
            int connectionTimeoutMs = 5000;
            String rootPath = "jannal-update";
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(4000, 3);
    
            CuratorFramework curatorFramework = CuratorFrameworkFactory.builder()
                    .connectString(connectString)
                    .sessionTimeoutMs(sessionTimeoutMs)
                    .connectionTimeoutMs(connectionTimeoutMs)
                    .retryPolicy(retryPolicy)
                    .namespace(rootPath) //用戶端對zk上的資料節點的操作都是基本此根節點進行的
                    .build();
            curatorFramework.start();
    
            String path = "/user/password";
    
            curatorFramework.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.PERSISTENT)
                    .forPath("/user/password", "123456".getBytes(Charsets.UTF_8));
            byte[] bytes = curatorFramework.getData().forPath(path);
            Assert.assertEquals("123456", new String(bytes, Charsets.UTF_8));
    
            Stat stat = curatorFramework.setData().forPath(path, "abcd".getBytes(Charsets.UTF_8));
            bytes = curatorFramework.getData().forPath(path);
            Assert.assertEquals("abcd", new String(bytes, Charsets.UTF_8));
    
            //更新一個節點的資料内容,強制指定版本進行更新
            curatorFramework.setData().withVersion(stat.getVersion()).forPath(path, "abcde".getBytes());
    
        }
    
    
                    

删除節點

  1. 在删除一個節點時,由于網絡原因,導緻删除操作失敗。這個異常在有些場景下是緻命的,比如

    Master

    選舉(先建立後删除),針對這個問題

    Curator

    引入了一種重試機制,即如果我們調用了

    guaranteed()

    ,那麼隻要用戶端有效,就會在背景反複重試,直到節點删除成功。
  2. 删除節點示例
    @Test
        public void testDeleteZnode() throws Exception {
            String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183";
            int sessionTimeoutMs = 25000;
            int connectionTimeoutMs = 5000;
            String rootPath = "jannal-delete";
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(4000, 3);
    
            CuratorFramework client = CuratorFrameworkFactory.builder()
                    .connectString(connectString)
                    .sessionTimeoutMs(sessionTimeoutMs)
                    .connectionTimeoutMs(connectionTimeoutMs)
                    .retryPolicy(retryPolicy)
                    .namespace(rootPath)
                    .build();
            client.start();
            client.create()
                    .creatingParentsIfNeeded()
                    .forPath("/user/username/jannal", "jannal123".getBytes(Charsets.UTF_8));
    
            client.create()
                    .creatingParentsIfNeeded()
                    .forPath("/user2/username/jannal", "jannal123".getBytes(Charsets.UTF_8));
            client.create()
                    .creatingParentsIfNeeded()
                    .forPath("/user3/username/jannal", "jannal123".getBytes(Charsets.UTF_8));
    
    
    
            //删除節點且遞歸删除其所有的子節點
            client.delete().deletingChildrenIfNeeded().forPath("/user");
            //強制指定版本進行删除
            client.delete().deletingChildrenIfNeeded().withVersion(0).forPath("/user2");
            //強制保證删除,隻要用戶端有效,Curator會在背景持續進行删除操作,直到節點删除成功
            client.delete().guaranteed().forPath("/user3/username/jannal");
    
        }
                    

事務

  1. 事務
    /**
         * inTransaction()開啟事務,可以複合create, setData, check, and/or delete
         * 等操作然後調用commit()作為一個原子操作送出
         *
         * @throws Exception
         */
        @Test
        public void testTransaction() throws Exception {
            String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183";
            int sessionTimeoutMs = 25000;
            int connectionTimeoutMs = 5000;
            String rootPath = "jannal-transaction";
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(4000, 3);
    
            CuratorFramework client = CuratorFrameworkFactory.builder()
                    .connectString(connectString)
                    .sessionTimeoutMs(sessionTimeoutMs)
                    .connectionTimeoutMs(connectionTimeoutMs)
                    .retryPolicy(retryPolicy)
                    .namespace(rootPath)
                    .build();
            client.start();
    
            client.inTransaction()
                    .create().withMode(CreateMode.EPHEMERAL).forPath("/transaction", "事務".getBytes(Charsets.UTF_8))
                    .and()
                    .delete().forPath("/transaction")
                    .and()
                    .commit();
        }
    
                    

節點存在

  1. 判斷節點是否存在
    @Test
        public void testExist() throws Exception {
            String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183";
            int sessionTimeoutMs = 25000;
            int connectionTimeoutMs = 5000;
            String rootPath = "jannal-Exist";
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(4000, 3);
    
            CuratorFramework client = CuratorFrameworkFactory.builder()
                    .connectString(connectString)
                    .sessionTimeoutMs(sessionTimeoutMs)
                    .connectionTimeoutMs(connectionTimeoutMs)
                    .retryPolicy(retryPolicy)
                    .namespace(rootPath)
                    .build();
            client.start();
            //節點不存在傳回null
            Stat stat = client.checkExists().forPath("/jannal");
            Assert.assertEquals(null, stat);
            logger.info(ToStringBuilder.reflectionToString(stat));
        }
    
    
                    

事件監聽

  1. 原生用戶端支援通過注冊Watch來進行事件監聽,但是收到事件通知後每次都需要手動再次注冊Watch。

    Curator

    引入了Cache來實作對

    Zookeeper

    服務端事件的監聽,并且自動處理反複注冊監聽,簡化了原生API的繁瑣。
  2. Cache

    分為兩類監聽
    • 節點監聽
    • 子節點監聽
  3. NodeCache

    用于監聽指定

    Zookeeper

    資料節點本身的變化,資料内容發生變化時,就會回調此方法。
    @Test
        public void testNodeCacheEventListener() throws Exception {
            String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183";
            int sessionTimeoutMs = 25000;
            int connectionTimeoutMs = 5000;
            String rootPath = "jannal-listener";
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(4000, 3);
    
            CuratorFramework client = CuratorFrameworkFactory.builder()
                    .connectString(connectString)
                    .sessionTimeoutMs(sessionTimeoutMs)
                    .connectionTimeoutMs(connectionTimeoutMs)
                    .retryPolicy(retryPolicy)
                    .namespace(rootPath)
                    .build();
            client.start();
            String path = "/user/username/jannal";
            client.create()
                    .creatingParentsIfNeeded()
                    .forPath(path, "jannal123".getBytes(Charsets.UTF_8));
            //是否進行資料壓縮
            boolean dataIsCompressed = false;
            final NodeCache cache = new NodeCache(client, path, dataIsCompressed);
            cache.start(true);
            CountDownLatch countDownLatch = new CountDownLatch(2);
            cache.getListenable().addListener(new NodeCacheListener() {
                @Override
                public void nodeChanged() throws Exception {
                    ChildData currentData = cache.getCurrentData();
                    if (currentData != null) {
                        logger.info("資料發生變更,新資料:{}", new String(currentData.getData(), StandardCharsets.UTF_8));
                    }else{
                        logger.info("資料發生變更,新資料:{},節點可能被删除", currentData, StandardCharsets.UTF_8);
                    }
    
                    countDownLatch.countDown();
                }
            });
    
            client.setData().forPath(path, "jannal".getBytes(Charsets.UTF_8));
            client.delete().deletingChildrenIfNeeded().forPath(path);
            countDownLatch.await();
        }
    
                    
  4. PathChildrenCache

    用于監聽指定

    Zookeeper

    資料節點的子節點變化情況。當指定節點的子節點發生變化時,就會回調Listener。

    Curator

    無法對二級子節點觸發變更事件。比如對

    /zk-jannal

    子節點進行監聽,當

    /zk-jannal/user/password

    節點被建立或者删除的時候,無法觸發子節點變更事件
    @Test
        public void testPathChildrenCacheEventListener() throws Exception {
            String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183";
            int sessionTimeoutMs = 25000;
            int connectionTimeoutMs = 5000;
            String rootPath = "jannal-parent-listener";
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(4000, 3);
    
            CuratorFramework client = CuratorFrameworkFactory.builder()
                    .connectString(connectString)
                    .sessionTimeoutMs(sessionTimeoutMs)
                    .connectionTimeoutMs(connectionTimeoutMs)
                    .retryPolicy(retryPolicy)
                    .namespace(rootPath)
                    .build();
            client.start();
            String path = "/user";
            client.create()
                    .creatingParentsIfNeeded()
                    .forPath(path, "jannal123".getBytes(Charsets.UTF_8));
            //是否進行資料壓縮
            boolean dataIsCompressed = false;
            /**
             * 是否把節點内容緩存起來,如果配置為true,則client在接收
             * 到節點清單變更的同時,也能夠擷取到節點的資料内容,如果為false
             * 則無法擷取節點的資料内容
             */
            boolean cacheData = true;
            ExecutorService executorService = Executors.newFixedThreadPool(2);
            CountDownLatch countDownLatch = new CountDownLatch(3);
            final PathChildrenCache cache = new PathChildrenCache(client, path, cacheData, dataIsCompressed, executorService);
    
            /**
             * BUILD_INITIAL_CACHE 同步初始化用戶端的cache,及建立cache後,就從伺服器端拉入對應的資料(這個是NodeCache使用的方式)
             * NORMAL 異步初始化cache
             * POST_INITIALIZED_EVENT 異步初始化,初始化完成觸發事件PathChildrenCacheEvent.Type.INITIALIZED
             */
            cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
            cache.getListenable().addListener(new PathChildrenCacheListener() {
                @Override
                public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                    ChildData currentData = pathChildrenCacheEvent.getData();
                    //事件類型,可通過判斷事件類型來做相應的處理
                    PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();
                    switch (type) {
                        //初始化會觸發這個事件
                        case INITIALIZED:
                            logger.info("子類緩存系統初始化完成");
                            break;
                        case CHILD_ADDED:
                            //新增子節點/user/username
                            logger.info("新增子節點{}", currentData.getPath());
                            break;
                        case CHILD_UPDATED:
                            // 子節點資料變更[106, 97, 110, 110, 97, 108]
                            logger.info("子節點資料變更{}", new String(currentData.getData(),Charsets.UTF_8));
                            break;
                        case CHILD_REMOVED:
                            //子節點資料删除/user/username
                            logger.info("子節點資料删除{}", currentData.getPath());
                            break;
                    }
                    countDownLatch.countDown();
                }
    
    
            });
            client.create()
                    .creatingParentsIfNeeded()
                    .forPath(path+"/username", "jannal123".getBytes(Charsets.UTF_8));
            TimeUnit.SECONDS.sleep(1);
            client.setData().forPath(path+"/username", "jannal".getBytes(Charsets.UTF_8));
            TimeUnit.SECONDS.sleep(1);
            client.delete().deletingChildrenIfNeeded().forPath(path);
            countDownLatch.await();
        }
    
                    
  5. TreeCache

    可以監聽整個樹上的所有節點。即可以監控節點的狀态,還監控節點的子節點的狀态
    @Test
        public void testTreeCache() throws Exception {
            String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183";
            int sessionTimeoutMs = 25000;
            int connectionTimeoutMs = 5000;
            String rootPath = "jannal-tree-listener";
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(4000, 3);
    
            CuratorFramework client = CuratorFrameworkFactory.builder()
                    .connectString(connectString)
                    .sessionTimeoutMs(sessionTimeoutMs)
                    .connectionTimeoutMs(connectionTimeoutMs)
                    .retryPolicy(retryPolicy)
                    .namespace(rootPath)
                    .build();
            client.start();
            String path = "/user";
            client.create()
                    .creatingParentsIfNeeded()
                    .forPath(path, "jannal123".getBytes(Charsets.UTF_8));
    
            CountDownLatch countDownLatch = new CountDownLatch(1);
            TreeCache cache = new TreeCache(client, path);
            cache.getListenable().addListener(new TreeCacheListener() {
                @Override
                public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
                    TreeCacheEvent.Type type = treeCacheEvent.getType();
                    ChildData childData = treeCacheEvent.getData();
    
                    switch (type) {
                        //子類緩存系統初始化完成
                        case INITIALIZED:
                            logger.info("子類緩存系統初始化完成");
                            break;
                        case NODE_ADDED:
                            //新增子節點/user/username
                            logger.info("新增子節點{}", childData != null ? childData.getPath() : null);
                            break;
                        case NODE_UPDATED:
                            // 子節點資料變更jannal
                            // 子節點資料變更jannal2
                            logger.info("子節點資料變更{}", childData != null ? new String(childData.getData(), Charsets.UTF_8) : null);
                            break;
                        case NODE_REMOVED:
                            //子節點資料删除/user
                            logger.info("子節點資料删除{}", childData != null ? childData.getPath() : null);
                            break;
                    }
                    countDownLatch.countDown();
                }
            });
            cache.start();
            client.setData().forPath(path, "jannal".getBytes());
            Thread.sleep(1000);
            client.setData().forPath(path, "jannal2".getBytes());
            Thread.sleep(1000);
            client.delete().deletingChildrenIfNeeded().forPath(path);
            Thread.sleep(1000 * 2);
            countDownLatch.await();
    
        }
                    

其他工具類

  1. Curator

    提供了一些工具類。比如

    ZKPaths

    EnsurePath

  2. ZKPaths

    提供更簡單的API來建構

    Znode

    路徑、遞歸建立和删除節點。
  3. EnsurePath

    提供了一種確定資料節點存在的機制。在分布式環境下,A和B機器都同時建立節點,由于并發操作的存在,可能抛出

    節點已經存在的異常

    EnsurePath

    内部實作就是試圖建立指定節點,如果節點存在,那麼就不進行任何操作,也不抛出異常,否則正常建立節點。從Curator 2.9.0版本開始此類已經過時,首選

    CuratorFramework.create().creatingParentContainersIfNeeded()

    or

    CuratorFramework.exists().creatingParentContainersIfNeeded()

  4. 代碼示例
    @Test
        public void testOther() throws Exception {
            String connectString = "zk-master:2180,zk-slave1:2182,zk-slave2:2183";
            int sessionTimeoutMs = 25000;
            int connectionTimeoutMs = 5000;
            String rootPath = "zk-other";
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(4000, 3);
    
            CuratorFramework client = CuratorFrameworkFactory.builder()
                    .connectString(connectString)
                    .sessionTimeoutMs(sessionTimeoutMs)
                    .connectionTimeoutMs(connectionTimeoutMs)
                    .retryPolicy(retryPolicy)
                    .namespace(rootPath)
                    .build();
            client.start();
            String path = ZKPaths.fixForNamespace(rootPath, "/user");
            // 輸出:/zk-other/user
            logger.info(path);
            ZKPaths.PathAndNode pathAndNode = ZKPaths.getPathAndNode("/zk-other/user/username");
            // path:/zk-other/user,node:username
            logger.info("path:{},node:{}", pathAndNode.getPath(), pathAndNode.getNode());
    
            CuratorZookeeperClient zookeeperClient = client.getZookeeperClient();
            //建立節點
            ZKPaths.mkdirs(zookeeperClient.getZooKeeper(), "/zk-other/user/password");
    
            //擷取子節點
            List<String> sortedChildren = ZKPaths.getSortedChildren(zookeeperClient.getZooKeeper(), "/zk-other");
            logger.info(sortedChildren.toString());
    
    
    
    
        }
                    

開發測試

  1. 為了便于開發人員進行zk的開發與測試,可以使用

    curator-test

    子產品。依賴

    compile "org.apache.curator:curator-test:2.11.0"

  2. 單元測試
    @Test
        public void testSingleServer() throws Exception {
            File dataDir = new File(".");
            TestingServer server = new TestingServer(2000, dataDir);
            server.start();
            CuratorFramework curatorFramework = CuratorFrameworkFactory.
                    builder().
                    connectString(server.getConnectString()).
                    sessionTimeoutMs(1000).
                    retryPolicy(new RetryNTimes(3, 1000)).
                    build();
            curatorFramework.start();
            System.out.println(curatorFramework.getChildren().forPath("/"));
            curatorFramework.close();
            server.stop();
    
        }
                    
  3. 叢集測試
    @Test
        public void testClusterServer() throws Exception {
            TestingCluster server = new TestingCluster(3);
            server.start();
            Thread.sleep(2000);
            TestingZooKeeperServer leader = null;
            for (TestingZooKeeperServer zs : server.getServers()) {
                logger.info(zs.getInstanceSpec().getServerId() + "-");
                logger.info(zs.getQuorumPeer().getServerState() + "-");
                logger.info(zs.getInstanceSpec().getDataDirectory().getAbsolutePath());
                if (zs.getQuorumPeer().getServerState().equals("leading")) {
                    leader = zs;
                }
            }
            //模拟leader挂掉
            leader.kill();
            logger.info("leader 挂掉之後");
            for (TestingZooKeeperServer zs : server.getServers()) {
                logger.info(zs.getInstanceSpec().getServerId() + "-");
                logger.info(zs.getQuorumPeer().getServerState() + "-");
                logger.info(zs.getInstanceSpec().getDataDirectory().getAbsolutePath());
    
            }
            server.stop();
    
        }