天天看點

Zookeeper幹貨

一、基本操作指令

進入​

​zkClli.sh​

ZooKeeper -server host:port cmd args
  stat path [watch]
  set path data [version]
  ls path [watch]
  delquota [-n|-b] path
  ls2 path [watch]
  setAcl path acl
  setquota -n|-b val path
  history 
  redo cmdno
  printwatches on|off
  delete path [version]
  sync path
  listquota path
  rmr path
  get path [watch]
  create [-s] [-e] path data acl
  addauth scheme auth
  quit 
  getAcl path
  close 
  connect host:port      

二、項目整合zookeeper

依賴

<!-- zookeeper -->
            <dependency>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
                <version>${zookeeper.version}</version>
            </dependency>


            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-framework</artifactId>
                <version>${curator.version}</version>
            </dependency>


            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
                <version>${curator.version}</version>
            </dependency>      

初始化執行個體

  • 配置applicationContext-zookeeper.xml
<description>zookeeper 放入spring容器,項目啟動加載的時候就建立和zk的連接配接</description>


  <!--建立重連政策-->
  <bean id="retryPolicy" class="org.apache.curator.retry.ExponentialBackoffRetry">
    <!--每次重試連接配接的等待時間-->
    <constructor-arg index="0" value="1000"></constructor-arg>
    <!--設定的重連的次數-->
    <constructor-arg index="1" value="5"></constructor-arg>
  </bean>


  <!--建立zookeeper用戶端-->
  <bean id="client" class="org.apache.curator.framework.CuratorFrameworkFactory"
      factory-method="newClient" init-method="start">
    <constructor-arg index="0" value="47.107.63.171:2181"></constructor-arg>
    <constructor-arg index="1" value="10000"></constructor-arg>
    <constructor-arg index="2" value="10000"></constructor-arg>
    <constructor-arg index="3" ref="retryPolicy"></constructor-arg>
  </bean>


  <!--用戶端配置-->
  <bean id="ZKCurator" class="com.tony.web.util.ZKCurator" init-method="init">
    <constructor-arg index="0" ref="client"></constructor-arg>
  </bean>      

建立zkCurator類

并自動初始化,啟動監聽

package com.tony.web.util;


import com.sun.scenario.effect.impl.sw.sse.SSEBlend_SRC_OUTPeer;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooDefs.Ids;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class ZKCurator {
    //通過bean自動建立執行個體
    private CuratorFramework client = null;


    final static Logger log = LoggerFactory.getLogger(ZKCurator.class);


    public ZKCurator(CuratorFramework client){
        this.client = client;
    }


    public void init(){
        client = client.usingNamespace("admin");


        try {
            //判斷在admin命名空間下是否有bgm節點,/admin/bgm
            if (client.checkExists().forPath("/bgm") == null) {
                /**
                 * 對于zk來講,有兩種類型的節點:
                 * 持久節點:當用戶端斷開連接配接時,znode 不會被自動删除,
                 * 臨時節點:znode 将在用戶端斷開連接配接時被删除,
                 */
                client.create().creatingParentsIfNeeded()
                        .withMode(CreateMode.PERSISTENT) //持久化節點
                        .withACL(Ids.OPEN_ACL_UNSAFE) //acl:匿名權限,完全開發
                        .forPath("/bgm");
                log.info("zookeeper初始化成功");
                log.info("zookeeper伺服器狀态:{}",client.isStarted());
            }


        } catch (Exception e) {
            log.error("zookeeper用戶端連接配接、初始化錯誤...");
            e.printStackTrace();
        }

    }


    /**
     * @Descrption: 增加或刪除bgm,向zookeeper建立子節點,供小程式監聽
     * @param bgmId
     * @param operaObj
     */
    public void sendBgmOperator(String bgmId, String operaObj){

        try {
            client.create().creatingParentsIfNeeded()
                    .withMode(CreateMode.PERSISTENT)
                    .withACL(Ids.OPEN_ACL_UNSAFE)
                    .forPath("/bgm/" + bgmId, operaObj.getBytes("utf-8"));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}      

三、定義配置類

實作自動依賴注入來自動執行個體化

@Configuration
public class WebMvcConfig extends WebMvcConfigurerAdapter {
   
    @Bean(initMethod = "init")
    public ZKCuratorClient zkCuratorClient(){
        return new ZKCuratorClient();
    }
}      

通過自動執行個體化自動開啟監聽

四、建立用戶端監聽

package com.tony;


import com.tony.config.ResourceConfig;
import com.tony.service.BgmService;
import com.tony.utils.BGMOperatorTypeEnum;
import com.tony.utils.JsonUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.aspectj.util.FileUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


import java.io.File;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Map;


@Component
public class ZKCuratorClient {


   private CuratorFramework client = null;
   final static Logger log = LoggerFactory.getLogger(ZKCuratorClient.class);


   @Autowired
   private BgmService bgmService;


   @Autowired
   private ResourceConfig resourceConfig;


   //public static final String ZOOKEEPER_SERVER = "47.107.63.171:2181";
   //優化:使用resource.properties來依賴注入

   public void init(){
      if(client != null){
         return;
      }

      //重試政策
      RetryPolicy retryPolicy = new ExponentialBackoffRetry(  1000, 5);
      //建立zk用戶端
      client = CuratorFrameworkFactory.builder().connectString(resourceConfig.getZookeeperServer())
         .sessionTimeoutMs(10000).retryPolicy(retryPolicy).namespace("admin").build();
      //啟動用戶端
      client.start();

      try {
         //String testNodeData = new String(client.getData().forPath("/bgm/210816FNTPDX97HH"));
         //log.info("測試節點資料為:{}" + testNodeData);
         addChildWatch("/bgm");
      } catch (Exception e) {
         e.printStackTrace();
      }
   }


   public void addChildWatch(String nodePath) throws Exception{
      final PathChildrenCache cache = new PathChildrenCache(client, nodePath, true);


      cache.start();
      cache.getListenable().addListener(new PathChildrenCacheListener() {
         @Override
         public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
            if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){
               log.info("監聽到事件CHILD_ADDED");


               //1. 從資料庫查詢bgm對象,擷取路徑path
               String path = event.getData().getPath();
               String operatotObjStr = new String(event.getData().getData(), "utf-8");
               Map<String, Object> map = JsonUtils.jsonToPojo(operatotObjStr, Map.class);


               System.out.println("operatorObjStr: " + operatotObjStr);
               System.out.println("map: " + map);


               //從zookeeper擷取bgmPath
               String operatorType = (String) map.get("operaType");
               String songPath = (String) map.get("path");


               //String[] arr = path.split("/");
               //String bgmId = arr[arr.length - 1];


               //從資料庫擷取bgmPath
               //Bgm bgm = bgmService.queryBgmById(bgmId);
               //if(bgm == null){
               //    return;
               //}
               //bgm所在的相對路徑
               //String songPath = bgm.getPath();


               //2. 定義儲存到本地的bgm路徑
               //                 String filePath = "D:/tony_videos_dev" + songPath;
               /*優化*/          String filePath = resourceConfig.getFileSpace() + songPath;
               //3. 定義下載下傳的路徑(播放url)
               String arrPath[] = songPath.split("\\\\");
               String finalPath = "";
               //3.1 處理url的斜杠以及編碼
               for(int i = 0; i < arrPath.length; i++){
                  if(StringUtils.isNotBlank(arrPath[i])){
                     finalPath += "/";
                     finalPath += URLEncoder.encode(arrPath[i], StandardCharsets.UTF_8.toString()) ;
                  }
               }
               //               String bgmUrl = "http://192.168.56.1:8080/mvc" + finalPath;
               /*優化*/          String bgmUrl = resourceConfig.getBgmServer() + finalPath;
               if(operatorType.equals(BGMOperatorTypeEnum.ADD.type)){
                  //下載下傳bgm到springboot伺服器
                  URL url = new URL(bgmUrl);
                  File file = new File(filePath);
                  FileUtils.copyURLToFile(url, file);
                  client.delete().forPath(path);
                  log.info("檔案 {} 已同步!",filePath);
               }else if (operatorType.equals(BGMOperatorTypeEnum.DELETE.type)){
                  File file = new File(filePath);
                  log.info("檔案 {} 已删除!",filePath);
                  FileUtils.forceDelete(file);
                  client.delete().forPath(path);
               }

            }
         }
      });


   }
}      

