上一篇釋出了近期的面試總結-Mysql篇,最近在讀《從Paxos到Zookeeper分布式一緻性原理與實踐》的電子書,從上面學到不少,是以将部分api示範從頭演練一遍形成demo代碼。
zookeeper初始化構造方法
package com.coderman.zookeeper.clusterdemo.version2;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
/**
* @description:
* @author: Fanchunshuai
* @time: 2020/2/10 15:51
*/
public class ZKConstructDemo implements Watcher {
private static StringBuffer buffer = new StringBuffer();
private static CountDownLatch countDownLatch = new CountDownLatch(1);
private static final int SESSION_TIMEOUT = 5000;
protected static ZooKeeper zooKeeper;
static {
buffer.append("192.168.1.224:2184,");
buffer.append("192.168.1.224:2181,");
buffer.append("192.168.1.224:2182,");
buffer.append("192.168.1.224:2183,");
buffer.append("192.168.1.224:2185");
}
public static void main(String[] args) {
try {
//第一種初始化接口
zooKeeper = new ZooKeeper(buffer.toString(), SESSION_TIMEOUT, new ZKConstructDemo());
//第二種初始化接口增加了兩個參數,為了複用session會話和密碼
zooKeeper = new ZooKeeper(buffer.toString(), SESSION_TIMEOUT, new ZKConstructDemo(),zooKeeper.getSessionId(),zooKeeper.getSessionPasswd());
System.out.println(zooKeeper.getState());
} catch (IOException e) {
e.printStackTrace();
}
try {
countDownLatch.await();
System.out.println("zookeeper session established...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("received watch event = "+watchedEvent);
if(Event.KeeperState.SyncConnected == watchedEvent.getState()){
countDownLatch.countDown();
}
}
}
zookeeper建立API的接口方法
package com.coderman.zookeeper.clusterdemo.version2;
import org.apache.zookeeper.*;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
/**
* @description:
* @author: Fanchunshuai
* @time: 2020/2/10 16:47
*/
/**
* 節點建立API
* zookeeper的節點建立api有兩個重載方法
* 一個是同步調用,一個是異步調用
* 建立節點的參數如下:
* path:需要建立節點的路徑 如/zookeeper/config
* data[]:節點建立後的初始内容
* acl:節點的ACL政策,是List類型,已經通過ZooDefs.Ids類初始化好了
* createMode:建立節點的類型或者模式,是枚舉類型,通過CreateMode枚舉類初始化好了
* cb:回調函數
* ctx:傳遞一個上下文對象,在回調方法執行的時候使用
* <p>
* 需要注意的是:無論是同步接口還是異步接口,zookeeper都不支援遞歸建立,即無法在父節點不存在的情況下建立子節點。
* 如果節點已經存在,那麼建立同名節點将會抛出NodeExitsException異常。
* <p>
* 同步調用示範
*/
public class ZKCreateAPIDemo implements Watcher {
private static StringBuffer buffer = new StringBuffer();
private static CountDownLatch countDownLatch = new CountDownLatch(1);
private static final int SESSION_TIMEOUT = 5000;
protected static ZooKeeper zooKeeper;
static {
buffer.append("192.168.1.224:2184,");
buffer.append("192.168.1.224:2181,");
buffer.append("192.168.1.224:2182,");
buffer.append("192.168.1.224:2183,");
buffer.append("192.168.1.224:2185");
}
public static void main(String[] args) {
//createSyncCall();
createAsyncCall();
}
@Override
public void process(WatchedEvent watchedEvent) {
if (Event.KeeperState.SyncConnected == watchedEvent.getState()) {
countDownLatch.countDown();
}
}
/**
* 同步調用示範
*/
private static void createSyncCall() {
try {
//第一種初始化接口
zooKeeper = new ZooKeeper(buffer.toString(), SESSION_TIMEOUT, new ZKConstructDemo());
//第二種初始化接口增加了兩個參數,為了複用session會話和密碼
zooKeeper = new ZooKeeper(buffer.toString(), SESSION_TIMEOUT, new ZKConstructDemo(), zooKeeper.getSessionId(), zooKeeper.getSessionPasswd());
String path = "/hadoop";
byte[] data = "hadooper".getBytes();
//1.建立一個臨時節點
String response = zooKeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("response = " + response);
String path2 = "/hadoop2";
byte[] data2 = "hadooper2".getBytes();
//2.建立一個臨時順序節點
String response2 = zooKeeper.create(path2, data2, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("response2 = " + response2);
//3.建立一個持久節點
String path3 = "/hadoop3";
byte[] data3 = "hadooper3".getBytes();
String response3 = zooKeeper.create(path3, data3, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("response3 = " + response3);
//4.建立一個持久順序節點
String path4 = "/hadoop4";
byte[] data4 = "hadooper4".getBytes();
String response4 = zooKeeper.create(path4, data4, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
System.out.println("response4 = " + response3);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
try {
countDownLatch.await();
System.out.println("zookeeper session established...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 異步調用示範
*/
private static void createAsyncCall() {
String path = "/hadoopsync";
byte[] data = "hadoopersync".getBytes();
try {
//第一種初始化接口
zooKeeper = new ZooKeeper(buffer.toString(), SESSION_TIMEOUT, new ZKConstructDemo());
//第二種初始化接口增加了兩個參數,為了複用session會話和密碼
zooKeeper = new ZooKeeper(buffer.toString(), SESSION_TIMEOUT, new ZKConstructDemo(), zooKeeper.getSessionId(), zooKeeper.getSessionPasswd());
} catch (IOException e) {
e.printStackTrace();
}
//1.建立一個臨時節點
zooKeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,
new IStringCallBack(), "the context");
//1.建立一個臨時節點
zooKeeper.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
new IStringCallBack(), "the context");
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 字元串回調接口,除了這種類型的回調接口之外
* 如ACLCallback,DataCallback等。
*/
static class IStringCallBack implements AsyncCallback.StringCallback {
/**
* @param rc 響應碼 0:接口調用成功 -4:用戶端和服務端連結已經斷開
* -110:指定節點已經存在。-112:會話已過期
* @param path 接口調用時傳入API的資料節點的路徑參數值
* @param ctx 接口調用時傳入API的ctx參數值
* @param name 實際在服務端建立的節點名
*/
@Override
public void processResult(int rc, String path, Object ctx, String name) {
System.out.println("create path result:["
+ rc + ", " + path + ", " + ctx + ", real path name :" + name);
}
}
}