上一章中我們讨論了zkcli的普通請求流程,這次我們使用 get -w / 指令,看看上一章被忽略的流程
1、zkWatchManager
首先我們回顧一下zkcli連接配接的建立,最終的io都是通過clientCnxn對象完成,下面是它的構造函數:
public ClientCnxn(
String chrootPath,
HostProvider hostProvider,
int sessionTimeout,
ZKClientConfig clientConfig,
Watcher defaultWatcher,
ClientCnxnSocket clientCnxnSocket,
long sessionId,
byte[] sessionPasswd,
boolean canBeReadOnly
) throws IOException {
this.chrootPath = chrootPath;
this.hostProvider = hostProvider;
this.sessionTimeout = sessionTimeout;
this.clientConfig = clientConfig;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
this.readOnly = canBeReadOnly;
this.watchManager = new ZKWatchManager(
clientConfig.getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET),
defaultWatcher);
this.connectTimeout = sessionTimeout / hostProvider.size();
this.readTimeout = sessionTimeout * 2 / 3;
this.sendThread = new SendThread(clientCnxnSocket);
this.eventThread = new EventThread();
initRequestTimeout();
}
其中會注冊一個WatchManager,這個類主要是管理不同監聽事件與處理邏輯,預設的Watcher為傳入的defaultWatcher,其是下面這個預設實作
private class MyWatcher implements Watcher {
public void process(WatchedEvent event) {
if (getPrintWatches()) {
ZooKeeperMain.printMessage("WATCHER::");
ZooKeeperMain.printMessage(event.toString());
}
if (connectLatch != null) {
// connection success
if (event.getType() == Event.EventType.None
&& event.getState() == Event.KeeperState.SyncConnected) {
connectLatch.countDown();
}
}
}
}
2、發送流程注冊Watch
這裡采用 getData做測試,進入zookeeper對象發包流程
public byte[] getData(final String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException {
final String clientPath = path;
PathUtils.validatePath(clientPath);
// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
//若有-w标志則會建立一個watchRegistration對象--标志監聽類型、路徑與watcher
wcb = new DataWatchRegistration(watcher, clientPath);
}
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.getData);
GetDataRequest request = new GetDataRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
GetDataResponse response = new GetDataResponse();
//組裝請求包
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath);
}
if (stat != null) {
DataTree.copyStat(response.getStat(), stat);
}
return response.getData();
}
//組包的構造調用,可以看到最終,該監聽與packet進行了綁定
Packet packet = queuePacket(
h,
r,
request,
response,
null,
null,
null,
null,
watchRegistration,
watchDeregistration);
根據上一章發送流程,我們現在去異步讀包的地方:
protected void finishPacket(Packet p) {
int err = p.replyHeader.getErr();
if (p.watchRegistration != null) {
//若packet收包有wacther則将其添加進WatchManager
p.watchRegistration.register(err);
}
...
在這裡就完成了用戶端監聽事件的注冊流程。
3、接收流程回調Watcher
而消息讀取之後會生成對應的event來異步執行,對于watcher事件為以下邏輯
private void processEvent(Object event) {
try {
if (event instanceof WatcherSetEventPair) {
// each watcher will process the event
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
try {
//這裡回調了注冊的函數,預設的為上面MyWatch實作--列印消息
watcher.process(pair.event);
} catch (Throwable t) {
LOG.error("Error while calling watcher.", t);
}
}
...
4、流程總結
可以看到zkcli的watch邏輯還是比較簡單,總的來講就是觀察者模式的實作,為不同處理事件注冊對應的回調函數,當服務端觸發事件發送消息,用戶端接收到消息進行回調處理。
到這裡,zkcli的主要邏輯我們已經分析完全,下一章我們開始進行單機版zkserver的學習。