天天看點

ZooKeeper用戶端源碼(三)——Watcher注冊與通知

本篇源碼基于​

​ZooKeeper3.7.0​

​版本。

ZooKeeper用戶端源碼(三)——Watcher注冊與通知

​ZooKeeper​

​ 提供了分布式資料的釋出/訂閱功能。一個典型的釋出/訂閱模型系統定義了一種一對多的訂閱關系,能夠讓多個訂閱者同時監聽某一個主題對象,當這個主題對象自身狀态變化時,會通知所有訂閱者,使它們能夠做出相應的處理。

​ZooKeeper​

​​ 允許用戶端向服務端注冊一個 ​

​Watcher​

​​ 監聽,當服務端的一些更新操作觸發了這個 ​

​Watcher​

​,就會向指定用戶端發送一個事件通知來實作分布式的通知功能。

ZooKeeper用戶端源碼(三)——Watcher注冊與通知

本篇僅基于用戶端對​

​Watcher​

​​注冊與通知過程進行講解,涉及到​

​Watcher​

​如何在服務端注冊管理,又如何觸發事件遠端通知用戶端的原理後續在講解服務端源碼時會補充。

不過也不要覺得服務端的​

​Watcher​

​注冊與通知邏輯就複雜,其實也比較簡單,這裡簡單陳述以保内容完整:

​ZooKeeper​

​​的​

​Watcher​

​​機制是一個跨程序的釋出/訂閱功能,用戶端與服務端都需要儲存資料節點和​

​Watcher​

​​的關系,當節點的狀态資訊變更時就會觸發一些事件,服務端先從自己的記憶體中找出節點對應的​

​Watcher​

​​清單,然後一個個周遊生成事件通知消息,再遠端發送給用戶端;用戶端接收到對應消息後,解析出​

​Wather​

​​事件資訊,得知是哪個資料節點,觸發什麼事件類型,然後用戶端同樣從記憶體中找到節點對應的​

​Watcher​

​清單,真正觸發事件回調。

一、基礎類

ZooKeeper用戶端源碼(三)——Watcher注冊與通知

1、Watcher

使用者注冊​

​watcher​

​​都需要實作​

​Watcher​

​​接口,實作​

​process​

​方法。

org.apache.zookeeper.Watcher
public interface Watcher {
    void process(WatchedEvent event);
}      

2、WatchedEvent

​process(WatchedEvent event)​

​​的參數是​

​WatchedEvent​

​,定義事件資訊:

// org.apache.zookeeper.WatchedEvent
public class WatchedEvent {
    private final KeeperState keeperState;
    private final EventType eventType;
    private String path;
    ... ...
    /**
     *  将WatchedEvent轉換為可以通過網絡發送的類型
     *  Convert WatchedEvent to type that can be sent over network
     */
    public WatcherEvent getWrapper() {
        return new WatcherEvent(eventType.getIntValue(), keeperState.getIntValue(), path);
    }
}      

​WatchedEvent​

​有3個變量,通知狀态​

​keeperState​

​、節點事件類型​

​eventType​

​、節點​

​path​

​:

​keeperState​

​和​

​eventType​

​都是​

​Watcher​

​中的枚舉類。

(1)KeeperState

KeeperState 說明
Disconnected(0) 用戶端與服務端斷開連接配接
SyncConnected(3) 用戶端與服務端處于連接配接狀态
AuthFailed(4) 授權失敗
ConnectedReadOnly(5) 用戶端連接配接到隻讀伺服器。接收到這個狀态後,唯一允許的操作是讀取操作。這個狀态隻在隻讀用戶端産生,讀寫用戶端是不允許連接配接隻讀伺服器的
SaslAuthenticated(6) 用于通知用戶端他們已經通過了SaslAuthenticated,以後可以用sasl授權的權限執行Zookeeper動作
Expired(-112) 會話逾時
Closed(7) 用戶端已關閉。這個狀态永遠不會由伺服器生成,由用戶端本地生成。

