上一篇,小编给大家介绍了zookeeper server端的启动。这一篇将来说一下client和server端是如何建立session的。通过官网的DataMonitor例子来说明。通过Session建立这个例子,可以大概知道client端和server端是如何处理请求的,之间是如何通信的。
官网Datamonitor的代码:
Executor
Java代码
- public class Executor implements Watcher, Runnable,
- DataMonitor.DataMonitorListener {
- String znode;
- DataMonitor dm;
- ZooKeeper zk;
- String filename;
- String exec[];
- Process child;
- //Executor是一个watcher,不过其处理都代理给DataMonitor了
- public Executor(String hostPort, String znode, String filename,
- String exec[]) throws KeeperException, IOException {
- this.filename = filename;
- this.exec = exec;
- //初始化zookeeper的client,这一步会建立连接,创建session,启动client端的SendThread线程,当然都是异步的
- zk = new ZooKeeper(hostPort, 3000, this);
- //datamonitor是真实的处理类
- dm = new DataMonitor(zk, znode, null, this);
- }
public class Executor implements Watcher, Runnable,
DataMonitor.DataMonitorListener {
String znode;
DataMonitor dm;
ZooKeeper zk;
String filename;
String exec[];
Process child;
//Executor是一个watcher,不过其处理都代理给DataMonitor了
public Executor(String hostPort, String znode, String filename,
String exec[]) throws KeeperException, IOException {
this.filename = filename;
this.exec = exec;
//初始化zookeeper的client,这一步会建立连接,创建session,启动client端的SendThread线程,当然都是异步的
zk = new ZooKeeper(hostPort, 3000, this);
//datamonitor是真实的处理类
dm = new DataMonitor(zk, znode, null, this);
}
DataMonitor
- public class DataMonitor implements Watcher, StatCallback {
- .......
- public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,
- DataMonitorListener listener) {
- ......
- // Get things started by checking if the node exists. We are going
- // to be completely event driven,异步exist,注册watcher,设置回调
- zk.exists(znode, true, this, null);
- //处理watcher通知事件
- public void process(WatchedEvent event) {
- String path = event.getPath();
- //如果exist操作的对应的事件触发(create.delete,setdata),则再次注册watcher(watcher是单次的),业务处理都在回调里处理
- } else {
- if (path != null && path.equals(znode)) {
- // Something has changed on the node, let's find out
- if (chainedWatcher != null) {
- chainedWatcher.process(event);
- //处理exist操作的回掉结果
- public void processResult(int rc, String path, Object ctx, Stat stat) {
- boolean exists;
- switch (rc) {
- case Code.Ok:
- exists = true;
- break;
- case Code.NoNode:
- exists = false;
- case Code.SessionExpired:
- case Code.NoAuth:
- dead = true;
- listener.closing(rc);
- return;
- default:
- // Retry errors
- //如果节点存在,则同步获取节点数据
- byte b[] = null;
- if (exists) {
- try {
- b = zk.getData(znode, false, null);
- } catch (KeeperException e) {
- // We don't need to worry about recovering now. The watch
- // callbacks will kick off any exception handling
- e.printStackTrace();
- } catch (InterruptedException e) {
- //如果数据有变化,则处理之
- if ((b == null && b != prevData)
- || (b != null && !Arrays.equals(prevData, b))) {
- listener.exists(b);
- prevData = b;
public class DataMonitor implements Watcher, StatCallback {
.......
public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,
DataMonitorListener listener) {
......
// Get things started by checking if the node exists. We are going
// to be completely event driven,异步exist,注册watcher,设置回调
zk.exists(znode, true, this, null);
}
......
//处理watcher通知事件
public void process(WatchedEvent event) {
String path = event.getPath();
//如果exist操作的对应的事件触发(create.delete,setdata),则再次注册watcher(watcher是单次的),业务处理都在回调里处理
} else {
if (path != null && path.equals(znode)) {
// Something has changed on the node, let's find out
zk.exists(znode, true, this, null);
}
}
if (chainedWatcher != null) {
chainedWatcher.process(event);
}
}
//处理exist操作的回掉结果
public void processResult(int rc, String path, Object ctx, Stat stat) {
boolean exists;
switch (rc) {
case Code.Ok:
exists = true;
break;
case Code.NoNode:
exists = false;
break;
case Code.SessionExpired:
case Code.NoAuth:
dead = true;
listener.closing(rc);
return;
default:
// Retry errors
zk.exists(znode, true, this, null);
return;
}
//如果节点存在,则同步获取节点数据
byte b[] = null;
if (exists) {
try {
b = zk.getData(znode, false, null);
} catch (KeeperException e) {
// We don't need to worry about recovering now. The watch
// callbacks will kick off any exception handling
e.printStackTrace();
} catch (InterruptedException e) {
return;
}
}
//如果数据有变化,则处理之
if ((b == null && b != prevData)
|| (b != null && !Arrays.equals(prevData, b))) {
listener.exists(b);
prevData = b;
}
}
}
从这个例子出发,我们来分析下zookeeper的第一步session是如何建立的,主要就是Zookeeper类的构造。
- public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
- boolean canBeReadOnly)
- throws IOException
- {
- LOG.info("Initiating client connection, connectString=" + connectString
- + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
- //设置默认watcher
- watchManager.defaultWatcher = watcher;
- ConnectStringParser connectStringParser = new ConnectStringParser(
- connectString);
- //从配置的serverList,解析成serverAddresses,这里做了shuffle,server顺序被打乱了
- HostProvider hostProvider = new StaticHostProvider(
- connectStringParser.getServerAddresses());
- //创建客户端连接,初始化SendThread和EventThread
- cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
- hostProvider, sessionTimeout, this, watchManager,
- getClientCnxnSocket(), canBeReadOnly);
- //启动SendThread和EventThread
- cnxn.start();
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly)
throws IOException
{
LOG.info("Initiating client connection, connectString=" + connectString
+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
//设置默认watcher
watchManager.defaultWatcher = watcher;
ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);
//从配置的serverList,解析成serverAddresses,这里做了shuffle,server顺序被打乱了
HostProvider hostProvider = new StaticHostProvider(
connectStringParser.getServerAddresses());
//创建客户端连接,初始化SendThread和EventThread
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
//启动SendThread和EventThread
cnxn.start();
}