天天看点

Zookeeper干货

一、基本操作命令

进入​

​zkClli.sh​

ZooKeeper -server host:port cmd args
  stat path [watch]
  set path data [version]
  ls path [watch]
  delquota [-n|-b] path
  ls2 path [watch]
  setAcl path acl
  setquota -n|-b val path
  history 
  redo cmdno
  printwatches on|off
  delete path [version]
  sync path
  listquota path
  rmr path
  get path [watch]
  create [-s] [-e] path data acl
  addauth scheme auth
  quit 
  getAcl path
  close 
  connect host:port      

二、项目整合zookeeper

依赖

<!-- zookeeper -->
            <dependency>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
                <version>${zookeeper.version}</version>
            </dependency>


            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-framework</artifactId>
                <version>${curator.version}</version>
            </dependency>


            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
                <version>${curator.version}</version>
            </dependency>      

初始化实例

  • 配置applicationContext-zookeeper.xml
<description>zookeeper 放入spring容器,项目启动加载的时候就建立和zk的连接</description>


  <!--创建重连策略-->
  <bean id="retryPolicy" class="org.apache.curator.retry.ExponentialBackoffRetry">
    <!--每次重试连接的等待时间-->
    <constructor-arg index="0" value="1000"></constructor-arg>
    <!--设置的重连的次数-->
    <constructor-arg index="1" value="5"></constructor-arg>
  </bean>


  <!--创建zookeeper客户端-->
  <bean id="client" class="org.apache.curator.framework.CuratorFrameworkFactory"
      factory-method="newClient" init-method="start">
    <constructor-arg index="0" value="47.107.63.171:2181"></constructor-arg>
    <constructor-arg index="1" value="10000"></constructor-arg>
    <constructor-arg index="2" value="10000"></constructor-arg>
    <constructor-arg index="3" ref="retryPolicy"></constructor-arg>
  </bean>


  <!--客户端配置-->
  <bean id="ZKCurator" class="com.tony.web.util.ZKCurator" init-method="init">
    <constructor-arg index="0" ref="client"></constructor-arg>
  </bean>      

创建zkCurator类

并自动初始化,启动监听

package com.tony.web.util;


import com.sun.scenario.effect.impl.sw.sse.SSEBlend_SRC_OUTPeer;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooDefs.Ids;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class ZKCurator {
    //通过bean自动创建实例
    private CuratorFramework client = null;


    final static Logger log = LoggerFactory.getLogger(ZKCurator.class);


    public ZKCurator(CuratorFramework client){
        this.client = client;
    }


    public void init(){
        client = client.usingNamespace("admin");


        try {
            //判断在admin命名空间下是否有bgm节点,/admin/bgm
            if (client.checkExists().forPath("/bgm") == null) {
                /**
                 * 对于zk来讲,有两种类型的节点:
                 * 持久节点:当客户端断开连接时,znode 不会被自动删除,
                 * 临时节点:znode 将在客户端断开连接时被删除,
                 */
                client.create().creatingParentsIfNeeded()
                        .withMode(CreateMode.PERSISTENT) //持久化节点
                        .withACL(Ids.OPEN_ACL_UNSAFE) //acl:匿名权限,完全开发
                        .forPath("/bgm");
                log.info("zookeeper初始化成功");
                log.info("zookeeper服务器状态:{}",client.isStarted());
            }


        } catch (Exception e) {
            log.error("zookeeper客户端连接、初始化错误...");
            e.printStackTrace();
        }

    }


    /**
     * @Descrption: 增加或刪除bgm,向zookeeper创建子节点,供小程序监听
     * @param bgmId
     * @param operaObj
     */
    public void sendBgmOperator(String bgmId, String operaObj){

        try {
            client.create().creatingParentsIfNeeded()
                    .withMode(CreateMode.PERSISTENT)
                    .withACL(Ids.OPEN_ACL_UNSAFE)
                    .forPath("/bgm/" + bgmId, operaObj.getBytes("utf-8"));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}      

三、定义配置类

实现自动依赖注入来自动实例化

@Configuration
public class WebMvcConfig extends WebMvcConfigurerAdapter {
   
    @Bean(initMethod = "init")
    public ZKCuratorClient zkCuratorClient(){
        return new ZKCuratorClient();
    }
}      

通过自动实例化自动开启监听

四、创建客户端监听

package com.tony;


import com.tony.config.ResourceConfig;
import com.tony.service.BgmService;
import com.tony.utils.BGMOperatorTypeEnum;
import com.tony.utils.JsonUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.aspectj.util.FileUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


import java.io.File;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Map;


@Component
public class ZKCuratorClient {


   private CuratorFramework client = null;
   final static Logger log = LoggerFactory.getLogger(ZKCuratorClient.class);


   @Autowired
   private BgmService bgmService;


   @Autowired
   private ResourceConfig resourceConfig;


   //public static final String ZOOKEEPER_SERVER = "47.107.63.171:2181";
   //优化:使用resource.properties来依赖注入

   public void init(){
      if(client != null){
         return;
      }

      //重试策略
      RetryPolicy retryPolicy = new ExponentialBackoffRetry(  1000, 5);
      //创建zk客户端
      client = CuratorFrameworkFactory.builder().connectString(resourceConfig.getZookeeperServer())
         .sessionTimeoutMs(10000).retryPolicy(retryPolicy).namespace("admin").build();
      //启动客户端
      client.start();

      try {
         //String testNodeData = new String(client.getData().forPath("/bgm/210816FNTPDX97HH"));
         //log.info("测试节点数据为:{}" + testNodeData);
         addChildWatch("/bgm");
      } catch (Exception e) {
         e.printStackTrace();
      }
   }


   public void addChildWatch(String nodePath) throws Exception{
      final PathChildrenCache cache = new PathChildrenCache(client, nodePath, true);


      cache.start();
      cache.getListenable().addListener(new PathChildrenCacheListener() {
         @Override
         public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
            if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){
               log.info("监听到事件CHILD_ADDED");


               //1. 从数据库查询bgm对象,获取路径path
               String path = event.getData().getPath();
               String operatotObjStr = new String(event.getData().getData(), "utf-8");
               Map<String, Object> map = JsonUtils.jsonToPojo(operatotObjStr, Map.class);


               System.out.println("operatorObjStr: " + operatotObjStr);
               System.out.println("map: " + map);


               //从zookeeper获取bgmPath
               String operatorType = (String) map.get("operaType");
               String songPath = (String) map.get("path");


               //String[] arr = path.split("/");
               //String bgmId = arr[arr.length - 1];


               //从数据库获取bgmPath
               //Bgm bgm = bgmService.queryBgmById(bgmId);
               //if(bgm == null){
               //    return;
               //}
               //bgm所在的相对路径
               //String songPath = bgm.getPath();


               //2. 定义保存到本地的bgm路径
               //                 String filePath = "D:/tony_videos_dev" + songPath;
               /*优化*/          String filePath = resourceConfig.getFileSpace() + songPath;
               //3. 定义下载的路径(播放url)
               String arrPath[] = songPath.split("\\\\");
               String finalPath = "";
               //3.1 处理url的斜杠以及编码
               for(int i = 0; i < arrPath.length; i++){
                  if(StringUtils.isNotBlank(arrPath[i])){
                     finalPath += "/";
                     finalPath += URLEncoder.encode(arrPath[i], StandardCharsets.UTF_8.toString()) ;
                  }
               }
               //               String bgmUrl = "http://192.168.56.1:8080/mvc" + finalPath;
               /*优化*/          String bgmUrl = resourceConfig.getBgmServer() + finalPath;
               if(operatorType.equals(BGMOperatorTypeEnum.ADD.type)){
                  //下载bgm到springboot服务器
                  URL url = new URL(bgmUrl);
                  File file = new File(filePath);
                  FileUtils.copyURLToFile(url, file);
                  client.delete().forPath(path);
                  log.info("文件 {} 已同步!",filePath);
               }else if (operatorType.equals(BGMOperatorTypeEnum.DELETE.type)){
                  File file = new File(filePath);
                  log.info("文件 {} 已删除!",filePath);
                  FileUtils.forceDelete(file);
                  client.delete().forPath(path);
               }

            }
         }
      });


   }
}      

