本篇源碼基于
ZooKeeper3.7.0
版本。

ZooKeeper
提供了分布式資料的釋出/訂閱功能。一個典型的釋出/訂閱模型系統定義了一種一對多的訂閱關系,能夠讓多個訂閱者同時監聽某一個主題對象,當這個主題對象自身狀态變化時,會通知所有訂閱者,使它們能夠做出相應的處理。
ZooKeeper
允許用戶端向服務端注冊一個
Watcher
監聽,當服務端的一些更新操作觸發了這個
Watcher
,就會向指定用戶端發送一個事件通知來實作分布式的通知功能。
本篇僅基于用戶端對
Watcher
注冊與通知過程進行講解,涉及到
Watcher
如何在服務端注冊管理,又如何觸發事件遠端通知用戶端的原理後續在講解服務端源碼時會補充。
不過也不要覺得服務端的
Watcher
注冊與通知邏輯就複雜,其實也比較簡單,這裡簡單陳述以保内容完整:
ZooKeeper
的
Watcher
機制是一個跨程序的釋出/訂閱功能,用戶端與服務端都需要儲存資料節點和
Watcher
的關系,當節點的狀态資訊變更時就會觸發一些事件,服務端先從自己的記憶體中找出節點對應的
Watcher
清單,然後一個個周遊生成事件通知消息,再遠端發送給用戶端;用戶端接收到對應消息後,解析出
Wather
事件資訊,得知是哪個資料節點,觸發什麼事件類型,然後用戶端同樣從記憶體中找到節點對應的
Watcher
清單,真正觸發事件回調。
一、基礎類
1、Watcher
使用者注冊
watcher
都需要實作
Watcher
接口,實作
process
方法。
org.apache.zookeeper.Watcher
public interface Watcher {
void process(WatchedEvent event);
}
2、WatchedEvent
process(WatchedEvent event)
的參數是
WatchedEvent
,定義事件資訊:
// org.apache.zookeeper.WatchedEvent
public class WatchedEvent {
private final KeeperState keeperState;
private final EventType eventType;
private String path;
... ...
/**
* 将WatchedEvent轉換為可以通過網絡發送的類型
* Convert WatchedEvent to type that can be sent over network
*/
public WatcherEvent getWrapper() {
return new WatcherEvent(eventType.getIntValue(), keeperState.getIntValue(), path);
}
}
WatchedEvent
有3個變量,通知狀态
keeperState
、節點事件類型
eventType
、節點
path
:
keeperState
和
eventType
都是
Watcher
中的枚舉類。
(1)KeeperState
KeeperState | 說明 |
Disconnected(0) | 用戶端與服務端斷開連接配接 |
SyncConnected(3) | 用戶端與服務端處于連接配接狀态 |
AuthFailed(4) | 授權失敗 |
ConnectedReadOnly(5) | 用戶端連接配接到隻讀伺服器。接收到這個狀态後,唯一允許的操作是讀取操作。這個狀态隻在隻讀用戶端産生,讀寫用戶端是不允許連接配接隻讀伺服器的 |
SaslAuthenticated(6) | 用于通知用戶端他們已經通過了SaslAuthenticated,以後可以用sasl授權的權限執行Zookeeper動作 |
Expired(-112) | 會話逾時 |
Closed(7) | 用戶端已關閉。這個狀态永遠不會由伺服器生成,由用戶端本地生成。 |
(2)EventType
EventType | 說明 |
None(-1) | KeeperState為SyncConnected(3)時,表示用戶端與服務端成功建立會話 |
NodeCreated(1) | 資料節點建立 |
NodeDeleted(2) | 資料節點被删除 |
NodeDataChanged(3) | 資料節點的狀态資訊更新,即使更新内容一樣,版本号,一樣會觸發 |
NodeChildrenChanged(4) | 資料節點的孩子節點清單發生變更,特指子節點個數群組成情況的變更,即新增子節點或删除子節點,而子節點内容的變化是不會觸發這個事件的 |
DataWatchRemoved(5) | 資料節點的watcher被主動移除 |
ChildWatchRemoved(6) | 孩子節點的watcher被主動移除 |
PersistentWatchRemoved (7) | 持久有效的watcher被主動移除 |
3、WatcherEvent
WatcherEvent
是可以通過網絡發送的事件資訊封裝。
WatcherEvent
和
WatchedEvent
表示的是同一個事物,都是對一個
watcher
事件資訊的封裝,不同的是,
WatchedEvent
是一個邏輯事件,用于服務端和用戶端程式執行過程中所需的邏輯對象,而
WatcherEvent
因為實作了序列化接口,是以可以用于網絡傳輸:
-
,可以将serialize()
資訊序列化到網絡位元組流中,然後發送到網絡中。服務端遠端通知用戶端Watcher
時使用。watcher
-
,可以從網絡位元組流中反序列化出deserialize()
資訊。用戶端接收到服務端遠端通知消息時使用。Watcher
無論是
WatchedEvent
還是
WatcherEvent
,其對
watcher
事件資訊的封裝都是極其簡單的,用戶端無法直接從事件資訊中擷取對應資料節點的原始資料内容以及變更後的新資料内容,而是需要用戶端再次主動去擷取資料。
4、WatchRegistration
WatchRegistration
是對
watcher
注冊方式的抽象:
注冊的動作是一樣的,隻是需要注冊到不同的集合中,具體繼承類,需要實作方法
WatchRegistration#getWatches
,擷取相應集合,将
Watcher
加入節點
path
對應的清單中。
如下是抽象類
WatchRegistration
部分代碼:
protected abstract Map<String, Set<Watcher>> getWatches(int rc);
public void register(int rc) {
if (rc == KeeperException.Code.OK.intValue()) {
Map<String, Set<Watcher>> watches = getWatches(rc);
synchronized (watches) {
Set<Watcher> watchers = watches.get(clientPath);
if (watchers == null) {
watchers = new HashSet<Watcher>();
watches.put(clientPath, watchers);
}
watchers.add(watcher);
}
}
}
5、WatcherSetEventPair
使用者可能會對一個節點注冊多個
watcher
,服務端遠端觸發用戶端的
watcher
時,用戶端需要将該節點對應的所有
watcher
都觸發一次。
是以
WatcherSetEventPair
對
WatchedEvent
和
watchers
清單進行封裝,友善
EventThread
線程處理
watcher
觸發工作。
6、ZKWatchManager
ZKWatchManager
作為用戶端
watcher
管理器,實作了接口
ClientWatchManager
:
ZKWatchManager
中用5個集合對應5種不同的
watcher
注冊場景:
-
,在調用dataWatches
、getData
時注冊了getConfig
,會使用watcher
來存儲dataWatches
。watcher
-
,對應existWatches
。exists
-
,對應childWatches
。getChildren
-
,給定節點持續有效的persistentWatches
集合,觸發之後不會被移除。watcher
-
,給定節點及其遞歸所有子節點都持續有效的persistentRecursiveWatches
集合,觸發之後不會被移除。watcher
之前網上一直說
Zookeeper
的觀察者注冊一次隻能觸發一次,觸發的同時會被移除,如果需要注冊一次,可多次有效觸發,用戶端使用起來比較麻煩。
是以官方彌補了這種場景,新加了
persistentWatches
和
persistentRecursiveWatches
兩種集合來存儲持續有效的
watcher
,觸發之後不會被移除,如果要移除需要調用指定方法
ZKWatchManager#removeWatcher
,如果想注冊持續有效的觀察者,也是需要單獨調用指定方法
ZooKeeper#addWatch
。
ZKWatchManager
實作了接口
ClientWatchManager
,主要實作了
ClientWatchManager#materialize
方法,擷取一個應該被觸發事件的
watcher
清單:
org.apache.zookeeper.ZKWatchManager#materialize
@Override
public Set<Watcher> materialize(
Watcher.Event.KeeperState state,
Watcher.Event.EventType type,
String clientPath
) {
final Set<Watcher> result = new HashSet<>();
switch (type) {
case None:
// ... ...省略None情況,
// 無類型事件,判斷 通知狀态KeeperState,如果KeeperState不是SyncConnected 就把所有的 watcher容器都清空
// 根據 EventType 從不同的集合中擷取觀察者清單
// dataWatches、existWatches、childWatches在擷取watcher清單時有移除操作
// persistentWatches、persistentRecursiveWatches沒有移除操作
case NodeDataChanged:
case NodeCreated:
synchronized (dataWatches) {
addTo(dataWatches.remove(clientPath), result);
}
synchronized (existWatches) {
addTo(existWatches.remove(clientPath), result);
}
addPersistentWatches(clientPath, result);
break;
case NodeChildrenChanged:
synchronized (childWatches) {
addTo(childWatches.remove(clientPath), result);
}
addPersistentWatches(clientPath, result);
break;
case NodeDeleted:
synchronized (dataWatches) {
addTo(dataWatches.remove(clientPath), result);
}
// TODO This shouldn't be needed, but just in case
synchronized (existWatches) {
Set<Watcher> list = existWatches.remove(clientPath);
if (list != null) {
addTo(list, result);
LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
}
}
synchronized (childWatches) {
addTo(childWatches.remove(clientPath), result);
}
addPersistentWatches(clientPath, result);
break;
default:
String errorMsg = String.format(
"Unhandled watch event type %s with state %s on path %s",
type,
state,
clientPath);
LOG.error(errorMsg);
throw new RuntimeException(errorMsg);
}
return result;
}
private void addPersistentWatches(String clientPath, Set<Watcher> result) {
synchronized (persistentWatches) {
addTo(persistentWatches.get(clientPath), result);
}
synchronized (persistentRecursiveWatches) {
for (String path : PathParentIterator.forAll(clientPath).asIterable()) {
addTo(persistentRecursiveWatches.get(path), result);
}
}
}
由源碼可見,從
dataWatches
、
existWatches
、
childWatches
集合中擷取
watcher
清單時有移除操作,而從
persistentWatches
、
persistentRecursiveWatches
擷取時沒有移除操作。
二、Watcher注冊流程
可以注冊
watcher
的請求都是非事務請求,比如:
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
public byte[] getData(final String path, Watcher watcher, Stat stat)
public Stat exists(final String path, Watcher watcher)
public List<String> getChildren(final String path, Watcher watcher, Stat stat)
... ...
1、建構WatchRegistration
需要注冊的
Watcher
會被封裝進一個
WatchRegistration
對象中,
WatchRegistration
抽象了注冊的方式,會和請求體等一并包裝進
Packet
。
需要注意,
Watcher
注冊資訊不會發送給服務端,而是隻發送一個布爾值标注是否注冊
Watcher
(
watch=true
),這樣就減少了資料包的大小,降低了網絡壓力,同時也使得
Watcher
注冊流程簡單。
以
getData
為例:
2、響應成功後注冊Watcher
需要注冊
Watcher
的請求發給服務端後,用戶端并不會立刻在自己記憶體中存儲
Watcher
關系,而是還需要根據請求的響應狀态,如果響應狀态OK,才會把
Watcher
注冊到
ZKWatchManager
。
如下圖是
Wacther
注冊流程:
三、Watcher通知流程
1、處理事件通知資訊
資料節點的狀态資訊發生變更後,服務端找到該節點的
watcher
清單,周遊生成事件通知資訊發送給用戶端。用戶端接收到事件通知資訊後,反解析出
WatcherEvent
對象,又轉換成
WatchedEvent
,再送出到
EventThread
線程處理。
如下是用戶端處理事件通知資訊
NOTIFICATION
的部分源碼:
2、送出給EventThread線程
從事件通知資訊中解析出
WatchedEvent
後,通過
WatchedEvent
的三個屬性
keeperState
、
eventType
和
path
從
ZKWatchManager
中取出符合要求的
Watcher
清單,然後将
WachedEvent
對象和
Watcher
清單封裝進
WatcherSetEventPair
并添加到
waitingEvents
隊列。
3、周遊waitingEvents隊列
4、真正觸發Watcher#process
如下圖是
Watcher
通知流程:
四、總結
1、
Watcher
注冊時,用戶端隻發送了一個布爾值給服務端聲明是否需要注冊
Watcher
;隻有當服務端那邊
Wacther
注冊成功了,且響應成功,用戶端這邊才會儲存
Watcher
和節點的關系。
2、
Wacther
通知時,隻能從通知資訊中得知是哪個節點發生什麼事件,而無法得知具體發生了什麼變更,要想得知必須再主動擷取一次節點資訊。
推薦閱讀:《從Paxos到Zookeeper:分布式一緻性原理與實踐》倪超著。