天天看點

ZooKeeper : Curator架構之共享計數器DistributedAtomicLong

DistributedAtomicLong

原子增量操作的計數器,首先嘗試使用樂觀鎖進行增量操作,如果失敗,則采用可選的​

​InterProcessMutex​

​(悲觀鎖)進行增量操作。 對于樂觀鎖和悲觀鎖,重試政策都用于重試增量操作。

各種增量方法都會傳回一個​

​AtomicValue​

​​執行個體,通過調用​

​AtomicValue​

​​執行個體的​

​succeeded()​

​​可以查詢增量操作是否執行成功,除了​

​get()​

​ 外,其他任何方法都不保證一定成功。

​AtomicValue​

​接口源碼(原子操作傳回值的抽象):

public interface AtomicValue<T>
{
    /**
     * 如果操作成功,則傳回true
     * 如果傳回false,則操作失敗
     */
    public boolean      succeeded();

    /**
     * 傳回操作前計數器的值
     */
    public T            preValue();

    /**
     * 傳回操作後計數器的值
     */
    public T            postValue();

    /**
     * 傳回操作的調試統計資訊,比如樂觀鎖、悲觀鎖嘗試的次數與時間
     */
    public AtomicStats  getStats();
}      

​DistributedAtomicLong​

​​類中的内部類​

​AtomicLong​

​​實作了​

​AtomicValue​

​​接口,但實際上隻是起到封裝的作用,所有的調用都委托給了​

​bytes​

​屬性(其他實作類的執行個體)。

private class AtomicLong implements AtomicValue<Long>
    {
        private AtomicValue<byte[]> bytes;

        private AtomicLong(AtomicValue<byte[]> bytes)
        {
            this.bytes = bytes;
        }

        @Override
        public boolean succeeded()
        {
            return bytes.succeeded();
        }

        @Override
        public Long preValue()
        {
            return bytesToValue(bytes.preValue());
        }

        @Override
        public Long postValue()
        {
            return bytesToValue(bytes.postValue());
        }

        @Override
        public AtomicStats getStats()
        {
            return bytes.getStats();
        }
    }      

​DistributedAtomicLong​

​​類實作了​

​DistributedAtomicNumber​

​​接口,并且​

​DistributedAtomicLong​

​​将各種原子操作的執行委托給了​

​DistributedAtomicValue​

​。

public class DistributedAtomicLong implements DistributedAtomicNumber<Long>
{
    private final DistributedAtomicValue        value;
    ...
}      

​DistributedAtomicNumber​

​接口是分布式原子數值類型的抽象,定義了分布式原子數值類型需要提供的方法。

public interface DistributedAtomicNumber<T>
{
    public AtomicValue<T> get() throws Exception;
    public AtomicValue<T> compareAndSet(T expectedValue, T newValue) throws Exception;
    public AtomicValue<T> trySet(T newValue) throws Exception;
    public boolean initialize(T value) throws Exception;
    public void forceSet(T newValue) throws Exception;
    public AtomicValue<T> increment() throws Exception;
    public AtomicValue<T> decrement() throws Exception;
    public AtomicValue<T> add(T delta) throws Exception;
    public AtomicValue<T> subtract(T delta) throws Exception;
}      

目前​

​DistributedAtomicNumber​

​​接口有兩種實作,除了​

​DistributedAtomicLong​

​​類,還有​

​DistributedAtomicInteger​

​類。

ZooKeeper : Curator架構之共享計數器DistributedAtomicLong

并且​

​DistributedAtomicInteger​

​​也是将各種原子操作的執行委托給了​

​DistributedAtomicValue​

​,是以這兩種實作是類似的,隻不過表示的數值類型不同而已。

public class DistributedAtomicInteger implements DistributedAtomicNumber<Integer>
{
    private final DistributedAtomicValue        value;
    ...
}      

​DistributedAtomicValue​

​​是原子操作真正的執行者,是以可以知道内部類​

​AtomicLong​

​​的​

​bytes​

​​屬性是​

​MutableAtomicValue​

​執行個體。

public AtomicValue<byte[]>     get() throws Exception
    {
        MutableAtomicValue<byte[]>  result = new MutableAtomicValue<byte[]>(null, null, false);
        ...
        return result;
    }      

測試

​pom.xml​

​:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.kaven</groupId>
    <artifactId>zookeeper</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
        </dependency>
    </dependencies>
</project>      

​CuratorFrameworkProperties​

​​類(提供​

​CuratorFramework​

​​需要的一些配置資訊,以及建立​

​CuratorFramework​

​執行個體的方法):

