天天看點

Zookeeper 3.6.X支援持久化watcher節點

公司内部使用zk的watcher功能做一個分布式監聽器,用來做一個資料變更通知,但是出現了資料不同步的問題,檢視了zk相關源碼,發現zk的watcher在觸發一次後,就會從監視隊列中移除,也就是說,watcher是一次性的。

之前為了解決資料變更通知不丢消息的問題,考慮自己來實作一個簡單的watcher機制,然後就考慮去扒一下zk的源碼,稍微改一改,弄個簡易版的watcher滿足我們的需求即可,于是去github上clone了最新的代碼,找到watcher觸發相關的部分。Emmmm?!?! 我們一起來看一哈。

PS:在看Curator相關代碼的時候,發現了關聯的zk的任務–zk增加了持久化遞歸監視

https://issues.apache.org/jira/browse/ZOOKEEPER-1416

首先我切到3.6版本的分支,因為我在伺服器上部署的是3.6.2版本的zk,是以branch_3.6應該是對應的我部署的server版本的代碼。接收請求的流程就不詳細說了,網上很多解析(關鍵是具體我也沒看,直接百度然後找到重點代碼來看的/狗頭),請求最終都會走到這個類裡面

org.apache.zookeeper.server.FinalRequestProcessor

,然後執行這個方法

org.apache.zookeeper.server.FinalRequestProcessor#processRequest

public void processRequest(Request request) {
        LOG.debug("Processing request:: {}", request);

        // request.addRQRec(">final");
        long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
        if (request.type == OpCode.ping) {
            traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
        }
        if (LOG.isTraceEnabled()) {
            ZooTrace.logRequest(LOG, traceMask, 'E', request, "");
        }

        ProcessTxnResult rc = zks.processTxn(request);
		// 以下省略成噸的代碼
}
           

我們重點看一下

zks.processTxn(request);

這個方法。點進去

public ProcessTxnResult processTxn(Request request) {
        TxnHeader hdr = request.getHdr();
        processTxnForSessionEvents(request, hdr, request.getTxn());

        final boolean writeRequest = (hdr != null);
        final boolean quorumRequest = request.isQuorum();

        // return fast w/o synchronization when we get a read
        if (!writeRequest && !quorumRequest) {
            return new ProcessTxnResult();
        }
        synchronized (outstandingChanges) {
            ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest());

            // request.hdr is set for write requests, which are the only ones
            // that add to outstandingChanges.
            if (writeRequest) {
                long zxid = hdr.getZxid();
                while (!outstandingChanges.isEmpty()
                        && outstandingChanges.peek().zxid <= zxid) {
                    ChangeRecord cr = outstandingChanges.remove();
                    ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1);
                    if (cr.zxid < zxid) {
                        LOG.warn(
                            "Zxid outstanding 0x{} is less than current 0x{}",
                            Long.toHexString(cr.zxid),
                            Long.toHexString(zxid));
                    }
                    if (outstandingChangesForPath.get(cr.path) == cr) {
                        outstandingChangesForPath.remove(cr.path);
                    }
                }
            }

            // do not add non quorum packets to the queue.
            if (quorumRequest) {
                getZKDatabase().addCommittedProposal(request);
            }
            return rc;
        }
    }
           

其中同步塊中,

ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest());

傳回的類叫XXXResult,而方法名是processXXXInDB,而且入參是吧請求reqeust的一些東西丢了進去,應該是處理請求并得到結果,點進去可以看到都是參數傳遞,直接調用了下一層的方法,直到

org.apache.zookeeper.server.DataTree#processTxn(TxnHeader, org.apache.jute.Record, boolean)

