天天看點

ZooKeeper : Curator架構之共享計數器SharedCount

SharedCount

​SharedCount​

​類的源碼注釋:

Manages a shared integer. All clients watching the same path will have the up-to-date value of the shared integer (considering ZK’s normal consistency guarantees).

管理一個共享整數。檢視同一路徑的所有用戶端都将擁有共享整數的最新值(考慮到​

​ZK​

​的正常一緻性保證)。

​SharedCount​

​類的源碼:

public class SharedCount implements Closeable, SharedCountReader, Listenable<SharedCountListener>
{
    // 存儲監聽器
    private final Map<SharedCountListener, SharedValueListener> listeners = Maps.newConcurrentMap();
    // SharedValue用于管理一個共享值,SharedCount将共享整數的維護委托給該SharedValue執行個體
    private final SharedValue           sharedValue;

    /**
     * client: CuratorFramework執行個體,用于與Zookeeper進行互動
     * path: 共享計數器的路徑,即存儲共享整數的位置
     * seedValue: 如果共享計數器的路徑尚未建立,則設定該值為共享整數的初始值
     */
    public SharedCount(CuratorFramework client, String path, int seedValue)
    {
        // 初始化SharedValue執行個體
        sharedValue = new SharedValue(client, path, toBytes(seedValue));
    }

    protected SharedCount(CuratorFramework client, String path, SharedValue sv)
    {
        sharedValue = sv;
    }

    // 擷取共享整數的值
    @Override
    public int getCount()
    {
        return fromBytes(sharedValue.getValue());
    }

    // 擷取共享整數的版本和值,封裝在VersionedValue執行個體中
    @Override
    public VersionedValue<Integer> getVersionedValue()
    {
        VersionedValue<byte[]> localValue = sharedValue.getVersionedValue();
        return new VersionedValue<Integer>(localValue.getVersion(), fromBytes(localValue.getValue()));
    }

    /**
     * 更改共享整數的值,而不管其先前的狀态如何
     */
    public void     setCount(int newCount) throws Exception
    {
        sharedValue.setValue(toBytes(newCount));
    }
    
    @Deprecated
    public boolean  trySetCount(int newCount) throws Exception
    {
        return sharedValue.trySetValue(toBytes(newCount));
    }

    /**
     * 僅當共享整數的版本和指定的版本一緻時,才更改共享整數的值
     */
    public boolean  trySetCount(VersionedValue<Integer> previous, int newCount) throws Exception
    {
        VersionedValue<byte[]> previousCopy = new VersionedValue<byte[]>(previous.getVersion(), toBytes(previous.getValue()));
        return sharedValue.trySetValue(previousCopy, toBytes(newCount));
    }

    // 添加監聽器
    @Override
    public void     addListener(SharedCountListener listener)
    {
        addListener(listener, MoreExecutors.directExecutor());
    }

    // 添加監聽器與執行該監聽器的Executor執行個體
    @Override
    public void     addListener(final SharedCountListener listener, Executor executor)
    {
        // 使用匿名内部類建立SharedValueListener執行個體
        SharedValueListener     valueListener = new SharedValueListener()
        {
            @Override
            public void valueHasChanged(SharedValueReader sharedValue, byte[] newValue) throws Exception
            {
                // 通過SharedValueListener執行個體來通知listener共享整數的值改變
                listener.countHasChanged(SharedCount.this, fromBytes(newValue));
            }

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState)
            {
                // 通過SharedValueListener執行個體來通知listener共享整數的狀态改變
                listener.stateChanged(client, newState);
            }
        };
        // 将SharedValueListener執行個體和Executor執行個體添加到sharedValue執行個體中
        sharedValue.getListenable().addListener(valueListener, executor);
        // 将listener和SharedValueListener執行個體的映射存儲到listeners屬性中
        listeners.put(listener, valueListener);
    }

    // 移除監聽器
    @Override
    public void     removeListener(SharedCountListener listener)
    {
        // 删除listeners屬性中以該監聽器為key的鍵值對
        // 如果listeners屬性存在以該監聽器為key的鍵值對
        // 則會獲得對應的SharedValueListener執行個體
        SharedValueListener valueListener = listeners.remove(listener);
        if(valueListener != null) {
            // 如果listeners屬性存在以該監聽器為key的鍵值對
            // 則删除sharedValue執行個體中對應的SharedValueListener執行個體
            sharedValue.getListenable().removeListener(valueListener);
        }
    }

    /**
     * 必須先啟動共享計數器,然後才能使用它
     */
    public void     start() throws Exception
    {
        sharedValue.start();
    }

    // 使用完共享計數器後需要調用close()進行關閉
    @Override
    public void close() throws IOException
    {
        sharedValue.close();
    }

    // 将整數轉換成位元組數組
    @VisibleForTesting
    static byte[]   toBytes(int value)
    {
        byte[]      bytes = new byte[4];
        ByteBuffer.wrap(bytes).putInt(value);
        return bytes;
    }

    // 将位元組數組轉換成整數
    private static int      fromBytes(byte[] bytes)
    {
        return ByteBuffer.wrap(bytes).getInt();
    }
}      