package com.kaven.zookeeper;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorFrameworkProperties {
    // 連接配接位址
    public static final String CONNECT_ADDRESS = "192.168.1.3:9000";
    // 連接配接逾時時間
    public static final int CONNECTION_TIMEOUT_MS = 40000;
    // Session逾時時間
    public static final int SESSION_TIMEOUT_MS = 10000;
    // 命名空間
    public static final String NAMESPACE = "MyNamespace";
    // 重試政策
    public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 3);

    public static CuratorFramework getCuratorFramework() {
        // 建立CuratorFramework執行個體
        CuratorFramework curator = CuratorFrameworkFactory.builder()
                .connectString(CuratorFrameworkProperties.CONNECT_ADDRESS)
                .retryPolicy(CuratorFrameworkProperties.RETRY_POLICY)
                .connectionTimeoutMs(CuratorFrameworkProperties.CONNECTION_TIMEOUT_MS)
                .sessionTimeoutMs(CuratorFrameworkProperties.SESSION_TIMEOUT_MS)
                .namespace(CuratorFrameworkProperties.NAMESPACE)
                .build();
        curator.start();
        assert curator.getState().equals(CuratorFrameworkState.STARTED);
        return curator;
    }
}      

​DistributedAtomicLongRunnable​

​​類(實作了​

​Runnable​

​接口,模拟分布式節點操作分布式原子長整型):

package com.kaven.zookeeper;

import lombok.SneakyThrows;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
import org.apache.curator.retry.RetryNTimes;

public class DistributedAtomicLongRunnable implements Runnable{
    @SneakyThrows
    @Override
    public void run() {
        // 使用不同的CuratorFramework執行個體,表示不同的分布式節點
        CuratorFramework curator = CuratorFrameworkProperties.getCuratorFramework();

        // 共享計數器的路徑
        String counterPath = "/kaven";

        // 建立DistributedAtomicLong執行個體,用于操作分布式原子長整型
        // new RetryNTimes(100, 5)是樂觀鎖的重試政策執行個體
        DistributedAtomicLong atomicLong = new DistributedAtomicLong(curator, counterPath,
                new RetryNTimes(100, 5));

        // 初始化
        boolean initialize = atomicLong.initialize(100L);
        if(initialize) {
            System.out.println(Thread.currentThread().getName() + "初始化 atomicLong 成功");
        }
        else {
            System.out.println(Thread.currentThread().getName() + "初始化 atomicLong 失敗");
        }

        // 比較再設定,當Zookeeper中的值與期望值相等時才能設定新值
        AtomicValue<Long> longAtomicValue = atomicLong.compareAndSet(100L, 501L);
        if(longAtomicValue.succeeded()) {
            System.out.println(Thread.currentThread().getName() + " compareAndSet 成功");
        }
        else {
            System.out.println(Thread.currentThread().getName() + " compareAndSet 失敗");
        }
    }
}      

啟動類:

package com.kaven.zookeeper;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Application {
    private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();

    public static void main(String[] args) throws Exception {
        // 分布式節點處理業務
        for (int i = 0; i < 15; i++) {
            EXECUTOR_SERVICE.execute(new DistributedAtomicLongRunnable());
        }
    }
}      

模拟​

​15​

​個分布式節點操作分布式原子長整型,輸出如下所示:

pool-1-thread-12初始化 atomicLong 成功
pool-1-thread-11初始化 atomicLong 失敗
pool-1-thread-10初始化 atomicLong 失敗
pool-1-thread-14初始化 atomicLong 失敗
pool-1-thread-15初始化 atomicLong 失敗
pool-1-thread-8初始化 atomicLong 失敗
pool-1-thread-13初始化 atomicLong 失敗
pool-1-thread-6初始化 atomicLong 失敗
pool-1-thread-1初始化 atomicLong 失敗
pool-1-thread-7初始化 atomicLong 失敗
pool-1-thread-5初始化 atomicLong 失敗
pool-1-thread-3初始化 atomicLong 失敗
pool-1-thread-9初始化 atomicLong 失敗
pool-1-thread-2初始化 atomicLong 失敗
pool-1-thread-4初始化 atomicLong 失敗
pool-1-thread-8 compareAndSet 失敗
pool-1-thread-14 compareAndSet 失敗
pool-1-thread-10 compareAndSet 失敗
pool-1-thread-6 compareAndSet 失敗
pool-1-thread-15 compareAndSet 失敗
pool-1-thread-13 compareAndSet 失敗
pool-1-thread-7 compareAndSet 失敗
pool-1-thread-9 compareAndSet 失敗
pool-1-thread-11 compareAndSet 失敗
pool-1-thread-5 compareAndSet 失敗
pool-1-thread-12 compareAndSet 失敗
pool-1-thread-1 compareAndSet 失敗
pool-1-thread-3 compareAndSet 成功
pool-1-thread-4 compareAndSet 失敗
pool-1-thread-2 compareAndSet 失敗      

