天天看點

ZooKeeper : Curator架構Watcher API介紹

ZooKeeper : Curator架構Watcher API介紹

在之前的部落格中,部落客已經介紹了​

​Curator​

​​架構的​

​Session​

​​、​

​Znode​

​​以及​

​ACL API​

​:

  • ​​ZooKeeper : Curator架構重試政策和Session API介紹​​
  • ​​ZooKeeper : Curator架構Znode、ACL API介紹​​

本篇部落格将介紹​

​Curator​

​​架構的​

​Watcher API​

​​,它簡化了​

​Java​

​​用戶端原生​

​Watcher API​

​的使用,但了解後者可以更好地了解前者的實作:

  • ​​ZooKeeper :Java用戶端Watcher API介紹​​

部落客使用的​

​Curator​

​​架構版本是​

​5.2.0​

​​,​

​ZooKeeper​

​​版本是​

​3.6.3​

​。

<dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.2.0</version>
        </dependency>      
ZooKeeper : Curator架構Watcher API介紹

​5.2.0​

​​版本的​

​Curator​

​​使用​

​3.6.3​

​​版本的​

​ZooKeeper​

​,是以它們是相容的。

ZooKeeper : Curator架構Watcher API介紹

Watcher

​Watcher​

​​是​

​ZooKeeper​

​​中非常重要的特性, ​

​ZooKeeper​

​​上建立的節點,可以對這些節點綁定監聽事件,比如可以監聽節點資料變更、節點删除、子節點狀态變更等事件,通過這個事件機制,可以基于​

​ZooKeeper​

​實作分布式鎖、叢集管理等功能。

​Watcher​

​​特性:比如當節點資料發生變化的時候,​

​ZooKeeper​

​​會産生一個​

​Watcher​

​​事件,并且會發送到用戶端,用戶端收到監聽的節點事件後,就可以進行相應的業務處理了。​

​ZooKeeper​

​​的​

​Watcher​

​​機制,可以分為三個過程:用戶端注冊​

​Watcher​

​​、服務端處理​

​Watcher​

​和用戶端回調。

一次性Watcher

在​

​Java​

​​用戶端原生​

​API​

​​中,調用​

​getData​

​​、​

​getChildren​

​​以及​

​exists​

​​這三個方法都可以在節點上留下一次性​

​Watcher​

​​,而這些一次性​

​Watcher​

​​的類型分别是​

​Data​

​​、​

​Children​

​​和​

​Data​

​​,​

​Watcher​

​​類型在​

​WatcherType​

​枚舉類中定義。

@Public
    public static enum WatcherType {
        Children(1),
        Data(2),
        Any(3);

        private final int intValue;

        private WatcherType(int intValue) {
            this.intValue = intValue;
        }

        public int getIntValue() {
            return this.intValue;
        }

        public static Watcher.WatcherType fromInt(int intValue) {
            switch(intValue) {
            case 1:
                return Children;
            case 2:
                return Data;
            case 3:
                return Any;
            default:
                throw new RuntimeException("Invalid integer value for conversion to WatcherType");
            }
        }
    }      

​Java​

​​用戶端原生​

​API​

​​中​

​getData​

​​、​

​getChildren​

​​以及​

​exists​

​​三個方法的一次性​

​Watcher​

​觸發條件:

  • ​getData​

    ​​:一次性​

    ​Watcher​

    ​将由在節點上設定資料或删除節點的成功操作觸發。
  • ​exists​

    ​​:一次性​

    ​Watcher​

    ​将由建立、删除節點或在節點上設定資料的成功操作觸發。
  • ​getChildren​

    ​:删除給定路徑的節點或在節點下建立、删除子節點的成功操作将觸發一次性​

    ​Watcher​

    ​。

測試類​

​Application​

​​(實作了​

​CuratorWatcher​

​​接口,是以​

​Application​

​​的執行個體有​

​Watcher​

​的功能 ):

package com.kaven.zookeeper;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;

/**
 * @Author: ITKaven
 * @Date: 2021/11/20 10:30
 * @Leetcode: https://leetcode-cn.com/u/kavenit
 * @Notes:
 */

public class Application implements CuratorWatcher {

    private static final String SERVER_PROXY = "192.168.31.172:9000";
    private static final int CONNECTION_TIMEOUT_MS = 40000;
    private static final int SESSION_TIMEOUT_MS = 10000;
    private static final String NAMESPACE = "MyNamespace";

    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework curator = CuratorFrameworkFactory.builder()
                .connectString(SERVER_PROXY)
                .retryPolicy(retryPolicy)
                .connectionTimeoutMs(CONNECTION_TIMEOUT_MS)
                .sessionTimeoutMs(SESSION_TIMEOUT_MS)
                .namespace(NAMESPACE)
                .build();

