在zookeeper安裝目錄bin目錄下有用戶端、服務端開啟的腳本
打開這個腳本,有如下資訊:
"$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \
-cp "$CLASSPATH" $CLIENT_JVMFLAGS $JVMFLAGS \
org.apache.zookeeper.ZooKeeperMain "$@"
其實這就是啟動java程式(ZookeeperMain)的入口。
是以一切從org.apache.zookeeper.ZooKeeperMain.main開始
public static void main(String args[]) throws CliException, IOException, InterruptedException
{
ZooKeeperMain main = new ZooKeeperMain(args);
main.run();
}
看起來有點像SpringBoot的啟動類,首先構造一個類對象,然後執行run方法。
啟動類的構造
- 首先看一下構造方法:
public ZooKeeperMain(String args[]) throws IOException, InterruptedException {
// 解析外面傳入的參數 server timeout readonly
// 解析的結果存在在一個靜态内部類MyCommandOptions中
// $ bin/zkCli.sh -server 127.0.0.1:2181
// 執行如上指令 此處明确指定了服務端位址 就會儲存 server 127.0.0.1:2181
cl.parseOptions(args);
// 解析配置 server 控制台日志列印 --> 是否可以考慮用日志架構呢?
System.out.println("Connecting to " + cl.getOption("server"));
// 連接配接到服務端 在一個構造類裡面執行IO操作 是不是需要警惕呢 ?
// 這個過程中會進行各種參數尤其是服務端位址的解析,然後根據配置選用NIO還是netty進行網絡通訊,然後建立網絡連接配接對象,啟動兩個線程對象SendThread和EventThread線程對象,前者用于處理(連接配接、心跳、用戶端指令)請求的發送,後者用于異步事件的處理,在此過程中,會選擇伺服器進行連接配接,執行網絡操作
connectToZK(cl.getOption("server"));
}
預設的配置(MyCommandOptions:用于解析系統的配置和指令行輸入的指令):
// 通過HashMap儲存
private Map<String,String> options = new HashMap<String,String>();
// 通過Patter進行字元比對
public static final Pattern ARGS_PATTERN = Pattern.compile("\\s*([^\"\']\\S*|\"[^\"]*\"|'[^']*')\\s*");
public static final Pattern QUOTED_PATTERN = Pattern.compile("^([\'\"])(.*)(\\1)$");
public MyCommandOptions() {
options.put("server", "localhost:2181");
options.put("timeout", "30000");
}
- 連接配接服務端
protected void connectToZK(String newHost) throws InterruptedException, IOException {
// 已經啟動過了 直接關閉 再重新開機
if (zk != null && zk.getState().isAlive()) {
zk.close();
}
host = newHost;
boolean readOnly = cl.getOption("readonly") != null;
if (cl.getOption("secure") != null) {
System.setProperty(ZKClientConfig.SECURE_CLIENT, "true");
System.out.println("Secure connection is enabled");
}
// 繼續構造另一個對象 從類名感覺是一個進行ZooKeeper管理的類
// 此處額外還構造了一個MyWatcher
zk = new ZooKeeperAdmin(host, Integer.parseInt(cl.getOption("timeout")), new MyWatcher(), readOnly);
}
// 此類看起來不是很重要(僅僅用于列印異步事件的通知消息) 但是ZooKeeper的Watcher機制可是作用大大的
private class MyWatcher implements Watcher {
public void process(WatchedEvent event) {
// printWatches這個屬性預設為true
if (getPrintWatches()) {
ZooKeeperMain.printMessage("WATCHER::");
ZooKeeperMain.printMessage(event.toString());
}
}
}
public static void printMessage(String msg) {
System.out.println("\n"+msg);
}
ZooKeeperAdmin的關系結構圖
ZooKeeperAdmin的構造最後會調用到父類org.apache.zookeeper.ZooKeeper的構造
ZooKeeper類作為用戶端的主類,注解如下
/ 除非額外聲明 一般都是線程安全
* This is the main class of ZooKeeper client library. To use a ZooKeeper
* service, an application must first instantiate an object of ZooKeeper class.
* All the iterations will be done by calling the methods of ZooKeeper class.
* The methods of this class are thread-safe unless otherwise noted.
*
* <p> 隻要連接配接服務端成功 就會配置設定一個事務ID 然後用戶端通過持續的發送心跳保證會話的有效性
* Once a connection to a server is established, a session ID is assigned to the
* client. The client will send heart beats to the server periodically to keep
* the session valid.
*
* <p>
* The application can call ZooKeeper APIs through a client as long as the
* session ID of the client remains valid.
*
* <p> 如果在比較長的時間裡(超過sessionTimeout),那麼服務端就會将這個會話失效,無法
* 通過API進行操作了 除非再定義一個新的對象
* If for some reason, the client fails to send heart beats to the server for a
* prolonged period of time (exceeding the sessionTimeout value, for instance),
* the server will expire the session, and the session ID will become invalid.
* The client object will no longer be usable. To make ZooKeeper API calls, the
* application must create a new client object.
*
* <p> 如果目前連接配接的服務端連接配接失敗或長久沒有反應,在會話ID實作之前,就會自動嘗試連接配接到其他的伺服器。
* If the ZooKeeper server the client currently connects to fails or otherwise
* does not respond, the client will automatically try to connect to another
* server before its session ID expires. If successful, the application can
* continue to use the client.
*
* <p> 該類的方法包括同步的或異步,同步的方法會在服務端響應之前一直進行阻塞
* 異步的方法僅僅将消息放入到消息隊列 然後快速的傳回 在服務端響應之後 通過回調進行處理
* The ZooKeeper API methods are either synchronous or asynchronous. Synchronous
* methods blocks until the server has responded. Asynchronous methods just
* queue the request for sending and return immediately. They take a callback
* object that will be executed either on successful execution of the request or
* on error with an appropriate return code (rc) indicating the error.
*
* <p>
* Some successful ZooKeeper API calls can leave watches on the "data nodes" in
* the ZooKeeper server. Other successful ZooKeeper API calls can trigger those
* watches. Once a watch is triggered, an event will be delivered to the client
* which left the watch at the first place. Each watch can be triggered only
* once. Thus, up to one event will be delivered to a client for every watch it
* leaves.
* <p>
* A client needs an object of a class implementing Watcher interface for
* processing the events delivered to the client.
*
* When a client drops the current connection and re-connects to a server, all
* the existing watches are considered as being triggered but the undelivered
* events are lost. To emulate this, the client will generate a special event to
* tell the event handler a connection has been dropped. This special event has
* EventType None and KeeperState Disconnected.
*
*/
/*
* We suppress the "try" warning here because the close() method's signature
* allows it to throw InterruptedException which is strongly advised against by
* AutoCloseable (see:
* http://docs.oracle.com/javase/7/docs/api/java/lang/AutoCloseable.html#close()
* ). close() will never throw an InterruptedException but the exception remains
* in the signature for backwards compatibility purposes.
*/
Zookeeper類的構造
* 建立zookeeper用戶端對象 需要傳入host:port參數 可以為多個
* To create a ZooKeeper client object, the application needs to pass a
* connection string containing a comma separated list of host:port pairs, each
* corresponding to a ZooKeeper server.
* <p> 異步方式建立會話連接配接 通過watcher通知
* Session establishment is asynchronous. This constructor will initiate
* connection to the server and return immediately - potentially (usually)
* before the session is fully established. The watcher argument specifies the
* watcher that will be notified of any changes in state. This notification can
* come at any point before or after the constructor call has returned.
* <p> 對于多個服務端連接配接 随機選取一個進行連接配接 直到其中一個連接配接成功
* The instantiated ZooKeeper client object will pick an arbitrary server from
* the connectString and attempt to connect to it. If establishment of the
* connection fails, another server in the connect string will be tried (the
* order is non-deterministic, as we random shuffle the list), until a
* connection is established. The client will continue attempts until the
* session is explicitly closed.
* <p> 可選的chroot參數可以為所有建立的路徑指定一個預設的開始路徑 在下面connectString裡面有說明
* Added in 3.2.0: An optional "chroot" suffix may also be appended to the
* connection string. This will run the client commands while interpreting all
* paths relative to this root (similar to the unix chroot command).
* <p>
* For backward compatibility, there is another version
* {@link #ZooKeeper(String, int, Watcher, boolean)} which uses default
* {@link StaticHostProvider}
*
* @param connectString comma separated host:port pairs, each corresponding to
* a zk server. e.g.
* "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" If the
* optional chroot suffix is used the example would look
* like:
* "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a"
* where the client would be rooted at "/app/a" and all
* paths would be relative to this root - ie
* getting/setting/etc... "/foo/bar" would result in
* operations being run on "/app/a/foo/bar" (from the
* server perspective).
* @param sessionTimeout session timeout in milliseconds
* @param watcher a watcher object which will be notified of state
* changes, may also be notified for node events
* @param canBeReadOnly (added in 3.4) whether the created client is allowed to
* go to read-only mode in case of partitioning. Read-only
* mode basically means that if the client can't find any
* majority servers but there's partitioned server it
* could reach, it connects to one in read-only mode, i.e.
* read requests are allowed while write requests are not.
* It continues seeking for majority in the background.
* @param aHostProvider use this as HostProvider to enable custom behaviour.
* @param clientConfig (added in 3.5.2) passing this conf object gives each
* client the flexibility of configuring properties
* differently compared to other instances
* @throws IOException in cases of network failure
* @throws IllegalArgumentException if an invalid chroot path is specified
*/
/**
* 用戶端核心線程,其内部包括兩個線程:SendThread和EventThread。
* 前者是一個I/O線程,主要負責Zookeeper用戶端和服務端之間的網絡I/O通訊 後者是一個事件線程,主要負責對服務端事件進行處理
*/
protected final ClientCnxn cnxn;
private static final Logger LOG;
static {
// Keep these two lines together to keep the initialization order explicit
LOG = LoggerFactory.getLogger(ZooKeeper.class);
Environment.logEnv("Client environment:", LOG);
}
/**
* 用戶端位址清單管理器
*/
protected final HostProvider hostProvider;
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly,
HostProvider aHostProvider, ZKClientConfig clientConfig) throws IOException {
LOG.info("Initiating client connection, connectString=" + connectString + " sessionTimeout="
+ sessionTimeout + " watcher=" + watcher);
if (clientConfig == null) {
clientConfig = new ZKClientConfig();
}
this.clientConfig = clientConfig;
// 1. 設定預設Watcher
watchManager = defaultWatchManager();
watchManager.defaultWatcher = watcher; // 将傳入的watcher作為預設的watcher
// 2. 設定zookeeper伺服器位址清單
ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
hostProvider = aHostProvider;
// 3. 擷取網絡連接配接對象 僅僅執行個體化 getClientCnxnSocket
// 4. 建立clientCnxn
cnxn = createConnection(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this,
watchManager, getClientCnxnSocket(), canBeReadOnly);
// 5. 啟動sendThread 和 eventThread 執行對應的run方法 此時就會執行sendThread裡面的連接配接請求
cnxn.start();
}
讀取系統配置
public ZKClientConfig() {
super();
initFromJavaSystemProperties();
}
/**
* Initialize all the ZooKeeper client properties which are configurable as
* java system property
*/
private void initFromJavaSystemProperties() {
setProperty(ZOOKEEPER_REQUEST_TIMEOUT,
System.getProperty(ZOOKEEPER_REQUEST_TIMEOUT));
}
// org.apache.zookeeper.common.ZKConfig ZKClientConfig的父類
/**
* properties, which are common to both client and server, are initialized
* from system properties
*/
public ZKConfig() {
init();
}
private void init() {
/**
* backward compatibility for all currently available client properties
*/
handleBackwardCompatibility();
}
設定預設Watcher
/* Useful for testing watch handling behavior */
protected ZKWatchManager defaultWatchManager() {
// Manage watchers & handle events generated by the ClientCnxn object.
return new ZKWatchManager(getClientConfig().getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET));
}
擷取網絡連接配接
private ClientCnxnSocket getClientCnxnSocket() throws IOException {
String clientCnxnSocketName = getClientConfig()
.getProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET);
if (clientCnxnSocketName == null) {
// 未設定要使用的用戶端連接配接器 則使用預設的NIO模式 可以選擇的還有Netty
clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
}
try {
Constructor<?> clientCxnConstructor = Class.forName(clientCnxnSocketName)
.getDeclaredConstructor(ZKClientConfig.class);
ClientCnxnSocket clientCxnSocket = (ClientCnxnSocket) clientCxnConstructor
.newInstance(getClientConfig());
return clientCxnSocket;
} catch (Exception e) {
IOException ioe = new IOException("Couldn't instantiate " + clientCnxnSocketName);
ioe.initCause(e);
throw ioe;
}
}
除了原生的NIO,還支援通過Netty的方式進行網絡通訊(Netty基于NIO,但是對NIO做了更好的封裝,并且解決了NIO的BUG).通過zookeeper.clientCnxnSocket參數進行配置。
最重要的屬性如下:
// 負責請求的發送
protected ClientCnxn.SendThread sendThread;
// 緩沖發送的請求 保證請求的順序性
protected LinkedBlockingDeque<Packet> outgoingQueue;
該類中最主要的方法就是doTransport和doIO(負責請求的發送和響應接收)方法
@Override
void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn)
throws IOException, InterruptedException {
selector.select(waitTimeOut);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
// Everything below and until we get back to the select is
// non blocking, so time is effectively a constant. That is
// Why we just have to do this once, here
updateNow();
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (sc.finishConnect()) {
updateLastSendAndHeard();
updateSocketAddresses();
// 建構ConnectRequest請求
/*
* 之前的步驟隻是純粹地從網絡TCP層面完成了用戶端和服務端之間的socket連接配接,但遠未完成ZooKeeper用戶端的會話建立
*/
sendThread.primeConnection();
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
// 響應控制階段
/*
* 接收讀寫事件并處理
*/
doIO(pendingQueue, cnxn);
}
}
if (sendThread.getZkState().isConnected()) {
if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
enableWrite();
}
}
selected.clear();
}
/**
* 核心邏輯 主要負責對請求的發送和相應接收過程
* @return true if a packet was received
* @throws InterruptedException
* @throws IOException
*/
void doIO(List<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
}
if (sockKey.isReadable()) {
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException("Unable to read additional data from server sessionid 0x"
+ Long.toHexString(sessionId) + ", likely server has closed socket");
}
if (!incomingBuffer.hasRemaining()) {
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
recvCount.getAndIncrement();
readLength();
} else if (!initialized) {
// 如果尚未初始化 那麼就認為該相應一定是會話建立請求的響應 直接交給readConnectResult方法處理該響應
readConnectResult();
enableRead();
if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
// Since SASL authentication has completed (if client is configured to do so),
// outgoing packets waiting in the outgoingQueue can now be sent.
enableWrite();
}
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
initialized = true;
} else {
// 針對各種接收事件進行處理 如心跳 認證 正常請求響應
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
}
}
}
if (sockKey.isWritable()) {
// 從發送隊列中取出一個可發送的Packet對象 同時生成一個用戶端請求序号XID并将其設定到Packet請求頭中去
Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress());
if (p != null) {
updateLastSend();
// If we already started writing p, p.bb will already exist
if (p.bb == null) {
if ((p.requestHeader != null) && (p.requestHeader.getType() != OpCode.ping)
&& (p.requestHeader.getType() != OpCode.auth)) {
// 設定請求頭序列号 XID
p.requestHeader.setXid(cnxn.getXid());
}
// 進行序列化
p.createBB();
}
sock.write(p.bb);
if (!p.bb.hasRemaining()) {
sentCount.getAndIncrement();
// 移除已發送Packet
outgoingQueue.removeFirstOccurrence(p);
if (p.requestHeader != null && p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
synchronized (pendingQueue) {
// 将packet儲存到pendingQueue隊列中 以便等待服務端響應傳回後進行相應的處理
pendingQueue.add(p);
}
}
}
}
if (outgoingQueue.isEmpty()) {
// No more packets to send: turn off write interest flag.
// Will be turned on later by a later call to enableWrite(),
// from within ZooKeeperSaslClient (if client is configured
// to attempt SASL authentication), or in either doIO() or
// in doTransport() if not.
disableWrite();
} else if (!initialized && p != null && !p.bb.hasRemaining()) {
// On initial connection, write the complete connect request
// packet, but then disable further writes until after
// receiving a successful connection response. If the
// session is expired, then the server sends the expiration
// response and immediately closes its end of the socket. If
// the client is simultaneously writing on its end, then the
// TCP stack may choose to abort with RST, in which case the
// client would never receive the session expired event. See
// http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html
disableWrite();
} else {
// Just in case
enableWrite();
}
}
}
建立clientCnxn
protected ClientCnxn createConnection(String chrootPath, HostProvider hostProvider, int sessionTimeout,
ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
boolean canBeReadOnly) throws IOException {
// 建立并初始化用戶端網絡連接配接器
return new ClientCnxn(chrootPath, hostProvider, sessionTimeout, this, watchManager, clientCnxnSocket,
canBeReadOnly);
}
org.apache.zookeeper.ClientCnxn這個類的類名取的是真的不好,怎麼看也看不出是啥意思
這個類的注解是:This class manages the socket i/o for the client. ClientCnxn maintains a list of available servers to connect to and “transparently” switches servers it is connected to as needed.
網絡連接配接器 管理用戶端和服務端的所有網絡互動 包括兩個核心隊列outgoingQueue和pendingQueue 以及連個核心網絡線程 SendThread和EventThread 前者用于管理用戶端和服務端之間的所有網絡I/O 後者則用于進行用戶端事件的處理 同時用戶端還會将ClientCnxnSocket配置設定給SendThread作為底層網絡I/O處理器 并初始化EventThread的待處理事件隊列waitingEvents 用于存放所有等待被用戶端換處理的事件
/**
* 建立一個網絡連接配接對象
* Creates a connection object. The actual network connect doesn't get
* established until needed. The start() instance method must be called
* subsequent to construction.
*
* @param chrootPath - the chroot of this client. Should be removed from
* this Class in ZOOKEEPER-838
* @param hostProvider the list of ZooKeeper servers to connect to
* @param sessionTimeout the timeout for connections.
* @param zooKeeper the zookeeper object that this connection is related
* to.
* @param watcher watcher for this connection
* @param clientCnxnSocket the socket implementation used (e.g. NIO/Netty)
* @param sessionId session id if re-establishing session
* @param sessionPasswd session passwd if re-establishing session
* @param canBeReadOnly whether the connection is allowed to go to read-only
* mode in case of partitioning
* @throws IOException
*/
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, long sessionId,
byte[] sessionPasswd, boolean canBeReadOnly) {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
this.sessionTimeout = sessionTimeout;
this.hostProvider = hostProvider;
this.chrootPath = chrootPath;
connectTimeout = sessionTimeout / hostProvider.size();
readTimeout = sessionTimeout * 2 / 3;
readOnly = canBeReadOnly;
// 核心網絡線程 管理用戶端和服務端之間的所有網絡I/O
sendThread = new SendThread(clientCnxnSocket);
// 核心網絡線程 進行用戶端的事件處理
eventThread = new EventThread();
this.clientConfig = zooKeeper.getClientConfig();
initRequestTimeout();
}
/
* This class services the outgoing request queue and generates the heart beats.
* It also spawns(産生) the ReadThread.
* 用戶端ClientCnxn内部一個核心的I/O排程線程
* 用于管理用戶端和服務端之間的所有網絡I/O操作
* 在ZooKeeper用戶端的實際運作過程中
* 一方面sendThread維護了用戶端和服務端之間的會話聲明周期,其通過在對應的周期頻率内向
* 服務端發送一個PING包來實作心跳檢測.同時在會話周期内,如果用戶端與服務端之間出現TCP連接配接斷開的情況,那麼會自動且透明化完成重連操作
* 另一方面,SendThread管理了用戶端所有的請求發送和響應接收操作,其将上層用戶端API操作轉換成相應的請求協定并發送到服務端,并完成對同步調用的傳回和異步調用的回調。
* 同時,SendThread還負責将來自服務端的事件傳遞給EventThread去處理
*/
class SendThread extends ZooKeeperThread {
private long lastPingSentNs;
private final ClientCnxnSocket clientCnxnSocket;
private Random r = new Random();
private boolean isFirstConnect = true;
@Override
public void run() {
// 首先判斷目前用戶端的狀态 進行一系列的請理性工作 為用戶端發送“會話建立”請求做好準備
clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
long lastPingRwServer = Time.currentElapsedTime();
final int MAX_SEND_PING_INTERVAL = 10000; // 10 seconds
InetSocketAddress serverAddress = null;
while (state.isAlive()) {
try {
// 未連接配接狀态則建立連接配接
if (!clientCnxnSocket.isConnected()) {
// don't re-establish connection if we are closing
if (closing) {
break;
}
// 擷取一個伺服器位址
/*
* 在開始建立TCP連接配接之前 SendThread首先需要擷取一個ZooKeeper伺服器的目标位址 這通常是從HostProvider中 随機擷取一個位址
* 然後委托網絡連接配接器去建立與ZooKeeper伺服器之間的TCP連接配接
*/
if (rwServerAddress != null) {
serverAddress = rwServerAddress;
rwServerAddress = null;
} else {
serverAddress = hostProvider.next(1000);
}
// 建立TCP連接配接
/*
* 擷取一個伺服器位址後 網絡連接配接器就負責和伺服器建立一個TCP長連接配接 并設定狀态為 CONNECTING
*/
startConnect(serverAddress);
clientCnxnSocket.updateLastSendAndHeard();
}
// 處于連接配接狀态 判斷是否已經認證
if (state.isConnected()) {
// determine whether we need to send an AuthFailed event.
if (zooKeeperSaslClient != null) {
boolean sendAuthEvent = false;
if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
try {
zooKeeperSaslClient.initialize(ClientCnxn.this);
} catch (SaslException e) {
LOG.error(
"SASL authentication with Zookeeper Quorum member failed: " + e);
state = States.AUTH_FAILED;
sendAuthEvent = true;
}
}
KeeperState authState = zooKeeperSaslClient.getKeeperState();
if (authState != null) {
if (authState == KeeperState.AuthFailed) {
// An authentication error occurred during authentication with the
// Zookeeper Server.
state = States.AUTH_FAILED;
sendAuthEvent = true;
} else {
if (authState == KeeperState.SaslAuthenticated) {
sendAuthEvent = true;
}
}
}
if (sendAuthEvent) {
eventThread.queueEvent(
new WatchedEvent(Watcher.Event.EventType.None, authState, null));
if (state == States.AUTH_FAILED) {
eventThread.queueEventOfDeath();
}
}
}
to = readTimeout - clientCnxnSocket.getIdleRecv();
} else {
to = connectTimeout - clientCnxnSocket.getIdleRecv();
}
if (to <= 0) {
String warnInfo;
warnInfo = "Client session timed out, have not heard from server in "
+ clientCnxnSocket.getIdleRecv() + "ms" + " for sessionid 0x"
+ Long.toHexString(sessionId);
LOG.warn(warnInfo);
throw new SessionTimeoutException(warnInfo);
}
if (state.isConnected()) {
// 1000(1 second) is to prevent race condition missing to send the second ping
// also make sure not to send too many pings when readTimeout is small
int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend()
- ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
// send a ping request either time is due or no packet sent out within
// MAX_SEND_PING_INTERVAL
if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
sendPing();
clientCnxnSocket.updateLastSend();
} else {
if (timeToNextPing < to) {
to = timeToNextPing;
}
}
}
// If we are in read-only mode, seek for read/write server
if (state == States.CONNECTEDREADONLY) {
long now = Time.currentElapsedTime();
int idlePingRwServer = (int) (now - lastPingRwServer);
if (idlePingRwServer >= pingRwTimeout) {
lastPingRwServer = now;
idlePingRwServer = 0;
pingRwTimeout = Math.min(2 * pingRwTimeout, maxPingRwTimeout);
pingRwServer();
}
to = Math.min(to, pingRwTimeout - idlePingRwServer);
}
// 發送請求
clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
} catch (Throwable e) {
if (closing) {
if (LOG.isDebugEnabled()) {
// closing so this is expected
LOG.debug("An exception was thrown while closing send thread for session 0x"
+ Long.toHexString(getSessionId()) + " : " + e.getMessage());
}
break;
} else {
// this is ugly, you have a better way speak up
if (e instanceof SessionExpiredException) {
LOG.info(e.getMessage() + ", closing socket connection");
} else if (e instanceof SessionTimeoutException) {
LOG.info(e.getMessage() + RETRY_CONN_MSG);
} else if (e instanceof EndOfStreamException) {
LOG.info(e.getMessage() + RETRY_CONN_MSG);
} else if (e instanceof RWServerFoundException) {
LOG.info(e.getMessage());
} else if (e instanceof SocketException) {
LOG.info("Socket error occurred: {}: {}", serverAddress, e.getMessage());
} else {
LOG.warn("Session 0x{} for server {}, unexpected error{}",
Long.toHexString(getSessionId()), serverAddress, RETRY_CONN_MSG, e);
}
// At this point, there might still be new packets appended to outgoingQueue.
// they will be handled in next connection or cleared up if closed.
cleanAndNotifyState();
}
}
}
// while循環結束
synchronized (state) {
// When it comes to this point, it guarantees that later queued
// packet to outgoingQueue will be notified of death.
cleanup();
}
clientCnxnSocket.close();
if (state.isAlive()) {
eventThread.queueEvent(
new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null));
}
eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Closed, null));
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
"SendThread exited loop for session: 0x" + Long.toHexString(getSessionId()));
}
/**
* 用戶端擷取到來自服務端的完整響應資料後 根據不同的用戶端請求類型 會進行不同的處理
*
* @param incomingBuffer
* @throws IOException
*/
void readResponse(ByteBuffer incomingBuffer) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
replyHdr.deserialize(bbia, "header");
if (replyHdr.getXid() == -2) {
// 心跳資訊
// -2 is the xid for pings
if (LOG.isDebugEnabled()) {
LOG.debug("Got ping response for sessionid: 0x" + Long.toHexString(sessionId) + " after "
+ ((System.nanoTime() - lastPingSentNs) / 1000000) + "ms");
}
return;
}
if (replyHdr.getXid() == -4) {
// 認證
// -4 is the xid for AuthPacket
if (replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
state = States.AUTH_FAILED;
eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null));
eventThread.queueEventOfDeath();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Got auth sessionid:0x" + Long.toHexString(sessionId));
}
return;
}
if (replyHdr.getXid() == -1) {
// 事件通知
// -1 means notification
if (LOG.isDebugEnabled()) {
LOG.debug("Got notification sessionid:0x" + Long.toHexString(sessionId));
}
// 反序列化為WatcherEvent事件并放入到待處理隊列中
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");
// convert from a server path to a client path
if (chrootPath != null) {
String serverPath = event.getPath();
if (serverPath.compareTo(chrootPath) == 0)
event.setPath("/");
else if (serverPath.length() > chrootPath.length())
event.setPath(serverPath.substring(chrootPath.length()));
else {
LOG.warn("Got server path " + event.getPath() + " which is too short for chroot path "
+ chrootPath);
}
}
WatchedEvent we = new WatchedEvent(event);
if (LOG.isDebugEnabled()) {
LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(sessionId));
}
eventThread.queueEvent(we);
return;
}
// If SASL authentication is currently in progress, construct and
// send a response packet immediately, rather than queuing a
// response as with other packets.
if (tunnelAuthInProgress()) {
GetSASLRequest request = new GetSASLRequest();
request.deserialize(bbia, "token");
zooKeeperSaslClient.respondToServer(request.getToken(), ClientCnxn.this);
return;
}
// 如果是一個正常的請求響應(指的是Create/GetData和Exist等操作請求),那麼會從pendingQueue隊列中取出一個Packet來
// 記性響應的處理
// ZooKeeper用戶端首先會檢驗服務端響應中包含的XID值來確定請求處理的順序性,然後再将接收到的ByteBuffer(incommingBuffer)
// 序列化成相應的Response對象
Packet packet;
synchronized (pendingQueue) {
if (pendingQueue.size() == 0) {
throw new IOException("Nothing in the queue, but got " + replyHdr.getXid());
}
packet = pendingQueue.remove();
}
/*
* Since requests are processed in order, we better get a response to the first
* request!
*/
try {
if (packet.requestHeader.getXid() != replyHdr.getXid()) {
packet.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
throw new IOException("Xid out of order. Got Xid " + replyHdr.getXid() + " with err "
+ +replyHdr.getErr() + " expected Xid " + packet.requestHeader.getXid()
+ " for a packet with details: " + packet);
}
packet.replyHeader.setXid(replyHdr.getXid());
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());
if (replyHdr.getZxid() > 0) {
lastZxid = replyHdr.getZxid();
}
if (packet.response != null && replyHdr.getErr() == 0) {
packet.response.deserialize(bbia, "response");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Reading reply sessionid:0x" + Long.toHexString(sessionId) + ", packet:: "
+ packet);
}
} finally {
// 處理watcher注冊等邏輯
finishPacket(packet);
}
}
SendThread(ClientCnxnSocket clientCnxnSocket) {
super(makeThreadName("-SendThread()"));
state = States.CONNECTING;
this.clientCnxnSocket = clientCnxnSocket;
setDaemon(true); // 守護線程
}
}
/
* 核心線程 負責用戶端的事件處理 并觸發用戶端注冊的Watcher監聽,
* EventThread中有一個waitingEvents隊列,用于臨時存放那些需要被觸發的Object,包括那些用戶端注冊的Watcher和異步接口中注冊的回調器AsyncCallback,
* 同時,EventThread會不斷從waitingEvents中取出這個Object,識别其具體類型,并分别調用process和processResult接口方法來實作對事件的觸發和回調
* @author Administrator
*
*/
class EventThread extends ZooKeeperThread {
private final LinkedBlockingQueue<Object> waitingEvents = new LinkedBlockingQueue<Object>();
/**
* This is really the queued session state until the event thread actually
* processes the event and hands it to the watcher. But for all intents and
* purposes this is the state.
*/
private volatile KeeperState sessionState = KeeperState.Disconnected;
private volatile boolean wasKilled = false;
private volatile boolean isRunning = false;
EventThread() {
super(makeThreadName("-EventThread"));
setDaemon(true);
}
@Override
@SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")
public void run() {
try {
isRunning = true;
while (true) {
// eventThread不斷從waitingEvents隊列中取出待處理的Watcher對象
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
// 将Watcher對象進行處理 以達到觸發watcher的目的
processEvent(event);
}
if (wasKilled)
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
} catch (InterruptedException e) {
LOG.error("Event thread exiting due to interruption", e);
}
LOG.info("EventThread shut down for session: 0x{}", Long.toHexString(getSessionId()));
}
}
執行主類的run方法
void run() throws CliException, IOException, InterruptedException {
if (cl.getCommand() == null) {
System.out.println("Welcome to ZooKeeper!");
boolean jlinemissing = false;
// only use jline if it's in the classpath
try {
Class<?> consoleC = Class.forName("jline.console.ConsoleReader");
Class<?> completorC =
Class.forName("org.apache.zookeeper.JLineZNodeCompleter");
System.out.println("JLine support is enabled");
Object console =
consoleC.getConstructor().newInstance();
Object completor =
completorC.getConstructor(ZooKeeper.class).newInstance(zk);
Method addCompletor = consoleC.getMethod("addCompleter",
Class.forName("jline.console.completer.Completer"));
addCompletor.invoke(console, completor);
String line;
Method readLine = consoleC.getMethod("readLine", String.class);
while ((line = (String)readLine.invoke(console, getPrompt())) != null) {
executeLine(line);
}
} catch (ClassNotFoundException e) {
LOG.debug("Unable to start jline", e);
jlinemissing = true;
} catch (NoSuchMethodException e) {
LOG.debug("Unable to start jline", e);
jlinemissing = true;
} catch (InvocationTargetException e) {
LOG.debug("Unable to start jline", e);
jlinemissing = true;
} catch (IllegalAccessException e) {
LOG.debug("Unable to start jline", e);
jlinemissing = true;
} catch (InstantiationException e) {
LOG.debug("Unable to start jline", e);
jlinemissing = true;
}
if (jlinemissing) {
System.out.println("JLine support is disabled");
BufferedReader br =
new BufferedReader(new InputStreamReader(System.in));
String line;
while ((line = br.readLine()) != null) {
executeLine(line);
}
}
} else {
// Command line args non-null. Run what was passed.
// 主要的邏輯 讀取用戶端請求 并執行
processCmd(cl);
}
System.exit(exitCode);
}
protected boolean processZKCmd(MyCommandOptions co) throws CliException, IOException, InterruptedException {
String[] args = co.getArgArray();
String cmd = co.getCommand();
if (args.length < 1) {
usage();
throw new MalformedCommandException("No command entered");
}
if (!commandMap.containsKey(cmd)) {
usage();
throw new CommandNotFoundException("Command not found " + cmd);
}
boolean watch = false;
LOG.debug("Processing " + cmd);
if (cmd.equals("quit")) {
zk.close();
System.exit(exitCode);
} else if (cmd.equals("redo") && args.length >= 2) {
Integer i = Integer.decode(args[1]);
if (commandCount <= i || i < 0) { // don't allow redoing this redo
throw new MalformedCommandException("Command index out of range");
}
cl.parseCommand(history.get(i));
if (cl.getCommand().equals("redo")) {
throw new MalformedCommandException("No redoing redos");
}
history.put(commandCount, history.get(i));
processCmd(cl);
} else if (cmd.equals("history")) {
for (int i = commandCount - 10; i <= commandCount; ++i) {
if (i < 0) continue;
System.out.println(i + " - " + history.get(i));
}
} else if (cmd.equals("printwatches")) {
if (args.length == 1) {
System.out.println("printwatches is " + (printWatches ? "on" : "off"));
} else {
printWatches = args[1].equals("on");
}
} else if (cmd.equals("connect")) {
if (args.length >= 2) {
connectToZK(args[1]);
} else {
connectToZK(host);
}
}
// Below commands all need a live connection
if (zk == null || !zk.getState().isAlive()) {
System.out.println("Not connected");
return false;
}
// execute from commandMap
CliCommand cliCmd = commandMapCli.get(cmd);
if(cliCmd != null) {
cliCmd.setZk(zk);
// 執行其他的指令
watch = cliCmd.parse(args).exec();
} else if (!commandMap.containsKey(cmd)) {
usage();
}
return watch;
}
根據指令的類型擷取一個CliCommand對象,比如set指令,傳回的是SetCommand對象.
然後執行對應CliCommand的exec方法(典型的指令行設計模式)
public class SetCommand extends CliCommand {
private static Options options = new Options();
private String[] args;
private CommandLine cl;
static {
options.addOption("s", false, "stats");
options.addOption("v", true, "version");
}
public SetCommand() {
super("set", "[-s] [-v version] path data");
}
@Override
public CliCommand parse(String[] cmdArgs) throws CliParseException {
Parser parser = new PosixParser();
try {
cl = parser.parse(options, cmdArgs);
} catch (ParseException ex) {
throw new CliParseException(ex);
}
args = cl.getArgs();
if (args.length < 3) {
throw new CliParseException(getUsageStr());
}
return this;
}
@Override
public boolean exec() throws CliException {
String path = args[1];
byte[] data = args[2].getBytes();
int version;
if (cl.hasOption("v")) {
version = Integer.parseInt(cl.getOptionValue("v"));
} else {
version = -1;
}
try {
Stat stat = zk.setData(path, data, version);
if (cl.hasOption("s")) {
new StatPrinter(out).print(stat);
}
} catch (IllegalArgumentException ex) {
throw new MalformedPathException(ex.getMessage());
} catch (KeeperException|InterruptedException ex) {
throw new CliWrapperException(ex);
}
return false;
}
}
/**
* Set the data for the node of the given path if such a node exists and the
* given version matches the version of the node (if the given version is -1, it
* matches any node's versions). Return the stat of the node.
* <p>
* This operation, if successful, will trigger all the watches on the node of
* the given path left by getData calls.
* <p>
* A KeeperException with error code KeeperException.NoNode will be thrown if no
* node with the given path exists.
* <p>
* A KeeperException with error code KeeperException.BadVersion will be thrown
* if the given version does not match the node's version.
* <p>
* The maximum allowable size of the data array is 1 MB (1,048,576 bytes).
* Arrays larger than this will cause a KeeperException to be thrown.
*
* @param path the path of the node
* @param data the data to set
* @param version the expected matching version
* @return the state of the node
* @throws InterruptedException If the server transaction is interrupted.
* @throws KeeperException If the server signals an error with a
* non-zero error code.
* @throws IllegalArgumentException if an invalid path is specified
*/
public Stat setData(final String path, byte data[], int version)
throws KeeperException, InterruptedException {
final String clientPath = path;
PathUtils.validatePath(clientPath);
final String serverPath = prependChroot(clientPath);
// 建立請求頭
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.setData);
SetDataRequest request = new SetDataRequest();
request.setPath(serverPath);
request.setData(data);
request.setVersion(version);
SetDataResponse response = new SetDataResponse();
// 通過網絡連接配接對象送出一個請求
ReplyHeader r = cnxn.submitRequest(h, request, response, null);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath);
}
return response.getStat();
}
public ReplyHeader submitRequest(RequestHeader h, Record request, Record response,
WatchRegistration watchRegistration) throws InterruptedException {
return submitRequest(h, request, response, watchRegistration, null);
}
public ReplyHeader submitRequest(RequestHeader h, Record request, Record response,
WatchRegistration watchRegistration, WatchDeregistration watchDeregistration)
throws InterruptedException {
ReplyHeader r = new ReplyHeader();
Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration,
watchDeregistration);
synchronized (packet) {
if (requestTimeout > 0) {
// Wait for request completion with timeout
waitForPacketFinish(r, packet);
} else {
// Wait for request completion infinitely
while (!packet.finished) {
packet.wait();
}
}
}
if (r.getErr() == Code.REQUESTTIMEOUT.intValue()) {
sendThread.cleanAndNotifyState();
}
return r;
}
public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response,
AsyncCallback cb, String clientPath, String serverPath, Object ctx,
WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) {
Packet packet = null;
// Note that we do not generate the Xid for the packet yet. It is
// generated later at send-time, by an implementation of
// ClientCnxnSocket::doIO(),
// where the packet is actually sent.
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
packet.watchDeregistration = watchDeregistration;
// The synchronized block here is for two purpose:
// 1. synchronize with the final cleanup() in SendThread.run() to avoid race
// 2. synchronized against each packet. So if a closeSession packet is added,
// later packet will be notified.
synchronized (state) {
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
// If the client is asking to close the session then
// mark as closing
if (h.getType() == OpCode.closeSession) {
closing = true;
}
outgoingQueue.add(packet);
}
}
sendThread.getClientCnxnSocket().packetAdded();
return packet;
}
總結
- 在構造方法中啟動兩個線程sendThread和eventThread,處理發送和接收以及異步事件處理
- 讀取控制台指令放到指令的隊列中,sendThread讀取隊列中的資料,進行發送
-
整個過程中大量使用了隊列,就是為了保證順序性(FIFO)
參照圖如下: