天天看點

Zookeeper用戶端源碼解讀(大緻流程)

在zookeeper安裝目錄bin目錄下有用戶端、服務端開啟的腳本

Zookeeper用戶端源碼解讀(大緻流程)

打開這個腳本,有如下資訊:

Zookeeper用戶端源碼解讀(大緻流程)
"$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \
     -cp "$CLASSPATH" $CLIENT_JVMFLAGS $JVMFLAGS \
     org.apache.zookeeper.ZooKeeperMain "$@"
           

其實這就是啟動java程式(ZookeeperMain)的入口。

是以一切從org.apache.zookeeper.ZooKeeperMain.main開始

public static void main(String args[]) throws CliException, IOException, InterruptedException
{
    ZooKeeperMain main = new ZooKeeperMain(args);
    main.run();
}
           

看起來有點像SpringBoot的啟動類,首先構造一個類對象,然後執行run方法。

啟動類的構造

  1. 首先看一下構造方法:
public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
	// 解析外面傳入的參數 server timeout readonly
	// 解析的結果存在在一個靜态内部類MyCommandOptions中
	// $ bin/zkCli.sh -server 127.0.0.1:2181
	// 執行如上指令  此處明确指定了服務端位址 就會儲存 server 127.0.0.1:2181
    cl.parseOptions(args);
    // 解析配置 server  控制台日志列印 --> 是否可以考慮用日志架構呢?
    System.out.println("Connecting to " + cl.getOption("server"));
    // 連接配接到服務端 在一個構造類裡面執行IO操作 是不是需要警惕呢 ? 
    // 這個過程中會進行各種參數尤其是服務端位址的解析,然後根據配置選用NIO還是netty進行網絡通訊,然後建立網絡連接配接對象,啟動兩個線程對象SendThread和EventThread線程對象,前者用于處理(連接配接、心跳、用戶端指令)請求的發送,後者用于異步事件的處理,在此過程中,會選擇伺服器進行連接配接,執行網絡操作
    connectToZK(cl.getOption("server"));
}
           

預設的配置(MyCommandOptions:用于解析系統的配置和指令行輸入的指令):

// 通過HashMap儲存
private Map<String,String> options = new HashMap<String,String>();
// 通過Patter進行字元比對
public static final Pattern ARGS_PATTERN = Pattern.compile("\\s*([^\"\']\\S*|\"[^\"]*\"|'[^']*')\\s*");
public static final Pattern QUOTED_PATTERN = Pattern.compile("^([\'\"])(.*)(\\1)$");
public MyCommandOptions() {
  options.put("server", "localhost:2181");
  options.put("timeout", "30000");
}
           
  1. 連接配接服務端
protected void connectToZK(String newHost) throws InterruptedException, IOException {	
	// 已經啟動過了 直接關閉 再重新開機 
    if (zk != null && zk.getState().isAlive()) {
        zk.close();
    }
    host = newHost;
    boolean readOnly = cl.getOption("readonly") != null;
    if (cl.getOption("secure") != null) {
        System.setProperty(ZKClientConfig.SECURE_CLIENT, "true");
        System.out.println("Secure connection is enabled");
    }
    // 繼續構造另一個對象 從類名感覺是一個進行ZooKeeper管理的類
    // 此處額外還構造了一個MyWatcher
    zk = new ZooKeeperAdmin(host, Integer.parseInt(cl.getOption("timeout")), new MyWatcher(), readOnly);
}
           
// 此類看起來不是很重要(僅僅用于列印異步事件的通知消息) 但是ZooKeeper的Watcher機制可是作用大大的
private class MyWatcher implements Watcher {
    public void process(WatchedEvent event) {
    	// printWatches這個屬性預設為true
        if (getPrintWatches()) {
            ZooKeeperMain.printMessage("WATCHER::");
            ZooKeeperMain.printMessage(event.toString());
        }
    }
}
public static void printMessage(String msg) {
    System.out.println("\n"+msg);
}
           

ZooKeeperAdmin的關系結構圖

Zookeeper用戶端源碼解讀(大緻流程)

ZooKeeperAdmin的構造最後會調用到父類org.apache.zookeeper.ZooKeeper的構造

ZooKeeper類作為用戶端的主類,注解如下

/ 除非額外聲明 一般都是線程安全
 * This is the main class of ZooKeeper client library. To use a ZooKeeper
 * service, an application must first instantiate an object of ZooKeeper class.
 * All the iterations will be done by calling the methods of ZooKeeper class.
 * The methods of this class are thread-safe unless otherwise noted.
 * 
 * <p> 隻要連接配接服務端成功 就會配置設定一個事務ID 然後用戶端通過持續的發送心跳保證會話的有效性
 * Once a connection to a server is established, a session ID is assigned to the
 * client. The client will send heart beats to the server periodically to keep
 * the session valid.
 * 
 * <p> 
 * The application can call ZooKeeper APIs through a client as long as the
 * session ID of the client remains valid.
 * 
 * <p> 如果在比較長的時間裡(超過sessionTimeout),那麼服務端就會将這個會話失效,無法
 * 通過API進行操作了 除非再定義一個新的對象
 * If for some reason, the client fails to send heart beats to the server for a
 * prolonged period of time (exceeding the sessionTimeout value, for instance),
 * the server will expire the session, and the session ID will become invalid.
 * The client object will no longer be usable. To make ZooKeeper API calls, the
 * application must create a new client object.
 * 
 * <p> 如果目前連接配接的服務端連接配接失敗或長久沒有反應,在會話ID實作之前,就會自動嘗試連接配接到其他的伺服器。
 * If the ZooKeeper server the client currently connects to fails or otherwise
 * does not respond, the client will automatically try to connect to another
 * server before its session ID expires. If successful, the application can
 * continue to use the client.
 * 
 * <p> 該類的方法包括同步的或異步,同步的方法會在服務端響應之前一直進行阻塞
 *  異步的方法僅僅将消息放入到消息隊列 然後快速的傳回 在服務端響應之後 通過回調進行處理
 * The ZooKeeper API methods are either synchronous or asynchronous. Synchronous
 * methods blocks until the server has responded. Asynchronous methods just
 * queue the request for sending and return immediately. They take a callback
 * object that will be executed either on successful execution of the request or
 * on error with an appropriate return code (rc) indicating the error.
 * 
 * <p> 
 * Some successful ZooKeeper API calls can leave watches on the "data nodes" in
 * the ZooKeeper server. Other successful ZooKeeper API calls can trigger those
 * watches. Once a watch is triggered, an event will be delivered to the client
 * which left the watch at the first place. Each watch can be triggered only
 * once. Thus, up to one event will be delivered to a client for every watch it
 * leaves.
 * <p>
 * A client needs an object of a class implementing Watcher interface for
 * processing the events delivered to the client.
 *
 * When a client drops the current connection and re-connects to a server, all
 * the existing watches are considered as being triggered but the undelivered
 * events are lost. To emulate this, the client will generate a special event to
 * tell the event handler a connection has been dropped. This special event has
 * EventType None and KeeperState Disconnected.
 *
 */
