天天看點

ZooKeeper基礎CRUD操作

==============================

Curator Java 用戶端 CRUD 使用

==============================

Curator 是 Apache 下的開源項目, Spring Cloud 也采用了該庫, 可以其功能強大和權威性.

Curator 項目包含多個 artifact, 一般情況下, 我們隻需要引入 curator-recipes 依賴即可, artifact 針對不同的場景提供進階封裝, 可簡化使用zk的複雜性.

Curator 4.0 适合于 zk 3.5版, 但目前zk3.5仍是beta版本, 對于zk 3.4版推薦使用 Curator 2.12.0 版.

Curator所做的改進:

重試機制:提供可插拔的重試機制, 它将給捕獲所有可恢複的異常配置一個重試政策, 并且内部也提供了幾種标準的重試政策(比如指數補償).

連接配接狀态監控: Curator初始化之後會一直的對zk連接配接進行監聽, 一旦發現連接配接狀态發生變化, 将作出相應的處理.

zk用戶端執行個體管理:Curator對zk用戶端到server叢集連接配接進行管理. 并在需要的情況, 重建zk執行個體, 保證與zk叢集的可靠連接配接

各種使用場景支援:Curator實作zk支援的大部分使用場景支援(甚至包括zk自身不支援的場景), 這些實作都遵循了zk的最佳實踐, 并考慮了各種極端情況.

pom 依賴:

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.12.0</version>
</dependency>
<dependency>
    <!--org.apache.curator 依賴 slf4j -->
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-simple</artifactId>
    <version>1.7.7</version>
</dependency>      

Curator大量使用了鍊式調用風格, 遇到forPath()才會真正觸發zk調用, 主要的鍊式調用有:

   client.create().forPath()

   client.delete().forPath()

   client.setData().forPath()

   client.getData().forPath()

   client.checkExists().forPath()

下面 main() 代碼中, 已經展現幾乎所有的Curator 基礎操作.

public static void main(String[] args) throws Exception {

        // 多個伺服器用逗号分隔
        String zkUrl = "localhost:2181";

        // 重連server機制, 最多重連3次, 每次重連間隔會加長, 初次重連的間隔為1000毫秒
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        /**
         * 重連機制有: ExponentialBackoffRetry: 重試指定的次數, 且每一次重試之間停頓的時間逐漸增加 RetryNtimes:
         * 指定最大重試次數的重試政策 RetryOneTimes: 僅重試一次 RetryUntilElapsed:一直重試直到達到規定的時間
         */

        // CuratorFramework 是一個線程安全的類, 可以完成zk所有操作
        CuratorFramework client = CuratorFrameworkFactory.newClient(zkUrl, retryPolicy);

        // 打開用戶端
        client.start();

        // 檢查節點是否存在
        if (client.checkExists().forPath("/head") != null) {

            // 删除節點
            client.delete().forPath("/head");
        }

        // 建立節點, 節點資料為空
        client.create().forPath("/head", new byte[0]);

        // 重新指派,需要将String轉成 byte 數組
        client.setData().forPath("/head", "ABC".getBytes());

        // 擷取節點的版本等資訊, 傳回一個 Stat執行個體
        Stat stat = client.checkExists().forPath("/head");
        System.out.println(stat.getCzxid());

        // 擷取節點值, getData().forPath()結果為 byte 數組, 可以轉成String類型
        String value = new String(client.getData().forPath("/head"));
        System.out.println(String.format("/head data is :%s", value));

        // 建立臨時序列節點
        client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/head/child");

        if (client.checkExists().forPath("/a") != null) {
            // 級聯删除節點
            client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/a");
        }

        // 級聯建立節點, 建立之前不需要檢查上級節點是否存在
        client.create().creatingParentsIfNeeded().forPath("/a/b/c");

        client.close();
    }