package com.test.com.test.zookeeper;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.List;
/**
* @author Jane E-mail:[email protected]
* @version Zookeeper用戶端示例及源碼分析 建立時間:2018/10/3
*/
/**
*
* 本内容完全是關于zk的用戶端,其中例子的運作需要在本地啟動一個zkServer叢集。
*
* zk 建議全部通過異步機制進行回調處理來提高性能,進而無需捕獲異常(隻需根據狀态碼進行相應處理)。
* zk通過單線程(eventThread)處理所有的回調通知(異步回調,監聽回調),是以在回調處理過多的情況下,可能造成較高的延遲。
*
* Zookeeper類提供了一系列與zkServer互動的方法,并關聯了ClientCnxn用戶端,此用戶端打開一個與zkServer的會話,并關聯sendThread和eventThread用于對發送請求和回調事件進行處理,
* ClientCnxnSocketNIO通過NIO的方式對socket進行處理。
*
* eventThread負責從其維護的阻塞隊列中擷取event,如果是DeathEvent則關閉此線程,否則根據事件類型進行解析,調用回調結構的不同重載(事件監聽回調,異步通知回調)。各種回調都被封裝成實作Event接口的對象
* sendThread負責與zkServer的連接配接,并負責心跳和重連。然後調用doTransport()方法。異步的方式,用戶端将回調處理器包裝成一個Packet對象,并将該對象加入outgoing隊列中。
* doTransport()方法通過NIO select擷取一次觸發的所有事件。
* 如果channel可讀,則從讀buffer中讀取内容,然後調用sendThread的readResponse方法,然後根據擷取消息的響應頭執行不同處理
* Xid為-2:心跳響應
* Xid為-4:認證授權
* Xid為-1:監聽事件通知,讀取事件的response,并将其送出到eventThread的隊列中(将回調相應Watcher接口的實作處理器)。
* 為sasl:擷取token然後發送給服務端
* 為回調通知:用戶端認為除以上所有情況外,都屬于回調的通知。從pending隊列中取出packet進行以下處理
* 1. 如果取出的Xid與服務端響應的Xid不一緻,則認為發送連接配接丢失。
* 2. 将服務端傳回的消息,根據協定填回packet字段中
* 3. 如果沒有回調接口則notifyAll,如果有回調則将packet加入eventThread的阻塞隊列中。
*
* 如果channel可寫,則從outgoing隊列中取出一個Packet對象,将buffer内容寫入socket中,然後将此packet對象交與pending隊列中。
*
* <p>
* zk用戶端提供了重連機制,如果心跳檢測網絡斷開,會一直嘗試重新連接配接(連接配接原伺服器,或者叢集中的其他伺服器),直到用戶端程序被殺死。
* <p>
* zk用戶端重新連接配接,但是服務端已經将此會話置為無效,則必須重新new Zookeeper句柄,與服務端開啟一個新的會話。
* 優雅的關閉與服務端的會話,需要調用 Zookeeper.close();
* <p>
* 對于外部資源的通路,需要使用隔離符(czxid)使用者接受最新的會話的請求,如果Czxid落後則不允許通路。
*/
public class Master implements Watcher { // Watcher接口實作一個監聽,在用戶端啟動後回調
private ZooKeeper zk;
private String hostPort;
Master(String hostPort) {
this.hostPort = hostPort;
}
void startZK() throws IOException {
zk = new ZooKeeper(hostPort, 15000, this);
}
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("收到監聽通知:" + watchedEvent);
}
/**
* 同步調用建立znode
* 必須要自己解決受檢異常
* @param path
* @param value
*/
public void createZnode(String path, String value) {
try {
String str = this.zk.create(path, value.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println(str);
} catch (KeeperException e) {
System.out.println("check一下,看那個程序建立了子節點");
System.out.println(String.valueOf(getData(path)));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 異步的方式建立Znode.
* CreateMode.EPHEMERAL隻會建立臨時節點 用戶端會話關閉自動删除
* CreateMode.PERSISTENT會建立持久化節點
* 在回調進行中,根據異常傳回碼進行相應處理
* @param path
* @param value
*/
public void createZnodeAsync(String path, String value) {
this.zk.create(path, value.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new AsyncCallback.StringCallback() {
@Override
public void processResult(int i, String s, Object o, String s1) {
System.out.println(i + " " + s + " " + o + " " + s1);
}
}, new Object());
}
/**
* 擷取某個節點的資料,即建立時設定的值
*
* @param path
*
* @return
*/
public String getData(String path) {
byte[] result = null;
try {
result = zk.getData(path, false, new Stat());
} catch (KeeperException e) {
System.out.println("繼續check或者報警");
} catch (InterruptedException e) {
e.printStackTrace();
}
return String.valueOf(result);
}
/**
* 查詢某個節點是否存在,如果存在則設定删除節點監聽。
* 帶Watcher監聽器 和 異步回調處理器
* @param path
*/
public void exist(String path) {
Stat s = new Stat();
zk.exists(path, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("設定監視節點删除事件回調");
}
}, new AsyncCallback.StatCallback() {
@Override
public void processResult(int i, String s, Object o, Stat stat) {
System.out.println("exist回調處理器");
// 根據不同的傳回異常碼,做相應的處理,對于連接配接丢失事件需要重試。
switch (KeeperException.Code.get(i)) {
case OK:
System.out.println("處理成功");
break;
case NOTEMPTY:
System.out.println("已經存在");
break;
case CONNECTIONLOSS:
System.out.println("連接配接丢失,可能發送時丢失,可能接收時丢失,建議重試");
break;
}
}
}, s);
}
/**
* 擷取某個路徑下的子節點,并設定監聽,如果子節點變化,則回調處理器
*
* @param path
*/
public void getChild(String path) {
zk.getChildren(path, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("getChild設定監聽,如果子節點變動,則回調此處理器");
}
}, new AsyncCallback.ChildrenCallback() {
@Override
public void processResult(int i, String s, Object o, List<String> list) {
System.out.println("getChild傳回結果回調");
switch (KeeperException.Code.get(i)) {
case OK:
System.out.println("處理成功");
for (String str : list) {
System.out.println(str);
}
}
}
}, new Object());
}
public static void main(String args[]) throws Exception {
Master m = new Master("127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184");
// 建立句柄,并與zookeeper Server建立會話連接配接
// 這個建立會話也是異步執行的
m.startZK();
//m.createZnode("/master", "hello world");
//m.createZnodeAsync("/slave","hello world");
// 如果存在則在回調中處理,并設定監聽,當伺服器删除節點時候收到回調通知。
//m.exist("/slave");
System.out.println(m.getData("/slave"));
// 擷取某個路徑下所有子節點,并設定監聽,如果子節點更改,則回調監聽事件。
m.getChild("/slave");
Thread.sleep(60000);
m.closeZK();
}
public void closeZK() {
try {
this.zk.close();
} catch (InterruptedException e) {
System.out.println("關閉zk失敗");
}
}
}