/*
 * We suppress the "try" warning here because the close() method's signature
 * allows it to throw InterruptedException which is strongly advised against by
 * AutoCloseable (see:
 * http://docs.oracle.com/javase/7/docs/api/java/lang/AutoCloseable.html#close()
 * ). close() will never throw an InterruptedException but the exception remains
 * in the signature for backwards compatibility purposes.
 */
           

Zookeeper類的構造

* 建立zookeeper用戶端對象 需要傳入host:port參數 可以為多個 
 * To create a ZooKeeper client object, the application needs to pass a
 * connection string containing a comma separated list of host:port pairs, each
 * corresponding to a ZooKeeper server.
 * <p> 異步方式建立會話連接配接 通過watcher通知 
 * Session establishment is asynchronous. This constructor will initiate
 * connection to the server and return immediately - potentially (usually)
 * before the session is fully established. The watcher argument specifies the
 * watcher that will be notified of any changes in state. This notification can
 * come at any point before or after the constructor call has returned.
 * <p> 對于多個服務端連接配接 随機選取一個進行連接配接 直到其中一個連接配接成功
 * The instantiated ZooKeeper client object will pick an arbitrary server from
 * the connectString and attempt to connect to it. If establishment of the
 * connection fails, another server in the connect string will be tried (the
 * order is non-deterministic, as we random shuffle the list), until a
 * connection is established. The client will continue attempts until the
 * session is explicitly closed.
 * <p> 可選的chroot參數可以為所有建立的路徑指定一個預設的開始路徑 在下面connectString裡面有說明  
 * Added in 3.2.0: An optional "chroot" suffix may also be appended to the
 * connection string. This will run the client commands while interpreting all
 * paths relative to this root (similar to the unix chroot command).
 * <p>
 * For backward compatibility, there is another version
 * {@link #ZooKeeper(String, int, Watcher, boolean)} which uses default
 * {@link StaticHostProvider}
 *
 * @param connectString  comma separated host:port pairs, each corresponding to
 *                       a zk server. e.g.
 *                       "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" If the
 *                       optional chroot suffix is used the example would look
 *                       like:
 *                       "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a"
 *                       where the client would be rooted at "/app/a" and all
 *                       paths would be relative to this root - ie
 *                       getting/setting/etc... "/foo/bar" would result in
 *                       operations being run on "/app/a/foo/bar" (from the
 *                       server perspective).
 * @param sessionTimeout session timeout in milliseconds
 * @param watcher        a watcher object which will be notified of state
 *                       changes, may also be notified for node events
 * @param canBeReadOnly  (added in 3.4) whether the created client is allowed to
 *                       go to read-only mode in case of partitioning. Read-only
 *                       mode basically means that if the client can't find any
 *                       majority servers but there's partitioned server it
 *                       could reach, it connects to one in read-only mode, i.e.
 *                       read requests are allowed while write requests are not.
 *                       It continues seeking for majority in the background.
 * @param aHostProvider  use this as HostProvider to enable custom behaviour.
 * @param clientConfig   (added in 3.5.2) passing this conf object gives each
 *                       client the flexibility of configuring properties
 *                       differently compared to other instances
 * @throws IOException              in cases of network failure
 * @throws IllegalArgumentException if an invalid chroot path is specified
 */
/**
 * 用戶端核心線程,其内部包括兩個線程:SendThread和EventThread。
 * 前者是一個I/O線程,主要負責Zookeeper用戶端和服務端之間的網絡I/O通訊 後者是一個事件線程,主要負責對服務端事件進行處理
 */
protected final ClientCnxn cnxn;
private static final Logger LOG;

static {
	// Keep these two lines together to keep the initialization order explicit
	LOG = LoggerFactory.getLogger(ZooKeeper.class);
	Environment.logEnv("Client environment:", LOG);
}

/**
 * 用戶端位址清單管理器
 */
protected final HostProvider hostProvider;

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly,
		HostProvider aHostProvider, ZKClientConfig clientConfig) throws IOException {
	LOG.info("Initiating client connection, connectString=" + connectString + " sessionTimeout="
			+ sessionTimeout + " watcher=" + watcher);

	if (clientConfig == null) {
		clientConfig = new ZKClientConfig();
	}
	this.clientConfig = clientConfig;
	// 1. 設定預設Watcher
	watchManager = defaultWatchManager();
	watchManager.defaultWatcher = watcher; // 将傳入的watcher作為預設的watcher
	// 2. 設定zookeeper伺服器位址清單
	ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
	hostProvider = aHostProvider;
	// 3. 擷取網絡連接配接對象 僅僅執行個體化 getClientCnxnSocket
	// 4. 建立clientCnxn
	cnxn = createConnection(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this,
			watchManager, getClientCnxnSocket(), canBeReadOnly);
	// 5. 啟動sendThread 和 eventThread  執行對應的run方法  此時就會執行sendThread裡面的連接配接請求
	cnxn.start();
}
           