public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) {
        ProcessTxnResult rc = new ProcessTxnResult();

        try {
            rc.clientId = header.getClientId();
            rc.cxid = header.getCxid();
            rc.zxid = header.getZxid();
            rc.type = header.getType();
            rc.err = 0;
            rc.multiResult = null;
            switch (header.getType()) {
            case OpCode.create:
                CreateTxn createTxn = (CreateTxn) txn;
                rc.path = createTxn.getPath();
                createNode(
                    createTxn.getPath(),
                    createTxn.getData(),
                    createTxn.getAcl(),
                    createTxn.getEphemeral() ? header.getClientId() : 0,
                    createTxn.getParentCVersion(),
                    header.getZxid(),
                    header.getTime(),
                    null);
                break;
            case OpCode.create2:
                CreateTxn create2Txn = (CreateTxn) txn;
                rc.path = create2Txn.getPath();
                Stat stat = new Stat();
                createNode(
                    create2Txn.getPath(),
                    create2Txn.getData(),
                    create2Txn.getAcl(),
                    create2Txn.getEphemeral() ? header.getClientId() : 0,
                    create2Txn.getParentCVersion(),
                    header.getZxid(),
                    header.getTime(),
                    stat);
                rc.stat = stat;
                break;
            case OpCode.createTTL:
                CreateTTLTxn createTtlTxn = (CreateTTLTxn) txn;
                rc.path = createTtlTxn.getPath();
                stat = new Stat();
                createNode(
                    createTtlTxn.getPath(),
                    createTtlTxn.getData(),
                    createTtlTxn.getAcl(),
                    EphemeralType.TTL.toEphemeralOwner(createTtlTxn.getTtl()),
                    createTtlTxn.getParentCVersion(),
                    header.getZxid(),
                    header.getTime(),
                    stat);
                rc.stat = stat;
                break;
            case OpCode.createContainer:
                CreateContainerTxn createContainerTxn = (CreateContainerTxn) txn;
                rc.path = createContainerTxn.getPath();
                stat = new Stat();
                createNode(
                    createContainerTxn.getPath(),
                    createContainerTxn.getData(),
                    createContainerTxn.getAcl(),
                    EphemeralType.CONTAINER_EPHEMERAL_OWNER,
                    createContainerTxn.getParentCVersion(),
                    header.getZxid(),
                    header.getTime(),
                    stat);
                rc.stat = stat;
                break;
            case OpCode.delete:
            case OpCode.deleteContainer:
                DeleteTxn deleteTxn = (DeleteTxn) txn;
                rc.path = deleteTxn.getPath();
                deleteNode(deleteTxn.getPath(), header.getZxid());
                break;
            case OpCode.reconfig:
            case OpCode.setData:
                SetDataTxn setDataTxn = (SetDataTxn) txn;
                rc.path = setDataTxn.getPath();
                rc.stat = setData(
                    setDataTxn.getPath(),
                    setDataTxn.getData(),
                    setDataTxn.getVersion(),
                    header.getZxid(),
                    header.getTime());
                break;
            case OpCode.setACL:
                SetACLTxn setACLTxn = (SetACLTxn) txn;
                rc.path = setACLTxn.getPath();
                rc.stat = setACL(setACLTxn.getPath(), setACLTxn.getAcl(), setACLTxn.getVersion());
                break;
            case OpCode.closeSession:
                long sessionId = header.getClientId();
                if (txn != null) {
                    killSession(sessionId, header.getZxid(),
                            ephemerals.remove(sessionId),
                            ((CloseSessionTxn) txn).getPaths2Delete());
                } else {
                    killSession(sessionId, header.getZxid());
                }
                break;
            case OpCode.error:
                ErrorTxn errTxn = (ErrorTxn) txn;
                rc.err = errTxn.getErr();
                break;
            case OpCode.check:
                CheckVersionTxn checkTxn = (CheckVersionTxn) txn;
                rc.path = checkTxn.getPath();
                break;
            case OpCode.multi:
                MultiTxn multiTxn = (MultiTxn) txn;
                List<Txn> txns = multiTxn.getTxns();
                rc.multiResult = new ArrayList<ProcessTxnResult>();
                boolean failed = false;
                for (Txn subtxn : txns) {
                    if (subtxn.getType() == OpCode.error) {
                        failed = true;
                        break;
                    }
                }

                boolean post_failed = false;
                for (Txn subtxn : txns) {
                    ByteBuffer bb = ByteBuffer.wrap(subtxn.getData());
                    Record record = null;
                    switch (subtxn.getType()) {
                    case OpCode.create:
                        record = new CreateTxn();
                        break;
                    case OpCode.createTTL:
                        record = new CreateTTLTxn();
                        break;
                    case OpCode.createContainer:
                        record = new CreateContainerTxn();
                        break;
                    case OpCode.delete:
                    case OpCode.deleteContainer:
                        record = new DeleteTxn();
                        break;
                    case OpCode.setData:
                        record = new SetDataTxn();
                        break;
                    case OpCode.error:
                        record = new ErrorTxn();
                        post_failed = true;
                        break;
                    case OpCode.check:
                        record = new CheckVersionTxn();
                        break;
                    default:
                        throw new IOException("Invalid type of op: " + subtxn.getType());
                    }
                    assert (record != null);

                    ByteBufferInputStream.byteBuffer2Record(bb, record);

                    if (failed && subtxn.getType() != OpCode.error) {
                        int ec = post_failed ? Code.RUNTIMEINCONSISTENCY.intValue() : Code.OK.intValue();

                        subtxn.setType(OpCode.error);
                        record = new ErrorTxn(ec);
                    }

                    assert !failed || (subtxn.getType() == OpCode.error);

                    TxnHeader subHdr = new TxnHeader(
                        header.getClientId(),
                        header.getCxid(),
                        header.getZxid(),
                        header.getTime(),
                        subtxn.getType());
                    ProcessTxnResult subRc = processTxn(subHdr, record, true);
                    rc.multiResult.add(subRc);
                    if (subRc.err != 0 && rc.err == 0) {
                        rc.err = subRc.err;
                    }
                }
                break;
            }
        } catch (KeeperException e) {
            LOG.debug("Failed: {}:{}", header, txn, e);
            rc.err = e.code().intValue();
        } catch (IOException e) {
            LOG.debug("Failed: {}:{}", header, txn, e);
        }

        /*
         * Snapshots are taken lazily. When serializing a node, it's data
         * and children copied in a synchronization block on that node,
         * which means newly created node won't be in the snapshot, so
         * we won't have mismatched cversion and pzxid when replaying the
         * createNode txn.
         *
         * But there is a tricky scenario that if the child is deleted due
         * to session close and re-created in a different global session
         * after that the parent is serialized, then when replay the txn
         * because the node is belonging to a different session, replay the
         * closeSession txn won't delete it anymore, and we'll get NODEEXISTS
         * error when replay the createNode txn. In this case, we need to
         * update the cversion and pzxid to the new value.
         *
         * Note, such failures on DT should be seen only during
         * restore.
         */
        if (header.getType() == OpCode.create && rc.err == Code.NODEEXISTS.intValue()) {
            LOG.debug("Adjusting parent cversion for Txn: {} path: {} err: {}", header.getType(), rc.path, rc.err);
            int lastSlash = rc.path.lastIndexOf('/');
            String parentName = rc.path.substring(0, lastSlash);
            CreateTxn cTxn = (CreateTxn) txn;
            try {
                setCversionPzxid(parentName, cTxn.getParentCVersion(), header.getZxid());
            } catch (KeeperException.NoNodeException e) {
                LOG.error("Failed to set parent cversion for: {}", parentName, e);
                rc.err = e.code().intValue();
            }
        } else if (rc.err != Code.OK.intValue()) {
            LOG.debug("Ignoring processTxn failure hdr: {} : error: {}", header.getType(), rc.err);
        }

        /*
         * Things we can only update after the whole txn is applied to data
         * tree.
         *
         * If we update the lastProcessedZxid with the first sub txn in multi
         * and there is a snapshot in progress, it's possible that the zxid
         * associated with the snapshot only include partial of the multi op.
         *
         * When loading snapshot, it will only load the txns after the zxid
         * associated with snapshot file, which could cause data inconsistency
         * due to missing sub txns.
         *
         * To avoid this, we only update the lastProcessedZxid when the whole
         * multi-op txn is applied to DataTree.
         */
        if (!isSubTxn) {
            /*
             * A snapshot might be in progress while we are modifying the data
             * tree. If we set lastProcessedZxid prior to making corresponding
             * change to the tree, then the zxid associated with the snapshot
             * file will be ahead of its contents. Thus, while restoring from
             * the snapshot, the restore method will not apply the transaction
             * for zxid associated with the snapshot file, since the restore
             * method assumes that transaction to be present in the snapshot.
             *
             * To avoid this, we first apply the transaction and then modify
             * lastProcessedZxid.  During restore, we correctly handle the
             * case where the snapshot contains data ahead of the zxid associated
             * with the file.
             */
            if (rc.zxid > lastProcessedZxid) {
                lastProcessedZxid = rc.zxid;
            }

            if (digestFromLoadedSnapshot != null) {
                compareSnapshotDigests(rc.zxid);
            } else {
                // only start recording digest when we're not in fuzzy state
                logZxidDigest(rc.zxid, getTreeDigest());
            }
        }

        return rc;
    }
           

