SharedCount
SharedCount
类的源码注释:
Manages a shared integer. All clients watching the same path will have the up-to-date value of the shared integer (considering ZK’s normal consistency guarantees).
管理一个共享整数。查看同一路径的所有客户端都将拥有共享整数的最新值(考虑到
的正常一致性保证)。
ZK
SharedCount
类的源码:
public class SharedCount implements Closeable, SharedCountReader, Listenable<SharedCountListener>
{
// 存储监听器
private final Map<SharedCountListener, SharedValueListener> listeners = Maps.newConcurrentMap();
// SharedValue用于管理一个共享值,SharedCount将共享整数的维护委托给该SharedValue实例
private final SharedValue sharedValue;
/**
* client: CuratorFramework实例,用于与Zookeeper进行交互
* path: 共享计数器的路径,即存储共享整数的位置
* seedValue: 如果共享计数器的路径尚未创建,则设置该值为共享整数的初始值
*/
public SharedCount(CuratorFramework client, String path, int seedValue)
{
// 初始化SharedValue实例
sharedValue = new SharedValue(client, path, toBytes(seedValue));
}
protected SharedCount(CuratorFramework client, String path, SharedValue sv)
{
sharedValue = sv;
}
// 获取共享整数的值
@Override
public int getCount()
{
return fromBytes(sharedValue.getValue());
}
// 获取共享整数的版本和值,封装在VersionedValue实例中
@Override
public VersionedValue<Integer> getVersionedValue()
{
VersionedValue<byte[]> localValue = sharedValue.getVersionedValue();
return new VersionedValue<Integer>(localValue.getVersion(), fromBytes(localValue.getValue()));
}
/**
* 更改共享整数的值,而不管其先前的状态如何
*/
public void setCount(int newCount) throws Exception
{
sharedValue.setValue(toBytes(newCount));
}
@Deprecated
public boolean trySetCount(int newCount) throws Exception
{
return sharedValue.trySetValue(toBytes(newCount));
}
/**
* 仅当共享整数的版本和指定的版本一致时,才更改共享整数的值
*/
public boolean trySetCount(VersionedValue<Integer> previous, int newCount) throws Exception
{
VersionedValue<byte[]> previousCopy = new VersionedValue<byte[]>(previous.getVersion(), toBytes(previous.getValue()));
return sharedValue.trySetValue(previousCopy, toBytes(newCount));
}
// 添加监听器
@Override
public void addListener(SharedCountListener listener)
{
addListener(listener, MoreExecutors.directExecutor());
}
// 添加监听器与执行该监听器的Executor实例
@Override
public void addListener(final SharedCountListener listener, Executor executor)
{
// 使用匿名内部类创建SharedValueListener实例
SharedValueListener valueListener = new SharedValueListener()
{
@Override
public void valueHasChanged(SharedValueReader sharedValue, byte[] newValue) throws Exception
{
// 通过SharedValueListener实例来通知listener共享整数的值改变
listener.countHasChanged(SharedCount.this, fromBytes(newValue));
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
// 通过SharedValueListener实例来通知listener共享整数的状态改变
listener.stateChanged(client, newState);
}
};
// 将SharedValueListener实例和Executor实例添加到sharedValue实例中
sharedValue.getListenable().addListener(valueListener, executor);
// 将listener和SharedValueListener实例的映射存储到listeners属性中
listeners.put(listener, valueListener);
}
// 移除监听器
@Override
public void removeListener(SharedCountListener listener)
{
// 删除listeners属性中以该监听器为key的键值对
// 如果listeners属性存在以该监听器为key的键值对
// 则会获得对应的SharedValueListener实例
SharedValueListener valueListener = listeners.remove(listener);
if(valueListener != null) {
// 如果listeners属性存在以该监听器为key的键值对
// 则删除sharedValue实例中对应的SharedValueListener实例
sharedValue.getListenable().removeListener(valueListener);
}
}
/**
* 必须先启动共享计数器,然后才能使用它
*/
public void start() throws Exception
{
sharedValue.start();
}
// 使用完共享计数器后需要调用close()进行关闭
@Override
public void close() throws IOException
{
sharedValue.close();
}
// 将整数转换成字节数组
@VisibleForTesting
static byte[] toBytes(int value)
{
byte[] bytes = new byte[4];
ByteBuffer.wrap(bytes).putInt(value);
return bytes;
}
// 将字节数组转换成整数
private static int fromBytes(byte[] bytes)
{
return ByteBuffer.wrap(bytes).getInt();
}
}
SharedCount
将共享整数的维护委托给了
SharedValue
,
SharedValue
的源码以后再介绍。
测试
pom.xml
:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.kaven</groupId>
<artifactId>zookeeper</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
</dependencies>
</project>
CuratorFrameworkProperties
类(提供
CuratorFramework
需要的一些配置信息,以及创建
CuratorFramework
实例的方法):
package com.kaven.zookeeper;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorFrameworkProperties {
// 连接地址
public static final String CONNECT_ADDRESS = "192.168.1.3:9000";
// 连接超时时间
public static final int CONNECTION_TIMEOUT_MS = 40000;
// Session超时时间
public static final int SESSION_TIMEOUT_MS = 10000;
// 命名空间
public static final String NAMESPACE = "MyNamespace";
// 重试策略
public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 3);
public static CuratorFramework getCuratorFramework() {
// 创建CuratorFramework实例
CuratorFramework curator = CuratorFrameworkFactory.builder()
.connectString(CuratorFrameworkProperties.CONNECT_ADDRESS)
.retryPolicy(CuratorFrameworkProperties.RETRY_POLICY)
.connectionTimeoutMs(CuratorFrameworkProperties.CONNECTION_TIMEOUT_MS)
.sessionTimeoutMs(CuratorFrameworkProperties.SESSION_TIMEOUT_MS)
.namespace(CuratorFrameworkProperties.NAMESPACE)
.build();
curator.start();
assert curator.getState().equals(CuratorFrameworkState.STARTED);
return curator;
}
}
SharedCountRunnable
类(实现了
Runnable
接口,模拟分布式节点使用共享计数器):
package com.kaven.zookeeper;
import lombok.SneakyThrows;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.shared.SharedCount;
import java.util.Random;
public class SharedCountRunnable implements Runnable{
@SneakyThrows
@Override
public void run() {
// 使用不同的CuratorFramework实例,表示不同的分布式节点
CuratorFramework curator = CuratorFrameworkProperties.getCuratorFramework();
// 模拟随机加入的分布式节点
int randomSleep = new Random().nextInt(10000);
Thread.sleep(randomSleep);
// 共享计数器的路径
String path = "/kaven";
int seedValue = 0;
// 创建SharedCount实例,用于提供共享计数器功能
// 如果共享计数器的路径尚未创建,则使用seedValue的值作为共享整数的初始值
SharedCount sharedCount = new SharedCount(curator, path, seedValue);
// 必须先启动共享计数器,然后才能使用它
sharedCount.start();
// 使用随机数来设置共享整数的值
int randomValue = new Random().nextInt(1000);
sharedCount.setCount(randomValue);
System.out.println(Thread.currentThread().getName() + " 修改共享计数器的值为 " + randomValue);
Thread.sleep(randomSleep);
System.out.println(
"*** " + Thread.currentThread().getName() +
" 获取共享计数器的值为 " + sharedCount.getCount());
// 使用完共享计数器后需要调用close()进行关闭
sharedCount.close();
}
}
启动类:
package com.kaven.zookeeper;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Application {
private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();
public static void main(String[] args) throws Exception {
// 分布式节点处理业务
for (int i = 0; i < 15; i++) {
EXECUTOR_SERVICE.execute(new SharedCountRunnable());
}
}
}
模拟
15
个分布式节点使用共享计数器,输出如下所示:
pool-1-thread-6 修改共享计数器的值为 525
pool-1-thread-2 修改共享计数器的值为 379
pool-1-thread-12 修改共享计数器的值为 60
pool-1-thread-4 修改共享计数器的值为 387
pool-1-thread-5 修改共享计数器的值为 821
pool-1-thread-15 修改共享计数器的值为 273
pool-1-thread-9 修改共享计数器的值为 410
*** pool-1-thread-2 获取共享计数器的值为 410
pool-1-thread-8 修改共享计数器的值为 897
pool-1-thread-1 修改共享计数器的值为 871
pool-1-thread-10 修改共享计数器的值为 803
*** pool-1-thread-12 获取共享计数器的值为 803
pool-1-thread-13 修改共享计数器的值为 210
*** pool-1-thread-6 获取共享计数器的值为 210
*** pool-1-thread-4 获取共享计数器的值为 210
pool-1-thread-11 修改共享计数器的值为 227
pool-1-thread-7 修改共享计数器的值为 445
pool-1-thread-14 修改共享计数器的值为 871
pool-1-thread-3 修改共享计数器的值为 705
*** pool-1-thread-5 获取共享计数器的值为 705
*** pool-1-thread-15 获取共享计数器的值为 705
*** pool-1-thread-9 获取共享计数器的值为 705
*** pool-1-thread-8 获取共享计数器的值为 705
*** pool-1-thread-1 获取共享计数器的值为 705
*** pool-1-thread-10 获取共享计数器的值为 705
*** pool-1-thread-13 获取共享计数器的值为 705
*** pool-1-thread-11 获取共享计数器的值为 705
*** pool-1-thread-7 获取共享计数器的值为 705
*** pool-1-thread-14 获取共享计数器的值为 705
*** pool-1-thread-3 获取共享计数器的值为 705
输出符合预期。
public class SharedCountRunnable implements Runnable{
@SneakyThrows
@Override
public void run() {
// 使用不同的CuratorFramework实例,表示不同的分布式节点
CuratorFramework curator = CuratorFrameworkProperties.getCuratorFramework();
// 模拟随机加入的分布式节点
int randomSleep = new Random().nextInt(1000);
Thread.sleep(randomSleep);
// 共享计数器的路径
String path = "/kaven";
int seedValue = 0;
// 创建SharedCount实例,用于提供共享计数器功能
// 如果共享计数器的路径尚未创建,则使用seedValue的值作为共享整数的初始值
SharedCount sharedCount = new SharedCount(curator, path, seedValue);
// 必须先启动共享计数器,然后才能使用它
sharedCount.start();
// 使用随机数来设置共享整数的值
int randomValue = new Random().nextInt(1000);
// 获取VersionedValue实例用于修改共享整数的值
sharedCount.trySetCount(sharedCount.getVersionedValue(), randomValue);
System.out.println(Thread.currentThread().getName() + " 修改共享计数器的值为 " + randomValue);
Thread.sleep(randomSleep);
System.out.println(
"*** " + Thread.currentThread().getName() +
" 获取共享计数器的值为 " + sharedCount.getCount());
// 使用完共享计数器后需要调用close()进行关闭
sharedCount.close();
}
}
输出如下所示:
pool-1-thread-7 修改共享计数器的值为 797
pool-1-thread-1 修改共享计数器的值为 291
pool-1-thread-2 修改共享计数器的值为 85
*** pool-1-thread-2 获取共享计数器的值为 291
pool-1-thread-12 修改共享计数器的值为 696
pool-1-thread-8 修改共享计数器的值为 258
*** pool-1-thread-7 获取共享计数器的值为 696
*** pool-1-thread-1 获取共享计数器的值为 258
*** pool-1-thread-12 获取共享计数器的值为 258
pool-1-thread-14 修改共享计数器的值为 426
*** pool-1-thread-8 获取共享计数器的值为 426
pool-1-thread-9 修改共享计数器的值为 704
pool-1-thread-5 修改共享计数器的值为 187
pool-1-thread-13 修改共享计数器的值为 662
pool-1-thread-4 修改共享计数器的值为 390
pool-1-thread-15 修改共享计数器的值为 702
pool-1-thread-11 修改共享计数器的值为 65
pool-1-thread-6 修改共享计数器的值为 84
pool-1-thread-10 修改共享计数器的值为 93
*** pool-1-thread-14 获取共享计数器的值为 93
*** pool-1-thread-9 获取共享计数器的值为 93
*** pool-1-thread-5 获取共享计数器的值为 93
*** pool-1-thread-13 获取共享计数器的值为 93
*** pool-1-thread-4 获取共享计数器的值为 93
*** pool-1-thread-15 获取共享计数器的值为 93
*** pool-1-thread-11 获取共享计数器的值为 93
pool-1-thread-3 修改共享计数器的值为 276
*** pool-1-thread-6 获取共享计数器的值为 276
*** pool-1-thread-10 获取共享计数器的值为 276
*** pool-1-thread-3 获取共享计数器的值为 276
输出也是符合预期的,也许有人会认为下面这段输出是不符合预期的,但博主认为是符合预期的,因为博主这里的
ZooKeeper
服务端是单机模式,因此不用考虑集群的一致性问题,如果值在服务端修改成功,读取肯定是最新的值,而造成这种情况的原因,可能是
CPU
时间片轮询调度的原因(
15
个线程全部在一台计算机上运行),也有可能是打印字符串的相关函数获取
I/O
设备存在快慢的原因,博主觉得是此类原因导致的,如果
ZooKeeper
服务端是集群模式,倒是有可能是集群的一致性问题导致的。
pool-1-thread-12 修改共享计数器的值为 696
pool-1-thread-8 修改共享计数器的值为 258
*** pool-1-thread-7 获取共享计数器的值为 696
*** pool-1-thread-1 获取共享计数器的值为 258
*** pool-1-thread-12 获取共享计数器的值为 258
分布式环境是非常复杂的,获取
VersionedValue
实例然后立即用于修改共享整数的值也是有可能不成功的,因为获取的
VersionedValue
实例可能瞬间就失效了,比如有其他的节点在这个时间间隙中修改了共享整数的值。
// 获取VersionedValue实例用于修改共享整数的值
sharedCount.trySetCount(sharedCount.getVersionedValue(), randomValue);
监听器
public class SharedCountRunnable implements Runnable{
// 是否是监听节点
private final boolean isListenerNode;
public SharedCountRunnable(boolean isListenerNode) {
this.isListenerNode = isListenerNode;
}
@SneakyThrows
@Override
public void run() {
// 使用不同的CuratorFramework实例,表示不同的分布式节点
CuratorFramework curator = CuratorFrameworkProperties.getCuratorFramework();
// 模拟随机加入的分布式节点
int randomSleep = new Random().nextInt(100);
// 让监听节点先添加监听器
if(!isListenerNode) {
Thread.sleep(3000 + randomSleep);
}
// 共享计数器的路径
String path = "/kaven";
int seedValue = 0;
// 创建SharedCount实例,用于提供共享计数器功能
// 如果共享计数器的路径尚未创建,则使用seedValue的值作为共享整数的初始值
SharedCount sharedCount = new SharedCount(curator, path, seedValue);
// 必须先启动共享计数器,然后才能使用它
sharedCount.start();
if(isListenerNode) {
// 监听节点添加监听器
sharedCount.addListener(new SharedCountListener(){
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {}
@Override
public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
System.out.println("[Listener] : 共享整数被修改成 " + newCount);
}
});
System.out.println("监听器已添加");
Thread.sleep(1000000);
}
else {
update(sharedCount);
}
// 使用完共享计数器后需要调用close()进行关闭
sharedCount.close();
}
private void update(SharedCount sharedCount) throws Exception {
// 使用随机数来设置共享整数的值
int randomValue = new Random().nextInt(1000);
// 获取VersionedValue实例用于修改共享整数的值
sharedCount.trySetCount(sharedCount.getVersionedValue(), randomValue);
System.out.println(Thread.currentThread().getName() + " 修改共享计数器的值为 " + randomValue);
}
}
public class Application {
private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();
public static void main(String[] args) throws Exception {
// 分布式节点处理业务
for (int i = 0; i < 30; i++) {
// 设置一个监听节点,其他节点用于修改共享整数的值
if(i > 0) EXECUTOR_SERVICE.execute(new SharedCountRunnable(false));
else EXECUTOR_SERVICE.execute(new SharedCountRunnable(true));
}
}
}
输出如下所示:
监听器已添加
pool-1-thread-29 修改共享计数器的值为 178
pool-1-thread-22 修改共享计数器的值为 526
[Listener] : 共享整数被修改成 526
pool-1-thread-26 修改共享计数器的值为 76
pool-1-thread-14 修改共享计数器的值为 27
pool-1-thread-4 修改共享计数器的值为 907
[Listener] : 共享整数被修改成 907
pool-1-thread-16 修改共享计数器的值为 616
pool-1-thread-18 修改共享计数器的值为 21
pool-1-thread-13 修改共享计数器的值为 851
pool-1-thread-15 修改共享计数器的值为 569
[Listener] : 共享整数被修改成 569
pool-1-thread-20 修改共享计数器的值为 806
pool-1-thread-21 修改共享计数器的值为 961
pool-1-thread-25 修改共享计数器的值为 562
pool-1-thread-19 修改共享计数器的值为 99
pool-1-thread-6 修改共享计数器的值为 784
pool-1-thread-11 修改共享计数器的值为 202
pool-1-thread-24 修改共享计数器的值为 97
[Listener] : 共享整数被修改成 202
pool-1-thread-17 修改共享计数器的值为 139
pool-1-thread-12 修改共享计数器的值为 437
[Listener] : 共享整数被修改成 437
pool-1-thread-7 修改共享计数器的值为 156
pool-1-thread-8 修改共享计数器的值为 416
pool-1-thread-28 修改共享计数器的值为 930
pool-1-thread-10 修改共享计数器的值为 857
pool-1-thread-23 修改共享计数器的值为 945
pool-1-thread-5 修改共享计数器的值为 38
pool-1-thread-27 修改共享计数器的值为 422
[Listener] : 共享整数被修改成 422
pool-1-thread-9 修改共享计数器的值为 428
pool-1-thread-2 修改共享计数器的值为 413
pool-1-thread-30 修改共享计数器的值为 59
pool-1-thread-3 修改共享计数器的值为 539