五、遍历所有节点

public static void listAll(ZooKeeper zk, String path) throws Exception {
        List<String> children = zk.getChildren(path, false);
        for(String child : children){
            String currentNodeName = "/".equals(path) ? path + child : path + "/" + child;
            System.out.println(currentNodeName);
            listAll(zk, currentNodeName);
        }
    }      

六、创建节点

​ZooDefs.Ids.CREATOR_ALL_ACL​

​是需要添加身份才可以创建,否则身份为只读用户,不能创建和删除

/**
         * 使用java远程访问zookeeper
         * 创建客户端
         * 使用客户端发送命令处理返回结果
         * 回收资源
         */
        public static void create() throws IOException, KeeperException, InterruptedException {
            ZooKeeper zk = new ZooKeeper("47.107.63.171:2181", 10000, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    System.out.println("watch中的方法执行");
                }
            });




            String result = zk.create("/parent", "parent data".getBytes(),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.PERSISTENT);
            System.out.println("创建的parent的结果:"  +  result);

           //创建临时节点后,在zk.close()后会失效
            String tmpResult = zk.create("/parent/tmp",null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            System.out.println("创建的/parent/tmp的结果:"  +  tmpResult);


            String seqResult = zk.create("/parent/sequence", null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.PERSISTENT_SEQUENTIAL);
            System.out.println("创建的/parent/sequence的结果:"  +  seqResult);


            //关闭客户端
            zk.close();


        }      

七、获取数据

public static void get() throws Exception {
    ZooKeeper zk = new ZooKeeper("47.107.63.171:2181", 10000, new Watcher() {
        @Override
        public void process(WatchedEvent event) {


        }
    });
    byte[] data = zk.getData("/parent", false, null);
    String s = new String(data);
    System.out.println("/parent获取的数据为:" + s);
}      

八、删除节点

/**
     * 删除节点
     * 删除节点前,需要先查询节点的状态(cversion),通过getData来查询这个版本
     * 设计是为了保证删除的节点是你想删除的那个
     * @throws Exception
     */
public static void delete() throws Exception {
    ZooKeeper zk = new ZooKeeper("47.107.63.171:2181", 10000, new Watcher() {
        @Override
        public void process(WatchedEvent event) {


        }
    });


    Stat stat = new Stat();
    System.out.println(stat);


    zk.getData("/parent/sequence0000000001",false, stat);
    //System.out.println(stat.getCversion());//cversion
    //System.out.println(stat.getVersion());//即dataVersion
    //System.out.println(stat.getAversion());//aclVersion
    System.out.println(stat.getCversion());


    /*delete不能删除非空节点,即存在子节点时
        stat.getCversion()是要在getDtata后才能获取到它的版本,因为每次查询后,
        stat才能获取到版本号,这样才能删除,如果获取不到那说明没有这个版本号
        也就是适合在高并发的时候使用,如果不用stat,而直接用版本号,那么可能就会冲突
         */
    zk.delete("/parent/sequence0000000001", stat.getCversion());
}