天天看點

ZooKeeper : Curator架構之分布式屏障DistributedDoubleBarrier

DistributedDoubleBarrier

​DistributedDoubleBarrier​

​類的源碼注釋:

Double barriers enable clients to synchronize the beginning and the end of a computation. When enough processes have joined the barrier, processes start their computation and leave the barrier once they have finished.

雙屏障使用戶端能夠同步計算的開始和結束。當足夠多的程序加入屏障時,程序開始計算并在完成後離開屏障。

類比單體應用的屏障​

​CyclicBarrier​

​:

  • ​​Java并發程式設計:CountDownLatch、CyclicBarrier、Semaphore初使用​​

測試

​pom.xml​

​:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.kaven</groupId>
    <artifactId>zookeeper</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
        </dependency>
    </dependencies>
</project>      

​CuratorFrameworkProperties​

​​類(提供​

​CuratorFramework​

​​需要的一些配置資訊,以及建立​

​CuratorFramework​

​執行個體的方法):

package com.kaven.zookeeper;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorFrameworkProperties {
    // 連接配接位址
    public static final String CONNECT_ADDRESS = "192.168.1.3:9000";
    // 連接配接逾時時間
    public static final int CONNECTION_TIMEOUT_MS = 40000;
    // Session逾時時間
    public static final int SESSION_TIMEOUT_MS = 10000;
    // 命名空間
    public static final String NAMESPACE = "MyNamespace";
    // 重試政策
    public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 3);

    public static CuratorFramework getCuratorFramework() {
        // 建立CuratorFramework執行個體
        CuratorFramework curator = CuratorFrameworkFactory.builder()
                .connectString(CuratorFrameworkProperties.CONNECT_ADDRESS)
                .retryPolicy(CuratorFrameworkProperties.RETRY_POLICY)
                .connectionTimeoutMs(CuratorFrameworkProperties.CONNECTION_TIMEOUT_MS)
                .sessionTimeoutMs(CuratorFrameworkProperties.SESSION_TIMEOUT_MS)
                .namespace(CuratorFrameworkProperties.NAMESPACE)
                .build();
        curator.start();
        assert curator.getState().equals(CuratorFrameworkState.STARTED);
        return curator;
    }
}      

​DistributedDoubleBarrierRunnable​

​​類(實作了​

​Runnable​

​接口,模拟分布式節點進入與離開分布式屏障):

package com.kaven.zookeeper;

import lombok.SneakyThrows;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.barriers.DistributedDoubleBarrier;

import java.util.Random;

public class DistributedDoubleBarrierRunnable implements Runnable{
    @SneakyThrows
    @Override
    public void run() {
        // 使用不同的CuratorFramework執行個體,表示不同的分布式節點
        CuratorFramework curator = CuratorFrameworkProperties.getCuratorFramework();

        // 模拟随機加入的分布式節點
        int randomSleep = new Random().nextInt(20000);
        Thread.sleep(randomSleep);

        // 分布式屏障的路徑
        String barrierPath = "/kaven";

        // 建立DistributedDoubleBarrier執行個體,用于提供分布式屏障功能
        DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(curator, barrierPath, 5);

        System.out.println(Thread.currentThread().getName() + " 等待進入屏障");
        long start = System.currentTimeMillis();
        // 等待進入屏障
        barrier.enter();
        System.out.println(Thread.currentThread().getName() + " 等待了 "
                + (System.currentTimeMillis() - start) / 1000 + " s");
        System.out.println(Thread.currentThread().getName() + " 進入屏障");
        Thread.sleep(1000);
        // 等待離開屏障
        barrier.leave();
        System.out.println(Thread.currentThread().getName() + " 離開屏障");
    }
}      

啟動類:

package com.kaven.zookeeper;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Application {
    private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();

    public static void main(String[] args) throws Exception {
        // 分布式節點處理業務
        for (int i = 0; i < 5; i++) {
            EXECUTOR_SERVICE.execute(new DistributedDoubleBarrierRunnable());
        }
    }
}      

