公司内部使用zk的watcher功能做一個分布式監聽器,用來做一個資料變更通知,但是出現了資料不同步的問題,檢視了zk相關源碼,發現zk的watcher在觸發一次後,就會從監視隊列中移除,也就是說,watcher是一次性的。
之前為了解決資料變更通知不丢消息的問題,考慮自己來實作一個簡單的watcher機制,然後就考慮去扒一下zk的源碼,稍微改一改,弄個簡易版的watcher滿足我們的需求即可,于是去github上clone了最新的代碼,找到watcher觸發相關的部分。Emmmm?!?! 我們一起來看一哈。
PS:在看Curator相關代碼的時候,發現了關聯的zk的任務–zk增加了持久化遞歸監視
https://issues.apache.org/jira/browse/ZOOKEEPER-1416
首先我切到3.6版本的分支,因為我在伺服器上部署的是3.6.2版本的zk,是以branch_3.6應該是對應的我部署的server版本的代碼。接收請求的流程就不詳細說了,網上很多解析(關鍵是具體我也沒看,直接百度然後找到重點代碼來看的/狗頭),請求最終都會走到這個類裡面
org.apache.zookeeper.server.FinalRequestProcessor
,然後執行這個方法
org.apache.zookeeper.server.FinalRequestProcessor#processRequest
。
public void processRequest(Request request) {
LOG.debug("Processing request:: {}", request);
// request.addRQRec(">final");
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
if (request.type == OpCode.ping) {
traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
}
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, traceMask, 'E', request, "");
}
ProcessTxnResult rc = zks.processTxn(request);
// 以下省略成噸的代碼
}
我們重點看一下
zks.processTxn(request);
這個方法。點進去
public ProcessTxnResult processTxn(Request request) {
TxnHeader hdr = request.getHdr();
processTxnForSessionEvents(request, hdr, request.getTxn());
final boolean writeRequest = (hdr != null);
final boolean quorumRequest = request.isQuorum();
// return fast w/o synchronization when we get a read
if (!writeRequest && !quorumRequest) {
return new ProcessTxnResult();
}
synchronized (outstandingChanges) {
ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest());
// request.hdr is set for write requests, which are the only ones
// that add to outstandingChanges.
if (writeRequest) {
long zxid = hdr.getZxid();
while (!outstandingChanges.isEmpty()
&& outstandingChanges.peek().zxid <= zxid) {
ChangeRecord cr = outstandingChanges.remove();
ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1);
if (cr.zxid < zxid) {
LOG.warn(
"Zxid outstanding 0x{} is less than current 0x{}",
Long.toHexString(cr.zxid),
Long.toHexString(zxid));
}
if (outstandingChangesForPath.get(cr.path) == cr) {
outstandingChangesForPath.remove(cr.path);
}
}
}
// do not add non quorum packets to the queue.
if (quorumRequest) {
getZKDatabase().addCommittedProposal(request);
}
return rc;
}
}
其中同步塊中,
ProcessTxnResult rc = processTxnInDB(hdr, request.getTxn(), request.getTxnDigest());
傳回的類叫XXXResult,而方法名是processXXXInDB,而且入參是吧請求reqeust的一些東西丢了進去,應該是處理請求并得到結果,點進去可以看到都是參數傳遞,直接調用了下一層的方法,直到
org.apache.zookeeper.server.DataTree#processTxn(TxnHeader, org.apache.jute.Record, boolean)
public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) {
ProcessTxnResult rc = new ProcessTxnResult();
try {
rc.clientId = header.getClientId();
rc.cxid = header.getCxid();
rc.zxid = header.getZxid();
rc.type = header.getType();
rc.err = 0;
rc.multiResult = null;
switch (header.getType()) {
case OpCode.create:
CreateTxn createTxn = (CreateTxn) txn;
rc.path = createTxn.getPath();
createNode(
createTxn.getPath(),
createTxn.getData(),
createTxn.getAcl(),
createTxn.getEphemeral() ? header.getClientId() : 0,
createTxn.getParentCVersion(),
header.getZxid(),
header.getTime(),
null);
break;
case OpCode.create2:
CreateTxn create2Txn = (CreateTxn) txn;
rc.path = create2Txn.getPath();
Stat stat = new Stat();
createNode(
create2Txn.getPath(),
create2Txn.getData(),
create2Txn.getAcl(),
create2Txn.getEphemeral() ? header.getClientId() : 0,
create2Txn.getParentCVersion(),
header.getZxid(),
header.getTime(),
stat);
rc.stat = stat;
break;
case OpCode.createTTL:
CreateTTLTxn createTtlTxn = (CreateTTLTxn) txn;
rc.path = createTtlTxn.getPath();
stat = new Stat();
createNode(
createTtlTxn.getPath(),
createTtlTxn.getData(),
createTtlTxn.getAcl(),
EphemeralType.TTL.toEphemeralOwner(createTtlTxn.getTtl()),
createTtlTxn.getParentCVersion(),
header.getZxid(),
header.getTime(),
stat);
rc.stat = stat;
break;
case OpCode.createContainer:
CreateContainerTxn createContainerTxn = (CreateContainerTxn) txn;
rc.path = createContainerTxn.getPath();
stat = new Stat();
createNode(
createContainerTxn.getPath(),
createContainerTxn.getData(),
createContainerTxn.getAcl(),
EphemeralType.CONTAINER_EPHEMERAL_OWNER,
createContainerTxn.getParentCVersion(),
header.getZxid(),
header.getTime(),
stat);
rc.stat = stat;
break;
case OpCode.delete:
case OpCode.deleteContainer:
DeleteTxn deleteTxn = (DeleteTxn) txn;
rc.path = deleteTxn.getPath();
deleteNode(deleteTxn.getPath(), header.getZxid());
break;
case OpCode.reconfig:
case OpCode.setData:
SetDataTxn setDataTxn = (SetDataTxn) txn;
rc.path = setDataTxn.getPath();
rc.stat = setData(
setDataTxn.getPath(),
setDataTxn.getData(),
setDataTxn.getVersion(),
header.getZxid(),
header.getTime());
break;
case OpCode.setACL:
SetACLTxn setACLTxn = (SetACLTxn) txn;
rc.path = setACLTxn.getPath();
rc.stat = setACL(setACLTxn.getPath(), setACLTxn.getAcl(), setACLTxn.getVersion());
break;
case OpCode.closeSession:
long sessionId = header.getClientId();
if (txn != null) {
killSession(sessionId, header.getZxid(),
ephemerals.remove(sessionId),
((CloseSessionTxn) txn).getPaths2Delete());
} else {
killSession(sessionId, header.getZxid());
}
break;
case OpCode.error:
ErrorTxn errTxn = (ErrorTxn) txn;
rc.err = errTxn.getErr();
break;
case OpCode.check:
CheckVersionTxn checkTxn = (CheckVersionTxn) txn;
rc.path = checkTxn.getPath();
break;
case OpCode.multi:
MultiTxn multiTxn = (MultiTxn) txn;
List<Txn> txns = multiTxn.getTxns();
rc.multiResult = new ArrayList<ProcessTxnResult>();
boolean failed = false;
for (Txn subtxn : txns) {
if (subtxn.getType() == OpCode.error) {
failed = true;
break;
}
}
boolean post_failed = false;
for (Txn subtxn : txns) {
ByteBuffer bb = ByteBuffer.wrap(subtxn.getData());
Record record = null;
switch (subtxn.getType()) {
case OpCode.create:
record = new CreateTxn();
break;
case OpCode.createTTL:
record = new CreateTTLTxn();
break;
case OpCode.createContainer:
record = new CreateContainerTxn();
break;
case OpCode.delete:
case OpCode.deleteContainer:
record = new DeleteTxn();
break;
case OpCode.setData:
record = new SetDataTxn();
break;
case OpCode.error:
record = new ErrorTxn();
post_failed = true;
break;
case OpCode.check:
record = new CheckVersionTxn();
break;
default:
throw new IOException("Invalid type of op: " + subtxn.getType());
}
assert (record != null);
ByteBufferInputStream.byteBuffer2Record(bb, record);
if (failed && subtxn.getType() != OpCode.error) {
int ec = post_failed ? Code.RUNTIMEINCONSISTENCY.intValue() : Code.OK.intValue();
subtxn.setType(OpCode.error);
record = new ErrorTxn(ec);
}
assert !failed || (subtxn.getType() == OpCode.error);
TxnHeader subHdr = new TxnHeader(
header.getClientId(),
header.getCxid(),
header.getZxid(),
header.getTime(),
subtxn.getType());
ProcessTxnResult subRc = processTxn(subHdr, record, true);
rc.multiResult.add(subRc);
if (subRc.err != 0 && rc.err == 0) {
rc.err = subRc.err;
}
}
break;
}
} catch (KeeperException e) {
LOG.debug("Failed: {}:{}", header, txn, e);
rc.err = e.code().intValue();
} catch (IOException e) {
LOG.debug("Failed: {}:{}", header, txn, e);
}
/*
* Snapshots are taken lazily. When serializing a node, it's data
* and children copied in a synchronization block on that node,
* which means newly created node won't be in the snapshot, so
* we won't have mismatched cversion and pzxid when replaying the
* createNode txn.
*
* But there is a tricky scenario that if the child is deleted due
* to session close and re-created in a different global session
* after that the parent is serialized, then when replay the txn
* because the node is belonging to a different session, replay the
* closeSession txn won't delete it anymore, and we'll get NODEEXISTS
* error when replay the createNode txn. In this case, we need to
* update the cversion and pzxid to the new value.
*
* Note, such failures on DT should be seen only during
* restore.
*/
if (header.getType() == OpCode.create && rc.err == Code.NODEEXISTS.intValue()) {
LOG.debug("Adjusting parent cversion for Txn: {} path: {} err: {}", header.getType(), rc.path, rc.err);
int lastSlash = rc.path.lastIndexOf('/');
String parentName = rc.path.substring(0, lastSlash);
CreateTxn cTxn = (CreateTxn) txn;
try {
setCversionPzxid(parentName, cTxn.getParentCVersion(), header.getZxid());
} catch (KeeperException.NoNodeException e) {
LOG.error("Failed to set parent cversion for: {}", parentName, e);
rc.err = e.code().intValue();
}
} else if (rc.err != Code.OK.intValue()) {
LOG.debug("Ignoring processTxn failure hdr: {} : error: {}", header.getType(), rc.err);
}
/*
* Things we can only update after the whole txn is applied to data
* tree.
*
* If we update the lastProcessedZxid with the first sub txn in multi
* and there is a snapshot in progress, it's possible that the zxid
* associated with the snapshot only include partial of the multi op.
*
* When loading snapshot, it will only load the txns after the zxid
* associated with snapshot file, which could cause data inconsistency
* due to missing sub txns.
*
* To avoid this, we only update the lastProcessedZxid when the whole
* multi-op txn is applied to DataTree.
*/
if (!isSubTxn) {
/*
* A snapshot might be in progress while we are modifying the data
* tree. If we set lastProcessedZxid prior to making corresponding
* change to the tree, then the zxid associated with the snapshot
* file will be ahead of its contents. Thus, while restoring from
* the snapshot, the restore method will not apply the transaction
* for zxid associated with the snapshot file, since the restore
* method assumes that transaction to be present in the snapshot.
*
* To avoid this, we first apply the transaction and then modify
* lastProcessedZxid. During restore, we correctly handle the
* case where the snapshot contains data ahead of the zxid associated
* with the file.
*/
if (rc.zxid > lastProcessedZxid) {
lastProcessedZxid = rc.zxid;
}
if (digestFromLoadedSnapshot != null) {
compareSnapshotDigests(rc.zxid);
} else {
// only start recording digest when we're not in fuzzy state
logZxidDigest(rc.zxid, getTreeDigest());
}
}
return rc;
}
這裡忽略成噸的代碼,直接看到有個switch判斷,下面的case判斷是個枚舉,從名字看出來應該是用戶端請求server來進行什麼資料操作,比如建立,删除之類的,因為我們的需求是資料變更來進行監聽,是以我們找到setData相對應的case,
case OpCode.setData:
SetDataTxn setDataTxn = (SetDataTxn) txn;
rc.path = setDataTxn.getPath();
rc.stat = setData(
setDataTxn.getPath(),
setDataTxn.getData(),
setDataTxn.getVersion(),
header.getZxid(),
header.getTime());
break;
這裡隻有一個setData方法,我們點進去
public Stat setData(String path, byte[] data, int version, long zxid, long time) throws KeeperException.NoNodeException {
Stat s = new Stat();
DataNode n = nodes.get(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
byte[] lastdata = null;
synchronized (n) {
lastdata = n.data;
nodes.preChange(path, n);
n.data = data;
n.stat.setMtime(time);
n.stat.setMzxid(zxid);
n.stat.setVersion(version);
n.copyStat(s);
nodes.postChange(path, n);
}
// now update if the path is in a quota subtree.
String lastPrefix = getMaxPrefixWithQuota(path);
long dataBytes = data == null ? 0 : data.length;
if (lastPrefix != null) {
this.updateCountBytes(lastPrefix, dataBytes - (lastdata == null ? 0 : lastdata.length), 0);
}
nodeDataSize.addAndGet(getNodeSize(path, data) - getNodeSize(path, lastdata));
updateWriteStat(path, dataBytes);
dataWatches.triggerWatch(path, EventType.NodeDataChanged);
return s;
}
看到最後我們終于發現,跟watcher有關系的代碼
dataWatches.triggerWatch(path, EventType.NodeDataChanged);
,看來是資料處理完了,要觸發watcher了,點進去,直接到這裡
@Override
public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet supress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
Set<Watcher> watchers = new HashSet<>();
PathParentIterator pathParentIterator = getPathParentIterator(path);
synchronized (this) {
for (String localPath : pathParentIterator.asIterable()) {
Set<Watcher> thisWatchers = watchTable.get(localPath);
if (thisWatchers == null || thisWatchers.isEmpty()) {
continue;
}
Iterator<Watcher> iterator = thisWatchers.iterator();
while (iterator.hasNext()) {
Watcher watcher = iterator.next();
WatcherMode watcherMode = watcherModeManager.getWatcherMode(watcher, localPath);
if (watcherMode.isRecursive()) {
if (type != EventType.NodeChildrenChanged) {
watchers.add(watcher);
}
} else if (!pathParentIterator.atParentPath()) {
watchers.add(watcher);
if (!watcherMode.isPersistent()) {
iterator.remove();
Set<String> paths = watch2Paths.get(watcher);
if (paths != null) {
paths.remove(localPath);
}
}
}
}
if (thisWatchers.isEmpty()) {
watchTable.remove(localPath);
}
}
}
if (watchers.isEmpty()) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "No watchers for " + path);
}
return null;
}
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
w.process(e);
}
switch (type) {
case NodeCreated:
ServerMetrics.getMetrics().NODE_CREATED_WATCHER.add(watchers.size());
break;
case NodeDeleted:
ServerMetrics.getMetrics().NODE_DELETED_WATCHER.add(watchers.size());
break;
case NodeDataChanged:
ServerMetrics.getMetrics().NODE_CHANGED_WATCHER.add(watchers.size());
break;
case NodeChildrenChanged:
ServerMetrics.getMetrics().NODE_CHILDREN_WATCHER.add(watchers.size());
break;
default:
// Other types not logged.
break;
}
return new WatcherOrBitSet(watchers);
}
這裡這個for循環周遊了所有path的父路徑,将watcher添加到一個新建立的watcherSet中,然後周遊這個set觸發每個watcher。我們可以看到,這裡是從watchTable裡面get到watcher的集合,并沒有像老版本那樣直接移除,周遊完之後判斷如果get到的watcher set集合為空才會從watchTable中remove掉
if (thisWatchers.isEmpty()) {
watchTable.remove(localPath);
}
是以從代碼層面來看,這個watcher并不是一次性的了。之後用curator5.x來測試新的zk監聽。
PS:curator4.x對應zookeeper 3.5.x,curator5.x對應的zookeeper 3.6.x
public static void main(String[] args) throws Exception {
String address = ”zkAddress“;
CuratorFramework client = CuratorFrameworkFactory.newClient(address, new ExponentialBackoffRetry(1000, 3));
client.start();
client.watchers().add()
// .withMode(AddWatchMode.PERSISTENT_RECURSIVE)
// .usingWatcher(new Watcher() {
// @Override
// public void process(WatchedEvent watchedEvent) {
// System.out.println("receive event:"+JSON.toJSONString(watchedEvent));
// }
// })
.usingWatcher(new CuratorWatcher() {
@Override
public void process(WatchedEvent event) throws Exception {
System.out.println("receive event:" + JSON.toJSONString(event));
}
})
.forPath("/test");
這是一個簡單的client測試監聽的代碼。先把監聽注冊上,然後另起一個服務多個線程并行修改監聽節點的值,這邊接收到的實際數量跟修改次數相同。進而解決了之前watcher一次性而導緻的丢消息問題。但是目前這個方法中的process無法擷取節點最新的值,可能有别的方法我沒找到,後續再補充吧