zookeeper安装
-
Watcher
zookeeper提供了分布式数据发布/订阅,zookeeper允许客户端向服务器注册一个watcher监听。当服务器端的节点触发指定事件的时候
会触发watcher。服务端会向客户端发送一个事件通知
watcher的通知是一次性,一旦触发一次通知后,该watcher就失效
-
ACL
zookeeper提供控制节点访问权限的功能,用于有效的保证zookeeper中数据的安全性。避免误操作而导致系统出现重大事故。
CREATE /READ/WRITE/DELETE/ADMIN
-
zookeeper的命令操作
1. create [-s] [-e] path data acl
-s 表示节点是否有序
-e 表示是否为临时节点
默认情况下,是持久化节点
2. get path [watch]
获得指定 path的信息
3.set path data [version]
修改节点 path对应的data
乐观锁的概念
数据库里面有一个 version 字段去控制数据行的版本号
4.delete path [version]
删除节点
stat信息
cversion = 0 子节点的版本号
aclVersion = 0 表示acl的版本号,修改节点权限
dataVersion = 1 表示的是当前节点数据的版本号
czxid 节点被创建时的事务ID
mzxid 节点最后一次被更新的事务ID
pzxid 当前节点下的子节点最后一次被修改时的事务ID
java API的使用(curator)zkClient略
-
权限控制模式
schema:授权对象
ip : 192.168.1.1
Digest : username:password
world : 开放式的权限控制模式,数据节点的访问权限对所有用户开放。 world:anyone
super :超级用户,可以对zookeeper上的数据节点进行操作
-
连接状态
KeeperStat.Expired 在一定时间内客户端没有收到服务器的通知, 则认为当前的会话已经过期了。
KeeperStat.Disconnected 断开连接的状态
KeeperStat.SyncConnected 客户端和服务器端在某一个节点上建立连接,并且完成一次version、zxid同步
KeeperStat.authFailed 授权失败
-
事件类型
NodeCreated 当节点被创建的时候,触发
NodeChildrenChanged 表示子节点被创建、被删除、子节点数据发生变化
NodeDataChanged 节点数据发生变化
NodeDeleted 节点被删除
None 客户端和服务器端连接状态发生变化的时候,事件类型就是None
-
curator api
Curator本身是Netflix公司开源的zookeeper客户端,提供了各种应用场景的实现封装
curator-framework 提供了fluent风格api
curator-replice 提供了实现封装
- curator连接的重试策略
ExponentialBackoffRetry() 衰减重试
RetryNTimes 指定最大重试次数
RetryOneTime 仅重试一次
RetryUnitilElapsed 一直重试知道规定的时间
代码--链接:https://pan.baidu.com/s/1_OzwNgaAK4FBv2c3dfLKUQ 密码:3bbc
- 导入jar包
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.8</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
</dependencies>
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorClientUtils {
private static CuratorFramework curatorFramework;
private final static String CONNECTSTRING="192.168.11.129:2181,192.168.11.134:2181," +
"192.168.11.135:2181,192.168.11.136:2181";
public static CuratorFramework getInstance(){
curatorFramework= CuratorFrameworkFactory.
newClient(CONNECTSTRING,5000,5000,
new ExponentialBackoffRetry(1000,3));
curatorFramework.start();
return curatorFramework;
}
}
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CuratorOperatorDemo {
public static void main(String[] args) throws InterruptedException {
CuratorFramework curatorFramework=CuratorClientUtils.getInstance();
System.out.println("连接成功.........");
//fluent风格
/**
* 创建节点
*/
/* try {
String result=curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).
forPath("/curator/curator1/curator11","123".getBytes());
System.out.println(result);
} catch (Exception e) {
e.printStackTrace();
}*/
/**
* 删除节点
*/
/*try {
//默认情况下,version为-1
curatorFramework.delete().deletingChildrenIfNeeded().forPath("/node11");
} catch (Exception e) {
e.printStackTrace();
}*/
/**
* 查询
*/
/*Stat stat=new Stat();
try {
byte[] bytes=curatorFramework.getData().storingStatIn(stat).forPath("/curator");
System.out.println(new String(bytes)+"-->stat:"+stat);
} catch (Exception e) {
e.printStackTrace();
}*/
/**
* 更新
*/
/* try {
Stat stat=curatorFramework.setData().forPath("/curator","123".getBytes());
System.out.println(stat);
} catch (Exception e) {
e.printStackTrace();
}*/
/**
* 异步操作
*/
/*ExecutorService service= Executors.newFixedThreadPool(1);
CountDownLatch countDownLatch=new CountDownLatch(1);
try {
curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).
inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
System.out.println(Thread.currentThread().getName()+"->resultCode:"+curatorEvent.getResultCode()+"->"
+curatorEvent.getType());
countDownLatch.countDown();
}
},service).forPath("/mic","123".getBytes());
} catch (Exception e) {
e.printStackTrace();
}
countDownLatch.await();
service.shutdown();*/
/**
* 事务操作(curator独有的)
*/
try {
Collection<CuratorTransactionResult> resultCollections=curatorFramework.inTransaction().create().forPath("/trans","111".getBytes()).and().
setData().forPath("/curator","111".getBytes()).and().commit();
for (CuratorTransactionResult result:resultCollections){
System.out.println(result.getForPath()+"->"+result.getType());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.zookeeper.CreateMode;
import java.util.concurrent.TimeUnit;
public class CuratorEventDemo {
/**
* 三种watcher来做节点的监听
* pathcache 监视一个路径下子节点的创建、删除、节点数据更新
* NodeCache 监视一个节点的创建、更新、删除
* TreeCache pathcaceh+nodecache 的合体(监视路径下的创建、更新、删除事件),
* 缓存路径下的所有子节点的数据
*/
public static void main(String[] args) throws Exception {
CuratorFramework curatorFramework=CuratorClientUtils.getInstance();
/**
* 节点变化NodeCache
*/
/* NodeCache cache=new NodeCache(curatorFramework,"/curator",false);
cache.start(true);
cache.getListenable().addListener(()-> System.out.println("节点数据发生变化,变化后的结果" +
":"+new String(cache.getCurrentData().getData())));
curatorFramework.setData().forPath("/curator","菲菲".getBytes());*/
/**
* PatchChildrenCache
*/
PathChildrenCache cache=new PathChildrenCache(curatorFramework,"/event",true);
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
// Normal / BUILD_INITIAL_CACHE /POST_INITIALIZED_EVENT
cache.getListenable().addListener((curatorFramework1,pathChildrenCacheEvent)->{
switch (pathChildrenCacheEvent.getType()){
case CHILD_ADDED:
System.out.println("增加子节点");
break;
case CHILD_REMOVED:
System.out.println("删除子节点");
break;
case CHILD_UPDATED:
System.out.println("更新子节点");
break;
default:break;
}
});
curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/event","event".getBytes());
TimeUnit.SECONDS.sleep(1);
System.out.println("1");
curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath("/event/event1","1".getBytes());
TimeUnit.SECONDS.sleep(1);
System.out.println("2");
curatorFramework.setData().forPath("/event/event1","222".getBytes());
TimeUnit.SECONDS.sleep(1);
System.out.println("3");
curatorFramework.delete().forPath("/event/event1");
System.out.println("4");
System.in.read();
}
}
zookeeper的实际应用场景
zookeeper能够实现哪些场景
订阅发布
watcher机制
统一配置管理(disconf)
分布式锁
redis
zookeeper
数据库
负载均衡
ID生成器
分布式队列
统一命名服务
master选举
分布式锁