讀取系統配置

public ZKClientConfig() {
    super();
    initFromJavaSystemProperties();
}

/**
 * Initialize all the ZooKeeper client properties which are configurable as
 * java system property
 */
private void initFromJavaSystemProperties() {
    setProperty(ZOOKEEPER_REQUEST_TIMEOUT,
            System.getProperty(ZOOKEEPER_REQUEST_TIMEOUT));
}
           
// org.apache.zookeeper.common.ZKConfig ZKClientConfig的父類
/**
 * properties, which are common to both client and server, are initialized
 * from system properties
 */
public ZKConfig() {
    init();
}
private void init() {
    /**
     * backward compatibility for all currently available client properties
     */
    handleBackwardCompatibility();
}
           

設定預設Watcher

/* Useful for testing watch handling behavior */
protected ZKWatchManager defaultWatchManager() {
// Manage watchers & handle events generated by the ClientCnxn object.
	return new ZKWatchManager(getClientConfig().getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET));
}
           
Zookeeper用戶端源碼解讀(大緻流程)

擷取網絡連接配接

private ClientCnxnSocket getClientCnxnSocket() throws IOException {
	String clientCnxnSocketName = getClientConfig()
			.getProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET);
	if (clientCnxnSocketName == null) {
		// 未設定要使用的用戶端連接配接器 則使用預設的NIO模式 可以選擇的還有Netty
		clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
	}
	try {
		Constructor<?> clientCxnConstructor = Class.forName(clientCnxnSocketName)
				.getDeclaredConstructor(ZKClientConfig.class);
		ClientCnxnSocket clientCxnSocket = (ClientCnxnSocket) clientCxnConstructor
				.newInstance(getClientConfig());
		return clientCxnSocket;
	} catch (Exception e) {
		IOException ioe = new IOException("Couldn't instantiate " + clientCnxnSocketName);
		ioe.initCause(e);
		throw ioe;
	}
}
           

除了原生的NIO,還支援通過Netty的方式進行網絡通訊(Netty基于NIO,但是對NIO做了更好的封裝,并且解決了NIO的BUG).通過zookeeper.clientCnxnSocket參數進行配置。

Zookeeper用戶端源碼解讀(大緻流程)

最重要的屬性如下:

// 負責請求的發送
protected ClientCnxn.SendThread sendThread;
// 緩沖發送的請求 保證請求的順序性
protected LinkedBlockingDeque<Packet> outgoingQueue;
           

該類中最主要的方法就是doTransport和doIO(負責請求的發送和響應接收)方法

@Override
void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn)
		throws IOException, InterruptedException {
	selector.select(waitTimeOut);
	Set<SelectionKey> selected;
	synchronized (this) {
		selected = selector.selectedKeys();
	}
	// Everything below and until we get back to the select is
	// non blocking, so time is effectively a constant. That is
	// Why we just have to do this once, here
	updateNow();
	for (SelectionKey k : selected) {
		SocketChannel sc = ((SocketChannel) k.channel());
		if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
			if (sc.finishConnect()) {
				updateLastSendAndHeard();
				updateSocketAddresses();
				// 建構ConnectRequest請求
				/*
				 * 之前的步驟隻是純粹地從網絡TCP層面完成了用戶端和服務端之間的socket連接配接,但遠未完成ZooKeeper用戶端的會話建立 
				 */
				sendThread.primeConnection();
			}
		} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
			// 響應控制階段
			/*
			 * 接收讀寫事件并處理
			 */
			doIO(pendingQueue, cnxn);
		}
	}
	if (sendThread.getZkState().isConnected()) {
		if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
			enableWrite();
		}
	}
	selected.clear();
}
           
/**
 * 核心邏輯 主要負責對請求的發送和相應接收過程
 * @return true if a packet was received
 * @throws InterruptedException
 * @throws IOException
 */
