天天看点

ZooKeeper : Curator框架之共享计数器SharedCount

SharedCount

​SharedCount​

​类的源码注释:

Manages a shared integer. All clients watching the same path will have the up-to-date value of the shared integer (considering ZK’s normal consistency guarantees).

管理一个共享整数。查看同一路径的所有客户端都将拥有共享整数的最新值(考虑到​

​ZK​

​的正常一致性保证)。

​SharedCount​

​类的源码:

public class SharedCount implements Closeable, SharedCountReader, Listenable<SharedCountListener>
{
    // 存储监听器
    private final Map<SharedCountListener, SharedValueListener> listeners = Maps.newConcurrentMap();
    // SharedValue用于管理一个共享值,SharedCount将共享整数的维护委托给该SharedValue实例
    private final SharedValue           sharedValue;

    /**
     * client: CuratorFramework实例,用于与Zookeeper进行交互
     * path: 共享计数器的路径,即存储共享整数的位置
     * seedValue: 如果共享计数器的路径尚未创建,则设置该值为共享整数的初始值
     */
    public SharedCount(CuratorFramework client, String path, int seedValue)
    {
        // 初始化SharedValue实例
        sharedValue = new SharedValue(client, path, toBytes(seedValue));
    }

    protected SharedCount(CuratorFramework client, String path, SharedValue sv)
    {
        sharedValue = sv;
    }

    // 获取共享整数的值
    @Override
    public int getCount()
    {
        return fromBytes(sharedValue.getValue());
    }

    // 获取共享整数的版本和值,封装在VersionedValue实例中
    @Override
    public VersionedValue<Integer> getVersionedValue()
    {
        VersionedValue<byte[]> localValue = sharedValue.getVersionedValue();
        return new VersionedValue<Integer>(localValue.getVersion(), fromBytes(localValue.getValue()));
    }

    /**
     * 更改共享整数的值,而不管其先前的状态如何
     */
    public void     setCount(int newCount) throws Exception
    {
        sharedValue.setValue(toBytes(newCount));
    }
    
    @Deprecated
    public boolean  trySetCount(int newCount) throws Exception
    {
        return sharedValue.trySetValue(toBytes(newCount));
    }

    /**
     * 仅当共享整数的版本和指定的版本一致时,才更改共享整数的值
     */
    public boolean  trySetCount(VersionedValue<Integer> previous, int newCount) throws Exception
    {
        VersionedValue<byte[]> previousCopy = new VersionedValue<byte[]>(previous.getVersion(), toBytes(previous.getValue()));
        return sharedValue.trySetValue(previousCopy, toBytes(newCount));
    }

    // 添加监听器
    @Override
    public void     addListener(SharedCountListener listener)
    {
        addListener(listener, MoreExecutors.directExecutor());
    }

    // 添加监听器与执行该监听器的Executor实例
    @Override
    public void     addListener(final SharedCountListener listener, Executor executor)
    {
        // 使用匿名内部类创建SharedValueListener实例
        SharedValueListener     valueListener = new SharedValueListener()
        {
            @Override
            public void valueHasChanged(SharedValueReader sharedValue, byte[] newValue) throws Exception
            {
                // 通过SharedValueListener实例来通知listener共享整数的值改变
                listener.countHasChanged(SharedCount.this, fromBytes(newValue));
            }

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState)
            {
                // 通过SharedValueListener实例来通知listener共享整数的状态改变
                listener.stateChanged(client, newState);
            }
        };
        // 将SharedValueListener实例和Executor实例添加到sharedValue实例中
        sharedValue.getListenable().addListener(valueListener, executor);
        // 将listener和SharedValueListener实例的映射存储到listeners属性中
        listeners.put(listener, valueListener);
    }

    // 移除监听器
    @Override
    public void     removeListener(SharedCountListener listener)
    {
        // 删除listeners属性中以该监听器为key的键值对
        // 如果listeners属性存在以该监听器为key的键值对
        // 则会获得对应的SharedValueListener实例
        SharedValueListener valueListener = listeners.remove(listener);
        if(valueListener != null) {
            // 如果listeners属性存在以该监听器为key的键值对
            // 则删除sharedValue实例中对应的SharedValueListener实例
            sharedValue.getListenable().removeListener(valueListener);
        }
    }

    /**
     * 必须先启动共享计数器,然后才能使用它
     */
    public void     start() throws Exception
    {
        sharedValue.start();
    }

    // 使用完共享计数器后需要调用close()进行关闭
    @Override
    public void close() throws IOException
    {
        sharedValue.close();
    }

    // 将整数转换成字节数组
    @VisibleForTesting
    static byte[]   toBytes(int value)
    {
        byte[]      bytes = new byte[4];
        ByteBuffer.wrap(bytes).putInt(value);
        return bytes;
    }

    // 将字节数组转换成整数
    private static int      fromBytes(byte[] bytes)
    {
        return ByteBuffer.wrap(bytes).getInt();
    }
}      

