天天看點

使用zookeeper原生API連結ZK叢集進行基本API示範

上一篇釋出了近期的面試總結-Mysql篇,最近在讀《從Paxos到Zookeeper分布式一緻性原理與實踐》的電子書,從上面學到不少,是以将部分api示範從頭演練一遍形成demo代碼。

zookeeper初始化構造方法

package com.coderman.zookeeper.clusterdemo.version2;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * @description:
 * @author: Fanchunshuai
 * @time: 2020/2/10 15:51
 */
public class ZKConstructDemo implements Watcher {
    private static StringBuffer buffer = new StringBuffer();
    private static CountDownLatch countDownLatch = new CountDownLatch(1);
    private static final int SESSION_TIMEOUT = 5000;
    protected  static ZooKeeper zooKeeper;

    static {
        buffer.append("192.168.1.224:2184,");
        buffer.append("192.168.1.224:2181,");
        buffer.append("192.168.1.224:2182,");
        buffer.append("192.168.1.224:2183,");
        buffer.append("192.168.1.224:2185");
    }
    public static void main(String[] args) {
        try {
            //第一種初始化接口
            zooKeeper = new ZooKeeper(buffer.toString(), SESSION_TIMEOUT, new ZKConstructDemo());
            //第二種初始化接口增加了兩個參數,為了複用session會話和密碼
            zooKeeper = new ZooKeeper(buffer.toString(), SESSION_TIMEOUT, new ZKConstructDemo(),zooKeeper.getSessionId(),zooKeeper.getSessionPasswd());

            System.out.println(zooKeeper.getState());
        } catch (IOException e) {
            e.printStackTrace();
        }
        try {
            countDownLatch.await();
            System.out.println("zookeeper session established...");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void process(WatchedEvent watchedEvent) {
        System.out.println("received watch event = "+watchedEvent);
        if(Event.KeeperState.SyncConnected == watchedEvent.getState()){
            countDownLatch.countDown();
        }
    }
}

           

zookeeper建立API的接口方法

package com.coderman.zookeeper.clusterdemo.version2;

import org.apache.zookeeper.*;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * @description:
 * @author: Fanchunshuai
 * @time: 2020/2/10 16:47
 */

/**
 * 節點建立API
 * zookeeper的節點建立api有兩個重載方法
 * 一個是同步調用,一個是異步調用
 * 建立節點的參數如下:
 * path:需要建立節點的路徑 如/zookeeper/config
 * data[]:節點建立後的初始内容
 * acl:節點的ACL政策,是List類型,已經通過ZooDefs.Ids類初始化好了
 * createMode:建立節點的類型或者模式,是枚舉類型,通過CreateMode枚舉類初始化好了
 * cb:回調函數
 * ctx:傳遞一個上下文對象,在回調方法執行的時候使用
 * <p>
 * 需要注意的是:無論是同步接口還是異步接口,zookeeper都不支援遞歸建立,即無法在父節點不存在的情況下建立子節點。
 * 如果節點已經存在,那麼建立同名節點将會抛出NodeExitsException異常。
 * <p>
 * 同步調用示範
 */
public class ZKCreateAPIDemo implements Watcher {
    private static StringBuffer buffer = new StringBuffer();
    private static CountDownLatch countDownLatch = new CountDownLatch(1);
    private static final int SESSION_TIMEOUT = 5000;
    protected static ZooKeeper zooKeeper;

    static {
        buffer.append("192.168.1.224:2184,");
        buffer.append("192.168.1.224:2181,");
        buffer.append("192.168.1.224:2182,");
        buffer.append("192.168.1.224:2183,");
        buffer.append("192.168.1.224:2185");
    }

    public static void main(String[] args) {
        //createSyncCall();
        createAsyncCall();
    }

    @Override
    public void process(WatchedEvent watchedEvent) {
        if (Event.KeeperState.SyncConnected == watchedEvent.getState()) {
            countDownLatch.countDown();
        }
    }

    /**
     * 同步調用示範
     */
    private static void createSyncCall() {
        try {
            //第一種初始化接口
            zooKeeper = new ZooKeeper(buffer.toString(), SESSION_TIMEOUT, new ZKConstructDemo());
            //第二種初始化接口增加了兩個參數,為了複用session會話和密碼
            zooKeeper = new ZooKeeper(buffer.toString(), SESSION_TIMEOUT, new ZKConstructDemo(), zooKeeper.getSessionId(), zooKeeper.getSessionPasswd());

            String path = "/hadoop";
            byte[] data = "hadooper".getBytes();
            //1.建立一個臨時節點
            String response = zooKeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            System.out.println("response = " + response);

            String path2 = "/hadoop2";
            byte[] data2 = "hadooper2".getBytes();
            //2.建立一個臨時順序節點
            String response2 = zooKeeper.create(path2, data2, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println("response2 = " + response2);

            //3.建立一個持久節點
            String path3 = "/hadoop3";
            byte[] data3 = "hadooper3".getBytes();
            String response3 = zooKeeper.create(path3, data3, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println("response3 = " + response3);
            //4.建立一個持久順序節點
            String path4 = "/hadoop4";
            byte[] data4 = "hadooper4".getBytes();
            String response4 = zooKeeper.create(path4, data4, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
            System.out.println("response4 = " + response3);

        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
        try {
            countDownLatch.await();
            System.out.println("zookeeper session established...");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 異步調用示範
     */
    private static void createAsyncCall() {
        String path = "/hadoopsync";
        byte[] data = "hadoopersync".getBytes();
        try {
            //第一種初始化接口
            zooKeeper = new ZooKeeper(buffer.toString(), SESSION_TIMEOUT, new ZKConstructDemo());
            //第二種初始化接口增加了兩個參數,為了複用session會話和密碼
            zooKeeper = new ZooKeeper(buffer.toString(), SESSION_TIMEOUT, new ZKConstructDemo(), zooKeeper.getSessionId(), zooKeeper.getSessionPasswd());
        } catch (IOException e) {
            e.printStackTrace();
        }

        //1.建立一個臨時節點
        zooKeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,
                new IStringCallBack(), "the context");
        //1.建立一個臨時節點
        zooKeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
                new IStringCallBack(), "the context");
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 字元串回調接口,除了這種類型的回調接口之外
     * 如ACLCallback,DataCallback等。
     */
    static class IStringCallBack implements AsyncCallback.StringCallback {

        /**
         * @param rc   響應碼 0:接口調用成功 -4:用戶端和服務端連結已經斷開
         *             -110:指定節點已經存在。-112:會話已過期
         * @param path 接口調用時傳入API的資料節點的路徑參數值
         * @param ctx  接口調用時傳入API的ctx參數值
         * @param name 實際在服務端建立的節點名
         */
        @Override
        public void processResult(int rc, String path, Object ctx, String name) {
            System.out.println("create path result:["
                    + rc + ", " + path + ", " + ctx + ", real path name :" + name);
        }
    }
}