天天看点

ZooKeeper : Curator框架Watcher API介绍

ZooKeeper : Curator框架Watcher API介绍

在之前的博客中,博主已经介绍了​

​Curator​

​​框架的​

​Session​

​​、​

​Znode​

​​以及​

​ACL API​

​:

  • ​​ZooKeeper : Curator框架重试策略和Session API介绍​​
  • ​​ZooKeeper : Curator框架Znode、ACL API介绍​​

本篇博客将介绍​

​Curator​

​​框架的​

​Watcher API​

​​,它简化了​

​Java​

​​客户端原生​

​Watcher API​

​的使用,但了解后者可以更好地理解前者的实现:

  • ​​ZooKeeper :Java客户端Watcher API介绍​​

博主使用的​

​Curator​

​​框架版本是​

​5.2.0​

​​,​

​ZooKeeper​

​​版本是​

​3.6.3​

​。

<dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.2.0</version>
        </dependency>      
ZooKeeper : Curator框架Watcher API介绍

​5.2.0​

​​版本的​

​Curator​

​​使用​

​3.6.3​

​​版本的​

​ZooKeeper​

​,因此它们是兼容的。

ZooKeeper : Curator框架Watcher API介绍

Watcher

​Watcher​

​​是​

​ZooKeeper​

​​中非常重要的特性, ​

​ZooKeeper​

​​上创建的节点,可以对这些节点绑定监听事件,比如可以监听节点数据变更、节点删除、子节点状态变更等事件,通过这个事件机制,可以基于​

​ZooKeeper​

​实现分布式锁、集群管理等功能。

​Watcher​

​​特性:比如当节点数据发生变化的时候,​

​ZooKeeper​

​​会产生一个​

​Watcher​

​​事件,并且会发送到客户端,客户端收到监听的节点事件后,就可以进行相应的业务处理了。​

​ZooKeeper​

​​的​

​Watcher​

​​机制,可以分为三个过程:客户端注册​

​Watcher​

​​、服务端处理​

​Watcher​

​和客户端回调。

一次性Watcher

在​

​Java​

​​客户端原生​

​API​

​​中,调用​

​getData​

​​、​

​getChildren​

​​以及​

​exists​

​​这三个方法都可以在节点上留下一次性​

​Watcher​

​​,而这些一次性​

​Watcher​

​​的类型分别是​

​Data​

​​、​

​Children​

​​和​

​Data​

​​,​

​Watcher​

​​类型在​

​WatcherType​

​枚举类中定义。

@Public
    public static enum WatcherType {
        Children(1),
        Data(2),
        Any(3);

        private final int intValue;

        private WatcherType(int intValue) {
            this.intValue = intValue;
        }

        public int getIntValue() {
            return this.intValue;
        }

        public static Watcher.WatcherType fromInt(int intValue) {
            switch(intValue) {
            case 1:
                return Children;
            case 2:
                return Data;
            case 3:
                return Any;
            default:
                throw new RuntimeException("Invalid integer value for conversion to WatcherType");
            }
        }
    }      

​Java​

​​客户端原生​

​API​

​​中​

​getData​

​​、​

​getChildren​

​​以及​

​exists​

​​三个方法的一次性​

​Watcher​

​触发条件:

  • ​getData​

    ​​:一次性​

    ​Watcher​

    ​将由在节点上设置数据或删除节点的成功操作触发。
  • ​exists​

    ​​:一次性​

    ​Watcher​

    ​将由创建、删除节点或在节点上设置数据的成功操作触发。
  • ​getChildren​

    ​:删除给定路径的节点或在节点下创建、删除子节点的成功操作将触发一次性​

    ​Watcher​

    ​。

测试类​

​Application​

​​(实现了​

​CuratorWatcher​

​​接口,因此​

​Application​

​​的实例有​

​Watcher​

​的功能 ):

package com.kaven.zookeeper;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;

/**
 * @Author: ITKaven
 * @Date: 2021/11/20 10:30
 * @Leetcode: https://leetcode-cn.com/u/kavenit
 * @Notes:
 */

public class Application implements CuratorWatcher {

    private static final String SERVER_PROXY = "192.168.31.172:9000";
    private static final int CONNECTION_TIMEOUT_MS = 40000;
    private static final int SESSION_TIMEOUT_MS = 10000;
    private static final String NAMESPACE = "MyNamespace";

    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework curator = CuratorFrameworkFactory.builder()
                .connectString(SERVER_PROXY)
                .retryPolicy(retryPolicy)
                .connectionTimeoutMs(CONNECTION_TIMEOUT_MS)
                .sessionTimeoutMs(SESSION_TIMEOUT_MS)
                .namespace(NAMESPACE)
                .build();

