1.Curator Cache 與原生ZooKeeper Wacher差別
原生的ZooKeeper Wacher是一次性的:一個Wacher一旦觸發就會被移出,如果你想要反複使用Wacher,就要在Wacher被移除後重新注冊,使用起來很麻煩。使用Curator Cache 可以反複使用Wacher了。
2.Curator Cache 和Curator Wacher差別
2.相關類
Curator Cache主要提供了一下三組類,分别用于實作對節點的監聽,子節點的監聽和二者的混合:
1.NodeCache,NodeCacheListener,ChildData,節點建立,節點資料内容變更,不能監聽節點删除。
2.PathChildrenCache,PathChildrenCacheListener,PathChildrenCacheEvent監聽指定節點的子節點的變更包括添加,删除,子節點資料資料變更這三類。
3.TreeCache,TreeCacheListener,TreeCacheEvent,TreeCacheSelector
3.節點監聽
package cn.francis.maven.hello.ZooKeeper;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.CreateMode;
public class NodeCacheDemo {
public static void main(String[]args) throws Exception{
TestingServer server=null;
CuratorFramework client=null;
NodeCache nodeCache=null;
String path="/francis/nodecache/a";
try{
server=new TestingServer();
client= CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000,3));
client.start();
path=client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL).forPath(path,"init".getBytes());
nodeCache=new NodeCache(client,path,false);
nodeCache.start();
addListener(nodeCache);
client.setData().forPath(path,"hello".getBytes());
Thread.sleep(1000);
client.delete().deletingChildrenIfNeeded().forPath(path);
Thread.sleep(Integer.MAX_VALUE);
}catch(Exception e){
e.printStackTrace();
}finally{
//這裡因為是測試,沒有加他們。
//CloseableUtils.closeQuietly(nodeCache);
/// CloseableUtils.closeQuietly(client);
// CloseableUtils.closeQuietly(server);
}
}
public static void addListener(NodeCache nodeCache){
//監聽類型:節點是否存在,節點資料内容改變,不監聽節點删除。
nodeCache.getListenable().addListener(new NodeCacheListener(){
@Override
public void nodeChanged() throws Exception {
// TODO Auto-generated method stub
if(nodeCache.getCurrentData()!=null)
System.out.println("path:"+nodeCache.getCurrentData().getPath()+",data:"+new String(nodeCache.getCurrentData().getData()));
}});
}
}
在上面的代碼中首先建立了一個節點,然後建立用這個節點路徑來建立NodeCache,啟動NodeCache,添加NodeCacheListener。然後調用setData來修改節點資料。上面的代碼輸入如下:
path:/francis/nodecache/_c_d5be73ca-592c-4eda-b7c4-c8ec60ef39a8-a,data:hello
子節點監聽:
PathChildrenCache的構造函數如下:
public PathChildrenCache(CuratorFramework client,
String path,
boolean cacheData)
Parameters:
client - the client
path - path to watch
cacheData - if true, node contents are cached in addition to the stat
其中最主要的是cacheData,如果為true,那麼當對子節點調用setData時,PathChildrenCache會受到這個CHILD_UPDATED事件。
下面看一下demo:
package cn.francis.maven.hello.ZooKeeper;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.hadoop.mapred.join.Parser.TType;
import org.apache.zookeeper.CreateMode;
public class NodeCacheDemo {
public static void main(String[]args) throws Exception{
TestingServer server=null;
CuratorFramework client=null;
NodeCache nodeCache=null;
String path="/francis/nodecache/b";
try{
server=new TestingServer();
client= CuratorFrameworkFactory.newClient(server.getConnectString(),new ExponentialBackoffRetry(1000,3));
client.start();
//這裡将第三個參數cacheData設定為true,這樣當對子節點調用setData時,會受到CHILDREN_UPDATE通知。
PathChildrenCache childrenCache=new PathChildrenCache(client,path,true);
childrenCache.start(StartMode.POST_INITIALIZED_EVENT);
childrenCache.getListenable().addListener(new PathChildrenCacheListener(){
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
// TODO Auto-generated method stub
if(event.getType()==Type.INITIALIZED){
System.out.println("create"+event.getData().getPath());
}else if(event.getType()==Type.CHILD_ADDED){
System.out.println("create"+event.getData().getPath());
}else if(event.getType()==Type.CHILD_REMOVED){
System.out.println("remove:"+event.getData().getPath());
}else if(event.getType()==Type.CHILD_UPDATED){
//System.out.println("update:"+event.getData().getPath());
System.out.println("update:"+new String(event.getData().getData()));
}
}});
//建立父節點
System.out.println(client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path,"init".getBytes()));
Thread.sleep(1000);
//建立子節點
String childPath1=ZKPaths.makePath(path, "a");
childPath1=client.create().withMode(CreateMode.PERSISTENT).forPath(childPath1,"1".getBytes());
Thread.sleep(1000);
//對子節點指派
client.setData().forPath(childPath1,"aaa".getBytes());
Thread.sleep(1000);
//删除子節點
client.delete().forPath(childPath1);
client.delete().deletingChildrenIfNeeded().forPath("/francis");
Thread.sleep(2000);
}catch(Exception e){
e.printStackTrace();
}finally{
CloseableUtils.closeQuietly(nodeCache);
CloseableUtils.closeQuietly(client);
CloseableUtils.closeQuietly(server);
}
}
3.TreeNodeCache
TreeNodeCache将NodeCache和PathChildrenCache功能結合到一起了。他不僅可以對子節點和父節點同時進行監聽。如下:
package cn.francis.maven.hello.ZooKeeper;
import java.util.Map;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.hadoop.mapred.join.Parser.TType;
import org.apache.zookeeper.CreateMode;
public class NodeCacheDemo {
public static void main(String[]args) throws Exception{
TestingServer server=null;
CuratorFramework client=null;
NodeCache nodeCache=null;
String path="/francis/nodecache/b";
try{
server=new TestingServer();
client= CuratorFrameworkFactory.newClient(server.getConnectString(),new ExponentialBackoffRetry(1000,3));
client.start();
TreeCache treeNodeCache=new TreeCache(client,path);
treeNodeCache.start();
treeNodeCache.getListenable().addListener(new TreeCacheListener(){
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
// TODO Auto-generated method stub
switch(event.getType()){
case NODE_ADDED:
System.out.println("added:"+event.getData().getPath());
break;
case NODE_UPDATED:
System.out.println("updated:"+event.getData().getPath());
break;
case NODE_REMOVED:
System.out.println("removed:"+event.getData().getPath());
break;
default:
System.out.println("other:"+event.getType());
}
}
});
//建立父節點
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path,"init".getBytes());
Thread.sleep(1000);
//建立子節點
String childPath1=ZKPaths.makePath(path, "a");
childPath1=client.create().withMode(CreateMode.PERSISTENT).forPath(childPath1,"1".getBytes());
Thread.sleep(1000);
//對子節點指派
client.setData().forPath(childPath1,"aaa".getBytes());
Thread.sleep(1000);
//對子節點指派
client.setData().forPath(path,"aaa".getBytes());
Thread.sleep(1000);
//删除子節點
client.delete().forPath(childPath1);
client.delete().deletingChildrenIfNeeded().forPath("/francis");
Thread.sleep(2000);
}catch(Exception e){
e.printStackTrace();
}finally{
//這裡因為是測試,沒有加他們。
CloseableUtils.closeQuietly(nodeCache);
CloseableUtils.closeQuietly(client);
CloseableUtils.closeQuietly(server);
}
}
}
輸出如下:
other:INITIALIZED
added:/francis/nodecache/b
added:/francis/nodecache/b/a
updated:/francis/nodecache/b/a
updated:/francis/nodecache/b
removed:/francis/nodecache/b/a
removed:/francis/nodecache/b