天天看點

ZooKeeper源碼閱讀(二):用戶端

源代碼:

http://svn.apache.org/repos/asf/zookeeper/trunk/

導入eclipse:

在包含build.xml目錄下執行ant eclipse将産生.classpath檔案

目錄結構:

src/recipes:提供了各種Zookeeper應用例子

src/c:提供了c版用戶端。zookeeper_st,zookeeper_mt兩個library

src/contrib:别人貢獻的代碼?

src/generated:由jute生成的java實體類

用戶端入口:org.apache.zookeeper.ZooKeeperMain

//讀取指令行輸入,用MyCommandOptions解析。

//内部類MyCommandOptions包含成員指令名command、參數清單cmdArgs

-option value –option value command cmdArgs

//根據以上解析的ip、端口,連接配接到ZooKeeper

        zk = newZooKeeper(host,

                Integer.parseInt(cl.getOption("timeout")),

                 newMyWatcher(), readOnly);

//執行指令,在ZooKeeperMain.run()

//ZooKeeperMain隻是一個外殼,使用jline實作了指令提示功能。

//commandMapCli将提供的指令指令名與執行體CliCommand關聯

        //execute from commandMap

        CliCommandcliCmd = commandMapCli.get(cmd);

        if(cliCmd!=null) {

           cliCmd.setZk(zk);

            watch =cliCmd.parse(args).exec();

                   }

//最終轉到調用ZooKeeper方法

//提供的指令:

quit:Zk.close()關閉zk連接配接,調用cnxn.close()

history:列出曆史記錄

redo index:重新執行曆史記錄

printwatches [on]:檢視/設定watche開關狀态

connect:connectToZK(host)連接配接zk

//ZooKeeper内部連接配接

        cnxn = newClientCnxn(connectStringParser.getChrootPath(),

                hostProvider,sessionTimeout,this,watchManager,

                getClientCnxnSocket(),canBeReadOnly);

        cnxn.start();

ClientCnxn包含SendThread和EventThread兩個線程

SendThread将事件添加到waitEvents隊列中,EventThread線程消費該隊列。

//下面以ls指令為例

//調用zk.getChildren

    public boolean exec() throwsKeeperException, InterruptedException {

        String path= args[1];

        boolean watch =cl.hasOption("w");

       List<String> children = zk.getChildren(path, watch);

        out.println(children);

        return watch;

    }

//getChildren生成request

       RequestHeader h = newRequestHeader();

       h.setType(ZooDefs.OpCode.getChildren);

       GetChildrenRequest request = newGetChildrenRequest();

       request.setPath(serverPath);

       request.setWatch(watcher != null);

       GetChildrenResponse response = newGetChildrenResponse();

        ReplyHeader r = cnxn.submitRequest(h, request,response, wcb);

//submitRequest調用queuePacket,阻塞直到收到response。

    publicReplyHeadersubmitRequest(RequestHeaderh, Record request,

            Recordresponse, WatchRegistration watchRegistration)

            throwsInterruptedException {

        ReplyHeaderr = new ReplyHeader();

        Packetpacket = queuePacket(h,r, request, response,null,null,null,

                   null, watchRegistration);

        synchronized(packet) {

            while (!packet.finished) {

               packet.wait();

            }

        }

        return r;

    }

//queuePacket将Packet添加到outgoingQueue隊列中

            packet= new Packet(h, r, request, response,watchRegistration);

            packet.cb = cb;

            packet.ctx = ctx;

            packet.clientPath =clientPath;

            packet.serverPath =serverPath;

                outgoingQueue.add(packet);

         //然後喚醒selector

        sendThread.getClientCnxnSocket().wakeupCnxn();

//sendThread.run消費outgoingQueue

        clientCnxnSocket.doTransport(to,pendingQueue,outgoingQueue,ClientCnxn.this);

//selector判斷讀/寫事件

//doTransport調用doIO,doIO解析Response

         //讀事件

        int rc =sock.read(incomingBuffer);

        sendThread.readResponse(incomingBuffer);

         //寫事件

       sock.write(p.bb);

//readResponse

//收到的package包括header,token, response字段

// 當header中xid是-1時,收到的notification,response字段解析得到WatchedEvent

// event将放入eventThread事件隊列watingEvents

     WatcherEventevent = new WatcherEvent();

    event.deserialize(bbia, "response");

     …

     eventThread.queueEvent(we );

//在finally塊中調用finishPacket

    try {

        packet.replyHeader.setXid(replyHdr.getXid());

        packet.replyHeader.setErr(replyHdr.getErr());

        packet.replyHeader.setZxid(replyHdr.getZxid());

        if(replyHdr.getZxid() > 0) {

            lastZxid =replyHdr.getZxid();

        }

        if(packet.response != null&& replyHdr.getErr() == 0) {

            packet.response.deserialize(bbia, "response");

        }

    } finally {

        finishPacket(packet);

    }

//finishPacket

//如果有回調,finishPacket也将packet放到eventThread的隊列中

//否則設定packet.finish,此時submitRequest傳回response。

    if (p.cb == null) {

        synchronized (p) {

            p.finished = true;

           p.notifyAll();

        }

    } else {

        p.finished = true;

        eventThread.queuePacket(p);

    }

//EventThread.run線程消費waintingEvents

Objectevent = waitingEvents.take();

processEvent(event);

以下圖檔轉自:http://www.spnguru.com/2010/08/zookeeper%E5%85%A8%E8%A7%A3%E6%9E%90%E2%80%94%E2%80%94client%E7%AB%AF/

ZooKeeper源碼閱讀(二):用戶端
ZooKeeper源碼閱讀(二):用戶端