一、基本操作指令
進入
zkClli.sh
ZooKeeper -server host:port cmd args
stat path [watch]
set path data [version]
ls path [watch]
delquota [-n|-b] path
ls2 path [watch]
setAcl path acl
setquota -n|-b val path
history
redo cmdno
printwatches on|off
delete path [version]
sync path
listquota path
rmr path
get path [watch]
create [-s] [-e] path data acl
addauth scheme auth
quit
getAcl path
close
connect host:port
二、項目整合zookeeper
依賴
<!-- zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>${curator.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>${curator.version}</version>
</dependency>
初始化執行個體
- 配置applicationContext-zookeeper.xml
<description>zookeeper 放入spring容器,項目啟動加載的時候就建立和zk的連接配接</description>
<!--建立重連政策-->
<bean id="retryPolicy" class="org.apache.curator.retry.ExponentialBackoffRetry">
<!--每次重試連接配接的等待時間-->
<constructor-arg index="0" value="1000"></constructor-arg>
<!--設定的重連的次數-->
<constructor-arg index="1" value="5"></constructor-arg>
</bean>
<!--建立zookeeper用戶端-->
<bean id="client" class="org.apache.curator.framework.CuratorFrameworkFactory"
factory-method="newClient" init-method="start">
<constructor-arg index="0" value="47.107.63.171:2181"></constructor-arg>
<constructor-arg index="1" value="10000"></constructor-arg>
<constructor-arg index="2" value="10000"></constructor-arg>
<constructor-arg index="3" ref="retryPolicy"></constructor-arg>
</bean>
<!--用戶端配置-->
<bean id="ZKCurator" class="com.tony.web.util.ZKCurator" init-method="init">
<constructor-arg index="0" ref="client"></constructor-arg>
</bean>
建立zkCurator類
并自動初始化,啟動監聽
package com.tony.web.util;
import com.sun.scenario.effect.impl.sw.sse.SSEBlend_SRC_OUTPeer;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooDefs.Ids;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ZKCurator {
//通過bean自動建立執行個體
private CuratorFramework client = null;
final static Logger log = LoggerFactory.getLogger(ZKCurator.class);
public ZKCurator(CuratorFramework client){
this.client = client;
}
public void init(){
client = client.usingNamespace("admin");
try {
//判斷在admin命名空間下是否有bgm節點,/admin/bgm
if (client.checkExists().forPath("/bgm") == null) {
/**
* 對于zk來講,有兩種類型的節點:
* 持久節點:當用戶端斷開連接配接時,znode 不會被自動删除,
* 臨時節點:znode 将在用戶端斷開連接配接時被删除,
*/
client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT) //持久化節點
.withACL(Ids.OPEN_ACL_UNSAFE) //acl:匿名權限,完全開發
.forPath("/bgm");
log.info("zookeeper初始化成功");
log.info("zookeeper伺服器狀态:{}",client.isStarted());
}
} catch (Exception e) {
log.error("zookeeper用戶端連接配接、初始化錯誤...");
e.printStackTrace();
}
}
/**
* @Descrption: 增加或刪除bgm,向zookeeper建立子節點,供小程式監聽
* @param bgmId
* @param operaObj
*/
public void sendBgmOperator(String bgmId, String operaObj){
try {
client.create().creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(Ids.OPEN_ACL_UNSAFE)
.forPath("/bgm/" + bgmId, operaObj.getBytes("utf-8"));
} catch (Exception e) {
e.printStackTrace();
}
}
}
三、定義配置類
實作自動依賴注入來自動執行個體化
@Configuration
public class WebMvcConfig extends WebMvcConfigurerAdapter {
@Bean(initMethod = "init")
public ZKCuratorClient zkCuratorClient(){
return new ZKCuratorClient();
}
}
通過自動執行個體化自動開啟監聽
四、建立用戶端監聽
package com.tony;
import com.tony.config.ResourceConfig;
import com.tony.service.BgmService;
import com.tony.utils.BGMOperatorTypeEnum;
import com.tony.utils.JsonUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.aspectj.util.FileUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.File;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Map;
@Component
public class ZKCuratorClient {
private CuratorFramework client = null;
final static Logger log = LoggerFactory.getLogger(ZKCuratorClient.class);
@Autowired
private BgmService bgmService;
@Autowired
private ResourceConfig resourceConfig;
//public static final String ZOOKEEPER_SERVER = "47.107.63.171:2181";
//優化:使用resource.properties來依賴注入
public void init(){
if(client != null){
return;
}
//重試政策
RetryPolicy retryPolicy = new ExponentialBackoffRetry( 1000, 5);
//建立zk用戶端
client = CuratorFrameworkFactory.builder().connectString(resourceConfig.getZookeeperServer())
.sessionTimeoutMs(10000).retryPolicy(retryPolicy).namespace("admin").build();
//啟動用戶端
client.start();
try {
//String testNodeData = new String(client.getData().forPath("/bgm/210816FNTPDX97HH"));
//log.info("測試節點資料為:{}" + testNodeData);
addChildWatch("/bgm");
} catch (Exception e) {
e.printStackTrace();
}
}
public void addChildWatch(String nodePath) throws Exception{
final PathChildrenCache cache = new PathChildrenCache(client, nodePath, true);
cache.start();
cache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){
log.info("監聽到事件CHILD_ADDED");
//1. 從資料庫查詢bgm對象,擷取路徑path
String path = event.getData().getPath();
String operatotObjStr = new String(event.getData().getData(), "utf-8");
Map<String, Object> map = JsonUtils.jsonToPojo(operatotObjStr, Map.class);
System.out.println("operatorObjStr: " + operatotObjStr);
System.out.println("map: " + map);
//從zookeeper擷取bgmPath
String operatorType = (String) map.get("operaType");
String songPath = (String) map.get("path");
//String[] arr = path.split("/");
//String bgmId = arr[arr.length - 1];
//從資料庫擷取bgmPath
//Bgm bgm = bgmService.queryBgmById(bgmId);
//if(bgm == null){
// return;
//}
//bgm所在的相對路徑
//String songPath = bgm.getPath();
//2. 定義儲存到本地的bgm路徑
// String filePath = "D:/tony_videos_dev" + songPath;
/*優化*/ String filePath = resourceConfig.getFileSpace() + songPath;
//3. 定義下載下傳的路徑(播放url)
String arrPath[] = songPath.split("\\\\");
String finalPath = "";
//3.1 處理url的斜杠以及編碼
for(int i = 0; i < arrPath.length; i++){
if(StringUtils.isNotBlank(arrPath[i])){
finalPath += "/";
finalPath += URLEncoder.encode(arrPath[i], StandardCharsets.UTF_8.toString()) ;
}
}
// String bgmUrl = "http://192.168.56.1:8080/mvc" + finalPath;
/*優化*/ String bgmUrl = resourceConfig.getBgmServer() + finalPath;
if(operatorType.equals(BGMOperatorTypeEnum.ADD.type)){
//下載下傳bgm到springboot伺服器
URL url = new URL(bgmUrl);
File file = new File(filePath);
FileUtils.copyURLToFile(url, file);
client.delete().forPath(path);
log.info("檔案 {} 已同步!",filePath);
}else if (operatorType.equals(BGMOperatorTypeEnum.DELETE.type)){
File file = new File(filePath);
log.info("檔案 {} 已删除!",filePath);
FileUtils.forceDelete(file);
client.delete().forPath(path);
}
}
}
});
}
}
五、周遊所有節點
public static void listAll(ZooKeeper zk, String path) throws Exception {
List<String> children = zk.getChildren(path, false);
for(String child : children){
String currentNodeName = "/".equals(path) ? path + child : path + "/" + child;
System.out.println(currentNodeName);
listAll(zk, currentNodeName);
}
}
六、建立節點
ZooDefs.Ids.CREATOR_ALL_ACL
是需要添加身份才可以建立,否則身份為隻讀使用者,不能建立和删除
/**
* 使用java遠端通路zookeeper
* 建立用戶端
* 使用用戶端發送指令處理傳回結果
* 回收資源
*/
public static void create() throws IOException, KeeperException, InterruptedException {
ZooKeeper zk = new ZooKeeper("47.107.63.171:2181", 10000, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("watch中的方法執行");
}
});
String result = zk.create("/parent", "parent data".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
System.out.println("建立的parent的結果:" + result);
//建立臨時節點後,在zk.close()後會失效
String tmpResult = zk.create("/parent/tmp",null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("建立的/parent/tmp的結果:" + tmpResult);
String seqResult = zk.create("/parent/sequence", null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL);
System.out.println("建立的/parent/sequence的結果:" + seqResult);
//關閉用戶端
zk.close();
}
七、擷取資料
public static void get() throws Exception {
ZooKeeper zk = new ZooKeeper("47.107.63.171:2181", 10000, new Watcher() {
@Override
public void process(WatchedEvent event) {
}
});
byte[] data = zk.getData("/parent", false, null);
String s = new String(data);
System.out.println("/parent擷取的資料為:" + s);
}
八、删除節點
/**
* 删除節點
* 删除節點前,需要先查詢節點的狀态(cversion),通過getData來查詢這個版本
* 設計是為了保證删除的節點是你想删除的那個
* @throws Exception
*/
public static void delete() throws Exception {
ZooKeeper zk = new ZooKeeper("47.107.63.171:2181", 10000, new Watcher() {
@Override
public void process(WatchedEvent event) {
}
});
Stat stat = new Stat();
System.out.println(stat);
zk.getData("/parent/sequence0000000001",false, stat);
//System.out.println(stat.getCversion());//cversion
//System.out.println(stat.getVersion());//即dataVersion
//System.out.println(stat.getAversion());//aclVersion
System.out.println(stat.getCversion());
/*delete不能删除非空節點,即存在子節點時
stat.getCversion()是要在getDtata後才能擷取到它的版本,因為每次查詢後,
stat才能擷取到版本号,這樣才能删除,如果擷取不到那說明沒有這個版本号
也就是适合在高并發的時候使用,如果不用stat,而直接用版本号,那麼可能就會沖突
*/
zk.delete("/parent/sequence0000000001", stat.getCversion());
}