五、周遊所有節點

public static void listAll(ZooKeeper zk, String path) throws Exception {
        List<String> children = zk.getChildren(path, false);
        for(String child : children){
            String currentNodeName = "/".equals(path) ? path + child : path + "/" + child;
            System.out.println(currentNodeName);
            listAll(zk, currentNodeName);
        }
    }      

六、建立節點

​ZooDefs.Ids.CREATOR_ALL_ACL​

​是需要添加身份才可以建立,否則身份為隻讀使用者,不能建立和删除

/**
         * 使用java遠端通路zookeeper
         * 建立用戶端
         * 使用用戶端發送指令處理傳回結果
         * 回收資源
         */
        public static void create() throws IOException, KeeperException, InterruptedException {
            ZooKeeper zk = new ZooKeeper("47.107.63.171:2181", 10000, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    System.out.println("watch中的方法執行");
                }
            });




            String result = zk.create("/parent", "parent data".getBytes(),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.PERSISTENT);
            System.out.println("建立的parent的結果:"  +  result);

           //建立臨時節點後,在zk.close()後會失效
            String tmpResult = zk.create("/parent/tmp",null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            System.out.println("建立的/parent/tmp的結果:"  +  tmpResult);


            String seqResult = zk.create("/parent/sequence", null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.PERSISTENT_SEQUENTIAL);
            System.out.println("建立的/parent/sequence的結果:"  +  seqResult);


            //關閉用戶端
            zk.close();


        }      

七、擷取資料

public static void get() throws Exception {
    ZooKeeper zk = new ZooKeeper("47.107.63.171:2181", 10000, new Watcher() {
        @Override
        public void process(WatchedEvent event) {


        }
    });
    byte[] data = zk.getData("/parent", false, null);
    String s = new String(data);
    System.out.println("/parent擷取的資料為:" + s);
}      

八、删除節點

/**
     * 删除節點
     * 删除節點前,需要先查詢節點的狀态(cversion),通過getData來查詢這個版本
     * 設計是為了保證删除的節點是你想删除的那個
     * @throws Exception
     */
public static void delete() throws Exception {
    ZooKeeper zk = new ZooKeeper("47.107.63.171:2181", 10000, new Watcher() {
        @Override
        public void process(WatchedEvent event) {


        }
    });


    Stat stat = new Stat();
    System.out.println(stat);


    zk.getData("/parent/sequence0000000001",false, stat);
    //System.out.println(stat.getCversion());//cversion
    //System.out.println(stat.getVersion());//即dataVersion
    //System.out.println(stat.getAversion());//aclVersion
    System.out.println(stat.getCversion());


    /*delete不能删除非空節點,即存在子節點時
        stat.getCversion()是要在getDtata後才能擷取到它的版本,因為每次查詢後,
        stat才能擷取到版本号,這樣才能删除,如果擷取不到那說明沒有這個版本号
        也就是适合在高并發的時候使用,如果不用stat,而直接用版本号,那麼可能就會沖突
         */
    zk.delete("/parent/sequence0000000001", stat.getCversion());
}