(2)EventType

EventType 說明
None(-1) KeeperState為SyncConnected(3)時,表示用戶端與服務端成功建立會話
NodeCreated(1) 資料節點建立
NodeDeleted(2) 資料節點被删除
NodeDataChanged(3) 資料節點的狀态資訊更新,即使更新内容一樣,版本号,一樣會觸發
NodeChildrenChanged(4) 資料節點的孩子節點清單發生變更,特指子節點個數群組成情況的變更,即新增子節點或删除子節點,而子節點内容的變化是不會觸發這個事件的
DataWatchRemoved(5) 資料節點的watcher被主動移除
ChildWatchRemoved(6) 孩子節點的watcher被主動移除
PersistentWatchRemoved (7) 持久有效的watcher被主動移除

3、WatcherEvent

​WatcherEvent​

​是可以通過網絡發送的事件資訊封裝。

ZooKeeper用戶端源碼(三)——Watcher注冊與通知

​WatcherEvent​

​​和​

​WatchedEvent​

​​表示的是同一個事物,都是對一個​

​watcher​

​​事件資訊的封裝,不同的是,​

​WatchedEvent​

​​ 是一個邏輯事件,用于服務端和用戶端程式執行過程中所需的邏輯對象,而 ​

​WatcherEvent​

​ 因為實作了序列化接口,是以可以用于網絡傳輸:

  • ​serialize()​

    ​​,可以将​

    ​Watcher​

    ​​資訊序列化到網絡位元組流中,然後發送到網絡中。服務端遠端通知用戶端​

    ​watcher​

    ​時使用。
  • ​deserialize()​

    ​​,可以從網絡位元組流中反序列化出​

    ​Watcher​

    ​資訊。用戶端接收到服務端遠端通知消息時使用。

無論是​

​WatchedEvent​

​​還是​

​WatcherEvent​

​​,其對​

​watcher​

​事件資訊的封裝都是極其簡單的,用戶端無法直接從事件資訊中擷取對應資料節點的原始資料内容以及變更後的新資料内容,而是需要用戶端再次主動去擷取資料。

4、WatchRegistration

​WatchRegistration​

​是對​

​watcher​

​注冊方式的抽象:

ZooKeeper用戶端源碼(三)——Watcher注冊與通知

注冊的動作是一樣的,隻是需要注冊到不同的集合中,具體繼承類,需要實作方法​

​WatchRegistration#getWatches​

​,擷取相應集合,将​

​Watcher​

​加入節點​

​path​

​對應的清單中。

如下是抽象類​

​WatchRegistration​

​部分代碼:

protected abstract Map<String, Set<Watcher>> getWatches(int rc);

public void register(int rc) {
    if (rc == KeeperException.Code.OK.intValue()) {
        Map<String, Set<Watcher>> watches = getWatches(rc);
        synchronized (watches) {
            Set<Watcher> watchers = watches.get(clientPath);
            if (watchers == null) {
                watchers = new HashSet<Watcher>();
                watches.put(clientPath, watchers);
            }
            watchers.add(watcher);
        }
    }
}      

5、WatcherSetEventPair

使用者可能會對一個節點注冊多個​

​watcher​

​,服務端遠端觸發用戶端的​

​watcher​

​時,用戶端需要将該節點對應的所有​

​watcher​

​都觸發一次。

是以​

​WatcherSetEventPair​

​對​

​WatchedEvent​

​和​

​watchers​

​清單進行封裝,友善​

​EventThread​

​線程處理​

​watcher​

​觸發工作。

ZooKeeper用戶端源碼(三)——Watcher注冊與通知

6、ZKWatchManager

​ZKWatchManager​

​​作為用戶端​

​watcher​

​​管理器,實作了接口​

​ClientWatchManager​

​:

ZooKeeper用戶端源碼(三)——Watcher注冊與通知

​ZKWatchManager​

​​中用5個集合對應5種不同的​

​watcher​