這裡忽略成噸的代碼,直接看到有個switch判斷,下面的case判斷是個枚舉,從名字看出來應該是用戶端請求server來進行什麼資料操作,比如建立,删除之類的,因為我們的需求是資料變更來進行監聽,是以我們找到setData相對應的case,

case OpCode.setData:
                SetDataTxn setDataTxn = (SetDataTxn) txn;
                rc.path = setDataTxn.getPath();
                rc.stat = setData(
                    setDataTxn.getPath(),
                    setDataTxn.getData(),
                    setDataTxn.getVersion(),
                    header.getZxid(),
                    header.getTime());
                break;
           

這裡隻有一個setData方法,我們點進去

public Stat setData(String path, byte[] data, int version, long zxid, long time) throws KeeperException.NoNodeException {
        Stat s = new Stat();
        DataNode n = nodes.get(path);
        if (n == null) {
            throw new KeeperException.NoNodeException();
        }
        byte[] lastdata = null;
        synchronized (n) {
            lastdata = n.data;
            nodes.preChange(path, n);
            n.data = data;
            n.stat.setMtime(time);
            n.stat.setMzxid(zxid);
            n.stat.setVersion(version);
            n.copyStat(s);
            nodes.postChange(path, n);
        }
        // now update if the path is in a quota subtree.
        String lastPrefix = getMaxPrefixWithQuota(path);
        long dataBytes = data == null ? 0 : data.length;
        if (lastPrefix != null) {
            this.updateCountBytes(lastPrefix, dataBytes - (lastdata == null ? 0 : lastdata.length), 0);
        }
        nodeDataSize.addAndGet(getNodeSize(path, data) - getNodeSize(path, lastdata));

        updateWriteStat(path, dataBytes);
        dataWatches.triggerWatch(path, EventType.NodeDataChanged);
        return s;
    }
           