​SharedCount​

​​将共享整数的维护委托给了​

​SharedValue​

​​,​

​SharedValue​

​的源码以后再介绍。

测试

​pom.xml​

​:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.kaven</groupId>
    <artifactId>zookeeper</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
        </dependency>
    </dependencies>
</project>      

​CuratorFrameworkProperties​

​​类(提供​

​CuratorFramework​

​​需要的一些配置信息,以及创建​

​CuratorFramework​

​实例的方法):

package com.kaven.zookeeper;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorFrameworkProperties {
    // 连接地址
    public static final String CONNECT_ADDRESS = "192.168.1.3:9000";
    // 连接超时时间
    public static final int CONNECTION_TIMEOUT_MS = 40000;
    // Session超时时间
    public static final int SESSION_TIMEOUT_MS = 10000;
    // 命名空间
    public static final String NAMESPACE = "MyNamespace";
    // 重试策略
    public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 3);

    public static CuratorFramework getCuratorFramework() {
        // 创建CuratorFramework实例
        CuratorFramework curator = CuratorFrameworkFactory.builder()
                .connectString(CuratorFrameworkProperties.CONNECT_ADDRESS)
                .retryPolicy(CuratorFrameworkProperties.RETRY_POLICY)
                .connectionTimeoutMs(CuratorFrameworkProperties.CONNECTION_TIMEOUT_MS)
                .sessionTimeoutMs(CuratorFrameworkProperties.SESSION_TIMEOUT_MS)
                .namespace(CuratorFrameworkProperties.NAMESPACE)
                .build();
        curator.start();
        assert curator.getState().equals(CuratorFrameworkState.STARTED);
        return curator;
    }
}      

​SharedCountRunnable​

​​类(实现了​

​Runnable​

​接口,模拟分布式节点使用共享计数器):

package com.kaven.zookeeper;

import lombok.SneakyThrows;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.shared.SharedCount;

import java.util.Random;

public class SharedCountRunnable implements Runnable{
    @SneakyThrows
    @Override
    public void run() {
        // 使用不同的CuratorFramework实例,表示不同的分布式节点
        CuratorFramework curator = CuratorFrameworkProperties.getCuratorFramework();

        // 模拟随机加入的分布式节点
        int randomSleep = new Random().nextInt(10000);
        Thread.sleep(randomSleep);

        // 共享计数器的路径
        String path = "/kaven";
        int seedValue = 0;

        // 创建SharedCount实例,用于提供共享计数器功能
        // 如果共享计数器的路径尚未创建,则使用seedValue的值作为共享整数的初始值
        SharedCount sharedCount = new SharedCount(curator, path, seedValue);

        // 必须先启动共享计数器,然后才能使用它
        sharedCount.start();

        // 使用随机数来设置共享整数的值
        int randomValue = new Random().nextInt(1000);
        sharedCount.setCount(randomValue);
        System.out.println(Thread.currentThread().getName() + " 修改共享计数器的值为 " + randomValue);
        Thread.sleep(randomSleep);
        System.out.println(
                "***     " + Thread.currentThread().getName() +
                " 获取共享计数器的值为 " + sharedCount.getCount());

        // 使用完共享计数器后需要调用close()进行关闭
        sharedCount.close();
    }
}      