輸出是符合預期的,兩種操作都隻有一個節點執行成功。​

​DistributedAtomicValue​

​​類的​

​initialize​

​​和​

​compareAndSet​

​​方法如下所示,其實就是建立​

​Zookeeper​

​節點(隻有一個服務能建立成功)和基于版本設定節點的值(在部落客的測試程式中,也隻能有一個服務将該操作執行成功),而這兩種操作并沒有使用鎖(樂觀鎖和悲觀鎖)。

public boolean initialize(byte[] value) throws Exception
    {
        try
        {
            client.create().creatingParentContainersIfNeeded().forPath(path, value);
        }
        catch ( KeeperException.NodeExistsException ignore )
        {
            // ignore
            return false;
        }
        return true;
    }
    
    public AtomicValue<byte[]> compareAndSet(byte[] expectedValue, byte[] newValue) throws Exception
    {
        Stat                        stat = new Stat();
        MutableAtomicValue<byte[]>  result = new MutableAtomicValue<byte[]>(null, null, false);
        boolean                     createIt = getCurrentValue(result, stat);
        if ( !createIt && Arrays.equals(expectedValue, result.preValue) )
        {
            try
            {
                client.setData().withVersion(stat.getVersion()).forPath(path, newValue);
                result.succeeded = true;
                result.postValue = newValue;
            }
            catch ( KeeperException.BadVersionException dummy )
            {
                result.succeeded = false;
            }
            catch ( KeeperException.NoNodeException dummy )
            {
                result.succeeded = false;
            }
        }
        else
        {
            result.succeeded = false;
        }
        return result;
    }      

​increment​

​​、​

​decrement​

​​、​

​add​

​​以及​

​subtract​

​​這四種操作是類似的,部落客隻示範​

​increment​

​操作。

DistributedAtomicLong atomicLong = new DistributedAtomicLong(curator, counterPath,
                new RetryNTimes(100, 5));

        boolean initialize = atomicLong.initialize(100L);
        if(initialize) {
            System.out.println(Thread.currentThread().getName() + "初始化 atomicLong 成功");
        }
        else {
            System.out.println(Thread.currentThread().getName() + "初始化 atomicLong 失敗");
        }

        for (int i = 0; i < 1000; i++) {
            Thread.sleep(5);
            atomicLong.increment();
        }
        System.out.println(Thread.currentThread().getName() + "操作成功");
        System.out.println(Thread.currentThread().getName() + "目前的值為" + atomicLong.get().postValue());      

輸出如下所示:

pool-1-thread-8初始化 atomicLong 失敗
pool-1-thread-1初始化 atomicLong 失敗
pool-1-thread-3初始化 atomicLong 失敗
pool-1-thread-14初始化 atomicLong 失敗
pool-1-thread-5初始化 atomicLong 失敗
pool-1-thread-12初始化 atomicLong 成功
pool-1-thread-2初始化 atomicLong 失敗
pool-1-thread-4初始化 atomicLong 失敗
pool-1-thread-15初始化 atomicLong 失敗
pool-1-thread-13初始化 atomicLong 失敗
pool-1-thread-11初始化 atomicLong 失敗
pool-1-thread-9初始化 atomicLong 失敗
pool-1-thread-7初始化 atomicLong 失敗
pool-1-thread-6初始化 atomicLong 失敗
pool-1-thread-10初始化 atomicLong 失敗
pool-1-thread-15操作成功
pool-1-thread-15目前的值為14289
pool-1-thread-3操作成功
pool-1-thread-3目前的值為14305
pool-1-thread-13操作成功
pool-1-thread-13目前的值為14420
pool-1-thread-2操作成功
pool-1-thread-2目前的值為14681
pool-1-thread-4操作成功
pool-1-thread-4目前的值為14876
pool-1-thread-1操作成功
pool-1-thread-1目前的值為14906
pool-1-thread-5操作成功
pool-1-thread-5目前的值為14953
pool-1-thread-8操作成功
pool-1-thread-8目前的值為14972
pool-1-thread-14操作成功
pool-1-thread-14目前的值為15001
pool-1-thread-7操作成功
pool-1-thread-7目前的值為15020
pool-1-thread-10操作成功
pool-1-thread-10目前的值為15051
pool-1-thread-11操作成功
pool-1-thread-11目前的值為15053
pool-1-thread-9操作成功
pool-1-thread-9目前的值為15060
pool-1-thread-12操作成功
pool-1-thread-12目前的值為15093
pool-1-thread-6操作成功
pool-1-thread-6目前的值為15100      