看到最後我們終于發現,跟watcher有關系的代碼

dataWatches.triggerWatch(path, EventType.NodeDataChanged);

,看來是資料處理完了,要觸發watcher了,點進去,直接到這裡

@Override
    public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet supress) {
        WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
        Set<Watcher> watchers = new HashSet<>();
        PathParentIterator pathParentIterator = getPathParentIterator(path);
        synchronized (this) {
            for (String localPath : pathParentIterator.asIterable()) {
                Set<Watcher> thisWatchers = watchTable.get(localPath);
                if (thisWatchers == null || thisWatchers.isEmpty()) {
                    continue;
                }
                Iterator<Watcher> iterator = thisWatchers.iterator();
                while (iterator.hasNext()) {
                    Watcher watcher = iterator.next();
                    WatcherMode watcherMode = watcherModeManager.getWatcherMode(watcher, localPath);
                    if (watcherMode.isRecursive()) {
                        if (type != EventType.NodeChildrenChanged) {
                            watchers.add(watcher);
                        }
                    } else if (!pathParentIterator.atParentPath()) {
                        watchers.add(watcher);
                        if (!watcherMode.isPersistent()) {
                            iterator.remove();
                            Set<String> paths = watch2Paths.get(watcher);
                            if (paths != null) {
                                paths.remove(localPath);
                            }
                        }
                    }
                }
                if (thisWatchers.isEmpty()) {
                    watchTable.remove(localPath);
                }
            }
        }
        if (watchers.isEmpty()) {
            if (LOG.isTraceEnabled()) {
                ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "No watchers for " + path);
            }
            return null;
        }

        for (Watcher w : watchers) {
            if (supress != null && supress.contains(w)) {
                continue;
            }
            w.process(e);
        }

        switch (type) {
            case NodeCreated:
                ServerMetrics.getMetrics().NODE_CREATED_WATCHER.add(watchers.size());
                break;

            case NodeDeleted:
                ServerMetrics.getMetrics().NODE_DELETED_WATCHER.add(watchers.size());
                break;

            case NodeDataChanged:
                ServerMetrics.getMetrics().NODE_CHANGED_WATCHER.add(watchers.size());
                break;

            case NodeChildrenChanged:
                ServerMetrics.getMetrics().NODE_CHILDREN_WATCHER.add(watchers.size());
                break;
            default:
                // Other types not logged.
                break;
        }

        return new WatcherOrBitSet(watchers);
    }
           

這裡這個for循環周遊了所有path的父路徑,将watcher添加到一個新建立的watcherSet中,然後周遊這個set觸發每個watcher。我們可以看到,這裡是從watchTable裡面get到watcher的集合,并沒有像老版本那樣直接移除,周遊完之後判斷如果get到的watcher set集合為空才會從watchTable中remove掉

if (thisWatchers.isEmpty()) {
   watchTable.remove(localPath);
}
           

是以從代碼層面來看,這個watcher并不是一次性的了。之後用curator5.x來測試新的zk監聽。

PS:curator4.x對應zookeeper 3.5.x,curator5.x對應的zookeeper 3.6.x

public static void main(String[] args) throws Exception {
		String address = ”zkAddress“;
		CuratorFramework client = CuratorFrameworkFactory.newClient(address, new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.watchers().add()
//                .withMode(AddWatchMode.PERSISTENT_RECURSIVE)
//                .usingWatcher(new Watcher() {
//                    @Override
//                    public void process(WatchedEvent watchedEvent) {
//                        System.out.println("receive event:"+JSON.toJSONString(watchedEvent));
//                    }
//                })
                .usingWatcher(new CuratorWatcher() {
                    @Override
                    public void process(WatchedEvent event) throws Exception {
                        System.out.println("receive event:" + JSON.toJSONString(event));

                    }
                })
                .forPath("/test");
           

這是一個簡單的client測試監聽的代碼。先把監聽注冊上,然後另起一個服務多個線程并行修改監聽節點的值,這邊接收到的實際數量跟修改次數相同。進而解決了之前watcher一次性而導緻的丢消息問題。但是目前這個方法中的process無法擷取節點最新的值,可能有别的方法我沒找到,後續再補充吧