void doIO(List<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {
	SocketChannel sock = (SocketChannel) sockKey.channel();
	if (sock == null) {
		throw new IOException("Socket is null!");
	}
	if (sockKey.isReadable()) {
		int rc = sock.read(incomingBuffer);
		if (rc < 0) {
			throw new EndOfStreamException("Unable to read additional data from server sessionid 0x"
					+ Long.toHexString(sessionId) + ", likely server has closed socket");
		}
		if (!incomingBuffer.hasRemaining()) {
			incomingBuffer.flip();
			if (incomingBuffer == lenBuffer) {
				recvCount.getAndIncrement();
				readLength();
			} else if (!initialized) {
				// 如果尚未初始化 那麼就認為該相應一定是會話建立請求的響應 直接交給readConnectResult方法處理該響應
				readConnectResult();
				enableRead();
				if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
					// Since SASL authentication has completed (if client is configured to do so),
					// outgoing packets waiting in the outgoingQueue can now be sent.
					enableWrite();
				}
				lenBuffer.clear();
				incomingBuffer = lenBuffer;
				updateLastHeard();
				initialized = true;
			} else {
				// 針對各種接收事件進行處理 如心跳 認證 正常請求響應
				sendThread.readResponse(incomingBuffer);
				lenBuffer.clear();
				incomingBuffer = lenBuffer;
				updateLastHeard();
			}
		}
	}
	if (sockKey.isWritable()) {
		// 從發送隊列中取出一個可發送的Packet對象 同時生成一個用戶端請求序号XID并将其設定到Packet請求頭中去
		Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress());

		if (p != null) {
			updateLastSend();
			// If we already started writing p, p.bb will already exist
			if (p.bb == null) {
				if ((p.requestHeader != null) && (p.requestHeader.getType() != OpCode.ping)
						&& (p.requestHeader.getType() != OpCode.auth)) {
					// 設定請求頭序列号 XID
					p.requestHeader.setXid(cnxn.getXid());
				}
				// 進行序列化
				p.createBB();
			}
			sock.write(p.bb);
			if (!p.bb.hasRemaining()) {
				sentCount.getAndIncrement();
				// 移除已發送Packet
				outgoingQueue.removeFirstOccurrence(p);
				if (p.requestHeader != null && p.requestHeader.getType() != OpCode.ping
						&& p.requestHeader.getType() != OpCode.auth) {
					synchronized (pendingQueue) {
						// 将packet儲存到pendingQueue隊列中 以便等待服務端響應傳回後進行相應的處理
						pendingQueue.add(p);
					}
				}
			}
		}
		if (outgoingQueue.isEmpty()) {
			// No more packets to send: turn off write interest flag.
			// Will be turned on later by a later call to enableWrite(),
			// from within ZooKeeperSaslClient (if client is configured
			// to attempt SASL authentication), or in either doIO() or
			// in doTransport() if not.
			disableWrite();
		} else if (!initialized && p != null && !p.bb.hasRemaining()) {
			// On initial connection, write the complete connect request
			// packet, but then disable further writes until after
			// receiving a successful connection response. If the
			// session is expired, then the server sends the expiration
			// response and immediately closes its end of the socket. If
			// the client is simultaneously writing on its end, then the
			// TCP stack may choose to abort with RST, in which case the
			// client would never receive the session expired event. See
			// http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html
			disableWrite();
		} else {
			// Just in case
			enableWrite();
		}
	}
}
           

建立clientCnxn

protected ClientCnxn createConnection(String chrootPath, HostProvider hostProvider, int sessionTimeout,
		ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
		boolean canBeReadOnly) throws IOException {
	// 建立并初始化用戶端網絡連接配接器
	return new ClientCnxn(chrootPath, hostProvider, sessionTimeout, this, watchManager, clientCnxnSocket,
			canBeReadOnly);
}
           
org.apache.zookeeper.ClientCnxn這個類的類名取的是真的不好,怎麼看也看不出是啥意思

這個類的注解是:This class manages the socket i/o for the client. ClientCnxn maintains a list of available servers to connect to and “transparently” switches servers it is connected to as needed.

網絡連接配接器 管理用戶端和服務端的所有網絡互動 包括兩個核心隊列outgoingQueue和pendingQueue 以及連個核心網絡線程 SendThread和EventThread 前者用于管理用戶端和服務端之間的所有網絡I/O 後者則用于進行用戶端事件的處理 同時用戶端還會将ClientCnxnSocket配置設定給SendThread作為底層網絡I/O處理器 并初始化EventThread的待處理事件隊列waitingEvents 用于存放所有等待被用戶端換處理的事件

/**
 * 建立一個網絡連接配接對象
 * Creates a connection object. The actual network connect doesn't get
 * established until needed. The start() instance method must be called
 * subsequent to construction.
 *
 * @param chrootPath       - the chroot of this client. Should be removed from
 *                         this Class in ZOOKEEPER-838
 * @param hostProvider     the list of ZooKeeper servers to connect to
 * @param sessionTimeout   the timeout for connections.
 * @param zooKeeper        the zookeeper object that this connection is related
 *                         to.
 * @param watcher          watcher for this connection
 * @param clientCnxnSocket the socket implementation used (e.g. NIO/Netty)
 * @param sessionId        session id if re-establishing session
 * @param sessionPasswd    session passwd if re-establishing session
 * @param canBeReadOnly    whether the connection is allowed to go to read-only
 *                         mode in case of partitioning
 * @throws IOException
 */
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
		ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, long sessionId,
		byte[] sessionPasswd, boolean canBeReadOnly) {
	this.zooKeeper = zooKeeper;
	this.watcher = watcher;
	this.sessionId = sessionId;
	this.sessionPasswd = sessionPasswd;
	this.sessionTimeout = sessionTimeout;
	this.hostProvider = hostProvider;
	this.chrootPath = chrootPath;

	connectTimeout = sessionTimeout / hostProvider.size();
	readTimeout = sessionTimeout * 2 / 3;
	readOnly = canBeReadOnly;

	// 核心網絡線程 管理用戶端和服務端之間的所有網絡I/O
	sendThread = new SendThread(clientCnxnSocket);
	// 核心網絡線程 進行用戶端的事件處理
	eventThread = new EventThread();
	this.clientConfig = zooKeeper.getClientConfig();
	initRequestTimeout();
}
           
/
 * This class services the outgoing request queue and generates the heart beats.
 * It also spawns(産生) the ReadThread. 
 * 用戶端ClientCnxn内部一個核心的I/O排程線程
 * 用于管理用戶端和服務端之間的所有網絡I/O操作 
 * 在ZooKeeper用戶端的實際運作過程中 
 * 一方面sendThread維護了用戶端和服務端之間的會話聲明周期,其通過在對應的周期頻率内向
 * 服務端發送一個PING包來實作心跳檢測.同時在會話周期内,如果用戶端與服務端之間出現TCP連接配接斷開的情況,那麼會自動且透明化完成重連操作
 * 另一方面,SendThread管理了用戶端所有的請求發送和響應接收操作,其将上層用戶端API操作轉換成相應的請求協定并發送到服務端,并完成對同步調用的傳回和異步調用的回調。 
 * 同時,SendThread還負責将來自服務端的事件傳遞給EventThread去處理
 */