启动类:

package com.kaven.zookeeper;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Application {
    private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();

    public static void main(String[] args) throws Exception {
        // 分布式节点处理业务
        for (int i = 0; i < 15; i++) {
            EXECUTOR_SERVICE.execute(new SharedCountRunnable());
        }
    }
}      

模拟​

​15​

​个分布式节点使用共享计数器,输出如下所示:

pool-1-thread-6 修改共享计数器的值为 525
pool-1-thread-2 修改共享计数器的值为 379
pool-1-thread-12 修改共享计数器的值为 60
pool-1-thread-4 修改共享计数器的值为 387
pool-1-thread-5 修改共享计数器的值为 821
pool-1-thread-15 修改共享计数器的值为 273
pool-1-thread-9 修改共享计数器的值为 410
***     pool-1-thread-2 获取共享计数器的值为 410
pool-1-thread-8 修改共享计数器的值为 897
pool-1-thread-1 修改共享计数器的值为 871
pool-1-thread-10 修改共享计数器的值为 803
***     pool-1-thread-12 获取共享计数器的值为 803
pool-1-thread-13 修改共享计数器的值为 210
***     pool-1-thread-6 获取共享计数器的值为 210
***     pool-1-thread-4 获取共享计数器的值为 210
pool-1-thread-11 修改共享计数器的值为 227
pool-1-thread-7 修改共享计数器的值为 445
pool-1-thread-14 修改共享计数器的值为 871
pool-1-thread-3 修改共享计数器的值为 705
***     pool-1-thread-5 获取共享计数器的值为 705
***     pool-1-thread-15 获取共享计数器的值为 705
***     pool-1-thread-9 获取共享计数器的值为 705
***     pool-1-thread-8 获取共享计数器的值为 705
***     pool-1-thread-1 获取共享计数器的值为 705
***     pool-1-thread-10 获取共享计数器的值为 705
***     pool-1-thread-13 获取共享计数器的值为 705
***     pool-1-thread-11 获取共享计数器的值为 705
***     pool-1-thread-7 获取共享计数器的值为 705
***     pool-1-thread-14 获取共享计数器的值为 705
***     pool-1-thread-3 获取共享计数器的值为 705      

输出符合预期。

public class SharedCountRunnable implements Runnable{
    @SneakyThrows
    @Override
    public void run() {
        // 使用不同的CuratorFramework实例,表示不同的分布式节点
        CuratorFramework curator = CuratorFrameworkProperties.getCuratorFramework();

        // 模拟随机加入的分布式节点
        int randomSleep = new Random().nextInt(1000);
        Thread.sleep(randomSleep);

        // 共享计数器的路径
        String path = "/kaven";
        int seedValue = 0;

        // 创建SharedCount实例,用于提供共享计数器功能
        // 如果共享计数器的路径尚未创建,则使用seedValue的值作为共享整数的初始值
        SharedCount sharedCount = new SharedCount(curator, path, seedValue);

        // 必须先启动共享计数器,然后才能使用它
        sharedCount.start();

        // 使用随机数来设置共享整数的值
        int randomValue = new Random().nextInt(1000);
        // 获取VersionedValue实例用于修改共享整数的值
        sharedCount.trySetCount(sharedCount.getVersionedValue(), randomValue);
        System.out.println(Thread.currentThread().getName() + " 修改共享计数器的值为 " + randomValue);
        Thread.sleep(randomSleep);
        System.out.println(
                "***     " + Thread.currentThread().getName() +
                " 获取共享计数器的值为 " + sharedCount.getCount());

        // 使用完共享计数器后需要调用close()进行关闭
        sharedCount.close();
    }
}      