        curator.start();
        assert curator.getState().equals(CuratorFrameworkState.STARTED);

        Application watcher = new Application();
        curator.checkExists()
                .usingWatcher(watcher)
                .forPath("/father/son/grandson1");

        curator.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL)
                .forPath("/father/son/grandson1", "data".getBytes());

        curator.setData()
                .forPath("/father/son/grandson1", "new data".getBytes());
                
        Thread.sleep(10000000);
    }

    @Override
    public void process(WatchedEvent watchedEvent) throws Exception {
        System.out.println(watchedEvent.getPath());
        System.out.println(watchedEvent.getType());
    }
}      

輸出:

/father/son/grandson1
NodeCreated      

​Curator​

​​架構的​

​checkExists​

​​方法對應于​

​Java​

​​用戶端的​

​exists​

​​方法。對節點進行了兩次操作(建立節點和修改節點資料),但​

​Watcher​

​​隻觸發了一次。如果想要多次監聽節點事件就需要多次添加這種一次性​

​Watcher​

​:

Application watcher = new Application();
        curator.checkExists()
                .usingWatcher(watcher)
                .forPath("/father/son/grandson1");

        curator.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL)
                .forPath("/father/son/grandson1", "data".getBytes());

        curator.checkExists()
                .usingWatcher(watcher)
                .forPath("/father/son/grandson1");

        curator.setData()
                .forPath("/father/son/grandson1", "new data".getBytes());

        curator.checkExists()
                .usingWatcher(watcher)
                .forPath("/father/son/grandson1");

        curator.delete()
                .forPath("/father/son/grandson1");      

輸出:

/father/son/grandson1
NodeCreated
/father/son/grandson1
NodeDataChanged
/father/son/grandson1
NodeDeleted      

很顯然符合預期,想要多次監聽事件,就必須多次添加一次性​

​Watcher​

​​,而之前的程式由于沒有再次添加一次性​

​Watcher​

​​,節點的​

​NodeDataChanged​

​​事件就不能監聽,因為一次性​

​Watcher​

​​在節點的​

​NodeCreated​

​​事件觸發後就被​

​Zookeeper​

​​服務端移除了。​

​Curator​

​​架構的​

​getData​

​​方法對應于​

​Java​

​​用戶端的​

​getData​

​方法,它們都隻能在節點存在的情況下給節點留下一次性​

​Watcher​

​​,是以它們留下的一次性​

​Watcher​

​​肯定不能監聽該節點的建立事件,其他事件的監聽與​

​checkExists​

​​方法的一次性​

​Watcher​

​類似,這裡不再贅述。

Application watcher = new Application();

        curator.getData()
                .usingWatcher(watcher)
                .forPath("/father/son/grandson1");

        curator.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL)
                .forPath("/father/son/grandson1", "data".getBytes());      
ZooKeeper : Curator架構Watcher API介紹

​Curator​

​​架構的​

​getChildren​

​​方法對應于​

​Java​

​​用戶端的​

​getChildren​

​​方法,給指定節點留下一次性​

​Watcher​

​​的方式也是類似的,都是通過​

​usingWatcher​

​方法,删除給定路徑的節點或在節點下建立、删除子節點的成功操作将觸發這個一次性​

​Watcher​

​(臨時節點不能建立子節點)。

curator.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath("/father/son/grandson1", "data".getBytes());

        Application watcher = new Application();
        curator.getChildren()
                .usingWatcher(watcher)
                .forPath("/father/son/grandson1");

        curator.create()
                .withMode(CreateMode.EPHEMERAL)
                .forPath("/father/son/grandson1/test", "test".getBytes());

        curator.getChildren()
                .usingWatcher(watcher)
                .forPath("/father/son/grandson1");

        curator.delete()
                .forPath("/father/son/grandson1/test");

        curator.getChildren()
                .usingWatcher(watcher)
                .forPath("/father/son/grandson1");

        curator.delete()
                .forPath("/father/son/grandson1");      

輸出:

/father/son/grandson1
NodeChildrenChanged
/father/son/grandson1
NodeChildrenChanged
/father/son/grandson1
NodeDeleted      

在​

​Curator​

​​架構中可以使用兩種​

​Watcher​

​​,​

​Java​

​​用戶端提供的​

​Watcher​

​​接口和​

​Curator​

​​架構提供的​

​CuratorWatcher​

​接口。而它們的方法定義是類似的:

package org.apache.curator.framework.api;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;

public interface CuratorWatcher
{
    public void process(WatchedEvent event) throws Exception;
}      
package org.apache.zookeeper;

import org.apache.yetus.audience.InterfaceAudience.Public;

@Public
public interface Watcher {
    void process(WatchedEvent var1);
    ...
}      

在​

​Curator​

​​架構中這兩種​

​Watcher​

​的實作都可以使用。