​SharedCount​

​​将共享整數的維護委托給了​

​SharedValue​

​​,​

​SharedValue​

​的源碼以後再介紹。

測試

​pom.xml​

​:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.kaven</groupId>
    <artifactId>zookeeper</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
        </dependency>
    </dependencies>
</project>      

​CuratorFrameworkProperties​

​​類(提供​

​CuratorFramework​

​​需要的一些配置資訊,以及建立​

​CuratorFramework​

​執行個體的方法):

package com.kaven.zookeeper;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorFrameworkProperties {
    // 連接配接位址
    public static final String CONNECT_ADDRESS = "192.168.1.3:9000";
    // 連接配接逾時時間
    public static final int CONNECTION_TIMEOUT_MS = 40000;
    // Session逾時時間
    public static final int SESSION_TIMEOUT_MS = 10000;
    // 命名空間
    public static final String NAMESPACE = "MyNamespace";
    // 重試政策
    public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 3);

    public static CuratorFramework getCuratorFramework() {
        // 建立CuratorFramework執行個體
        CuratorFramework curator = CuratorFrameworkFactory.builder()
                .connectString(CuratorFrameworkProperties.CONNECT_ADDRESS)
                .retryPolicy(CuratorFrameworkProperties.RETRY_POLICY)
                .connectionTimeoutMs(CuratorFrameworkProperties.CONNECTION_TIMEOUT_MS)
                .sessionTimeoutMs(CuratorFrameworkProperties.SESSION_TIMEOUT_MS)
                .namespace(CuratorFrameworkProperties.NAMESPACE)
                .build();
        curator.start();
        assert curator.getState().equals(CuratorFrameworkState.STARTED);
        return curator;
    }
}      

​SharedCountRunnable​

​​類(實作了​

​Runnable​

​接口,模拟分布式節點使用共享計數器):

package com.kaven.zookeeper;

import lombok.SneakyThrows;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.shared.SharedCount;

import java.util.Random;

public class SharedCountRunnable implements Runnable{
    @SneakyThrows
    @Override
    public void run() {
        // 使用不同的CuratorFramework執行個體,表示不同的分布式節點
        CuratorFramework curator = CuratorFrameworkProperties.getCuratorFramework();

        // 模拟随機加入的分布式節點
        int randomSleep = new Random().nextInt(10000);
        Thread.sleep(randomSleep);

        // 共享計數器的路徑
        String path = "/kaven";
        int seedValue = 0;

        // 建立SharedCount執行個體,用于提供共享計數器功能
        // 如果共享計數器的路徑尚未建立,則使用seedValue的值作為共享整數的初始值
        SharedCount sharedCount = new SharedCount(curator, path, seedValue);

        // 必須先啟動共享計數器,然後才能使用它
        sharedCount.start();

        // 使用随機數來設定共享整數的值
        int randomValue = new Random().nextInt(1000);
        sharedCount.setCount(randomValue);
        System.out.println(Thread.currentThread().getName() + " 修改共享計數器的值為 " + randomValue);
        Thread.sleep(randomSleep);
        System.out.println(
                "***     " + Thread.currentThread().getName() +
                " 擷取共享計數器的值為 " + sharedCount.getCount());

        // 使用完共享計數器後需要調用close()進行關閉
        sharedCount.close();
    }
}      