        curator.start();
        assert curator.getState().equals(CuratorFrameworkState.STARTED);

        Application watcher = new Application();
        curator.checkExists()
                .usingWatcher(watcher)
                .forPath("/father/son/grandson1");

        curator.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL)
                .forPath("/father/son/grandson1", "data".getBytes());

        curator.setData()
                .forPath("/father/son/grandson1", "new data".getBytes());
                
        Thread.sleep(10000000);
    }

    @Override
    public void process(WatchedEvent watchedEvent) throws Exception {
        System.out.println(watchedEvent.getPath());
        System.out.println(watchedEvent.getType());
    }
}      

输出:

/father/son/grandson1
NodeCreated      

​Curator​

​​框架的​

​checkExists​

​​方法对应于​

​Java​

​​客户端的​

​exists​

​​方法。对节点进行了两次操作(创建节点和修改节点数据),但​

​Watcher​

​​只触发了一次。如果想要多次监听节点事件就需要多次添加这种一次性​

​Watcher​

​:

Application watcher = new Application();
        curator.checkExists()
                .usingWatcher(watcher)
                .forPath("/father/son/grandson1");

        curator.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL)
                .forPath("/father/son/grandson1", "data".getBytes());

        curator.checkExists()
                .usingWatcher(watcher)
                .forPath("/father/son/grandson1");

        curator.setData()
                .forPath("/father/son/grandson1", "new data".getBytes());

        curator.checkExists()
                .usingWatcher(watcher)
                .forPath("/father/son/grandson1");

        curator.delete()
                .forPath("/father/son/grandson1");      

输出:

/father/son/grandson1
NodeCreated
/father/son/grandson1
NodeDataChanged
/father/son/grandson1
NodeDeleted      

很显然符合预期,想要多次监听事件,就必须多次添加一次性​

​Watcher​

​​,而之前的程序由于没有再次添加一次性​

​Watcher​

​​,节点的​

​NodeDataChanged​

​​事件就不能监听,因为一次性​

​Watcher​

​​在节点的​

​NodeCreated​

​​事件触发后就被​

​Zookeeper​

​​服务端移除了。​

​Curator​

​​框架的​

​getData​

​​方法对应于​

​Java​

​​客户端的​

​getData​

​方法,它们都只能在节点存在的情况下给节点留下一次性​

​Watcher​

​​,因此它们留下的一次性​

​Watcher​

​​肯定不能监听该节点的创建事件,其他事件的监听与​

​checkExists​

​​方法的一次性​

​Watcher​

​类似,这里不再赘述。

Application watcher = new Application();

        curator.getData()
                .usingWatcher(watcher)
                .forPath("/father/son/grandson1");

        curator.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL)
                .forPath("/father/son/grandson1", "data".getBytes());      
ZooKeeper : Curator框架Watcher API介绍

​Curator​

​​框架的​

​getChildren​

​​方法对应于​

​Java​

​​客户端的​

​getChildren​

​​方法,给指定节点留下一次性​

​Watcher​

​​的方式也是类似的,都是通过​

​usingWatcher​

​方法,删除给定路径的节点或在节点下创建、删除子节点的成功操作将触发这个一次性​

​Watcher​

​(临时节点不能创建子节点)。

curator.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath("/father/son/grandson1", "data".getBytes());

        Application watcher = new Application();
        curator.getChildren()
                .usingWatcher(watcher)
                .forPath("/father/son/grandson1");

        curator.create()
                .withMode(CreateMode.EPHEMERAL)
                .forPath("/father/son/grandson1/test", "test".getBytes());

        curator.getChildren()
                .usingWatcher(watcher)
                .forPath("/father/son/grandson1");

        curator.delete()
                .forPath("/father/son/grandson1/test");

        curator.getChildren()
                .usingWatcher(watcher)
                .forPath("/father/son/grandson1");

        curator.delete()
                .forPath("/father/son/grandson1");      

输出:

/father/son/grandson1
NodeChildrenChanged
/father/son/grandson1
NodeChildrenChanged
/father/son/grandson1
NodeDeleted      

在​

​Curator​

​​框架中可以使用两种​

​Watcher​

​​,​

​Java​

​​客户端提供的​

​Watcher​

​​接口和​

​Curator​

​​框架提供的​

​CuratorWatcher​

​接口。而它们的方法定义是类似的:

package org.apache.curator.framework.api;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;

public interface CuratorWatcher
{
    public void process(WatchedEvent event) throws Exception;
}      
package org.apache.zookeeper;

import org.apache.yetus.audience.InterfaceAudience.Public;

@Public
public interface Watcher {
    void process(WatchedEvent var1);
    ...
}      

在​

​Curator​

​​框架中这两种​

​Watcher​

​的实现都可以使用。