最後的值為​

​15100​

​​符合預期。​

​increment​

​​、​

​decrement​

​​、​

​add​

​​以及​

​subtract​

​​這四個方法都調用​

​worker​

​方法來完成。

@Override
    public AtomicValue<Long>    increment() throws Exception
    {
        return worker(1L);
    }

    @Override
    public AtomicValue<Long>    decrement() throws Exception
    {
        return worker(-1L);
    }

    @Override
    public AtomicValue<Long>    add(Long delta) throws Exception
    {
        return worker(delta);
    }

    @Override
    public AtomicValue<Long> subtract(Long delta) throws Exception
    {
        return worker(-1 * delta);
    }      

​DistributedAtomicLong​

​​類的​

​worker​

​​方法則是調用​

​DistributedAtomicValue​

​​類的​

​trySet​

​方法來完成。

private AtomicValue<Long>   worker(final Long addAmount) throws Exception
    {
        Preconditions.checkNotNull(addAmount, "addAmount cannot be null");

        MakeValue               makeValue = new MakeValue()
        {
            @Override
            public byte[] makeFrom(byte[] previous)
            {
                long        previousValue = (previous != null) ? bytesToValue(previous) : 0;
                long        newValue = previousValue + addAmount;
                return valueToBytes(newValue);
            }
        };

        AtomicValue<byte[]>     result = value.trySet(makeValue);
        return new AtomicLong(result);
    }      

​DistributedAtomicValue​

​​類的​

​trySet​

​​方法嘗試以原子方式将計數器的值設定為給定值,首先嘗試使用樂觀鎖進行操作,如果失敗,則采用可選的​

​InterProcessMutex​

​(悲觀鎖)進行操作。

// 嘗試以原子方式将計數器的值設定為給定值
    public AtomicValue<byte[]>   trySet(final byte[] newValue) throws Exception
    {
        MutableAtomicValue<byte[]>  result = new MutableAtomicValue<byte[]>(null, null, false);

        MakeValue                   makeValue = new MakeValue()
        {
            @Override
            public byte[] makeFrom(byte[] previous)
            {
                return newValue;
            }
        };
        // 嘗試使用樂觀鎖
        tryOptimistic(result, makeValue);
        if ( !result.succeeded() && (mutex != null) )
        {
            // 如果在樂觀鎖下執行不成功,并且有悲觀鎖
            // 嘗試使用悲觀鎖
            tryWithMutex(result, makeValue);
        }

        return result;
    }      

​DistributedAtomicLong​

​​類的​

​trySet​

​​方法用于嘗試設定計數器的值,也是通過調用​

​DistributedAtomicValue​

​​類的​

​trySet​

​方法來完成。

@Override
    public AtomicValue<Long>   trySet(Long newValue) throws Exception
    {
        return new AtomicLong(value.trySet(valueToBytes(newValue)));
    }      

​DistributedAtomicLong​

​​類的​

​forceSet​

​​方法用于強制設定計數器的值,通過調用​

​DistributedAtomicValue​

​​類的​

​forceSet​

​方法來完成。

@Override
    public void forceSet(Integer newValue) throws Exception
    {
        value.forceSet(valueToBytes(newValue));
    }      

​DistributedAtomicValue​

​​類的​

​forceSet​

​​方法如下所示,就是直接設定​

​Zookeeper​

​節點的值。

/**
     * 強制設定值
     */
    public void forceSet(byte[] newValue) throws Exception
    {
        try
        {
            client.setData().forPath(path, newValue);
        }
        catch ( KeeperException.NoNodeException dummy )
        {
            try
            {
                client.create().creatingParentContainersIfNeeded().forPath(path, newValue);
            }
            catch ( KeeperException.NodeExistsException dummy2 )
            {
                client.setData().forPath(path, newValue);
            }
        }
    }      

這些方法比較簡單,部落客就不示範了。

PromotedToLock promotedToLock = PromotedToLock.builder()
                 // 用于分布式鎖的Zookeeper路徑 
                .lockPath("/lock")
                // 鎖的重試政策
                .retryPolicy(new RetryNTimes(100, 5))
                // 鎖的逾時時間
                .timeout(10000, TimeUnit.SECONDS)
                .build();

        DistributedAtomicLong atomicLong = new DistributedAtomicLong(curator, counterPath,
                new RetryNTimes(100, 5),
                promotedToLock);