啟動類:

package com.kaven.zookeeper;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Application {
    private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();

    public static void main(String[] args) throws Exception {
        // 分布式節點處理業務
        for (int i = 0; i < 15; i++) {
            EXECUTOR_SERVICE.execute(new SharedCountRunnable());
        }
    }
}      

模拟​

​15​

​個分布式節點使用共享計數器,輸出如下所示:

pool-1-thread-6 修改共享計數器的值為 525
pool-1-thread-2 修改共享計數器的值為 379
pool-1-thread-12 修改共享計數器的值為 60
pool-1-thread-4 修改共享計數器的值為 387
pool-1-thread-5 修改共享計數器的值為 821
pool-1-thread-15 修改共享計數器的值為 273
pool-1-thread-9 修改共享計數器的值為 410
***     pool-1-thread-2 擷取共享計數器的值為 410
pool-1-thread-8 修改共享計數器的值為 897
pool-1-thread-1 修改共享計數器的值為 871
pool-1-thread-10 修改共享計數器的值為 803
***     pool-1-thread-12 擷取共享計數器的值為 803
pool-1-thread-13 修改共享計數器的值為 210
***     pool-1-thread-6 擷取共享計數器的值為 210
***     pool-1-thread-4 擷取共享計數器的值為 210
pool-1-thread-11 修改共享計數器的值為 227
pool-1-thread-7 修改共享計數器的值為 445
pool-1-thread-14 修改共享計數器的值為 871
pool-1-thread-3 修改共享計數器的值為 705
***     pool-1-thread-5 擷取共享計數器的值為 705
***     pool-1-thread-15 擷取共享計數器的值為 705
***     pool-1-thread-9 擷取共享計數器的值為 705
***     pool-1-thread-8 擷取共享計數器的值為 705
***     pool-1-thread-1 擷取共享計數器的值為 705
***     pool-1-thread-10 擷取共享計數器的值為 705
***     pool-1-thread-13 擷取共享計數器的值為 705
***     pool-1-thread-11 擷取共享計數器的值為 705
***     pool-1-thread-7 擷取共享計數器的值為 705
***     pool-1-thread-14 擷取共享計數器的值為 705
***     pool-1-thread-3 擷取共享計數器的值為 705      

輸出符合預期。

public class SharedCountRunnable implements Runnable{
    @SneakyThrows
    @Override
    public void run() {
        // 使用不同的CuratorFramework執行個體,表示不同的分布式節點
        CuratorFramework curator = CuratorFrameworkProperties.getCuratorFramework();

        // 模拟随機加入的分布式節點
        int randomSleep = new Random().nextInt(1000);
        Thread.sleep(randomSleep);

        // 共享計數器的路徑
        String path = "/kaven";
        int seedValue = 0;

        // 建立SharedCount執行個體,用于提供共享計數器功能
        // 如果共享計數器的路徑尚未建立,則使用seedValue的值作為共享整數的初始值
        SharedCount sharedCount = new SharedCount(curator, path, seedValue);

        // 必須先啟動共享計數器,然後才能使用它
        sharedCount.start();

        // 使用随機數來設定共享整數的值
        int randomValue = new Random().nextInt(1000);
        // 擷取VersionedValue執行個體用于修改共享整數的值
        sharedCount.trySetCount(sharedCount.getVersionedValue(), randomValue);
        System.out.println(Thread.currentThread().getName() + " 修改共享計數器的值為 " + randomValue);
        Thread.sleep(randomSleep);
        System.out.println(
                "***     " + Thread.currentThread().getName() +
                " 擷取共享計數器的值為 " + sharedCount.getCount());

        // 使用完共享計數器後需要調用close()進行關閉
        sharedCount.close();
    }
}      