输出如下所示:

pool-1-thread-7 修改共享计数器的值为 797
pool-1-thread-1 修改共享计数器的值为 291
pool-1-thread-2 修改共享计数器的值为 85
***     pool-1-thread-2 获取共享计数器的值为 291
pool-1-thread-12 修改共享计数器的值为 696
pool-1-thread-8 修改共享计数器的值为 258
***     pool-1-thread-7 获取共享计数器的值为 696
***     pool-1-thread-1 获取共享计数器的值为 258
***     pool-1-thread-12 获取共享计数器的值为 258
pool-1-thread-14 修改共享计数器的值为 426
***     pool-1-thread-8 获取共享计数器的值为 426
pool-1-thread-9 修改共享计数器的值为 704
pool-1-thread-5 修改共享计数器的值为 187
pool-1-thread-13 修改共享计数器的值为 662
pool-1-thread-4 修改共享计数器的值为 390
pool-1-thread-15 修改共享计数器的值为 702
pool-1-thread-11 修改共享计数器的值为 65
pool-1-thread-6 修改共享计数器的值为 84
pool-1-thread-10 修改共享计数器的值为 93
***     pool-1-thread-14 获取共享计数器的值为 93
***     pool-1-thread-9 获取共享计数器的值为 93
***     pool-1-thread-5 获取共享计数器的值为 93
***     pool-1-thread-13 获取共享计数器的值为 93
***     pool-1-thread-4 获取共享计数器的值为 93
***     pool-1-thread-15 获取共享计数器的值为 93
***     pool-1-thread-11 获取共享计数器的值为 93
pool-1-thread-3 修改共享计数器的值为 276
***     pool-1-thread-6 获取共享计数器的值为 276
***     pool-1-thread-10 获取共享计数器的值为 276
***     pool-1-thread-3 获取共享计数器的值为 276      

输出也是符合预期的,也许有人会认为下面这段输出是不符合预期的,但博主认为是符合预期的,因为博主这里的​

​ZooKeeper​

​​服务端是单机模式,因此不用考虑集群的一致性问题,如果值在服务端修改成功,读取肯定是最新的值,而造成这种情况的原因,可能是​

​CPU​

​​时间片轮询调度的原因(​

​15​

​​个线程全部在一台计算机上运行),也有可能是打印字符串的相关函数获取​

​I/O​

​​设备存在快慢的原因,博主觉得是此类原因导致的,如果​

​ZooKeeper​

​服务端是集群模式,倒是有可能是集群的一致性问题导致的。

pool-1-thread-12 修改共享计数器的值为 696
pool-1-thread-8 修改共享计数器的值为 258
***     pool-1-thread-7 获取共享计数器的值为 696
***     pool-1-thread-1 获取共享计数器的值为 258
***     pool-1-thread-12 获取共享计数器的值为 258      

分布式环境是非常复杂的,获取​

​VersionedValue​

​​实例然后立即用于修改共享整数的值也是有可能不成功的,因为获取的​

​VersionedValue​

​实例可能瞬间就失效了,比如有其他的节点在这个时间间隙中修改了共享整数的值。

// 获取VersionedValue实例用于修改共享整数的值
        sharedCount.trySetCount(sharedCount.getVersionedValue(), randomValue);      

监听器

public class SharedCountRunnable implements Runnable{

    // 是否是监听节点
    private final boolean isListenerNode;

    public SharedCountRunnable(boolean isListenerNode) {
        this.isListenerNode = isListenerNode;
    }

