天天看点

zookeeper java api_[ZooKeeper]基于Java API 实践

前提

建立maven项目中 要导入zookeeper的依赖

org.apache.zookeeper

zookeeper

3.4.8

我们同时可以打开linux中的zookeeper客户端来验证对比。输入 zkCli.sh 便可以进入zookeeper客户端 。

一、建立连接

直接建立连接后,不进行等待判断 运行结果为连接中(CONNECTING)。

清单1 连接中 zookeeper

// 一、没有连接成功

public static void main(String[] args) {

// zookeeper集群 ip :客户端口

String kvm="192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181";

try {

ZooKeeper zooKeeper=new ZooKeeper(kvm,4000,null);

// CONNECTING 连接中 ==》根据zookeeper的四个状态,可知、;没有连接成功

System.out.println(zooKeeper.getState());

zooKeeper.close(); // 关闭连接

} catch (IOException e) {

e.printStackTrace();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

清单2 连接zookeeper成功

// 二、连接成功 zookeeper

public static void main(String[] args) {

final CountDownLatch countDownLatch=new CountDownLatch(1);

String kvm="192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181";

try {

ZooKeeper zooKeeper=new ZooKeeper(kvm, 4000, new Watcher() {

// process: 观察者队列

@Override

public void process(WatchedEvent watchedEvent) {

// SyncConnected :同步连接

if(Event.KeeperState.SyncConnected==watchedEvent.getState()){

// 如果收到服务端的响应时间,连接成功

countDownLatch.countDown();

System.out.println("建立连接成功");

}

}

});

countDownLatch.await();

System.out.println(zooKeeper.getState()); // CONNECTED :连接成功

zooKeeper.close();

} catch (IOException e) {

e.printStackTrace();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

二、数据的增删改查操作

基于代码的增删改查操作,我们可以去到zookeeper中的zkCli客户端窗口中去验证。

进入zkCli客户端窗口:在bin目录下 直接输入 zkCli.sh 回车,便可进入。

清单1 事务操作

public static void main(String[] args) {

try {

final CountDownLatch countDownLatch=new CountDownLatch(1);

java.lang.String kvm="192.168.0.11:2181,192.168.0.13:2181,192.168.0.12:2181";

java.lang.String kvm2="192.168.0.85:2181";

ZooKeeper zooKeeper =new ZooKeeper(kvm, 4000, new Watcher() {

@Override

public void process(WatchedEvent watchedEvent) {

if (Event.KeeperState.SyncConnected==watchedEvent.getState()){

countDownLatch.countDown();//如果收到了服务端的响应时间,连接成功

System.out.println("zk 建立连接");

}

}

});

countDownLatch.await();

System.out.println(zooKeeper.getState());// connected 成功

// 1.新增节点

zooKeeper.create("/zk-wcl","0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

Thread.sleep(1000);

Stat stat=new Stat();// 状态

// 2.查看新增后的节点状态

byte[] bytes=zooKeeper.getData("/zk-wcl",null,stat);

System.out.println(new String(bytes));

// 3.修改节点

zooKeeper.setData("/zk-wcl","1".getBytes(),stat.getVersion());

// 4.查看修改后的节点状态

byte[] byte2=zooKeeper.getData("/zk-wcl",null,stat);

System.out.println(new String(byte2));

// 5.删除节点

zooKeeper.delete("/zk-wcl",stat.getVersion());

zooKeeper.close();

System.in.read();// 当前进程阻塞

} catch (IOException e) {

e.printStackTrace();

} catch (InterruptedException e) {

e.printStackTrace();

} catch (KeeperException e) {

e.printStackTrace();

}

}

三、事件机制

介绍

Watcher监听机制是ZooKeeper中非常重要的特性,我们基于zookeeper上创建的节点,可以对这些节点绑定监听事件,比如可以监听节点数据变更、节点删除、子节点状态变更等事件。通过这个事件机制,可以基于zookeeper实现分布式锁、集群管理功能。

Watcher特性:当数据发生变化的时候,zookeeper会产生一个Watcher事件,并且会发送到客户端,但是客户端只会接受一次通知。如果后续这个节点再次发生变化,那么之前的设置watcher的客户端不会再次受到消息。

watcher是一次性操作。可以通过循环监听去达到永久监听效果。

1、如何注册事件机制

分为两步:绑定事件、触发事件。

第一步:通过getDate、exists 、getChildren这三个操作来绑定事件。

第二步:凡是事务类型的操作,都会触发监听事件。

事务操作:create 、``delete````、 setData

清单1:创建删除节点的事件注册监听实例

public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

final CountDownLatch countDownLatch=new CountDownLatch(1);

java.lang.String kvm="192.168.0.11:2181,192.168.0.13:2181,192.168.0.12:2181";

java.lang.String kvm2="192.168.0.85:2181";

// TODO: 这儿是全局的Watcher

ZooKeeper zooKeeper =new ZooKeeper(kvm, 4000, new Watcher() {

@Override

public void process(WatchedEvent watchedEvent) {

// 默认事件

System.out.println("默认事件:"+watchedEvent.getType());

if (Event.KeeperState.SyncConnected==watchedEvent.getState()){

countDownLatch.countDown();//如果收到了服务端的响应时间,连接成功

System.out.println("zk 建立连接");

}

}

});

countDownLatch.await();

// 一、去创建一个临时节点

zooKeeper.create("/zk-test-wcl","1".getBytes(),

ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

Stat stat=zooKeeper.exists("/zk-test-wcl", new Watcher() {

@Override

public void process(WatchedEvent watchedEvent) {

System.out.println(watchedEvent.getType()+"-->"+watchedEvent.getPath());

// TODO:NOTE ==> 因为watcher是一次性操作,只能看到setData操作带来的变化,delete操作看不到变化。

// TODO:NOTE ==> 所以,要在绑定一次事件,来持续监听

try {

// TODO :注意true 默认用的是全局的watcher

zooKeeper.exists(watchedEvent.getPath(),true);

} catch (KeeperException e) {

e.printStackTrace();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

});

// 通过修改的事务类型操作来触发监听事件

// TODO:1、setData

stat = zooKeeper.setData("/zk-test-wcl","2".getBytes(),

stat.getVersion());

Thread.sleep(1000);

// TODO: 2、delete

zooKeeper.delete("/zk-test-wcl",stat.getVersion());

System.in.read();

}

2、watcher事件类型

事件类型

含义

None(-1)

客户端连接状态发生改变时,会受到none事件

NodeCreated(1)

创建节点事件

NodeDeleted(2)

删除节点事件

NodeDataChanged(3)

节点数据发生变更

NodeChildrenChanged(4)

子节点被创建、被删除会发生事件触发

3、什么样的操作会产生什么类型的事件

待续。。。。。

4、事件的实现原理

以上内容是关于zookeeper 的zkCli客户端java API的实操。其实zookeeper还有一个客户端Curator。Curator是对zookeeper原生Java API的封装。下面来记录下,我贴下练习的代码。

四、zookeeper客户端:Curator 在java API中的实操

zookeeper java api_[ZooKeeper]基于Java API 实践

文件图.png

前提:

在pom.xml文件中导入依赖的Curator包

org.apache.curator

curator-framework

4.0.0

org.apache.curator

curator-recipes

4.0.0

1、获取,创建,修改信息

建立连接并获取curator节点,注意,这个节点必须是已经存在的。

清单1、客户端的链接及获取命名空间

String kvm = "192.168.0.11:2181,192.168.0.12:2181,192.168.0.13:2181";

CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().

connectString(kvm).sessionTimeoutMs(4000).

retryPolicy(new ExponentialBackoffRetry(100, 3)).

namespace("curator").build();

curatorFramework.start();

清单2、创建一个多级节点

在原生Java API中创建节点,必须逐层创建,即必须先存在父节点,子节点才能创建。像上面中已经介绍过的zookeeper zkCli客户端。

现在在进一步封装的Curator客户端,便可以直接创建一个多级节点。

curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).

forPath("/wcl/node1","1".getBytes());

清单3、获取信息并修改信息

Stat stat = new Stat();

curatorFramework.getData().storingStatIn(stat).forPath("/wcl/node1");

curatorFramework.setData().withVersion(stat.getVersion()).forPath("/wcl/node1", "xx".getBytes());

注意:在main()方法的最后 要关闭。添加这句代码:curatorFramework.close();