T usingWatcher(Watcher watcher);
    T usingWatcher(CuratorWatcher watcher);      

当需要一直监听指定节点的事件时,一次性​

​Watcher​

​​就不太方便了,这时就需要持久​

​Watcher​

​。

持久Watcher

在​

​Java​

​​客户端原生​

​API​

​​中,调用​

​addWatch​

​​方法可以在节点上添加持久​

​Watcher​

​​(​

​PERSISTENT​

​​和​

​PERSISTENT_RECURSIVE​

​​),并且这些​

​Watcher​

​​的类型是​

​Any​

​​。在​

​Curator​

​​框架中,可以如下所示给节点添加持久​

​Watcher​

​​(通过​

​.watchers().add()​

​链式调用):

curator.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath("/father/son/grandson1", "data".getBytes());

        Application watcher = new Application();
        curator.watchers()
                .add()
                .withMode(AddWatchMode.PERSISTENT)
                .usingWatcher(watcher)
                .forPath("/father/son/grandson1");

        curator.create()
                .withMode(CreateMode.EPHEMERAL)
                .forPath("/father/son/grandson1/test", "test".getBytes());

        curator.setData()
                .forPath("/father/son/grandson1/test", "new test".getBytes());      

输出:

/father/son/grandson1
NodeChildrenChanged      

​AddWatchMode.PERSISTENT​

​​类型的​

​Watcher​

​​只会监听注册节点的相关事件(节点数据更改​

​NodeDataChanged​

​​、子节点列表更改​

​NodeChildrenChanged​

​​以及节点删除​

​NodeDeleted​

​​等),而不会监听注册节点的子节点的相关事件(不会引起子节点列表更改的事件)。如果将​

​AddWatchMode.PERSISTENT​

​​换成​

​AddWatchMode.PERSISTENT_RECURSIVE​

​:

curator.watchers()
                .add()
                .withMode(AddWatchMode.PERSISTENT_RECURSIVE)
                .usingWatcher(watcher)
                .forPath("/father/son/grandson1");      

输出:

/father/son/grandson1/test
NodeCreated
/father/son/grandson1/test
NodeDataChanged      

​AddWatchMode.PERSISTENT_RECURSIVE​

​​类型的​

​Watcher​

​​不仅监听注册节点的相关事件(节点数据更改​

​NodeDataChanged​

​​和节点删除​

​NodeDeleted​

​​等,而子节点列表更改​

​NodeChildrenChanged​

​​其实就是子节点的节点创建​

​NodeCreated​

​​和节点删除​

​NodeDeleted​

​,应该触发的是子节点的事件监听),还会递归地监听注册节点的所有子节点的相关事件。

​AddWatchMode​

​枚举类:

ZooKeeper : Curator框架Watcher API介绍

删除Watcher

删除类型为​

​Any​

​​的​

​Watcher​

​​,也会一起删除类型为​

​Children​

​​和​

​Data​

​​的​

​Watcher​

​​,而删除类型为​

​Children​

​​或​

​Data​

​​的​

​Watcher​

​​,只能删除对应类型的​

​Watcher​

​。

删除指定节点类型为​

​Data​

​​的所有​

​Watcher​

​:

Application watcher1 = new Application();
        Application watcher2 = new Application();

        curator.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath("/father/son/grandson1", "data".getBytes());

        curator.getChildren()
                .usingWatcher(watcher1)
                .forPath("/father/son/grandson1");

        curator.getData()
                .usingWatcher(watcher2)
                .forPath("/father/son/grandson1");

        curator.watchers()
                .removeAll()
                .ofType(Watcher.WatcherType.Data)
                .forPath("/father/son/grandson1");      
@Override
    public void process(WatchedEvent watchedEvent) throws Exception {
        System.out.println(watchedEvent.getPath());
        System.out.println(watchedEvent.getType());
        System.out.println(this);
    }      

输出:

/father/son/grandson1
DataWatchRemoved
com.kaven.zookeeper.Application@5af5ec05      

删除指定节点类型为​

​Any​

​​的所有​

​Watcher​

​:

curator.watchers()
                .removeAll()
                .ofType(Watcher.WatcherType.Any)
                .forPath("/father/son/grandson1");      

输出:

/father/son/grandson1
ChildWatchRemoved
com.kaven.zookeeper.Application@61613ed9
/father/son/grandson1
DataWatchRemoved
com.kaven.zookeeper.Application@5af5ec05      

删除指定节点类型为​

​Any​

​​的指定​

​Watcher​

​:

curator.watchers()
                .remove(watcher1)
                .ofType(Watcher.WatcherType.Any)
                .forPath("/father/son/grandson1");      
/father/son/grandson1
ChildWatchRemoved
com.kaven.zookeeper.Application@5ebb066a