源代碼:
http://svn.apache.org/repos/asf/zookeeper/trunk/
導入eclipse:
在包含build.xml目錄下執行ant eclipse将産生.classpath檔案
目錄結構:
src/recipes:提供了各種Zookeeper應用例子
src/c:提供了c版用戶端。zookeeper_st,zookeeper_mt兩個library
src/contrib:别人貢獻的代碼?
src/generated:由jute生成的java實體類
用戶端入口:org.apache.zookeeper.ZooKeeperMain
//讀取指令行輸入,用MyCommandOptions解析。
//内部類MyCommandOptions包含成員指令名command、參數清單cmdArgs
-option value –option value command cmdArgs
//根據以上解析的ip、端口,連接配接到ZooKeeper
zk = newZooKeeper(host,
Integer.parseInt(cl.getOption("timeout")),
newMyWatcher(), readOnly);
//執行指令,在ZooKeeperMain.run()
//ZooKeeperMain隻是一個外殼,使用jline實作了指令提示功能。
//commandMapCli将提供的指令指令名與執行體CliCommand關聯
//execute from commandMap
CliCommandcliCmd = commandMapCli.get(cmd);
if(cliCmd!=null) {
cliCmd.setZk(zk);
watch =cliCmd.parse(args).exec();
}
//最終轉到調用ZooKeeper方法
//提供的指令:
quit:Zk.close()關閉zk連接配接,調用cnxn.close()
history:列出曆史記錄
redo index:重新執行曆史記錄
printwatches [on]:檢視/設定watche開關狀态
connect:connectToZK(host)連接配接zk
//ZooKeeper内部連接配接
cnxn = newClientCnxn(connectStringParser.getChrootPath(),
hostProvider,sessionTimeout,this,watchManager,
getClientCnxnSocket(),canBeReadOnly);
cnxn.start();
ClientCnxn包含SendThread和EventThread兩個線程
SendThread将事件添加到waitEvents隊列中,EventThread線程消費該隊列。
//下面以ls指令為例
//調用zk.getChildren
public boolean exec() throwsKeeperException, InterruptedException {
String path= args[1];
boolean watch =cl.hasOption("w");
List<String> children = zk.getChildren(path, watch);
out.println(children);
return watch;
}
//getChildren生成request
RequestHeader h = newRequestHeader();
h.setType(ZooDefs.OpCode.getChildren);
GetChildrenRequest request = newGetChildrenRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
GetChildrenResponse response = newGetChildrenResponse();
ReplyHeader r = cnxn.submitRequest(h, request,response, wcb);
//submitRequest調用queuePacket,阻塞直到收到response。
publicReplyHeadersubmitRequest(RequestHeaderh, Record request,
Recordresponse, WatchRegistration watchRegistration)
throwsInterruptedException {
ReplyHeaderr = new ReplyHeader();
Packetpacket = queuePacket(h,r, request, response,null,null,null,
null, watchRegistration);
synchronized(packet) {
while (!packet.finished) {
packet.wait();
}
}
return r;
}
//queuePacket将Packet添加到outgoingQueue隊列中
packet= new Packet(h, r, request, response,watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath =clientPath;
packet.serverPath =serverPath;
outgoingQueue.add(packet);
//然後喚醒selector
sendThread.getClientCnxnSocket().wakeupCnxn();
//sendThread.run消費outgoingQueue
clientCnxnSocket.doTransport(to,pendingQueue,outgoingQueue,ClientCnxn.this);
//selector判斷讀/寫事件
//doTransport調用doIO,doIO解析Response
//讀事件
int rc =sock.read(incomingBuffer);
sendThread.readResponse(incomingBuffer);
//寫事件
sock.write(p.bb);
//readResponse
//收到的package包括header,token, response字段
// 當header中xid是-1時,收到的notification,response字段解析得到WatchedEvent
// event将放入eventThread事件隊列watingEvents
WatcherEventevent = new WatcherEvent();
event.deserialize(bbia, "response");
…
eventThread.queueEvent(we );
//在finally塊中調用finishPacket
try {
packet.replyHeader.setXid(replyHdr.getXid());
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());
if(replyHdr.getZxid() > 0) {
lastZxid =replyHdr.getZxid();
}
if(packet.response != null&& replyHdr.getErr() == 0) {
packet.response.deserialize(bbia, "response");
}
} finally {
finishPacket(packet);
}
//finishPacket
//如果有回調,finishPacket也将packet放到eventThread的隊列中
//否則設定packet.finish,此時submitRequest傳回response。
if (p.cb == null) {
synchronized (p) {
p.finished = true;
p.notifyAll();
}
} else {
p.finished = true;
eventThread.queuePacket(p);
}
//EventThread.run線程消費waintingEvents
Objectevent = waitingEvents.take();
processEvent(event);
以下圖檔轉自:http://www.spnguru.com/2010/08/zookeeper%E5%85%A8%E8%A7%A3%E6%9E%90%E2%80%94%E2%80%94client%E7%AB%AF/