輸出如下所示:

pool-1-thread-7 修改共享計數器的值為 797
pool-1-thread-1 修改共享計數器的值為 291
pool-1-thread-2 修改共享計數器的值為 85
***     pool-1-thread-2 擷取共享計數器的值為 291
pool-1-thread-12 修改共享計數器的值為 696
pool-1-thread-8 修改共享計數器的值為 258
***     pool-1-thread-7 擷取共享計數器的值為 696
***     pool-1-thread-1 擷取共享計數器的值為 258
***     pool-1-thread-12 擷取共享計數器的值為 258
pool-1-thread-14 修改共享計數器的值為 426
***     pool-1-thread-8 擷取共享計數器的值為 426
pool-1-thread-9 修改共享計數器的值為 704
pool-1-thread-5 修改共享計數器的值為 187
pool-1-thread-13 修改共享計數器的值為 662
pool-1-thread-4 修改共享計數器的值為 390
pool-1-thread-15 修改共享計數器的值為 702
pool-1-thread-11 修改共享計數器的值為 65
pool-1-thread-6 修改共享計數器的值為 84
pool-1-thread-10 修改共享計數器的值為 93
***     pool-1-thread-14 擷取共享計數器的值為 93
***     pool-1-thread-9 擷取共享計數器的值為 93
***     pool-1-thread-5 擷取共享計數器的值為 93
***     pool-1-thread-13 擷取共享計數器的值為 93
***     pool-1-thread-4 擷取共享計數器的值為 93
***     pool-1-thread-15 擷取共享計數器的值為 93
***     pool-1-thread-11 擷取共享計數器的值為 93
pool-1-thread-3 修改共享計數器的值為 276
***     pool-1-thread-6 擷取共享計數器的值為 276
***     pool-1-thread-10 擷取共享計數器的值為 276
***     pool-1-thread-3 擷取共享計數器的值為 276      

輸出也是符合預期的,也許有人會認為下面這段輸出是不符合預期的,但部落客認為是符合預期的,因為部落客這裡的​

​ZooKeeper​

​​服務端是單機模式,是以不用考慮叢集的一緻性問題,如果值在服務端修改成功,讀取肯定是最新的值,而造成這種情況的原因,可能是​

​CPU​

​​時間片輪詢排程的原因(​

​15​

​​個線程全部在一台計算機上運作),也有可能是列印字元串的相關函數擷取​

​I/O​

​​裝置存在快慢的原因,部落客覺得是此類原因導緻的,如果​

​ZooKeeper​

​服務端是叢集模式,倒是有可能是叢集的一緻性問題導緻的。

pool-1-thread-12 修改共享計數器的值為 696
pool-1-thread-8 修改共享計數器的值為 258
***     pool-1-thread-7 擷取共享計數器的值為 696
***     pool-1-thread-1 擷取共享計數器的值為 258
***     pool-1-thread-12 擷取共享計數器的值為 258      

分布式環境是非常複雜的,擷取​

​VersionedValue​

​​執行個體然後立即用于修改共享整數的值也是有可能不成功的,因為擷取的​

​VersionedValue​

​執行個體可能瞬間就失效了,比如有其他的節點在這個時間間隙中修改了共享整數的值。

// 擷取VersionedValue執行個體用于修改共享整數的值
        sharedCount.trySetCount(sharedCount.getVersionedValue(), randomValue);      

監聽器

public class SharedCountRunnable implements Runnable{

    // 是否是監聽節點
    private final boolean isListenerNode;

    public SharedCountRunnable(boolean isListenerNode) {
        this.isListenerNode = isListenerNode;
    }