    @SneakyThrows
    @Override
    public void run() {
        // 使用不同的CuratorFramework实例,表示不同的分布式节点
        CuratorFramework curator = CuratorFrameworkProperties.getCuratorFramework();

        // 模拟随机加入的分布式节点
        int randomSleep = new Random().nextInt(100);
        // 让监听节点先添加监听器
        if(!isListenerNode) {
            Thread.sleep(3000 + randomSleep);
        }

        // 共享计数器的路径
        String path = "/kaven";
        int seedValue = 0;

        // 创建SharedCount实例,用于提供共享计数器功能
        // 如果共享计数器的路径尚未创建,则使用seedValue的值作为共享整数的初始值
        SharedCount sharedCount = new SharedCount(curator, path, seedValue);

        // 必须先启动共享计数器,然后才能使用它
        sharedCount.start();

        if(isListenerNode) {
            // 监听节点添加监听器
            sharedCount.addListener(new SharedCountListener(){

                @Override
                public void stateChanged(CuratorFramework client, ConnectionState newState) {}

                @Override
                public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
                    System.out.println("[Listener] : 共享整数被修改成 " + newCount);
                }
            });
            System.out.println("监听器已添加");
            Thread.sleep(1000000);
        }
        else {
            update(sharedCount);
        }

        // 使用完共享计数器后需要调用close()进行关闭
        sharedCount.close();
    }

    private void update(SharedCount sharedCount) throws Exception {
        // 使用随机数来设置共享整数的值
        int randomValue = new Random().nextInt(1000);
        // 获取VersionedValue实例用于修改共享整数的值
        sharedCount.trySetCount(sharedCount.getVersionedValue(), randomValue);
        System.out.println(Thread.currentThread().getName() + " 修改共享计数器的值为 " + randomValue);
    }
}      
public class Application {
    private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();

    public static void main(String[] args) throws Exception {
        // 分布式节点处理业务
        for (int i = 0; i < 30; i++) {
            // 设置一个监听节点,其他节点用于修改共享整数的值
            if(i > 0) EXECUTOR_SERVICE.execute(new SharedCountRunnable(false));
            else EXECUTOR_SERVICE.execute(new SharedCountRunnable(true));
        }
    }
}      

输出如下所示:

监听器已添加
pool-1-thread-29 修改共享计数器的值为 178
pool-1-thread-22 修改共享计数器的值为 526
[Listener] : 共享整数被修改成 526
pool-1-thread-26 修改共享计数器的值为 76
pool-1-thread-14 修改共享计数器的值为 27
pool-1-thread-4 修改共享计数器的值为 907
[Listener] : 共享整数被修改成 907
pool-1-thread-16 修改共享计数器的值为 616
pool-1-thread-18 修改共享计数器的值为 21
pool-1-thread-13 修改共享计数器的值为 851
pool-1-thread-15 修改共享计数器的值为 569
[Listener] : 共享整数被修改成 569
pool-1-thread-20 修改共享计数器的值为 806
pool-1-thread-21 修改共享计数器的值为 961
pool-1-thread-25 修改共享计数器的值为 562
pool-1-thread-19 修改共享计数器的值为 99
pool-1-thread-6 修改共享计数器的值为 784
pool-1-thread-11 修改共享计数器的值为 202
pool-1-thread-24 修改共享计数器的值为 97
[Listener] : 共享整数被修改成 202
pool-1-thread-17 修改共享计数器的值为 139
pool-1-thread-12 修改共享计数器的值为 437
[Listener] : 共享整数被修改成 437
pool-1-thread-7 修改共享计数器的值为 156
pool-1-thread-8 修改共享计数器的值为 416
pool-1-thread-28 修改共享计数器的值为 930
pool-1-thread-10 修改共享计数器的值为 857
pool-1-thread-23 修改共享计数器的值为 945
pool-1-thread-5 修改共享计数器的值为 38
pool-1-thread-27 修改共享计数器的值为 422
[Listener] : 共享整数被修改成 422
pool-1-thread-9 修改共享计数器的值为 428
pool-1-thread-2 修改共享计数器的值为 413
pool-1-thread-30 修改共享计数器的值为 59
pool-1-thread-3 修改共享计数器的值为 539