ZooKeeper : Curator架構Watcher API介紹
在之前的部落格中,部落客已經介紹了
Curator
架構的
Session
、
Znode
以及
ACL API
:
- ZooKeeper : Curator架構重試政策和Session API介紹
- ZooKeeper : Curator架構Znode、ACL API介紹
本篇部落格将介紹
Curator
架構的
Watcher API
,它簡化了
Java
用戶端原生
Watcher API
的使用,但了解後者可以更好地了解前者的實作:
- ZooKeeper :Java用戶端Watcher API介紹
部落客使用的
Curator
架構版本是
5.2.0
,
ZooKeeper
版本是
3.6.3
。
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>
5.2.0
版本的
Curator
使用
3.6.3
版本的
ZooKeeper
,是以它們是相容的。
Watcher
Watcher
是
ZooKeeper
中非常重要的特性,
ZooKeeper
上建立的節點,可以對這些節點綁定監聽事件,比如可以監聽節點資料變更、節點删除、子節點狀态變更等事件,通過這個事件機制,可以基于
ZooKeeper
實作分布式鎖、叢集管理等功能。
Watcher
特性:比如當節點資料發生變化的時候,
ZooKeeper
會産生一個
Watcher
事件,并且會發送到用戶端,用戶端收到監聽的節點事件後,就可以進行相應的業務處理了。
ZooKeeper
的
Watcher
機制,可以分為三個過程:用戶端注冊
Watcher
、服務端處理
Watcher
和用戶端回調。
一次性Watcher
在
Java
用戶端原生
API
中,調用
getData
、
getChildren
以及
exists
這三個方法都可以在節點上留下一次性
Watcher
,而這些一次性
Watcher
的類型分别是
Data
、
Children
和
Data
,
Watcher
類型在
WatcherType
枚舉類中定義。
@Public
public static enum WatcherType {
Children(1),
Data(2),
Any(3);
private final int intValue;
private WatcherType(int intValue) {
this.intValue = intValue;
}
public int getIntValue() {
return this.intValue;
}
public static Watcher.WatcherType fromInt(int intValue) {
switch(intValue) {
case 1:
return Children;
case 2:
return Data;
case 3:
return Any;
default:
throw new RuntimeException("Invalid integer value for conversion to WatcherType");
}
}
}
Java
用戶端原生
API
中
getData
、
getChildren
以及
exists
三個方法的一次性
Watcher
觸發條件:
-
:一次性getData
将由在節點上設定資料或删除節點的成功操作觸發。Watcher
-
:一次性exists
将由建立、删除節點或在節點上設定資料的成功操作觸發。Watcher
-
:删除給定路徑的節點或在節點下建立、删除子節點的成功操作将觸發一次性getChildren
。Watcher
測試類
Application
(實作了
CuratorWatcher
接口,是以
Application
的執行個體有
Watcher
的功能 ):
package com.kaven.zookeeper;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
/**
* @Author: ITKaven
* @Date: 2021/11/20 10:30
* @Leetcode: https://leetcode-cn.com/u/kavenit
* @Notes:
*/
public class Application implements CuratorWatcher {
private static final String SERVER_PROXY = "192.168.31.172:9000";
private static final int CONNECTION_TIMEOUT_MS = 40000;
private static final int SESSION_TIMEOUT_MS = 10000;
private static final String NAMESPACE = "MyNamespace";
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework curator = CuratorFrameworkFactory.builder()
.connectString(SERVER_PROXY)
.retryPolicy(retryPolicy)
.connectionTimeoutMs(CONNECTION_TIMEOUT_MS)
.sessionTimeoutMs(SESSION_TIMEOUT_MS)
.namespace(NAMESPACE)
.build();
curator.start();
assert curator.getState().equals(CuratorFrameworkState.STARTED);
Application watcher = new Application();
curator.checkExists()
.usingWatcher(watcher)
.forPath("/father/son/grandson1");
curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath("/father/son/grandson1", "data".getBytes());
curator.setData()
.forPath("/father/son/grandson1", "new data".getBytes());
Thread.sleep(10000000);
}
@Override
public void process(WatchedEvent watchedEvent) throws Exception {
System.out.println(watchedEvent.getPath());
System.out.println(watchedEvent.getType());
}
}
輸出:
/father/son/grandson1
NodeCreated
Curator
架構的
checkExists
方法對應于
Java
用戶端的
exists
方法。對節點進行了兩次操作(建立節點和修改節點資料),但
Watcher
隻觸發了一次。如果想要多次監聽節點事件就需要多次添加這種一次性
Watcher
:
Application watcher = new Application();
curator.checkExists()
.usingWatcher(watcher)
.forPath("/father/son/grandson1");
curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath("/father/son/grandson1", "data".getBytes());
curator.checkExists()
.usingWatcher(watcher)
.forPath("/father/son/grandson1");
curator.setData()
.forPath("/father/son/grandson1", "new data".getBytes());
curator.checkExists()
.usingWatcher(watcher)
.forPath("/father/son/grandson1");
curator.delete()
.forPath("/father/son/grandson1");
輸出:
/father/son/grandson1
NodeCreated
/father/son/grandson1
NodeDataChanged
/father/son/grandson1
NodeDeleted
很顯然符合預期,想要多次監聽事件,就必須多次添加一次性
Watcher
,而之前的程式由于沒有再次添加一次性
Watcher
,節點的
NodeDataChanged
事件就不能監聽,因為一次性
Watcher
在節點的
NodeCreated
事件觸發後就被
Zookeeper
服務端移除了。
Curator
架構的
getData
方法對應于
Java
用戶端的
getData
方法,它們都隻能在節點存在的情況下給節點留下一次性
Watcher
,是以它們留下的一次性
Watcher
肯定不能監聽該節點的建立事件,其他事件的監聽與
checkExists
方法的一次性
Watcher
類似,這裡不再贅述。
Application watcher = new Application();
curator.getData()
.usingWatcher(watcher)
.forPath("/father/son/grandson1");
curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath("/father/son/grandson1", "data".getBytes());
Curator
架構的
getChildren
方法對應于
Java
用戶端的
getChildren
方法,給指定節點留下一次性
Watcher
的方式也是類似的,都是通過
usingWatcher
方法,删除給定路徑的節點或在節點下建立、删除子節點的成功操作将觸發這個一次性
Watcher
(臨時節點不能建立子節點)。
curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/father/son/grandson1", "data".getBytes());
Application watcher = new Application();
curator.getChildren()
.usingWatcher(watcher)
.forPath("/father/son/grandson1");
curator.create()
.withMode(CreateMode.EPHEMERAL)
.forPath("/father/son/grandson1/test", "test".getBytes());
curator.getChildren()
.usingWatcher(watcher)
.forPath("/father/son/grandson1");
curator.delete()
.forPath("/father/son/grandson1/test");
curator.getChildren()
.usingWatcher(watcher)
.forPath("/father/son/grandson1");
curator.delete()
.forPath("/father/son/grandson1");
輸出:
/father/son/grandson1
NodeChildrenChanged
/father/son/grandson1
NodeChildrenChanged
/father/son/grandson1
NodeDeleted
在
Curator
架構中可以使用兩種
Watcher
,
Java
用戶端提供的
Watcher
接口和
Curator
架構提供的
CuratorWatcher
接口。而它們的方法定義是類似的:
package org.apache.curator.framework.api;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
public interface CuratorWatcher
{
public void process(WatchedEvent event) throws Exception;
}
package org.apache.zookeeper;
import org.apache.yetus.audience.InterfaceAudience.Public;
@Public
public interface Watcher {
void process(WatchedEvent var1);
...
}
在
Curator
架構中這兩種
Watcher
的實作都可以使用。
T usingWatcher(Watcher watcher);
T usingWatcher(CuratorWatcher watcher);
當需要一直監聽指定節點的事件時,一次性
Watcher
就不太友善了,這時就需要持久
Watcher
。
持久Watcher
在
Java
用戶端原生
API
中,調用
addWatch
方法可以在節點上添加持久
Watcher
(
PERSISTENT
和
PERSISTENT_RECURSIVE
),并且這些
Watcher
的類型是
Any
。在
Curator
架構中,可以如下所示給節點添加持久
Watcher
(通過
.watchers().add()
鍊式調用):
curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/father/son/grandson1", "data".getBytes());
Application watcher = new Application();
curator.watchers()
.add()
.withMode(AddWatchMode.PERSISTENT)
.usingWatcher(watcher)
.forPath("/father/son/grandson1");
curator.create()
.withMode(CreateMode.EPHEMERAL)
.forPath("/father/son/grandson1/test", "test".getBytes());
curator.setData()
.forPath("/father/son/grandson1/test", "new test".getBytes());
輸出:
/father/son/grandson1
NodeChildrenChanged
AddWatchMode.PERSISTENT
類型的
Watcher
隻會監聽注冊節點的相關事件(節點資料更改
NodeDataChanged
、子節點清單更改
NodeChildrenChanged
以及節點删除
NodeDeleted
等),而不會監聽注冊節點的子節點的相關事件(不會引起子節點清單更改的事件)。如果将
AddWatchMode.PERSISTENT
換成
AddWatchMode.PERSISTENT_RECURSIVE
:
curator.watchers()
.add()
.withMode(AddWatchMode.PERSISTENT_RECURSIVE)
.usingWatcher(watcher)
.forPath("/father/son/grandson1");
輸出:
/father/son/grandson1/test
NodeCreated
/father/son/grandson1/test
NodeDataChanged
AddWatchMode.PERSISTENT_RECURSIVE
類型的
Watcher
不僅監聽注冊節點的相關事件(節點資料更改
NodeDataChanged
和節點删除
NodeDeleted
等,而子節點清單更改
NodeChildrenChanged
其實就是子節點的節點建立
NodeCreated
和節點删除
NodeDeleted
,應該觸發的是子節點的事件監聽),還會遞歸地監聽注冊節點的所有子節點的相關事件。
AddWatchMode
枚舉類:
删除Watcher
删除類型為
Any
的
Watcher
,也會一起删除類型為
Children
和
Data
的
Watcher
,而删除類型為
Children
或
Data
的
Watcher
,隻能删除對應類型的
Watcher
。
删除指定節點類型為
Data
的所有
Watcher
:
Application watcher1 = new Application();
Application watcher2 = new Application();
curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/father/son/grandson1", "data".getBytes());
curator.getChildren()
.usingWatcher(watcher1)
.forPath("/father/son/grandson1");
curator.getData()
.usingWatcher(watcher2)
.forPath("/father/son/grandson1");
curator.watchers()
.removeAll()
.ofType(Watcher.WatcherType.Data)
.forPath("/father/son/grandson1");
@Override
public void process(WatchedEvent watchedEvent) throws Exception {
System.out.println(watchedEvent.getPath());
System.out.println(watchedEvent.getType());
System.out.println(this);
}
輸出:
/father/son/grandson1
DataWatchRemoved
com.kaven.zookeeper.Application@5af5ec05
删除指定節點類型為
Any
的所有
Watcher
:
curator.watchers()
.removeAll()
.ofType(Watcher.WatcherType.Any)
.forPath("/father/son/grandson1");
輸出:
/father/son/grandson1
ChildWatchRemoved
com.kaven.zookeeper.Application@61613ed9
/father/son/grandson1
DataWatchRemoved
com.kaven.zookeeper.Application@5af5ec05
删除指定節點類型為
Any
的指定
Watcher
:
curator.watchers()
.remove(watcher1)
.ofType(Watcher.WatcherType.Any)
.forPath("/father/son/grandson1");
/father/son/grandson1
ChildWatchRemoved
com.kaven.zookeeper.Application@5ebb066a