class SendThread extends ZooKeeperThread {
	private long lastPingSentNs;
	private final ClientCnxnSocket clientCnxnSocket;
	private Random r = new Random();
	private boolean isFirstConnect = true;

	@Override
	public void run() {
		// 首先判斷目前用戶端的狀态 進行一系列的請理性工作 為用戶端發送“會話建立”請求做好準備
		clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
		clientCnxnSocket.updateNow();
		clientCnxnSocket.updateLastSendAndHeard();
		int to;
		long lastPingRwServer = Time.currentElapsedTime();
		final int MAX_SEND_PING_INTERVAL = 10000; // 10 seconds
		InetSocketAddress serverAddress = null;
		while (state.isAlive()) {
			try {
				// 未連接配接狀态則建立連接配接
				if (!clientCnxnSocket.isConnected()) {
					// don't re-establish connection if we are closing
					if (closing) {
						break;
					}
					// 擷取一個伺服器位址
					/*
					 * 在開始建立TCP連接配接之前 SendThread首先需要擷取一個ZooKeeper伺服器的目标位址 這通常是從HostProvider中 随機擷取一個位址
					 * 然後委托網絡連接配接器去建立與ZooKeeper伺服器之間的TCP連接配接
					 */
					if (rwServerAddress != null) {
						serverAddress = rwServerAddress;
						rwServerAddress = null;
					} else {
						serverAddress = hostProvider.next(1000);
					}
					// 建立TCP連接配接
					/*
					 * 擷取一個伺服器位址後 網絡連接配接器就負責和伺服器建立一個TCP長連接配接 并設定狀态為 CONNECTING
					 */
					startConnect(serverAddress);
					clientCnxnSocket.updateLastSendAndHeard();
				}

				// 處于連接配接狀态 判斷是否已經認證
				if (state.isConnected()) {
					// determine whether we need to send an AuthFailed event.
					if (zooKeeperSaslClient != null) {
						boolean sendAuthEvent = false;
						if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
							try {
								zooKeeperSaslClient.initialize(ClientCnxn.this);
							} catch (SaslException e) {
								LOG.error(
										"SASL authentication with Zookeeper Quorum member failed: " + e);
								state = States.AUTH_FAILED;
								sendAuthEvent = true;
							}
						}
						KeeperState authState = zooKeeperSaslClient.getKeeperState();
						if (authState != null) {
							if (authState == KeeperState.AuthFailed) {
								// An authentication error occurred during authentication with the
								// Zookeeper Server.
								state = States.AUTH_FAILED;
								sendAuthEvent = true;
							} else {
								if (authState == KeeperState.SaslAuthenticated) {
									sendAuthEvent = true;
								}
							}
						}

						if (sendAuthEvent) {
							eventThread.queueEvent(
									new WatchedEvent(Watcher.Event.EventType.None, authState, null));
							if (state == States.AUTH_FAILED) {
								eventThread.queueEventOfDeath();
							}
						}
					}
					to = readTimeout - clientCnxnSocket.getIdleRecv();
				} else {
					to = connectTimeout - clientCnxnSocket.getIdleRecv();
				}

				if (to <= 0) {
					String warnInfo;
					warnInfo = "Client session timed out, have not heard from server in "
							+ clientCnxnSocket.getIdleRecv() + "ms" + " for sessionid 0x"
							+ Long.toHexString(sessionId);
					LOG.warn(warnInfo);
					throw new SessionTimeoutException(warnInfo);
				}
				if (state.isConnected()) {
					// 1000(1 second) is to prevent race condition missing to send the second ping
					// also make sure not to send too many pings when readTimeout is small
					int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend()
							- ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
					// send a ping request either time is due or no packet sent out within
					// MAX_SEND_PING_INTERVAL
					if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
						sendPing();
						clientCnxnSocket.updateLastSend();
					} else {
						if (timeToNextPing < to) {
							to = timeToNextPing;
						}
					}
				}

				// If we are in read-only mode, seek for read/write server
				if (state == States.CONNECTEDREADONLY) {
					long now = Time.currentElapsedTime();
					int idlePingRwServer = (int) (now - lastPingRwServer);
					if (idlePingRwServer >= pingRwTimeout) {
						lastPingRwServer = now;
						idlePingRwServer = 0;
						pingRwTimeout = Math.min(2 * pingRwTimeout, maxPingRwTimeout);
						pingRwServer();
					}
					to = Math.min(to, pingRwTimeout - idlePingRwServer);
				}

				// 發送請求
				clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
			} catch (Throwable e) {
				if (closing) {
					if (LOG.isDebugEnabled()) {
						// closing so this is expected
						LOG.debug("An exception was thrown while closing send thread for session 0x"
								+ Long.toHexString(getSessionId()) + " : " + e.getMessage());
					}
					break;
				} else {
					// this is ugly, you have a better way speak up
					if (e instanceof SessionExpiredException) {
						LOG.info(e.getMessage() + ", closing socket connection");
					} else if (e instanceof SessionTimeoutException) {
						LOG.info(e.getMessage() + RETRY_CONN_MSG);
					} else if (e instanceof EndOfStreamException) {
						LOG.info(e.getMessage() + RETRY_CONN_MSG);
					} else if (e instanceof RWServerFoundException) {
						LOG.info(e.getMessage());
					} else if (e instanceof SocketException) {
						LOG.info("Socket error occurred: {}: {}", serverAddress, e.getMessage());
					} else {
						LOG.warn("Session 0x{} for server {}, unexpected error{}",
								Long.toHexString(getSessionId()), serverAddress, RETRY_CONN_MSG, e);
					}
					// At this point, there might still be new packets appended to outgoingQueue.
					// they will be handled in next connection or cleared up if closed.
					cleanAndNotifyState();
				}
			}
		}

		// while循環結束

		synchronized (state) {
			// When it comes to this point, it guarantees that later queued
			// packet to outgoingQueue will be notified of death.
			cleanup();
		}
		clientCnxnSocket.close();
		if (state.isAlive()) {
			eventThread.queueEvent(
					new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null));
		}
		eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Closed, null));
		ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
				"SendThread exited loop for session: 0x" + Long.toHexString(getSessionId()));
	}

	/**
	 * 用戶端擷取到來自服務端的完整響應資料後 根據不同的用戶端請求類型 會進行不同的處理
	 * 
	 * @param incomingBuffer
	 * @throws IOException
	 */
	void readResponse(ByteBuffer incomingBuffer) throws IOException {
		ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
		BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
		ReplyHeader replyHdr = new ReplyHeader();

		replyHdr.deserialize(bbia, "header");
		if (replyHdr.getXid() == -2) {
			// 心跳資訊
			// -2 is the xid for pings
			if (LOG.isDebugEnabled()) {
				LOG.debug("Got ping response for sessionid: 0x" + Long.toHexString(sessionId) + " after "
						+ ((System.nanoTime() - lastPingSentNs) / 1000000) + "ms");
			}
			return;
		}
		if (replyHdr.getXid() == -4) {
			// 認證
			// -4 is the xid for AuthPacket
			if (replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
				state = States.AUTH_FAILED;
				eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,
						Watcher.Event.KeeperState.AuthFailed, null));
				eventThread.queueEventOfDeath();
			}
			if (LOG.isDebugEnabled()) {
				LOG.debug("Got auth sessionid:0x" + Long.toHexString(sessionId));
			}
			return;
		}
		if (replyHdr.getXid() == -1) {
			// 事件通知
			// -1 means notification
			if (LOG.isDebugEnabled()) {
				LOG.debug("Got notification sessionid:0x" + Long.toHexString(sessionId));
			}
			// 反序列化為WatcherEvent事件并放入到待處理隊列中
			WatcherEvent event = new WatcherEvent();
			event.deserialize(bbia, "response");

			// convert from a server path to a client path
			if (chrootPath != null) {
				String serverPath = event.getPath();
				if (serverPath.compareTo(chrootPath) == 0)
					event.setPath("/");
				else if (serverPath.length() > chrootPath.length())
					event.setPath(serverPath.substring(chrootPath.length()));
				else {
					LOG.warn("Got server path " + event.getPath() + " which is too short for chroot path "
							+ chrootPath);
				}
			}

			WatchedEvent we = new WatchedEvent(event);
			if (LOG.isDebugEnabled()) {
				LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(sessionId));
			}

			eventThread.queueEvent(we);
			return;
		}

		// If SASL authentication is currently in progress, construct and
		// send a response packet immediately, rather than queuing a
		// response as with other packets.
		if (tunnelAuthInProgress()) {
			GetSASLRequest request = new GetSASLRequest();
			request.deserialize(bbia, "token");
			zooKeeperSaslClient.respondToServer(request.getToken(), ClientCnxn.this);
			return;
		}

		// 如果是一個正常的請求響應(指的是Create/GetData和Exist等操作請求),那麼會從pendingQueue隊列中取出一個Packet來
		// 記性響應的處理
		// ZooKeeper用戶端首先會檢驗服務端響應中包含的XID值來確定請求處理的順序性,然後再将接收到的ByteBuffer(incommingBuffer)
		// 序列化成相應的Response對象
		Packet packet;
		synchronized (pendingQueue) {
			if (pendingQueue.size() == 0) {
				throw new IOException("Nothing in the queue, but got " + replyHdr.getXid());
			}
			packet = pendingQueue.remove();
		}
		/*
		 * Since requests are processed in order, we better get a response to the first
		 * request!
		 */
		try {
			if (packet.requestHeader.getXid() != replyHdr.getXid()) {
				packet.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
				throw new IOException("Xid out of order. Got Xid " + replyHdr.getXid() + " with err "
						+ +replyHdr.getErr() + " expected Xid " + packet.requestHeader.getXid()
						+ " for a packet with details: " + packet);
			}

			packet.replyHeader.setXid(replyHdr.getXid());
			packet.replyHeader.setErr(replyHdr.getErr());
			packet.replyHeader.setZxid(replyHdr.getZxid());
			if (replyHdr.getZxid() > 0) {
				lastZxid = replyHdr.getZxid();
			}
			if (packet.response != null && replyHdr.getErr() == 0) {
				packet.response.deserialize(bbia, "response");
			}

			if (LOG.isDebugEnabled()) {
				LOG.debug("Reading reply sessionid:0x" + Long.toHexString(sessionId) + ", packet:: "
						+ packet);
			}
		} finally {
			// 處理watcher注冊等邏輯
			finishPacket(packet);
		}
	}

	SendThread(ClientCnxnSocket clientCnxnSocket) {
		super(makeThreadName("-SendThread()"));
		state = States.CONNECTING;
		this.clientCnxnSocket = clientCnxnSocket;
		setDaemon(true); // 守護線程
	}
}
           
