package com.test.com.test.zookeeper;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.List;
/**
* @author Jane E-mail:[email protected]
* @version Zookeeper客户端示例及源码分析 创建时间:2018/10/3
*/
/**
*
* 本内容完全是关于zk的客户端,其中例子的运行需要在本地启动一个zkServer集群。
*
* zk 建议全部通过异步机制进行回调处理来提高性能,从而无需捕获异常(只需根据状态码进行相应处理)。
* zk通过单线程(eventThread)处理所有的回调通知(异步回调,监听回调),所以在回调处理过多的情况下,可能造成较高的延迟。
*
* Zookeeper类提供了一系列与zkServer交互的方法,并关联了ClientCnxn客户端,此客户端打开一个与zkServer的会话,并关联sendThread和eventThread用于对发送请求和回调事件进行处理,
* ClientCnxnSocketNIO通过NIO的方式对socket进行处理。
*
* eventThread负责从其维护的阻塞队列中获取event,如果是DeathEvent则关闭此线程,否则根据事件类型进行解析,调用回调结构的不同重载(事件监听回调,异步通知回调)。各种回调都被封装成实现Event接口的对象
* sendThread负责与zkServer的连接,并负责心跳和重连。然后调用doTransport()方法。异步的方式,客户端将回调处理器包装成一个Packet对象,并将该对象加入outgoing队列中。
* doTransport()方法通过NIO select获取一次触发的所有事件。
* 如果channel可读,则从读buffer中读取内容,然后调用sendThread的readResponse方法,然后根据获取消息的响应头执行不同处理
* Xid为-2:心跳响应
* Xid为-4:认证授权
* Xid为-1:监听事件通知,读取事件的response,并将其提交到eventThread的队列中(将回调相应Watcher接口的实现处理器)。
* 为sasl:获取token然后发送给服务端
* 为回调通知:客户端认为除以上所有情况外,都属于回调的通知。从pending队列中取出packet进行以下处理
* 1. 如果取出的Xid与服务端响应的Xid不一致,则认为发送连接丢失。
* 2. 将服务端返回的消息,根据协议填回packet字段中
* 3. 如果没有回调接口则notifyAll,如果有回调则将packet加入eventThread的阻塞队列中。
*
* 如果channel可写,则从outgoing队列中取出一个Packet对象,将buffer内容写入socket中,然后将此packet对象交与pending队列中。
*
* <p>
* zk客户端提供了重连机制,如果心跳检测网络断开,会一直尝试重新连接(连接原服务器,或者集群中的其他服务器),直到客户端进程被杀死。
* <p>
* zk客户端重新连接,但是服务端已经将此会话置为无效,则必须重新new Zookeeper句柄,与服务端开启一个新的会话。
* 优雅的关闭与服务端的会话,需要调用 Zookeeper.close();
* <p>
* 对于外部资源的访问,需要使用隔离符(czxid)用户接受最新的会话的请求,如果Czxid落后则不允许访问。
*/
public class Master implements Watcher { // Watcher接口实现一个监听,在客户端启动后回调
private ZooKeeper zk;
private String hostPort;
Master(String hostPort) {
this.hostPort = hostPort;
}
void startZK() throws IOException {
zk = new ZooKeeper(hostPort, 15000, this);
}
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("收到监听通知:" + watchedEvent);
}
/**
* 同步调用创建znode
* 必须要自己解决受检异常
* @param path
* @param value
*/
public void createZnode(String path, String value) {
try {
String str = this.zk.create(path, value.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println(str);
} catch (KeeperException e) {
System.out.println("check一下,看那个进程创建了子节点");
System.out.println(String.valueOf(getData(path)));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 异步的方式创建Znode.
* CreateMode.EPHEMERAL只会创建临时节点 客户端会话关闭自动删除
* CreateMode.PERSISTENT会创建持久化节点
* 在回调处理中,根据异常返回码进行相应处理
* @param path
* @param value
*/
public void createZnodeAsync(String path, String value) {
this.zk.create(path, value.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new AsyncCallback.StringCallback() {
@Override
public void processResult(int i, String s, Object o, String s1) {
System.out.println(i + " " + s + " " + o + " " + s1);
}
}, new Object());
}
/**
* 获取某个节点的数据,即创建时设置的值
*
* @param path
*
* @return
*/
public String getData(String path) {
byte[] result = null;
try {
result = zk.getData(path, false, new Stat());
} catch (KeeperException e) {
System.out.println("继续check或者报警");
} catch (InterruptedException e) {
e.printStackTrace();
}
return String.valueOf(result);
}
/**
* 查询某个节点是否存在,如果存在则设置删除节点监听。
* 带Watcher监听器 和 异步回调处理器
* @param path
*/
public void exist(String path) {
Stat s = new Stat();
zk.exists(path, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("设置监视节点删除事件回调");
}
}, new AsyncCallback.StatCallback() {
@Override
public void processResult(int i, String s, Object o, Stat stat) {
System.out.println("exist回调处理器");
// 根据不同的返回异常码,做相应的处理,对于连接丢失事件需要重试。
switch (KeeperException.Code.get(i)) {
case OK:
System.out.println("处理成功");
break;
case NOTEMPTY:
System.out.println("已经存在");
break;
case CONNECTIONLOSS:
System.out.println("连接丢失,可能发送时丢失,可能接收时丢失,建议重试");
break;
}
}
}, s);
}
/**
* 获取某个路径下的子节点,并设置监听,如果子节点变化,则回调处理器
*
* @param path
*/
public void getChild(String path) {
zk.getChildren(path, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("getChild设置监听,如果子节点变动,则回调此处理器");
}
}, new AsyncCallback.ChildrenCallback() {
@Override
public void processResult(int i, String s, Object o, List<String> list) {
System.out.println("getChild返回结果回调");
switch (KeeperException.Code.get(i)) {
case OK:
System.out.println("处理成功");
for (String str : list) {
System.out.println(str);
}
}
}
}, new Object());
}
public static void main(String args[]) throws Exception {
Master m = new Master("127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184");
// 创建句柄,并与zookeeper Server创建会话连接
// 这个创建会话也是异步执行的
m.startZK();
//m.createZnode("/master", "hello world");
//m.createZnodeAsync("/slave","hello world");
// 如果存在则在回调中处理,并设置监听,当服务器删除节点时候收到回调通知。
//m.exist("/slave");
System.out.println(m.getData("/slave"));
// 获取某个路径下所有子节点,并设置监听,如果子节点更改,则回调监听事件。
m.getChild("/slave");
Thread.sleep(60000);
m.closeZK();
}
public void closeZK() {
try {
this.zk.close();
} catch (InterruptedException e) {
System.out.println("关闭zk失败");
}
}
}