天天看点

深入浅出Zookeeper之二Session建立

 上一篇,小编给大家介绍了zookeeper server端的启动。这一篇将来说一下client和server端是如何建立session的。通过官网的DataMonitor例子来说明。通过Session建立这个例子,可以大概知道client端和server端是如何处理请求的,之间是如何通信的。

官网Datamonitor的代码:

Executor

Java代码

  1. public class Executor implements Watcher, Runnable,
  2. DataMonitor.DataMonitorListener {
  3. String znode;
  4. DataMonitor dm;
  5. ZooKeeper zk;
  6. String filename;
  7. String exec[];
  8. Process child;
  9. //Executor是一个watcher,不过其处理都代理给DataMonitor了
  10. public Executor(String hostPort, String znode, String filename,
  11. String exec[]) throws KeeperException, IOException {
  12. this.filename = filename;
  13. this.exec = exec;
  14. //初始化zookeeper的client,这一步会建立连接,创建session,启动client端的SendThread线程,当然都是异步的
  15. zk = new ZooKeeper(hostPort, 3000, this);
  16. //datamonitor是真实的处理类
  17. dm = new DataMonitor(zk, znode, null, this);
  18. }
public class Executor implements Watcher, Runnable,
		DataMonitor.DataMonitorListener {
	String znode;

	DataMonitor dm;

	ZooKeeper zk;

	String filename;

	String exec[];

	Process child;

        //Executor是一个watcher,不过其处理都代理给DataMonitor了
	public Executor(String hostPort, String znode, String filename,
			String exec[]) throws KeeperException, IOException {
		this.filename = filename;
		this.exec = exec;
                //初始化zookeeper的client,这一步会建立连接,创建session,启动client端的SendThread线程,当然都是异步的
		zk = new ZooKeeper(hostPort, 3000, this);
                //datamonitor是真实的处理类
		dm = new DataMonitor(zk, znode, null, this);
	}      

DataMonitor

  1. public class DataMonitor implements Watcher, StatCallback {
  2. .......
  3. public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,
  4. DataMonitorListener listener) {
  5. ......
  6. // Get things started by checking if the node exists. We are going
  7. // to be completely event driven,异步exist,注册watcher,设置回调
  8. zk.exists(znode, true, this, null);
  9. //处理watcher通知事件
  10. public void process(WatchedEvent event) {
  11. String path = event.getPath();
  12. //如果exist操作的对应的事件触发(create.delete,setdata),则再次注册watcher(watcher是单次的),业务处理都在回调里处理
  13. } else {
  14. if (path != null && path.equals(znode)) {
  15. // Something has changed on the node, let's find out
  16. if (chainedWatcher != null) {
  17. chainedWatcher.process(event);
  18. //处理exist操作的回掉结果
  19. public void processResult(int rc, String path, Object ctx, Stat stat) {
  20. boolean exists;
  21. switch (rc) {
  22. case Code.Ok:
  23. exists = true;
  24. break;
  25. case Code.NoNode:
  26. exists = false;
  27. case Code.SessionExpired:
  28. case Code.NoAuth:
  29. dead = true;
  30. listener.closing(rc);
  31. return;
  32. default:
  33. // Retry errors
  34. //如果节点存在,则同步获取节点数据
  35. byte b[] = null;
  36. if (exists) {
  37. try {
  38. b = zk.getData(znode, false, null);
  39. } catch (KeeperException e) {
  40. // We don't need to worry about recovering now. The watch
  41. // callbacks will kick off any exception handling
  42. e.printStackTrace();
  43. } catch (InterruptedException e) {
  44. //如果数据有变化,则处理之
  45. if ((b == null && b != prevData)
  46. || (b != null && !Arrays.equals(prevData, b))) {
  47. listener.exists(b);
  48. prevData = b;
public class DataMonitor implements Watcher, StatCallback {

.......

	public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,
			DataMonitorListener listener) {
......
		// Get things started by checking if the node exists. We are going
		// to be completely event driven,异步exist,注册watcher,设置回调
		zk.exists(znode, true, this, null);
	}

......
	//处理watcher通知事件
	public void process(WatchedEvent event) {
		String path = event.getPath();
		//如果exist操作的对应的事件触发(create.delete,setdata),则再次注册watcher(watcher是单次的),业务处理都在回调里处理
		} else {
			if (path != null && path.equals(znode)) {
				// Something has changed on the node, let's find out
				zk.exists(znode, true, this, null);
			}
		}
		if (chainedWatcher != null) {
			chainedWatcher.process(event);
		}
	}
	//处理exist操作的回掉结果
	public void processResult(int rc, String path, Object ctx, Stat stat) {
		boolean exists;
		switch (rc) {
		case Code.Ok:
			exists = true;
			break;
		case Code.NoNode:
			exists = false;
			break;
		case Code.SessionExpired:
		case Code.NoAuth:
			dead = true;
			listener.closing(rc);
			return;
		default:
			// Retry errors
			zk.exists(znode, true, this, null);
			return;
		}
		//如果节点存在,则同步获取节点数据
		byte b[] = null;
		if (exists) {
			try {
				b = zk.getData(znode, false, null);
			} catch (KeeperException e) {
				// We don't need to worry about recovering now. The watch
				// callbacks will kick off any exception handling
				e.printStackTrace();
			} catch (InterruptedException e) {
				return;
			}
		}
		//如果数据有变化,则处理之
		if ((b == null && b != prevData)
				|| (b != null && !Arrays.equals(prevData, b))) {
			listener.exists(b);
			prevData = b;
		}
	}
}      

从这个例子出发,我们来分析下zookeeper的第一步session是如何建立的,主要就是Zookeeper类的构造。

  1. public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
  2. boolean canBeReadOnly)
  3. throws IOException
  4. {
  5. LOG.info("Initiating client connection, connectString=" + connectString
  6. + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
  7. //设置默认watcher
  8. watchManager.defaultWatcher = watcher;
  9. ConnectStringParser connectStringParser = new ConnectStringParser(
  10. connectString);
  11. //从配置的serverList,解析成serverAddresses,这里做了shuffle,server顺序被打乱了
  12. HostProvider hostProvider = new StaticHostProvider(
  13. connectStringParser.getServerAddresses());
  14. //创建客户端连接,初始化SendThread和EventThread
  15. cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
  16. hostProvider, sessionTimeout, this, watchManager,
  17. getClientCnxnSocket(), canBeReadOnly);
  18. //启动SendThread和EventThread
  19. cnxn.start();
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
            boolean canBeReadOnly)
        throws IOException
    {
        LOG.info("Initiating client connection, connectString=" + connectString
                + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
	//设置默认watcher
        watchManager.defaultWatcher = watcher;
	
        ConnectStringParser connectStringParser = new ConnectStringParser(
                connectString);
	//从配置的serverList,解析成serverAddresses,这里做了shuffle,server顺序被打乱了
        HostProvider hostProvider = new StaticHostProvider(
                connectStringParser.getServerAddresses());
	//创建客户端连接,初始化SendThread和EventThread
        cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                hostProvider, sessionTimeout, this, watchManager,
                getClientCnxnSocket(), canBeReadOnly);
	//启动SendThread和EventThread
        cnxn.start();
    }      
as