/
 * 核心線程 負責用戶端的事件處理 并觸發用戶端注冊的Watcher監聽,
 * EventThread中有一個waitingEvents隊列,用于臨時存放那些需要被觸發的Object,包括那些用戶端注冊的Watcher和異步接口中注冊的回調器AsyncCallback,
 * 同時,EventThread會不斷從waitingEvents中取出這個Object,識别其具體類型,并分别調用process和processResult接口方法來實作對事件的觸發和回調
 * @author Administrator
 *
 */
class EventThread extends ZooKeeperThread {
	private final LinkedBlockingQueue<Object> waitingEvents = new LinkedBlockingQueue<Object>();

	/**
	 * This is really the queued session state until the event thread actually
	 * processes the event and hands it to the watcher. But for all intents and
	 * purposes this is the state.
	 */
	private volatile KeeperState sessionState = KeeperState.Disconnected;

	private volatile boolean wasKilled = false;
	private volatile boolean isRunning = false;

	EventThread() {
		super(makeThreadName("-EventThread"));
		setDaemon(true);
	}

	@Override
	@SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
	public void run() {
		try {
			isRunning = true;
			while (true) {
				// eventThread不斷從waitingEvents隊列中取出待處理的Watcher對象
				Object event = waitingEvents.take();
				if (event == eventOfDeath) {
					wasKilled = true;
				} else {
					// 将Watcher對象進行處理 以達到觸發watcher的目的
					processEvent(event);
				}
				if (wasKilled)
					synchronized (waitingEvents) {
						if (waitingEvents.isEmpty()) {
							isRunning = false;
							break;
						}
					}
			}
		} catch (InterruptedException e) {
			LOG.error("Event thread exiting due to interruption", e);
		}

		LOG.info("EventThread shut down for session: 0x{}", Long.toHexString(getSessionId()));
	}
}
           