​注冊場景:

  • ​dataWatches​

    ​​,在調用​

    ​getData​

    ​​、​

    ​getConfig​

    ​​時注冊了​

    ​watcher​

    ​​,會使用​

    ​dataWatches​

    ​​來存儲​

    ​watcher​

    ​。
  • ​existWatches​

    ​​,對應​

    ​exists​

    ​。
  • ​childWatches​

    ​​,對應​

    ​getChildren​

    ​。
  • ​persistentWatches​

    ​​,給定節點持續有效的​

    ​watcher​

    ​集合,觸發之後不會被移除。
  • ​persistentRecursiveWatches​

    ​​,給定節點及其遞歸所有子節點都持續有效的​

    ​watcher​

    ​集合,觸發之後不會被移除。
ZooKeeper用戶端源碼(三)——Watcher注冊與通知

之前網上一直說​

​Zookeeper​

​的觀察者注冊一次隻能觸發一次,觸發的同時會被移除,如果需要注冊一次,可多次有效觸發,用戶端使用起來比較麻煩。

是以官方彌補了這種場景,新加了​

​persistentWatches​

​​和​

​persistentRecursiveWatches​

​​兩種集合來存儲持續有效的​

​watcher​

​​,觸發之後不會被移除,如果要移除需要調用指定方法​

​ZKWatchManager#removeWatcher​

​​,如果想注冊持續有效的觀察者,也是需要單獨調用指定方法​

​ZooKeeper#addWatch​

​。

​ZKWatchManager​

​​實作了接口​

​ClientWatchManager​

​​,主要實作了​

​ClientWatchManager#materialize​

​​方法,擷取一個應該被觸發事件的​

​watcher​

​清單:

org.apache.zookeeper.ZKWatchManager#materialize
@Override
public Set<Watcher> materialize(
    Watcher.Event.KeeperState state,
    Watcher.Event.EventType type,
    String clientPath
) {
    final Set<Watcher> result = new HashSet<>();

    switch (type) {
    case None:
        // ... ...省略None情況,
        // 無類型事件,判斷 通知狀态KeeperState,如果KeeperState不是SyncConnected 就把所有的 watcher容器都清空
        // 根據 EventType 從不同的集合中擷取觀察者清單
        // dataWatches、existWatches、childWatches在擷取watcher清單時有移除操作
        // persistentWatches、persistentRecursiveWatches沒有移除操作
    case NodeDataChanged:
    case NodeCreated:
        synchronized (dataWatches) {
            addTo(dataWatches.remove(clientPath), result);
        }
        synchronized (existWatches) {
            addTo(existWatches.remove(clientPath), result);
        }
        addPersistentWatches(clientPath, result);
        break;
    case NodeChildrenChanged:
        synchronized (childWatches) {
            addTo(childWatches.remove(clientPath), result);
        }
        addPersistentWatches(clientPath, result);
        break;
    case NodeDeleted:
        synchronized (dataWatches) {
            addTo(dataWatches.remove(clientPath), result);
        }
        // TODO This shouldn't be needed, but just in case
        synchronized (existWatches) {
            Set<Watcher> list = existWatches.remove(clientPath);
            if (list != null) {
                addTo(list, result);
                LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
            }
        }
        synchronized (childWatches) {
            addTo(childWatches.remove(clientPath), result);
        }
        addPersistentWatches(clientPath, result);
        break;
    default:
        String errorMsg = String.format(
            "Unhandled watch event type %s with state %s on path %s",
            type,
            state,
            clientPath);
        LOG.error(errorMsg);
        throw new RuntimeException(errorMsg);
    }
    return result;
}

private void addPersistentWatches(String clientPath, Set<Watcher> result) {
    synchronized (persistentWatches) {
        addTo(persistentWatches.get(clientPath), result);
    }
    synchronized (persistentRecursiveWatches) {
        for (String path : PathParentIterator.forAll(clientPath).asIterable()) {
            addTo(persistentRecursiveWatches.get(path), result);
        }
    }
}      

由源碼可見,從 ​