模拟​

​5​

​個分布式節點進入與離開分布式屏障,輸出如下所示:

pool-1-thread-3 等待進入屏障
pool-1-thread-5 等待進入屏障
pool-1-thread-2 等待進入屏障
pool-1-thread-1 等待進入屏障
pool-1-thread-4 等待進入屏障
pool-1-thread-4 等待了 0 s
pool-1-thread-4 進入屏障
pool-1-thread-3 等待了 9 s
pool-1-thread-3 進入屏障
pool-1-thread-2 等待了 6 s
pool-1-thread-2 進入屏障
pool-1-thread-1 等待了 5 s
pool-1-thread-1 進入屏障
pool-1-thread-5 等待了 8 s
pool-1-thread-5 進入屏障
pool-1-thread-1 離開屏障
pool-1-thread-3 離開屏障
pool-1-thread-5 離開屏障
pool-1-thread-2 離開屏障
pool-1-thread-4 離開屏障      

建立​

​DistributedDoubleBarrier​

​​執行個體,将​

​memberQty​

​​屬性設定為​

​5​

​​,當想進入屏障的節點數不少于​

​memberQty​

​的值時,這些節點就可以進入障礙。

DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(curator, barrierPath, 5);      

​memberQty​

​屬性是一個門檻值,而不是限制,門檻值的限制是比較寬松的,接下來進行測試。

public class Application {
    private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();

    public static void main(String[] args) throws Exception {
        // 分布式節點處理業務
        for (int i = 0; i < 7; i++) {
            EXECUTOR_SERVICE.execute(new DistributedDoubleBarrierRunnable());
        }
    }
}      

修改應用的運作配置。

ZooKeeper : Curator架構之分布式屏障DistributedDoubleBarrier

允許多個應用執行個體一起運作。

ZooKeeper : Curator架構之分布式屏障DistributedDoubleBarrier

運作兩個應用執行個體。

ZooKeeper : Curator架構之分布式屏障DistributedDoubleBarrier

第一個應用執行個體的輸出如下所示:

pool-1-thread-5 等待進入屏障
pool-1-thread-6 等待進入屏障
pool-1-thread-2 等待進入屏障
pool-1-thread-4 等待進入屏障
pool-1-thread-1 等待進入屏障
pool-1-thread-1 等待了 0 s
pool-1-thread-1 進入屏障
pool-1-thread-6 等待了 7 s
pool-1-thread-6 進入屏障
pool-1-thread-5 等待了 8 s
pool-1-thread-5 進入屏障
pool-1-thread-4 等待了 3 s
pool-1-thread-4 進入屏障
pool-1-thread-2 等待了 5 s
pool-1-thread-2 進入屏障
pool-1-thread-3 等待進入屏障
pool-1-thread-3 等待了 0 s
pool-1-thread-3 進入屏障
pool-1-thread-7 等待進入屏障
pool-1-thread-7 等待了 0 s
pool-1-thread-7 進入屏障
pool-1-thread-5 離開屏障
pool-1-thread-4 離開屏障
pool-1-thread-7 離開屏障
pool-1-thread-6 離開屏障
pool-1-thread-3 離開屏障
pool-1-thread-2 離開屏障
pool-1-thread-1 離開屏障      

第二個應用執行個體的輸出如下所示:

pool-1-thread-1 等待進入屏障
pool-1-thread-6 等待進入屏障
pool-1-thread-2 等待進入屏障
pool-1-thread-3 等待進入屏障
pool-1-thread-7 等待進入屏障
pool-1-thread-7 等待了 0 s
pool-1-thread-7 進入屏障
pool-1-thread-6 等待了 1 s
pool-1-thread-6 進入屏障
pool-1-thread-2 等待了 0 s
pool-1-thread-2 進入屏障
pool-1-thread-3 等待了 0 s
pool-1-thread-3 進入屏障
pool-1-thread-1 等待了 3 s
pool-1-thread-1 進入屏障
pool-1-thread-6 離開屏障
pool-1-thread-3 離開屏障
pool-1-thread-2 離開屏障
pool-1-thread-1 離開屏障
pool-1-thread-7 離開屏障
pool-1-thread-4 等待進入屏障
pool-1-thread-4 等待了 0 s
pool-1-thread-4 進入屏障
pool-1-thread-5 等待進入屏障
pool-1-thread-5 等待了 0 s
pool-1-thread-5 進入屏障
pool-1-thread-5 離開屏障
pool-1-thread-4 離開屏障      