執行主類的run方法

void run() throws CliException, IOException, InterruptedException {
    if (cl.getCommand() == null) {
        System.out.println("Welcome to ZooKeeper!");

        boolean jlinemissing = false;
        // only use jline if it's in the classpath
        try {
            Class<?> consoleC = Class.forName("jline.console.ConsoleReader");
            Class<?> completorC =
                Class.forName("org.apache.zookeeper.JLineZNodeCompleter");

            System.out.println("JLine support is enabled");

            Object console =
                consoleC.getConstructor().newInstance();

            Object completor =
                completorC.getConstructor(ZooKeeper.class).newInstance(zk);
            Method addCompletor = consoleC.getMethod("addCompleter",
                    Class.forName("jline.console.completer.Completer"));
            addCompletor.invoke(console, completor);

            String line;
            Method readLine = consoleC.getMethod("readLine", String.class);
            while ((line = (String)readLine.invoke(console, getPrompt())) != null) {
                executeLine(line);
            }
        } catch (ClassNotFoundException e) {
            LOG.debug("Unable to start jline", e);
            jlinemissing = true;
        } catch (NoSuchMethodException e) {
            LOG.debug("Unable to start jline", e);
            jlinemissing = true;
        } catch (InvocationTargetException e) {
            LOG.debug("Unable to start jline", e);
            jlinemissing = true;
        } catch (IllegalAccessException e) {
            LOG.debug("Unable to start jline", e);
            jlinemissing = true;
        } catch (InstantiationException e) {
            LOG.debug("Unable to start jline", e);
            jlinemissing = true;
        }

        if (jlinemissing) {
            System.out.println("JLine support is disabled");
            BufferedReader br =
                new BufferedReader(new InputStreamReader(System.in));

            String line;
            while ((line = br.readLine()) != null) {
                executeLine(line);
            }
        }
    } else {
        // Command line args non-null.  Run what was passed.
        // 主要的邏輯 讀取用戶端請求 并執行
        processCmd(cl);
    }
    System.exit(exitCode);
}

protected boolean processZKCmd(MyCommandOptions co) throws CliException, IOException, InterruptedException {
    String[] args = co.getArgArray();
    String cmd = co.getCommand();
    if (args.length < 1) {
        usage();
        throw new MalformedCommandException("No command entered");
    }

    if (!commandMap.containsKey(cmd)) {
        usage();
        throw new CommandNotFoundException("Command not found " + cmd);
    }
    
    boolean watch = false;
    LOG.debug("Processing " + cmd);


    if (cmd.equals("quit")) {
        zk.close();
        System.exit(exitCode);
    } else if (cmd.equals("redo") && args.length >= 2) {
        Integer i = Integer.decode(args[1]);
        if (commandCount <= i || i < 0) { // don't allow redoing this redo
            throw new MalformedCommandException("Command index out of range");
        }
        cl.parseCommand(history.get(i));
        if (cl.getCommand().equals("redo")) {
            throw new MalformedCommandException("No redoing redos");
        }
        history.put(commandCount, history.get(i));
        processCmd(cl);
    } else if (cmd.equals("history")) {
        for (int i = commandCount - 10; i <= commandCount; ++i) {
            if (i < 0) continue;
            System.out.println(i + " - " + history.get(i));
        }
    } else if (cmd.equals("printwatches")) {
        if (args.length == 1) {
            System.out.println("printwatches is " + (printWatches ? "on" : "off"));
        } else {
            printWatches = args[1].equals("on");
        }
    } else if (cmd.equals("connect")) {
        if (args.length >= 2) {
            connectToZK(args[1]);
        } else {
            connectToZK(host);
        }
    }
    
    // Below commands all need a live connection
    if (zk == null || !zk.getState().isAlive()) {
        System.out.println("Not connected");
        return false;
    }
    
    // execute from commandMap
    CliCommand cliCmd = commandMapCli.get(cmd);
    if(cliCmd != null) {
        cliCmd.setZk(zk);
        // 執行其他的指令
        watch = cliCmd.parse(args).exec();
    } else if (!commandMap.containsKey(cmd)) {
         usage();
    }
    return watch;
}
           