    @SneakyThrows
    @Override
    public void run() {
        // 使用不同的CuratorFramework執行個體,表示不同的分布式節點
        CuratorFramework curator = CuratorFrameworkProperties.getCuratorFramework();

        // 模拟随機加入的分布式節點
        int randomSleep = new Random().nextInt(100);
        // 讓監聽節點先添加監聽器
        if(!isListenerNode) {
            Thread.sleep(3000 + randomSleep);
        }

        // 共享計數器的路徑
        String path = "/kaven";
        int seedValue = 0;

        // 建立SharedCount執行個體,用于提供共享計數器功能
        // 如果共享計數器的路徑尚未建立,則使用seedValue的值作為共享整數的初始值
        SharedCount sharedCount = new SharedCount(curator, path, seedValue);

        // 必須先啟動共享計數器,然後才能使用它
        sharedCount.start();

        if(isListenerNode) {
            // 監聽節點添加監聽器
            sharedCount.addListener(new SharedCountListener(){

                @Override
                public void stateChanged(CuratorFramework client, ConnectionState newState) {}

                @Override
                public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
                    System.out.println("[Listener] : 共享整數被修改成 " + newCount);
                }
            });
            System.out.println("監聽器已添加");
            Thread.sleep(1000000);
        }
        else {
            update(sharedCount);
        }

        // 使用完共享計數器後需要調用close()進行關閉
        sharedCount.close();
    }

    private void update(SharedCount sharedCount) throws Exception {
        // 使用随機數來設定共享整數的值
        int randomValue = new Random().nextInt(1000);
        // 擷取VersionedValue執行個體用于修改共享整數的值
        sharedCount.trySetCount(sharedCount.getVersionedValue(), randomValue);
        System.out.println(Thread.currentThread().getName() + " 修改共享計數器的值為 " + randomValue);
    }
}      
public class Application {
    private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();

    public static void main(String[] args) throws Exception {
        // 分布式節點處理業務
        for (int i = 0; i < 30; i++) {
            // 設定一個監聽節點,其他節點用于修改共享整數的值
            if(i > 0) EXECUTOR_SERVICE.execute(new SharedCountRunnable(false));
            else EXECUTOR_SERVICE.execute(new SharedCountRunnable(true));
        }
    }
}      

輸出如下所示:

監聽器已添加
pool-1-thread-29 修改共享計數器的值為 178
pool-1-thread-22 修改共享計數器的值為 526
[Listener] : 共享整數被修改成 526
pool-1-thread-26 修改共享計數器的值為 76
pool-1-thread-14 修改共享計數器的值為 27
pool-1-thread-4 修改共享計數器的值為 907
[Listener] : 共享整數被修改成 907
pool-1-thread-16 修改共享計數器的值為 616
pool-1-thread-18 修改共享計數器的值為 21
pool-1-thread-13 修改共享計數器的值為 851
pool-1-thread-15 修改共享計數器的值為 569
[Listener] : 共享整數被修改成 569
pool-1-thread-20 修改共享計數器的值為 806
pool-1-thread-21 修改共享計數器的值為 961
pool-1-thread-25 修改共享計數器的值為 562
pool-1-thread-19 修改共享計數器的值為 99
pool-1-thread-6 修改共享計數器的值為 784
pool-1-thread-11 修改共享計數器的值為 202
pool-1-thread-24 修改共享計數器的值為 97
[Listener] : 共享整數被修改成 202
pool-1-thread-17 修改共享計數器的值為 139
pool-1-thread-12 修改共享計數器的值為 437
[Listener] : 共享整數被修改成 437
pool-1-thread-7 修改共享計數器的值為 156
pool-1-thread-8 修改共享計數器的值為 416
pool-1-thread-28 修改共享計數器的值為 930
pool-1-thread-10 修改共享計數器的值為 857
pool-1-thread-23 修改共享計數器的值為 945
pool-1-thread-5 修改共享計數器的值為 38
pool-1-thread-27 修改共享計數器的值為 422
[Listener] : 共享整數被修改成 422
pool-1-thread-9 修改共享計數器的值為 428
pool-1-thread-2 修改共享計數器的值為 413
pool-1-thread-30 修改共享計數器的值為 59
pool-1-thread-3 修改共享計數器的值為 539