兩個應用執行個體一起運作,相當于​

​14​

​​個節點進入與離開分布式屏障,在本次測試中,它們都成功進入與離開分布式屏障,而​

​14%5!=0​

​​,這是因為​

​memberQty​

​​屬性是一個門檻值,而不是限制,門檻值的限制是比較寬松的,當想進入屏障的節點數不少于​

​memberQty​

​​的值時,這些節點就可以進入障礙,而分布式環境是很複雜的,同時想要進入屏障的節點數可能有多個,隻要這些節點還沒有離開障礙就會一直存在,是以​

​14​

​​個節點是有可能全部成功進入與離開屏障。是以可以知道,某個節點想要進入屏障,隻要此時節點數滿足門檻值條件就可以進入,而不是限制每次隻能進入​

​memberQty​

​​個節點。而離開屏障沒有過多的限制,但離開屏障是一個集體操作,會讓該節點離開屏障,并且等待目前其他的節點全部離開屏障,才算離開屏障成功。部落客以後會分析​

​DistributedDoubleBarrier​

​​的源碼,到時候再詳細介紹​

​DistributedDoubleBarrier​

​的實作原理。

因為​

​memberQty​

​屬性是一個門檻值,而不是限制,是以多次測試的結果可能會有差異,下面是第二次測試的結果。

第一個應用執行個體的輸出如下所示:

pool-1-thread-2 等待進入屏障
pool-1-thread-7 等待進入屏障
pool-1-thread-5 等待進入屏障
pool-1-thread-5 等待了 0 s
pool-1-thread-5 進入屏障
pool-1-thread-2 等待了 8 s
pool-1-thread-2 進入屏障
pool-1-thread-7 等待了 2 s
pool-1-thread-7 進入屏障
pool-1-thread-3 等待進入屏障
pool-1-thread-3 等待了 0 s
pool-1-thread-3 進入屏障
pool-1-thread-2 離開屏障
pool-1-thread-5 離開屏障
pool-1-thread-7 離開屏障
pool-1-thread-3 離開屏障
pool-1-thread-4 等待進入屏障
pool-1-thread-6 等待進入屏障
pool-1-thread-6 等待了 0 s
pool-1-thread-6 進入屏障
pool-1-thread-4 等待了 3 s
pool-1-thread-4 進入屏障
pool-1-thread-4 離開屏障
pool-1-thread-6 離開屏障
pool-1-thread-1 等待進入屏障      

第二個應用執行個體的輸出如下所示:

pool-1-thread-3 等待進入屏障
pool-1-thread-7 等待進入屏障
pool-1-thread-3 等待了 6 s
pool-1-thread-3 進入屏障
pool-1-thread-7 等待了 3 s
pool-1-thread-7 進入屏障
pool-1-thread-7 離開屏障
pool-1-thread-3 離開屏障
pool-1-thread-6 等待進入屏障
pool-1-thread-1 等待進入屏障
pool-1-thread-2 等待進入屏障
pool-1-thread-6 等待了 5 s
pool-1-thread-6 進入屏障
pool-1-thread-2 等待了 0 s
pool-1-thread-2 進入屏障
pool-1-thread-1 等待了 3 s
pool-1-thread-1 進入屏障
pool-1-thread-6 離開屏障
pool-1-thread-2 離開屏障
pool-1-thread-1 離開屏障
pool-1-thread-4 等待進入屏障
pool-1-thread-5 等待進入屏障      

很顯然,這次測試的​

​14​

​個節點并沒有都成功進入與離開分布式屏障。