DistributedDoubleBarrier
DistributedDoubleBarrier
類的源碼注釋:
Double barriers enable clients to synchronize the beginning and the end of a computation. When enough processes have joined the barrier, processes start their computation and leave the barrier once they have finished.
雙屏障使用戶端能夠同步計算的開始和結束。當足夠多的程序加入屏障時,程序開始計算并在完成後離開屏障。
類比單體應用的屏障
CyclicBarrier
:
- Java并發程式設計:CountDownLatch、CyclicBarrier、Semaphore初使用
測試
pom.xml
:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.kaven</groupId>
<artifactId>zookeeper</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
</dependencies>
</project>
CuratorFrameworkProperties
類(提供
CuratorFramework
需要的一些配置資訊,以及建立
CuratorFramework
執行個體的方法):
package com.kaven.zookeeper;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorFrameworkProperties {
// 連接配接位址
public static final String CONNECT_ADDRESS = "192.168.1.3:9000";
// 連接配接逾時時間
public static final int CONNECTION_TIMEOUT_MS = 40000;
// Session逾時時間
public static final int SESSION_TIMEOUT_MS = 10000;
// 命名空間
public static final String NAMESPACE = "MyNamespace";
// 重試政策
public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 3);
public static CuratorFramework getCuratorFramework() {
// 建立CuratorFramework執行個體
CuratorFramework curator = CuratorFrameworkFactory.builder()
.connectString(CuratorFrameworkProperties.CONNECT_ADDRESS)
.retryPolicy(CuratorFrameworkProperties.RETRY_POLICY)
.connectionTimeoutMs(CuratorFrameworkProperties.CONNECTION_TIMEOUT_MS)
.sessionTimeoutMs(CuratorFrameworkProperties.SESSION_TIMEOUT_MS)
.namespace(CuratorFrameworkProperties.NAMESPACE)
.build();
curator.start();
assert curator.getState().equals(CuratorFrameworkState.STARTED);
return curator;
}
}
DistributedDoubleBarrierRunnable
類(實作了
Runnable
接口,模拟分布式節點進入與離開分布式屏障):
package com.kaven.zookeeper;
import lombok.SneakyThrows;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.barriers.DistributedDoubleBarrier;
import java.util.Random;
public class DistributedDoubleBarrierRunnable implements Runnable{
@SneakyThrows
@Override
public void run() {
// 使用不同的CuratorFramework執行個體,表示不同的分布式節點
CuratorFramework curator = CuratorFrameworkProperties.getCuratorFramework();
// 模拟随機加入的分布式節點
int randomSleep = new Random().nextInt(20000);
Thread.sleep(randomSleep);
// 分布式屏障的路徑
String barrierPath = "/kaven";
// 建立DistributedDoubleBarrier執行個體,用于提供分布式屏障功能
DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(curator, barrierPath, 5);
System.out.println(Thread.currentThread().getName() + " 等待進入屏障");
long start = System.currentTimeMillis();
// 等待進入屏障
barrier.enter();
System.out.println(Thread.currentThread().getName() + " 等待了 "
+ (System.currentTimeMillis() - start) / 1000 + " s");
System.out.println(Thread.currentThread().getName() + " 進入屏障");
Thread.sleep(1000);
// 等待離開屏障
barrier.leave();
System.out.println(Thread.currentThread().getName() + " 離開屏障");
}
}
啟動類:
package com.kaven.zookeeper;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Application {
private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();
public static void main(String[] args) throws Exception {
// 分布式節點處理業務
for (int i = 0; i < 5; i++) {
EXECUTOR_SERVICE.execute(new DistributedDoubleBarrierRunnable());
}
}
}
模拟
5
個分布式節點進入與離開分布式屏障,輸出如下所示:
pool-1-thread-3 等待進入屏障
pool-1-thread-5 等待進入屏障
pool-1-thread-2 等待進入屏障
pool-1-thread-1 等待進入屏障
pool-1-thread-4 等待進入屏障
pool-1-thread-4 等待了 0 s
pool-1-thread-4 進入屏障
pool-1-thread-3 等待了 9 s
pool-1-thread-3 進入屏障
pool-1-thread-2 等待了 6 s
pool-1-thread-2 進入屏障
pool-1-thread-1 等待了 5 s
pool-1-thread-1 進入屏障
pool-1-thread-5 等待了 8 s
pool-1-thread-5 進入屏障
pool-1-thread-1 離開屏障
pool-1-thread-3 離開屏障
pool-1-thread-5 離開屏障
pool-1-thread-2 離開屏障
pool-1-thread-4 離開屏障
建立
DistributedDoubleBarrier
執行個體,将
memberQty
屬性設定為
5
,當想進入屏障的節點數不少于
memberQty
的值時,這些節點就可以進入障礙。
DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(curator, barrierPath, 5);
memberQty
屬性是一個門檻值,而不是限制,門檻值的限制是比較寬松的,接下來進行測試。
public class Application {
private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();
public static void main(String[] args) throws Exception {
// 分布式節點處理業務
for (int i = 0; i < 7; i++) {
EXECUTOR_SERVICE.execute(new DistributedDoubleBarrierRunnable());
}
}
}
修改應用的運作配置。
允許多個應用執行個體一起運作。
運作兩個應用執行個體。
第一個應用執行個體的輸出如下所示:
pool-1-thread-5 等待進入屏障
pool-1-thread-6 等待進入屏障
pool-1-thread-2 等待進入屏障
pool-1-thread-4 等待進入屏障
pool-1-thread-1 等待進入屏障
pool-1-thread-1 等待了 0 s
pool-1-thread-1 進入屏障
pool-1-thread-6 等待了 7 s
pool-1-thread-6 進入屏障
pool-1-thread-5 等待了 8 s
pool-1-thread-5 進入屏障
pool-1-thread-4 等待了 3 s
pool-1-thread-4 進入屏障
pool-1-thread-2 等待了 5 s
pool-1-thread-2 進入屏障
pool-1-thread-3 等待進入屏障
pool-1-thread-3 等待了 0 s
pool-1-thread-3 進入屏障
pool-1-thread-7 等待進入屏障
pool-1-thread-7 等待了 0 s
pool-1-thread-7 進入屏障
pool-1-thread-5 離開屏障
pool-1-thread-4 離開屏障
pool-1-thread-7 離開屏障
pool-1-thread-6 離開屏障
pool-1-thread-3 離開屏障
pool-1-thread-2 離開屏障
pool-1-thread-1 離開屏障
第二個應用執行個體的輸出如下所示:
pool-1-thread-1 等待進入屏障
pool-1-thread-6 等待進入屏障
pool-1-thread-2 等待進入屏障
pool-1-thread-3 等待進入屏障
pool-1-thread-7 等待進入屏障
pool-1-thread-7 等待了 0 s
pool-1-thread-7 進入屏障
pool-1-thread-6 等待了 1 s
pool-1-thread-6 進入屏障
pool-1-thread-2 等待了 0 s
pool-1-thread-2 進入屏障
pool-1-thread-3 等待了 0 s
pool-1-thread-3 進入屏障
pool-1-thread-1 等待了 3 s
pool-1-thread-1 進入屏障
pool-1-thread-6 離開屏障
pool-1-thread-3 離開屏障
pool-1-thread-2 離開屏障
pool-1-thread-1 離開屏障
pool-1-thread-7 離開屏障
pool-1-thread-4 等待進入屏障
pool-1-thread-4 等待了 0 s
pool-1-thread-4 進入屏障
pool-1-thread-5 等待進入屏障
pool-1-thread-5 等待了 0 s
pool-1-thread-5 進入屏障
pool-1-thread-5 離開屏障
pool-1-thread-4 離開屏障
兩個應用執行個體一起運作,相當于
14
個節點進入與離開分布式屏障,在本次測試中,它們都成功進入與離開分布式屏障,而
14%5!=0
,這是因為
memberQty
屬性是一個門檻值,而不是限制,門檻值的限制是比較寬松的,當想進入屏障的節點數不少于
memberQty
的值時,這些節點就可以進入障礙,而分布式環境是很複雜的,同時想要進入屏障的節點數可能有多個,隻要這些節點還沒有離開障礙就會一直存在,是以
14
個節點是有可能全部成功進入與離開屏障。是以可以知道,某個節點想要進入屏障,隻要此時節點數滿足門檻值條件就可以進入,而不是限制每次隻能進入
memberQty
個節點。而離開屏障沒有過多的限制,但離開屏障是一個集體操作,會讓該節點離開屏障,并且等待目前其他的節點全部離開屏障,才算離開屏障成功。部落客以後會分析
DistributedDoubleBarrier
的源碼,到時候再詳細介紹
DistributedDoubleBarrier
的實作原理。
因為
memberQty
屬性是一個門檻值,而不是限制,是以多次測試的結果可能會有差異,下面是第二次測試的結果。
第一個應用執行個體的輸出如下所示:
pool-1-thread-2 等待進入屏障
pool-1-thread-7 等待進入屏障
pool-1-thread-5 等待進入屏障
pool-1-thread-5 等待了 0 s
pool-1-thread-5 進入屏障
pool-1-thread-2 等待了 8 s
pool-1-thread-2 進入屏障
pool-1-thread-7 等待了 2 s
pool-1-thread-7 進入屏障
pool-1-thread-3 等待進入屏障
pool-1-thread-3 等待了 0 s
pool-1-thread-3 進入屏障
pool-1-thread-2 離開屏障
pool-1-thread-5 離開屏障
pool-1-thread-7 離開屏障
pool-1-thread-3 離開屏障
pool-1-thread-4 等待進入屏障
pool-1-thread-6 等待進入屏障
pool-1-thread-6 等待了 0 s
pool-1-thread-6 進入屏障
pool-1-thread-4 等待了 3 s
pool-1-thread-4 進入屏障
pool-1-thread-4 離開屏障
pool-1-thread-6 離開屏障
pool-1-thread-1 等待進入屏障
第二個應用執行個體的輸出如下所示:
pool-1-thread-3 等待進入屏障
pool-1-thread-7 等待進入屏障
pool-1-thread-3 等待了 6 s
pool-1-thread-3 進入屏障
pool-1-thread-7 等待了 3 s
pool-1-thread-7 進入屏障
pool-1-thread-7 離開屏障
pool-1-thread-3 離開屏障
pool-1-thread-6 等待進入屏障
pool-1-thread-1 等待進入屏障
pool-1-thread-2 等待進入屏障
pool-1-thread-6 等待了 5 s
pool-1-thread-6 進入屏障
pool-1-thread-2 等待了 0 s
pool-1-thread-2 進入屏障
pool-1-thread-1 等待了 3 s
pool-1-thread-1 進入屏障
pool-1-thread-6 離開屏障
pool-1-thread-2 離開屏障
pool-1-thread-1 離開屏障
pool-1-thread-4 等待進入屏障
pool-1-thread-5 等待進入屏障
很顯然,這次測試的
14
個節點并沒有都成功進入與離開分布式屏障。