T usingWatcher(Watcher watcher);
    T usingWatcher(CuratorWatcher watcher);      

當需要一直監聽指定節點的事件時,一次性​

​Watcher​

​​就不太友善了,這時就需要持久​

​Watcher​

​。

持久Watcher

在​

​Java​

​​用戶端原生​

​API​

​​中,調用​

​addWatch​

​​方法可以在節點上添加持久​

​Watcher​

​​(​

​PERSISTENT​

​​和​

​PERSISTENT_RECURSIVE​

​​),并且這些​

​Watcher​

​​的類型是​

​Any​

​​。在​

​Curator​

​​架構中,可以如下所示給節點添加持久​

​Watcher​

​​(通過​

​.watchers().add()​

​鍊式調用):

curator.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath("/father/son/grandson1", "data".getBytes());

        Application watcher = new Application();
        curator.watchers()
                .add()
                .withMode(AddWatchMode.PERSISTENT)
                .usingWatcher(watcher)
                .forPath("/father/son/grandson1");

        curator.create()
                .withMode(CreateMode.EPHEMERAL)
                .forPath("/father/son/grandson1/test", "test".getBytes());

        curator.setData()
                .forPath("/father/son/grandson1/test", "new test".getBytes());      

輸出:

/father/son/grandson1
NodeChildrenChanged      

​AddWatchMode.PERSISTENT​

​​類型的​

​Watcher​

​​隻會監聽注冊節點的相關事件(節點資料更改​

​NodeDataChanged​

​​、子節點清單更改​

​NodeChildrenChanged​

​​以及節點删除​

​NodeDeleted​

​​等),而不會監聽注冊節點的子節點的相關事件(不會引起子節點清單更改的事件)。如果将​

​AddWatchMode.PERSISTENT​

​​換成​

​AddWatchMode.PERSISTENT_RECURSIVE​

​:

curator.watchers()
                .add()
                .withMode(AddWatchMode.PERSISTENT_RECURSIVE)
                .usingWatcher(watcher)
                .forPath("/father/son/grandson1");      

輸出:

/father/son/grandson1/test
NodeCreated
/father/son/grandson1/test
NodeDataChanged      

​AddWatchMode.PERSISTENT_RECURSIVE​

​​類型的​

​Watcher​

​​不僅監聽注冊節點的相關事件(節點資料更改​

​NodeDataChanged​

​​和節點删除​

​NodeDeleted​

​​等,而子節點清單更改​

​NodeChildrenChanged​

​​其實就是子節點的節點建立​

​NodeCreated​

​​和節點删除​

​NodeDeleted​

​,應該觸發的是子節點的事件監聽),還會遞歸地監聽注冊節點的所有子節點的相關事件。

​AddWatchMode​

​枚舉類:

ZooKeeper : Curator架構Watcher API介紹

删除Watcher

删除類型為​

​Any​

​​的​

​Watcher​

​​,也會一起删除類型為​

​Children​

​​和​

​Data​

​​的​

​Watcher​

​​,而删除類型為​

​Children​

​​或​

​Data​

​​的​

​Watcher​

​​,隻能删除對應類型的​

​Watcher​

​。

删除指定節點類型為​

​Data​

​​的所有​

​Watcher​

​:

Application watcher1 = new Application();
        Application watcher2 = new Application();

        curator.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath("/father/son/grandson1", "data".getBytes());

        curator.getChildren()
                .usingWatcher(watcher1)
                .forPath("/father/son/grandson1");

        curator.getData()
                .usingWatcher(watcher2)
                .forPath("/father/son/grandson1");

        curator.watchers()
                .removeAll()
                .ofType(Watcher.WatcherType.Data)
                .forPath("/father/son/grandson1");      
@Override
    public void process(WatchedEvent watchedEvent) throws Exception {
        System.out.println(watchedEvent.getPath());
        System.out.println(watchedEvent.getType());
        System.out.println(this);
    }      

輸出:

/father/son/grandson1
DataWatchRemoved
com.kaven.zookeeper.Application@5af5ec05      

删除指定節點類型為​

​Any​

​​的所有​

​Watcher​

​:

curator.watchers()
                .removeAll()
                .ofType(Watcher.WatcherType.Any)
                .forPath("/father/son/grandson1");      

輸出:

/father/son/grandson1
ChildWatchRemoved
com.kaven.zookeeper.Application@61613ed9
/father/son/grandson1
DataWatchRemoved
com.kaven.zookeeper.Application@5af5ec05      

删除指定節點類型為​

​Any​

​​的指定​

​Watcher​

​:

curator.watchers()
                .remove(watcher1)
                .ofType(Watcher.WatcherType.Any)
                .forPath("/father/son/grandson1");      
/father/son/grandson1
ChildWatchRemoved
com.kaven.zookeeper.Application@5ebb066a