文章目录
- 1. zookeeper客户端命令操作
- 2. zookeeper内部原理
-
- 2.1 持久化节点和临时节点
- 2.2 Stat结构体
- 2.3 监听原理
- 2.4paxos算法
- 2.5选举机制
- 2.6写数据流程
- 3.API操作
-
- 3.1zk客户端操作
- 3.2动态上下线
- 3.3同步线程锁
1. zookeeper客户端命令操作
1.启动zookeeper客户端
zkCli.sh
2.创建普通节点
create /iweb "jianhau"
3.获取节点的值
get /iweb
4.创建短暂节点
create -e /iweb
6.监听节点的变化
ls /iweb watch
5.退出
quit
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-v9afaE3X-1570004773361)(11886AE15C5E4EE284904D287986A1BD)]
2. zookeeper内部原理
2.1 持久化节点和临时节点
- 持久化节点是当客户端和zookeeper断开连接后,该节点依旧存在
- 临时节点是当客户端和zookeeper断开连接后,节点自动删除
2.2 Stat结构体
[zk: localhost:2181(CONNECTED) 5] get /sanguo
jinlian
cZxid = 0x100000003 ==创建节点的事务id==
ctime = Wed Aug 29 00:03:23 CST 2018 ==被创建的好毫秒数==
mZxid = 0x100000003 ==最后更新的事务==
mtime = Wed Aug 29 00:03:23 CST 2018 ==最后更改的毫秒数==
pZxid = 0x100000004 ==最后更新的子节点id==
cversion = 1 ==子节点修改次数==
dataVersion = 0 ==数据变化号==
aclVersion = 0 ==访问被控制列表的变化号==
ephemeralOwner = 0x0 ==如果是临时节点,这个是拥有者的session id 如果不是临时节点就是0==
dataLength = 7 ==数据长度==
numChildren = 1 ==子节点数量==
2.3 监听原理
- 首先创建一个main线程
- 在main线程中创建一个客户端,这是就会创建两个线程connect,Listener,一负责通信,一个负责监听
- 通过connect注册监听
- 在zookeeper的注册监听器列表中添加注册的监听事件
- 监听到有数据或者路径变化时通过,就会将消息发送给listener线程
- listener线程内部调用了process方法
常见的监听
1.监听数据
get path [watch]
2.监听子节点的增减变化
ls path [watch]
2.4paxos算法
- 当节点发生变化后,会汇报给zkserver,此时zkserver收到一个zxid
- 如果zxid大于自己当前的zxid,先记录下来,然后同步给其他的zkserver如果超过半数的zkserver同意后即生效,更新后同步给其他的zkserver,修改自己的zxid
2.5选举机制
- 半数机制:集群中半数以上的集群存活,集群可用,所以安装奇数台服务器
- 虽然在配置文件中没有指定的主从,但是会选举产生一个leader和其他的follower
- 如果有五台机器
- 第一台上线后先投自己一票
- 第二台上线后,先头自己一票,由于他的id比第一台的大,所以第一台改投第二台,此时第一台0票,第二台2票,但是少于3,仍然不能成为leader
- 第三台上线后,此时服务器都会改选票给服务器3,此时他又三票,成为leader,其他状态为follower
- 第四台启动,123不会交换选票信息,第四台只有一票少数服从多数
2.6写数据流程
- client向server1发送写数据请求,如果server1不是leader,那么server1会把请求转发给leader
- leader把写请求广播给follow
- follow返回信息并把请求放入待写队列中,并返回成功信息
- 当leader收到半数以上的成功信息后,说明该写操作可以执行
- leader向各个server发送提交信息,各个server收到后落实写请求,操作成功
- 返回给客户端操作成功
3.API操作
3.1zk客户端操作
public class Zkutils {
private static int Session_Time_Out = 300000;
private static ZooKeeper zk = null;
//创建节点
public static void CreateNodes() throws KeeperException, InterruptedException {
String path = "/test";
byte [] bytes = "hello zk ".getBytes();
String result = zk.create(path, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println(result);
}
//判断节点是否存在
public static void NodeExist()throws Exception{
String path ="/test";
Stat stat = zk.exists(path, false);
System.out.println(stat);
}
//获取数据
public static void getData() throws KeeperException, InterruptedException {
String path = "/test";
Stat stat = new Stat();
byte[] data = zk.getData(path, false, stat);
System.out.println(new String(data, Charset.forName("UTF-8")));
}
//存储数据
public static void setData() throws KeeperException, InterruptedException {
String path ="/test";
Stat stat = zk.setData(path, "hellozk".getBytes(), -1);
System.out.println(stat);
}
//删除节点
public static void DeleteNode() throws KeeperException, InterruptedException {
String path ="/test";
zk.delete(path,-1);
}
//测试
public static void main(String[] args) {
try {
zk = new ZooKeeper("bigdata1:2181,bigdata2:2181,bigdata3:2181", Session_Time_Out, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println(watchedEvent.toString());
}
});
// CreateNodes();
// setData();
// getData();
DeleteNode();
NodeExist();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
3.2动态上下线
public class DynamicUpDown {
private ZooKeeper zk = null;
private static int SESSION_TIME_OUT = 300000;
private static String HOSTS = "bigdata1:2181,bigdata2:2181,bigdata3:2181";
private static List<ACL> ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE;
private static String PRAENT_NODE = "/hosts";
private List<String> serverlist = new ArrayList<>();
public void Init() throws Exception{
zk = new ZooKeeper(HOSTS, SESSION_TIME_OUT, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
String path = watchedEvent.getPath();
Event.EventType type = watchedEvent.getType();
//打印出当前监听事件类型
System.out.println(type);
//判断子节点的变化
if (Event.EventType.NodeChildrenChanged==type&&path.equals(PRAENT_NODE)){
try {
//更新列表
UpdateServerList();
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
//创建父节点
Stat stat = zk.exists(PRAENT_NODE, false);
if (stat == null){
zk.create(PRAENT_NODE,"父节点".getBytes(),ACL, CreateMode.PERSISTENT);
}
UpdateServerList();
}
private void UpdateServerList() throws Exception{
List<String> newServerList = new ArrayList<>();
List<String> children = zk.getChildren(PRAENT_NODE,true);
children.forEach(child->{
try {
//获取当前子节点的目录
byte[] data = zk.getData(PRAENT_NODE + "/" + child, false, null);
newServerList.add(new String(data));
serverlist = newServerList;
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
serverlist.forEach(list->{
System.out.println(list);
});
}
//关闭资源
public void close() {
try {
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
DynamicUpDown server = new DynamicUpDown();
try {
server.Init();
Thread.sleep(Long.MAX_VALUE);
} catch (Exception e) {
e.printStackTrace();
}finally {
server.close();
}
}
}
3.3同步线程锁
//抽取任务接口
public interface CustomTask {
void doSomething();
}
//定义自己的任务
public class Mytask implements CustomTask{
private String name;
public Mytask(String name){
this.name =name;
}
@Override
public void doSomething() {
for (int i = 1;i<=5;i++){
System.out.println("做事情"+i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/*
指定个数的客户端访问服务器的资源
1.上线就向zookeeper客户端注册
2.判断是否只有一个客户端工作,若只有一个,便可以处理业务
3.获取父节点下注册的所有的锁,通过判断自己是否是号码最小的那把锁,如果是则可以处理业务
*/
public class DistributeLock {
private ZooKeeper zk= null;
private CustomTask task = null;
private final List<org.apache.zookeeper.data.ACL> ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE;
private final int sessionTimeOut = 5000;
private final String parent_node = "/locks";
private String connectstring = null;
private volatile String currentPath = null;
public DistributeLock(String connectstring){
this.connectstring = connectstring;
}
public DistributeLock(){
this("bigdata1:2181,bigdata2:2181,bigdata3:2181");
}
public void setTask(CustomTask task){
this.task = task;
}
//获取客户端
public void getClient() throws Exception{
zk =new ZooKeeper(connectstring,sessionTimeOut,event->{
//监听子节点的变化
if (event.getType()==Watcher.Event.EventType.NodeChildrenChanged&&
event.getPath().equals(parent_node)){
try {
//拿到所有的子节点
List<String> child = zk.getChildren(parent_node,true);
//判断自己是否是最小的节点
String currentNode = currentPath.substring(parent_node.length() + 1);
//排序
Collections.sort(child);
if (child.indexOf(currentNode)==0){
task.doSomething();
//释放锁
deleteLock();
//注册新锁
registerLock();
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
//注册锁
public void registerLock()throws Exception{
//使用CreateMode.EPHEMERAL_SEQUENTIAL临时顺序型
currentPath = zk.create(parent_node+"/lock",null,ACL,CreateMode.EPHEMERAL_SEQUENTIAL);
}
//判断是否只有一个节点在线,若只有自己一个节点,则调用业务处理的方法
public void watchParent() throws Exception{
List<String> children = zk.getChildren(parent_node, false);
if (children!=null&&children.size()==1){
task.doSomething();
deleteLock();
}else {
Thread.sleep(Long.MAX_VALUE);
}
}
public void deleteLock() throws Exception{
zk.delete(currentPath,-1);
}
}
//测试
public class Test {
public static void main(String[] args) throws Exception{
//获取客户端连接
DistributeLock distributeLock = new DistributeLock();
CustomTask customTask = new Mytask(UUID.randomUUID().toString());
//设置任务
distributeLock.setTask(customTask);
distributeLock.getClient();
//注册锁
distributeLock.registerLock();
//监听父节点
distributeLock.watchParent();
}
}