SharedCount
SharedCount
類的源碼注釋:
Manages a shared integer. All clients watching the same path will have the up-to-date value of the shared integer (considering ZK’s normal consistency guarantees).
管理一個共享整數。檢視同一路徑的所有用戶端都将擁有共享整數的最新值(考慮到
的正常一緻性保證)。
ZK
SharedCount
類的源碼:
public class SharedCount implements Closeable, SharedCountReader, Listenable<SharedCountListener>
{
// 存儲監聽器
private final Map<SharedCountListener, SharedValueListener> listeners = Maps.newConcurrentMap();
// SharedValue用于管理一個共享值,SharedCount将共享整數的維護委托給該SharedValue執行個體
private final SharedValue sharedValue;
/**
* client: CuratorFramework執行個體,用于與Zookeeper進行互動
* path: 共享計數器的路徑,即存儲共享整數的位置
* seedValue: 如果共享計數器的路徑尚未建立,則設定該值為共享整數的初始值
*/
public SharedCount(CuratorFramework client, String path, int seedValue)
{
// 初始化SharedValue執行個體
sharedValue = new SharedValue(client, path, toBytes(seedValue));
}
protected SharedCount(CuratorFramework client, String path, SharedValue sv)
{
sharedValue = sv;
}
// 擷取共享整數的值
@Override
public int getCount()
{
return fromBytes(sharedValue.getValue());
}
// 擷取共享整數的版本和值,封裝在VersionedValue執行個體中
@Override
public VersionedValue<Integer> getVersionedValue()
{
VersionedValue<byte[]> localValue = sharedValue.getVersionedValue();
return new VersionedValue<Integer>(localValue.getVersion(), fromBytes(localValue.getValue()));
}
/**
* 更改共享整數的值,而不管其先前的狀态如何
*/
public void setCount(int newCount) throws Exception
{
sharedValue.setValue(toBytes(newCount));
}
@Deprecated
public boolean trySetCount(int newCount) throws Exception
{
return sharedValue.trySetValue(toBytes(newCount));
}
/**
* 僅當共享整數的版本和指定的版本一緻時,才更改共享整數的值
*/
public boolean trySetCount(VersionedValue<Integer> previous, int newCount) throws Exception
{
VersionedValue<byte[]> previousCopy = new VersionedValue<byte[]>(previous.getVersion(), toBytes(previous.getValue()));
return sharedValue.trySetValue(previousCopy, toBytes(newCount));
}
// 添加監聽器
@Override
public void addListener(SharedCountListener listener)
{
addListener(listener, MoreExecutors.directExecutor());
}
// 添加監聽器與執行該監聽器的Executor執行個體
@Override
public void addListener(final SharedCountListener listener, Executor executor)
{
// 使用匿名内部類建立SharedValueListener執行個體
SharedValueListener valueListener = new SharedValueListener()
{
@Override
public void valueHasChanged(SharedValueReader sharedValue, byte[] newValue) throws Exception
{
// 通過SharedValueListener執行個體來通知listener共享整數的值改變
listener.countHasChanged(SharedCount.this, fromBytes(newValue));
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
// 通過SharedValueListener執行個體來通知listener共享整數的狀态改變
listener.stateChanged(client, newState);
}
};
// 将SharedValueListener執行個體和Executor執行個體添加到sharedValue執行個體中
sharedValue.getListenable().addListener(valueListener, executor);
// 将listener和SharedValueListener執行個體的映射存儲到listeners屬性中
listeners.put(listener, valueListener);
}
// 移除監聽器
@Override
public void removeListener(SharedCountListener listener)
{
// 删除listeners屬性中以該監聽器為key的鍵值對
// 如果listeners屬性存在以該監聽器為key的鍵值對
// 則會獲得對應的SharedValueListener執行個體
SharedValueListener valueListener = listeners.remove(listener);
if(valueListener != null) {
// 如果listeners屬性存在以該監聽器為key的鍵值對
// 則删除sharedValue執行個體中對應的SharedValueListener執行個體
sharedValue.getListenable().removeListener(valueListener);
}
}
/**
* 必須先啟動共享計數器,然後才能使用它
*/
public void start() throws Exception
{
sharedValue.start();
}
// 使用完共享計數器後需要調用close()進行關閉
@Override
public void close() throws IOException
{
sharedValue.close();
}
// 将整數轉換成位元組數組
@VisibleForTesting
static byte[] toBytes(int value)
{
byte[] bytes = new byte[4];
ByteBuffer.wrap(bytes).putInt(value);
return bytes;
}
// 将位元組數組轉換成整數
private static int fromBytes(byte[] bytes)
{
return ByteBuffer.wrap(bytes).getInt();
}
}
SharedCount
将共享整數的維護委托給了
SharedValue
,
SharedValue
的源碼以後再介紹。
測試
pom.xml
:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.kaven</groupId>
<artifactId>zookeeper</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
</dependencies>
</project>
CuratorFrameworkProperties
類(提供
CuratorFramework
需要的一些配置資訊,以及建立
CuratorFramework
執行個體的方法):
package com.kaven.zookeeper;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorFrameworkProperties {
// 連接配接位址
public static final String CONNECT_ADDRESS = "192.168.1.3:9000";
// 連接配接逾時時間
public static final int CONNECTION_TIMEOUT_MS = 40000;
// Session逾時時間
public static final int SESSION_TIMEOUT_MS = 10000;
// 命名空間
public static final String NAMESPACE = "MyNamespace";
// 重試政策
public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 3);
public static CuratorFramework getCuratorFramework() {
// 建立CuratorFramework執行個體
CuratorFramework curator = CuratorFrameworkFactory.builder()
.connectString(CuratorFrameworkProperties.CONNECT_ADDRESS)
.retryPolicy(CuratorFrameworkProperties.RETRY_POLICY)
.connectionTimeoutMs(CuratorFrameworkProperties.CONNECTION_TIMEOUT_MS)
.sessionTimeoutMs(CuratorFrameworkProperties.SESSION_TIMEOUT_MS)
.namespace(CuratorFrameworkProperties.NAMESPACE)
.build();
curator.start();
assert curator.getState().equals(CuratorFrameworkState.STARTED);
return curator;
}
}
SharedCountRunnable
類(實作了
Runnable
接口,模拟分布式節點使用共享計數器):
package com.kaven.zookeeper;
import lombok.SneakyThrows;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.shared.SharedCount;
import java.util.Random;
public class SharedCountRunnable implements Runnable{
@SneakyThrows
@Override
public void run() {
// 使用不同的CuratorFramework執行個體,表示不同的分布式節點
CuratorFramework curator = CuratorFrameworkProperties.getCuratorFramework();
// 模拟随機加入的分布式節點
int randomSleep = new Random().nextInt(10000);
Thread.sleep(randomSleep);
// 共享計數器的路徑
String path = "/kaven";
int seedValue = 0;
// 建立SharedCount執行個體,用于提供共享計數器功能
// 如果共享計數器的路徑尚未建立,則使用seedValue的值作為共享整數的初始值
SharedCount sharedCount = new SharedCount(curator, path, seedValue);
// 必須先啟動共享計數器,然後才能使用它
sharedCount.start();
// 使用随機數來設定共享整數的值
int randomValue = new Random().nextInt(1000);
sharedCount.setCount(randomValue);
System.out.println(Thread.currentThread().getName() + " 修改共享計數器的值為 " + randomValue);
Thread.sleep(randomSleep);
System.out.println(
"*** " + Thread.currentThread().getName() +
" 擷取共享計數器的值為 " + sharedCount.getCount());
// 使用完共享計數器後需要調用close()進行關閉
sharedCount.close();
}
}
啟動類:
package com.kaven.zookeeper;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Application {
private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();
public static void main(String[] args) throws Exception {
// 分布式節點處理業務
for (int i = 0; i < 15; i++) {
EXECUTOR_SERVICE.execute(new SharedCountRunnable());
}
}
}
模拟
15
個分布式節點使用共享計數器,輸出如下所示:
pool-1-thread-6 修改共享計數器的值為 525
pool-1-thread-2 修改共享計數器的值為 379
pool-1-thread-12 修改共享計數器的值為 60
pool-1-thread-4 修改共享計數器的值為 387
pool-1-thread-5 修改共享計數器的值為 821
pool-1-thread-15 修改共享計數器的值為 273
pool-1-thread-9 修改共享計數器的值為 410
*** pool-1-thread-2 擷取共享計數器的值為 410
pool-1-thread-8 修改共享計數器的值為 897
pool-1-thread-1 修改共享計數器的值為 871
pool-1-thread-10 修改共享計數器的值為 803
*** pool-1-thread-12 擷取共享計數器的值為 803
pool-1-thread-13 修改共享計數器的值為 210
*** pool-1-thread-6 擷取共享計數器的值為 210
*** pool-1-thread-4 擷取共享計數器的值為 210
pool-1-thread-11 修改共享計數器的值為 227
pool-1-thread-7 修改共享計數器的值為 445
pool-1-thread-14 修改共享計數器的值為 871
pool-1-thread-3 修改共享計數器的值為 705
*** pool-1-thread-5 擷取共享計數器的值為 705
*** pool-1-thread-15 擷取共享計數器的值為 705
*** pool-1-thread-9 擷取共享計數器的值為 705
*** pool-1-thread-8 擷取共享計數器的值為 705
*** pool-1-thread-1 擷取共享計數器的值為 705
*** pool-1-thread-10 擷取共享計數器的值為 705
*** pool-1-thread-13 擷取共享計數器的值為 705
*** pool-1-thread-11 擷取共享計數器的值為 705
*** pool-1-thread-7 擷取共享計數器的值為 705
*** pool-1-thread-14 擷取共享計數器的值為 705
*** pool-1-thread-3 擷取共享計數器的值為 705
輸出符合預期。
public class SharedCountRunnable implements Runnable{
@SneakyThrows
@Override
public void run() {
// 使用不同的CuratorFramework執行個體,表示不同的分布式節點
CuratorFramework curator = CuratorFrameworkProperties.getCuratorFramework();
// 模拟随機加入的分布式節點
int randomSleep = new Random().nextInt(1000);
Thread.sleep(randomSleep);
// 共享計數器的路徑
String path = "/kaven";
int seedValue = 0;
// 建立SharedCount執行個體,用于提供共享計數器功能
// 如果共享計數器的路徑尚未建立,則使用seedValue的值作為共享整數的初始值
SharedCount sharedCount = new SharedCount(curator, path, seedValue);
// 必須先啟動共享計數器,然後才能使用它
sharedCount.start();
// 使用随機數來設定共享整數的值
int randomValue = new Random().nextInt(1000);
// 擷取VersionedValue執行個體用于修改共享整數的值
sharedCount.trySetCount(sharedCount.getVersionedValue(), randomValue);
System.out.println(Thread.currentThread().getName() + " 修改共享計數器的值為 " + randomValue);
Thread.sleep(randomSleep);
System.out.println(
"*** " + Thread.currentThread().getName() +
" 擷取共享計數器的值為 " + sharedCount.getCount());
// 使用完共享計數器後需要調用close()進行關閉
sharedCount.close();
}
}
輸出如下所示:
pool-1-thread-7 修改共享計數器的值為 797
pool-1-thread-1 修改共享計數器的值為 291
pool-1-thread-2 修改共享計數器的值為 85
*** pool-1-thread-2 擷取共享計數器的值為 291
pool-1-thread-12 修改共享計數器的值為 696
pool-1-thread-8 修改共享計數器的值為 258
*** pool-1-thread-7 擷取共享計數器的值為 696
*** pool-1-thread-1 擷取共享計數器的值為 258
*** pool-1-thread-12 擷取共享計數器的值為 258
pool-1-thread-14 修改共享計數器的值為 426
*** pool-1-thread-8 擷取共享計數器的值為 426
pool-1-thread-9 修改共享計數器的值為 704
pool-1-thread-5 修改共享計數器的值為 187
pool-1-thread-13 修改共享計數器的值為 662
pool-1-thread-4 修改共享計數器的值為 390
pool-1-thread-15 修改共享計數器的值為 702
pool-1-thread-11 修改共享計數器的值為 65
pool-1-thread-6 修改共享計數器的值為 84
pool-1-thread-10 修改共享計數器的值為 93
*** pool-1-thread-14 擷取共享計數器的值為 93
*** pool-1-thread-9 擷取共享計數器的值為 93
*** pool-1-thread-5 擷取共享計數器的值為 93
*** pool-1-thread-13 擷取共享計數器的值為 93
*** pool-1-thread-4 擷取共享計數器的值為 93
*** pool-1-thread-15 擷取共享計數器的值為 93
*** pool-1-thread-11 擷取共享計數器的值為 93
pool-1-thread-3 修改共享計數器的值為 276
*** pool-1-thread-6 擷取共享計數器的值為 276
*** pool-1-thread-10 擷取共享計數器的值為 276
*** pool-1-thread-3 擷取共享計數器的值為 276
輸出也是符合預期的,也許有人會認為下面這段輸出是不符合預期的,但部落客認為是符合預期的,因為部落客這裡的
ZooKeeper
服務端是單機模式,是以不用考慮叢集的一緻性問題,如果值在服務端修改成功,讀取肯定是最新的值,而造成這種情況的原因,可能是
CPU
時間片輪詢排程的原因(
15
個線程全部在一台計算機上運作),也有可能是列印字元串的相關函數擷取
I/O
裝置存在快慢的原因,部落客覺得是此類原因導緻的,如果
ZooKeeper
服務端是叢集模式,倒是有可能是叢集的一緻性問題導緻的。
pool-1-thread-12 修改共享計數器的值為 696
pool-1-thread-8 修改共享計數器的值為 258
*** pool-1-thread-7 擷取共享計數器的值為 696
*** pool-1-thread-1 擷取共享計數器的值為 258
*** pool-1-thread-12 擷取共享計數器的值為 258
分布式環境是非常複雜的,擷取
VersionedValue
執行個體然後立即用于修改共享整數的值也是有可能不成功的,因為擷取的
VersionedValue
執行個體可能瞬間就失效了,比如有其他的節點在這個時間間隙中修改了共享整數的值。
// 擷取VersionedValue執行個體用于修改共享整數的值
sharedCount.trySetCount(sharedCount.getVersionedValue(), randomValue);
監聽器
public class SharedCountRunnable implements Runnable{
// 是否是監聽節點
private final boolean isListenerNode;
public SharedCountRunnable(boolean isListenerNode) {
this.isListenerNode = isListenerNode;
}
@SneakyThrows
@Override
public void run() {
// 使用不同的CuratorFramework執行個體,表示不同的分布式節點
CuratorFramework curator = CuratorFrameworkProperties.getCuratorFramework();
// 模拟随機加入的分布式節點
int randomSleep = new Random().nextInt(100);
// 讓監聽節點先添加監聽器
if(!isListenerNode) {
Thread.sleep(3000 + randomSleep);
}
// 共享計數器的路徑
String path = "/kaven";
int seedValue = 0;
// 建立SharedCount執行個體,用于提供共享計數器功能
// 如果共享計數器的路徑尚未建立,則使用seedValue的值作為共享整數的初始值
SharedCount sharedCount = new SharedCount(curator, path, seedValue);
// 必須先啟動共享計數器,然後才能使用它
sharedCount.start();
if(isListenerNode) {
// 監聽節點添加監聽器
sharedCount.addListener(new SharedCountListener(){
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {}
@Override
public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
System.out.println("[Listener] : 共享整數被修改成 " + newCount);
}
});
System.out.println("監聽器已添加");
Thread.sleep(1000000);
}
else {
update(sharedCount);
}
// 使用完共享計數器後需要調用close()進行關閉
sharedCount.close();
}
private void update(SharedCount sharedCount) throws Exception {
// 使用随機數來設定共享整數的值
int randomValue = new Random().nextInt(1000);
// 擷取VersionedValue執行個體用于修改共享整數的值
sharedCount.trySetCount(sharedCount.getVersionedValue(), randomValue);
System.out.println(Thread.currentThread().getName() + " 修改共享計數器的值為 " + randomValue);
}
}
public class Application {
private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();
public static void main(String[] args) throws Exception {
// 分布式節點處理業務
for (int i = 0; i < 30; i++) {
// 設定一個監聽節點,其他節點用于修改共享整數的值
if(i > 0) EXECUTOR_SERVICE.execute(new SharedCountRunnable(false));
else EXECUTOR_SERVICE.execute(new SharedCountRunnable(true));
}
}
}
輸出如下所示:
監聽器已添加
pool-1-thread-29 修改共享計數器的值為 178
pool-1-thread-22 修改共享計數器的值為 526
[Listener] : 共享整數被修改成 526
pool-1-thread-26 修改共享計數器的值為 76
pool-1-thread-14 修改共享計數器的值為 27
pool-1-thread-4 修改共享計數器的值為 907
[Listener] : 共享整數被修改成 907
pool-1-thread-16 修改共享計數器的值為 616
pool-1-thread-18 修改共享計數器的值為 21
pool-1-thread-13 修改共享計數器的值為 851
pool-1-thread-15 修改共享計數器的值為 569
[Listener] : 共享整數被修改成 569
pool-1-thread-20 修改共享計數器的值為 806
pool-1-thread-21 修改共享計數器的值為 961
pool-1-thread-25 修改共享計數器的值為 562
pool-1-thread-19 修改共享計數器的值為 99
pool-1-thread-6 修改共享計數器的值為 784
pool-1-thread-11 修改共享計數器的值為 202
pool-1-thread-24 修改共享計數器的值為 97
[Listener] : 共享整數被修改成 202
pool-1-thread-17 修改共享計數器的值為 139
pool-1-thread-12 修改共享計數器的值為 437
[Listener] : 共享整數被修改成 437
pool-1-thread-7 修改共享計數器的值為 156
pool-1-thread-8 修改共享計數器的值為 416
pool-1-thread-28 修改共享計數器的值為 930
pool-1-thread-10 修改共享計數器的值為 857
pool-1-thread-23 修改共享計數器的值為 945
pool-1-thread-5 修改共享計數器的值為 38
pool-1-thread-27 修改共享計數器的值為 422
[Listener] : 共享整數被修改成 422
pool-1-thread-9 修改共享計數器的值為 428
pool-1-thread-2 修改共享計數器的值為 413
pool-1-thread-30 修改共享計數器的值為 59
pool-1-thread-3 修改共享計數器的值為 539