​dataWatches​

​​、​

​existWatches​

​​、​

​childWatches​

​​ 集合中擷取​

​watcher​

​​清單時有移除操作,而從​

​persistentWatches​

​​、​

​persistentRecursiveWatches​

​擷取時沒有移除操作。

二、Watcher注冊流程

可以注冊​

​watcher​

​的請求都是非事務請求,比如:

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
public byte[] getData(final String path, Watcher watcher, Stat stat)
public Stat exists(final String path, Watcher watcher)
public List<String> getChildren(final String path, Watcher watcher, Stat stat)
... ...      

1、建構WatchRegistration

需要注冊的​

​Watcher​

​​會被封裝進一個​

​WatchRegistration​

​​對象中,​

​WatchRegistration​

​​抽象了注冊的方式,會和請求體等一并包裝進 ​

​Packet​

​。

需要注意,​

​Watcher​

​​注冊資訊不會發送給服務端,而是隻發送一個布爾值标注是否注冊​

​Watcher​

​​(​

​watch=true​

​​),這樣就減少了資料包的大小,降低了網絡壓力,同時也使得​

​Watcher​

​注冊流程簡單。

以​

​getData​

​為例:

ZooKeeper用戶端源碼(三)——Watcher注冊與通知

2、響應成功後注冊Watcher

需要注冊​

​Watcher​

​​的請求發給服務端後,用戶端并不會立刻在自己記憶體中存儲​

​Watcher​

​​關系,而是還需要根據請求的響應狀态,如果響應狀态OK,才會把​

​Watcher​

​​注冊到​

​ZKWatchManager​

​。

ZooKeeper用戶端源碼(三)——Watcher注冊與通知

如下圖是​

​Wacther​

​注冊流程:

ZooKeeper用戶端源碼(三)——Watcher注冊與通知

三、Watcher通知流程

1、處理事件通知資訊

資料節點的狀态資訊發生變更後,服務端找到該節點的​

​watcher​

​​清單,周遊生成事件通知資訊發送給用戶端。用戶端接收到事件通知資訊後,反解析出​

​WatcherEvent​

​​對象,又轉換成​

​WatchedEvent​

​​,再送出到​

​EventThread​

​線程處理。

如下是用戶端處理事件通知資訊​

​NOTIFICATION​

​的部分源碼:

ZooKeeper用戶端源碼(三)——Watcher注冊與通知

2、送出給EventThread線程

從事件通知資訊中解析出​

​WatchedEvent​

​​後,通過​

​WatchedEvent​

​​的三個屬性​

​keeperState​

​​、​

​eventType​

​​和 ​

​path​

​​從​

​ZKWatchManager​

​​中取出符合要求的​

​Watcher​

​​清單,然後将​

​WachedEvent​

​​對象和​

​Watcher​

​​清單封裝進 ​

​WatcherSetEventPair​

​​并添加到​

​waitingEvents​

​隊列。

ZooKeeper用戶端源碼(三)——Watcher注冊與通知

3、周遊waitingEvents隊列

ZooKeeper用戶端源碼(三)——Watcher注冊與通知

4、真正觸發Watcher#process

ZooKeeper用戶端源碼(三)——Watcher注冊與通知

如下圖是​

​Watcher​

​通知流程:

ZooKeeper用戶端源碼(三)——Watcher注冊與通知

四、總結

1、​

​Watcher​

​​注冊時,用戶端隻發送了一個布爾值給服務端聲明是否需要注冊​

​Watcher​

​​;隻有當服務端那邊​

​Wacther​

​​注冊成功了,且響應成功,用戶端這邊才會儲存​

​Watcher​

​和節點的關系。

2、​

​Wacther​

​通知時,隻能從通知資訊中得知是哪個節點發生什麼事件,而無法得知具體發生了什麼變更,要想得知必須再主動擷取一次節點資訊。

推薦閱讀:《從Paxos到Zookeeper:分布式一緻性原理與實踐》倪超著。