zookeeper安裝
-
Watcher
zookeeper提供了分布式資料釋出/訂閱,zookeeper允許用戶端向伺服器注冊一個watcher監聽。當伺服器端的節點觸發指定事件的時候
會觸發watcher。服務端會向用戶端發送一個事件通知
watcher的通知是一次性,一旦觸發一次通知後,該watcher就失效
-
ACL
zookeeper提供控制節點通路權限的功能,用于有效的保證zookeeper中資料的安全性。避免誤操作而導緻系統出現重大事故。
CREATE /READ/WRITE/DELETE/ADMIN
-
zookeeper的指令操作
1. create [-s] [-e] path data acl
-s 表示節點是否有序
-e 表示是否為臨時節點
預設情況下,是持久化節點
2. get path [watch]
獲得指定 path的資訊
3.set path data [version]
修改節點 path對應的data
樂觀鎖的概念
資料庫裡面有一個 version 字段去控制資料行的版本号
4.delete path [version]
删除節點
stat資訊
cversion = 0 子節點的版本号
aclVersion = 0 表示acl的版本号,修改節點權限
dataVersion = 1 表示的是目前節點資料的版本号
czxid 節點被建立時的事務ID
mzxid 節點最後一次被更新的事務ID
pzxid 目前節點下的子節點最後一次被修改時的事務ID
java API的使用(curator)zkClient略
-
權限控制模式
schema:授權對象
ip : 192.168.1.1
Digest : username:password
world : 開放式的權限控制模式,資料節點的通路權限對所有使用者開放。 world:anyone
super :超級使用者,可以對zookeeper上的資料節點進行操作
-
連接配接狀态
KeeperStat.Expired 在一定時間内用戶端沒有收到伺服器的通知, 則認為目前的會話已經過期了。
KeeperStat.Disconnected 斷開連接配接的狀态
KeeperStat.SyncConnected 用戶端和伺服器端在某一個節點上建立連接配接,并且完成一次version、zxid同步
KeeperStat.authFailed 授權失敗
-
事件類型
NodeCreated 當節點被建立的時候,觸發
NodeChildrenChanged 表示子節點被建立、被删除、子節點資料發生變化
NodeDataChanged 節點資料發生變化
NodeDeleted 節點被删除
None 用戶端和伺服器端連接配接狀态發生變化的時候,事件類型就是None
-
curator api
Curator本身是Netflix公司開源的zookeeper用戶端,提供了各種應用場景的實作封裝
curator-framework 提供了fluent風格api
curator-replice 提供了實作封裝
- curator連接配接的重試政策
ExponentialBackoffRetry() 衰減重試
RetryNTimes 指定最大重試次數
RetryOneTime 僅重試一次
RetryUnitilElapsed 一直重試知道規定的時間
代碼--連結:https://pan.baidu.com/s/1_OzwNgaAK4FBv2c3dfLKUQ 密碼:3bbc
- 導入jar包
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.8</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.2</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
</dependencies>
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorClientUtils {
private static CuratorFramework curatorFramework;
private final static String CONNECTSTRING="192.168.11.129:2181,192.168.11.134:2181," +
"192.168.11.135:2181,192.168.11.136:2181";
public static CuratorFramework getInstance(){
curatorFramework= CuratorFrameworkFactory.
newClient(CONNECTSTRING,5000,5000,
new ExponentialBackoffRetry(1000,3));
curatorFramework.start();
return curatorFramework;
}
}
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CuratorOperatorDemo {
public static void main(String[] args) throws InterruptedException {
CuratorFramework curatorFramework=CuratorClientUtils.getInstance();
System.out.println("連接配接成功.........");
//fluent風格
/**
* 建立節點
*/
/* try {
String result=curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).
forPath("/curator/curator1/curator11","123".getBytes());
System.out.println(result);
} catch (Exception e) {
e.printStackTrace();
}*/
/**
* 删除節點
*/
/*try {
//預設情況下,version為-1
curatorFramework.delete().deletingChildrenIfNeeded().forPath("/node11");
} catch (Exception e) {
e.printStackTrace();
}*/
/**
* 查詢
*/
/*Stat stat=new Stat();
try {
byte[] bytes=curatorFramework.getData().storingStatIn(stat).forPath("/curator");
System.out.println(new String(bytes)+"-->stat:"+stat);
} catch (Exception e) {
e.printStackTrace();
}*/
/**
* 更新
*/
/* try {
Stat stat=curatorFramework.setData().forPath("/curator","123".getBytes());
System.out.println(stat);
} catch (Exception e) {
e.printStackTrace();
}*/
/**
* 異步操作
*/
/*ExecutorService service= Executors.newFixedThreadPool(1);
CountDownLatch countDownLatch=new CountDownLatch(1);
try {
curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).
inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
System.out.println(Thread.currentThread().getName()+"->resultCode:"+curatorEvent.getResultCode()+"->"
+curatorEvent.getType());
countDownLatch.countDown();
}
},service).forPath("/mic","123".getBytes());
} catch (Exception e) {
e.printStackTrace();
}
countDownLatch.await();
service.shutdown();*/
/**
* 事務操作(curator獨有的)
*/
try {
Collection<CuratorTransactionResult> resultCollections=curatorFramework.inTransaction().create().forPath("/trans","111".getBytes()).and().
setData().forPath("/curator","111".getBytes()).and().commit();
for (CuratorTransactionResult result:resultCollections){
System.out.println(result.getForPath()+"->"+result.getType());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.zookeeper.CreateMode;
import java.util.concurrent.TimeUnit;
public class CuratorEventDemo {
/**
* 三種watcher來做節點的監聽
* pathcache 監視一個路徑下子節點的建立、删除、節點資料更新
* NodeCache 監視一個節點的建立、更新、删除
* TreeCache pathcaceh+nodecache 的合體(監視路徑下的建立、更新、删除事件),
* 緩存路徑下的所有子節點的資料
*/
public static void main(String[] args) throws Exception {
CuratorFramework curatorFramework=CuratorClientUtils.getInstance();
/**
* 節點變化NodeCache
*/
/* NodeCache cache=new NodeCache(curatorFramework,"/curator",false);
cache.start(true);
cache.getListenable().addListener(()-> System.out.println("節點資料發生變化,變化後的結果" +
":"+new String(cache.getCurrentData().getData())));
curatorFramework.setData().forPath("/curator","菲菲".getBytes());*/
/**
* PatchChildrenCache
*/
PathChildrenCache cache=new PathChildrenCache(curatorFramework,"/event",true);
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
// Normal / BUILD_INITIAL_CACHE /POST_INITIALIZED_EVENT
cache.getListenable().addListener((curatorFramework1,pathChildrenCacheEvent)->{
switch (pathChildrenCacheEvent.getType()){
case CHILD_ADDED:
System.out.println("增加子節點");
break;
case CHILD_REMOVED:
System.out.println("删除子節點");
break;
case CHILD_UPDATED:
System.out.println("更新子節點");
break;
default:break;
}
});
curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/event","event".getBytes());
TimeUnit.SECONDS.sleep(1);
System.out.println("1");
curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath("/event/event1","1".getBytes());
TimeUnit.SECONDS.sleep(1);
System.out.println("2");
curatorFramework.setData().forPath("/event/event1","222".getBytes());
TimeUnit.SECONDS.sleep(1);
System.out.println("3");
curatorFramework.delete().forPath("/event/event1");
System.out.println("4");
System.in.read();
}
}
zookeeper的實際應用場景
zookeeper能夠實作哪些場景
訂閱釋出
watcher機制
統一配置管理(disconf)
分布式鎖
redis
zookeeper
資料庫
負載均衡
ID生成器
分布式隊列
統一命名服務
master選舉
分布式鎖