天天看點

ZooKeeper源碼閱讀(三)---用戶端watch機制

上一章中我們讨論了zkcli的普通請求流程,這次我們使用 get -w / 指令,看看上一章被忽略的流程

 1、zkWatchManager

        首先我們回顧一下zkcli連接配接的建立,最終的io都是通過clientCnxn對象完成,下面是它的構造函數:

public ClientCnxn(
        String chrootPath,
        HostProvider hostProvider,
        int sessionTimeout,
        ZKClientConfig clientConfig,
        Watcher defaultWatcher,
        ClientCnxnSocket clientCnxnSocket,
        long sessionId,
        byte[] sessionPasswd,
        boolean canBeReadOnly
    ) throws IOException {
        this.chrootPath = chrootPath;
        this.hostProvider = hostProvider;
        this.sessionTimeout = sessionTimeout;
        this.clientConfig = clientConfig;
        this.sessionId = sessionId;
        this.sessionPasswd = sessionPasswd;
        this.readOnly = canBeReadOnly;

        this.watchManager = new ZKWatchManager(
                clientConfig.getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET),
                defaultWatcher);

        this.connectTimeout = sessionTimeout / hostProvider.size();
        this.readTimeout = sessionTimeout * 2 / 3;

        this.sendThread = new SendThread(clientCnxnSocket);
        this.eventThread = new EventThread();
        initRequestTimeout();
    }
           

其中會注冊一個WatchManager,這個類主要是管理不同監聽事件與處理邏輯,預設的Watcher為傳入的defaultWatcher,其是下面這個預設實作

private class MyWatcher implements Watcher {

        public void process(WatchedEvent event) {
            if (getPrintWatches()) {
                ZooKeeperMain.printMessage("WATCHER::");
                ZooKeeperMain.printMessage(event.toString());
            }
            if (connectLatch != null) {
                // connection success
                if (event.getType() == Event.EventType.None
                    && event.getState() == Event.KeeperState.SyncConnected) {
                    connectLatch.countDown();
                }
            }
        }

    }
           

2、發送流程注冊Watch

這裡采用 getData做測試,進入zookeeper對象發包流程

public byte[] getData(final String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException {
        final String clientPath = path;
        PathUtils.validatePath(clientPath);

        // the watch contains the un-chroot path
        
        WatchRegistration wcb = null;
        if (watcher != null) {
            //若有-w标志則會建立一個watchRegistration對象--标志監聽類型、路徑與watcher
            wcb = new DataWatchRegistration(watcher, clientPath);
        }

        final String serverPath = prependChroot(clientPath);

        RequestHeader h = new RequestHeader();
        h.setType(ZooDefs.OpCode.getData);
        GetDataRequest request = new GetDataRequest();
        request.setPath(serverPath);
        request.setWatch(watcher != null);
        GetDataResponse response = new GetDataResponse();
        //組裝請求包
        ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
        if (r.getErr() != 0) {
            throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath);
        }
        if (stat != null) {
            DataTree.copyStat(response.getStat(), stat);
        }
        return response.getData();
    }
//組包的構造調用,可以看到最終,該監聽與packet進行了綁定
        Packet packet = queuePacket(
            h,
            r,
            request,
            response,
            null,
            null,
            null,
            null,
            watchRegistration,
            watchDeregistration);


           

 根據上一章發送流程,我們現在去異步讀包的地方:

protected void finishPacket(Packet p) {
        int err = p.replyHeader.getErr();
        if (p.watchRegistration != null) {
            //若packet收包有wacther則将其添加進WatchManager
            p.watchRegistration.register(err);
        }
   ...
           

在這裡就完成了用戶端監聽事件的注冊流程。

3、接收流程回調Watcher

而消息讀取之後會生成對應的event來異步執行,對于watcher事件為以下邏輯

private void processEvent(Object event) {
            try {
                if (event instanceof WatcherSetEventPair) {
                    // each watcher will process the event
                    WatcherSetEventPair pair = (WatcherSetEventPair) event;
                    for (Watcher watcher : pair.watchers) {
                        try {
                            //這裡回調了注冊的函數,預設的為上面MyWatch實作--列印消息
                            watcher.process(pair.event);
                        } catch (Throwable t) {
                            LOG.error("Error while calling watcher.", t);
                        }
                    }
        ...
           

4、流程總結

可以看到zkcli的watch邏輯還是比較簡單,總的來講就是觀察者模式的實作,為不同處理事件注冊對應的回調函數,當服務端觸發事件發送消息,用戶端接收到消息進行回調處理。

到這裡,zkcli的主要邏輯我們已經分析完全,下一章我們開始進行單機版zkserver的學習。