根據指令的類型擷取一個CliCommand對象,比如set指令,傳回的是SetCommand對象.

然後執行對應CliCommand的exec方法(典型的指令行設計模式)

public class SetCommand extends CliCommand {

    private static Options options = new Options();
    private String[] args;
    private CommandLine cl;

    static {
        options.addOption("s", false, "stats");
        options.addOption("v", true, "version");
    }

    public SetCommand() {
        super("set", "[-s] [-v version] path data");
    }

    @Override
    public CliCommand parse(String[] cmdArgs) throws CliParseException {
        Parser parser = new PosixParser();
        try {
            cl = parser.parse(options, cmdArgs);
        } catch (ParseException ex) {
            throw new CliParseException(ex);
        }
        args = cl.getArgs();
        if (args.length < 3) {
            throw new CliParseException(getUsageStr());
        }

        return this;
    }

    @Override
    public boolean exec() throws CliException {
        String path = args[1];
        byte[] data = args[2].getBytes();
        int version;
        if (cl.hasOption("v")) {
            version = Integer.parseInt(cl.getOptionValue("v"));
        } else {
            version = -1;
        }

        try {
            Stat stat = zk.setData(path, data, version);
            if (cl.hasOption("s")) {
                new StatPrinter(out).print(stat);
            }
        } catch (IllegalArgumentException ex) {
            throw new MalformedPathException(ex.getMessage());
        } catch (KeeperException|InterruptedException ex) {
            throw new CliWrapperException(ex);
        }
        return false;
    }
}
           
/**
 * Set the data for the node of the given path if such a node exists and the
 * given version matches the version of the node (if the given version is -1, it
 * matches any node's versions). Return the stat of the node.
 * <p>
 * This operation, if successful, will trigger all the watches on the node of
 * the given path left by getData calls.
 * <p>
 * A KeeperException with error code KeeperException.NoNode will be thrown if no
 * node with the given path exists.
 * <p>
 * A KeeperException with error code KeeperException.BadVersion will be thrown
 * if the given version does not match the node's version.
 * <p>
 * The maximum allowable size of the data array is 1 MB (1,048,576 bytes).
 * Arrays larger than this will cause a KeeperException to be thrown.
 *
 * @param path    the path of the node
 * @param data    the data to set
 * @param version the expected matching version
 * @return the state of the node
 * @throws InterruptedException     If the server transaction is interrupted.
 * @throws KeeperException          If the server signals an error with a
 *                                  non-zero error code.
 * @throws IllegalArgumentException if an invalid path is specified
 */
public Stat setData(final String path, byte data[], int version)
		throws KeeperException, InterruptedException {
	final String clientPath = path;
	PathUtils.validatePath(clientPath);

	final String serverPath = prependChroot(clientPath);
	// 建立請求頭
	RequestHeader h = new RequestHeader();
	h.setType(ZooDefs.OpCode.setData);
	SetDataRequest request = new SetDataRequest();
	request.setPath(serverPath);
	request.setData(data);
	request.setVersion(version);
	SetDataResponse response = new SetDataResponse();
	// 通過網絡連接配接對象送出一個請求
	ReplyHeader r = cnxn.submitRequest(h, request, response, null);
	if (r.getErr() != 0) {
		throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath);
	}
	return response.getStat();
}
           
public ReplyHeader submitRequest(RequestHeader h, Record request, Record response,
		WatchRegistration watchRegistration) throws InterruptedException {
	return submitRequest(h, request, response, watchRegistration, null);
}

public ReplyHeader submitRequest(RequestHeader h, Record request, Record response,
		WatchRegistration watchRegistration, WatchDeregistration watchDeregistration)
		throws InterruptedException {
	ReplyHeader r = new ReplyHeader();
	Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration,
			watchDeregistration);
	synchronized (packet) {
		if (requestTimeout > 0) {
			// Wait for request completion with timeout
			waitForPacketFinish(r, packet);
		} else {
			// Wait for request completion infinitely
			while (!packet.finished) {
				packet.wait();
			}
		}
	}
	if (r.getErr() == Code.REQUESTTIMEOUT.intValue()) {
		sendThread.cleanAndNotifyState();
	}
	return r;
}

public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response,
		AsyncCallback cb, String clientPath, String serverPath, Object ctx,
		WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) {
	Packet packet = null;

	// Note that we do not generate the Xid for the packet yet. It is
	// generated later at send-time, by an implementation of
	// ClientCnxnSocket::doIO(),
	// where the packet is actually sent.
	packet = new Packet(h, r, request, response, watchRegistration);
	packet.cb = cb;
	packet.ctx = ctx;
	packet.clientPath = clientPath;
	packet.serverPath = serverPath;
	packet.watchDeregistration = watchDeregistration;
	// The synchronized block here is for two purpose:
	// 1. synchronize with the final cleanup() in SendThread.run() to avoid race
	// 2. synchronized against each packet. So if a closeSession packet is added,
	// later packet will be notified.
	synchronized (state) {
		if (!state.isAlive() || closing) {
			conLossPacket(packet);
		} else {
			// If the client is asking to close the session then
			// mark as closing
			if (h.getType() == OpCode.closeSession) {
				closing = true;
			}
			outgoingQueue.add(packet);
		}
	}
	sendThread.getClientCnxnSocket().packetAdded();
	return packet;
}
           

總結

  1. 在構造方法中啟動兩個線程sendThread和eventThread,處理發送和接收以及異步事件處理
  2. 讀取控制台指令放到指令的隊列中,sendThread讀取隊列中的資料,進行發送
  3. 整個過程中大量使用了隊列,就是為了保證順序性(FIFO)

    參照圖如下:

    Zookeeper用戶端源碼解讀(大緻流程)