DistributedAtomicLong
原子增量操作的計數器,首先嘗試使用樂觀鎖進行增量操作,如果失敗,則采用可選的
InterProcessMutex
(悲觀鎖)進行增量操作。 對于樂觀鎖和悲觀鎖,重試政策都用于重試增量操作。
各種增量方法都會傳回一個
AtomicValue
執行個體,通過調用
AtomicValue
執行個體的
succeeded()
可以查詢增量操作是否執行成功,除了
get()
外,其他任何方法都不保證一定成功。
AtomicValue
接口源碼(原子操作傳回值的抽象):
public interface AtomicValue<T>
{
/**
* 如果操作成功,則傳回true
* 如果傳回false,則操作失敗
*/
public boolean succeeded();
/**
* 傳回操作前計數器的值
*/
public T preValue();
/**
* 傳回操作後計數器的值
*/
public T postValue();
/**
* 傳回操作的調試統計資訊,比如樂觀鎖、悲觀鎖嘗試的次數與時間
*/
public AtomicStats getStats();
}
DistributedAtomicLong
類中的内部類
AtomicLong
實作了
AtomicValue
接口,但實際上隻是起到封裝的作用,所有的調用都委托給了
bytes
屬性(其他實作類的執行個體)。
private class AtomicLong implements AtomicValue<Long>
{
private AtomicValue<byte[]> bytes;
private AtomicLong(AtomicValue<byte[]> bytes)
{
this.bytes = bytes;
}
@Override
public boolean succeeded()
{
return bytes.succeeded();
}
@Override
public Long preValue()
{
return bytesToValue(bytes.preValue());
}
@Override
public Long postValue()
{
return bytesToValue(bytes.postValue());
}
@Override
public AtomicStats getStats()
{
return bytes.getStats();
}
}
DistributedAtomicLong
類實作了
DistributedAtomicNumber
接口,并且
DistributedAtomicLong
将各種原子操作的執行委托給了
DistributedAtomicValue
。
public class DistributedAtomicLong implements DistributedAtomicNumber<Long>
{
private final DistributedAtomicValue value;
...
}
DistributedAtomicNumber
接口是分布式原子數值類型的抽象,定義了分布式原子數值類型需要提供的方法。
public interface DistributedAtomicNumber<T>
{
public AtomicValue<T> get() throws Exception;
public AtomicValue<T> compareAndSet(T expectedValue, T newValue) throws Exception;
public AtomicValue<T> trySet(T newValue) throws Exception;
public boolean initialize(T value) throws Exception;
public void forceSet(T newValue) throws Exception;
public AtomicValue<T> increment() throws Exception;
public AtomicValue<T> decrement() throws Exception;
public AtomicValue<T> add(T delta) throws Exception;
public AtomicValue<T> subtract(T delta) throws Exception;
}
目前
DistributedAtomicNumber
接口有兩種實作,除了
DistributedAtomicLong
類,還有
DistributedAtomicInteger
類。
并且
DistributedAtomicInteger
也是将各種原子操作的執行委托給了
DistributedAtomicValue
,是以這兩種實作是類似的,隻不過表示的數值類型不同而已。
public class DistributedAtomicInteger implements DistributedAtomicNumber<Integer>
{
private final DistributedAtomicValue value;
...
}
DistributedAtomicValue
是原子操作真正的執行者,是以可以知道内部類
AtomicLong
的
bytes
屬性是
MutableAtomicValue
執行個體。
public AtomicValue<byte[]> get() throws Exception
{
MutableAtomicValue<byte[]> result = new MutableAtomicValue<byte[]>(null, null, false);
...
return result;
}
測試
pom.xml
:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.kaven</groupId>
<artifactId>zookeeper</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
</dependencies>
</project>
CuratorFrameworkProperties
類(提供
CuratorFramework
需要的一些配置資訊,以及建立
CuratorFramework
執行個體的方法):
package com.kaven.zookeeper;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorFrameworkProperties {
// 連接配接位址
public static final String CONNECT_ADDRESS = "192.168.1.3:9000";
// 連接配接逾時時間
public static final int CONNECTION_TIMEOUT_MS = 40000;
// Session逾時時間
public static final int SESSION_TIMEOUT_MS = 10000;
// 命名空間
public static final String NAMESPACE = "MyNamespace";
// 重試政策
public static final RetryPolicy RETRY_POLICY = new ExponentialBackoffRetry(1000, 3);
public static CuratorFramework getCuratorFramework() {
// 建立CuratorFramework執行個體
CuratorFramework curator = CuratorFrameworkFactory.builder()
.connectString(CuratorFrameworkProperties.CONNECT_ADDRESS)
.retryPolicy(CuratorFrameworkProperties.RETRY_POLICY)
.connectionTimeoutMs(CuratorFrameworkProperties.CONNECTION_TIMEOUT_MS)
.sessionTimeoutMs(CuratorFrameworkProperties.SESSION_TIMEOUT_MS)
.namespace(CuratorFrameworkProperties.NAMESPACE)
.build();
curator.start();
assert curator.getState().equals(CuratorFrameworkState.STARTED);
return curator;
}
}
DistributedAtomicLongRunnable
類(實作了
Runnable
接口,模拟分布式節點操作分布式原子長整型):
package com.kaven.zookeeper;
import lombok.SneakyThrows;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
import org.apache.curator.retry.RetryNTimes;
public class DistributedAtomicLongRunnable implements Runnable{
@SneakyThrows
@Override
public void run() {
// 使用不同的CuratorFramework執行個體,表示不同的分布式節點
CuratorFramework curator = CuratorFrameworkProperties.getCuratorFramework();
// 共享計數器的路徑
String counterPath = "/kaven";
// 建立DistributedAtomicLong執行個體,用于操作分布式原子長整型
// new RetryNTimes(100, 5)是樂觀鎖的重試政策執行個體
DistributedAtomicLong atomicLong = new DistributedAtomicLong(curator, counterPath,
new RetryNTimes(100, 5));
// 初始化
boolean initialize = atomicLong.initialize(100L);
if(initialize) {
System.out.println(Thread.currentThread().getName() + "初始化 atomicLong 成功");
}
else {
System.out.println(Thread.currentThread().getName() + "初始化 atomicLong 失敗");
}
// 比較再設定,當Zookeeper中的值與期望值相等時才能設定新值
AtomicValue<Long> longAtomicValue = atomicLong.compareAndSet(100L, 501L);
if(longAtomicValue.succeeded()) {
System.out.println(Thread.currentThread().getName() + " compareAndSet 成功");
}
else {
System.out.println(Thread.currentThread().getName() + " compareAndSet 失敗");
}
}
}
啟動類:
package com.kaven.zookeeper;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Application {
private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();
public static void main(String[] args) throws Exception {
// 分布式節點處理業務
for (int i = 0; i < 15; i++) {
EXECUTOR_SERVICE.execute(new DistributedAtomicLongRunnable());
}
}
}
模拟
15
個分布式節點操作分布式原子長整型,輸出如下所示:
pool-1-thread-12初始化 atomicLong 成功
pool-1-thread-11初始化 atomicLong 失敗
pool-1-thread-10初始化 atomicLong 失敗
pool-1-thread-14初始化 atomicLong 失敗
pool-1-thread-15初始化 atomicLong 失敗
pool-1-thread-8初始化 atomicLong 失敗
pool-1-thread-13初始化 atomicLong 失敗
pool-1-thread-6初始化 atomicLong 失敗
pool-1-thread-1初始化 atomicLong 失敗
pool-1-thread-7初始化 atomicLong 失敗
pool-1-thread-5初始化 atomicLong 失敗
pool-1-thread-3初始化 atomicLong 失敗
pool-1-thread-9初始化 atomicLong 失敗
pool-1-thread-2初始化 atomicLong 失敗
pool-1-thread-4初始化 atomicLong 失敗
pool-1-thread-8 compareAndSet 失敗
pool-1-thread-14 compareAndSet 失敗
pool-1-thread-10 compareAndSet 失敗
pool-1-thread-6 compareAndSet 失敗
pool-1-thread-15 compareAndSet 失敗
pool-1-thread-13 compareAndSet 失敗
pool-1-thread-7 compareAndSet 失敗
pool-1-thread-9 compareAndSet 失敗
pool-1-thread-11 compareAndSet 失敗
pool-1-thread-5 compareAndSet 失敗
pool-1-thread-12 compareAndSet 失敗
pool-1-thread-1 compareAndSet 失敗
pool-1-thread-3 compareAndSet 成功
pool-1-thread-4 compareAndSet 失敗
pool-1-thread-2 compareAndSet 失敗
輸出是符合預期的,兩種操作都隻有一個節點執行成功。
DistributedAtomicValue
類的
initialize
和
compareAndSet
方法如下所示,其實就是建立
Zookeeper
節點(隻有一個服務能建立成功)和基于版本設定節點的值(在部落客的測試程式中,也隻能有一個服務将該操作執行成功),而這兩種操作并沒有使用鎖(樂觀鎖和悲觀鎖)。
public boolean initialize(byte[] value) throws Exception
{
try
{
client.create().creatingParentContainersIfNeeded().forPath(path, value);
}
catch ( KeeperException.NodeExistsException ignore )
{
// ignore
return false;
}
return true;
}
public AtomicValue<byte[]> compareAndSet(byte[] expectedValue, byte[] newValue) throws Exception
{
Stat stat = new Stat();
MutableAtomicValue<byte[]> result = new MutableAtomicValue<byte[]>(null, null, false);
boolean createIt = getCurrentValue(result, stat);
if ( !createIt && Arrays.equals(expectedValue, result.preValue) )
{
try
{
client.setData().withVersion(stat.getVersion()).forPath(path, newValue);
result.succeeded = true;
result.postValue = newValue;
}
catch ( KeeperException.BadVersionException dummy )
{
result.succeeded = false;
}
catch ( KeeperException.NoNodeException dummy )
{
result.succeeded = false;
}
}
else
{
result.succeeded = false;
}
return result;
}
increment
、
decrement
、
add
以及
subtract
這四種操作是類似的,部落客隻示範
increment
操作。
DistributedAtomicLong atomicLong = new DistributedAtomicLong(curator, counterPath,
new RetryNTimes(100, 5));
boolean initialize = atomicLong.initialize(100L);
if(initialize) {
System.out.println(Thread.currentThread().getName() + "初始化 atomicLong 成功");
}
else {
System.out.println(Thread.currentThread().getName() + "初始化 atomicLong 失敗");
}
for (int i = 0; i < 1000; i++) {
Thread.sleep(5);
atomicLong.increment();
}
System.out.println(Thread.currentThread().getName() + "操作成功");
System.out.println(Thread.currentThread().getName() + "目前的值為" + atomicLong.get().postValue());
輸出如下所示:
pool-1-thread-8初始化 atomicLong 失敗
pool-1-thread-1初始化 atomicLong 失敗
pool-1-thread-3初始化 atomicLong 失敗
pool-1-thread-14初始化 atomicLong 失敗
pool-1-thread-5初始化 atomicLong 失敗
pool-1-thread-12初始化 atomicLong 成功
pool-1-thread-2初始化 atomicLong 失敗
pool-1-thread-4初始化 atomicLong 失敗
pool-1-thread-15初始化 atomicLong 失敗
pool-1-thread-13初始化 atomicLong 失敗
pool-1-thread-11初始化 atomicLong 失敗
pool-1-thread-9初始化 atomicLong 失敗
pool-1-thread-7初始化 atomicLong 失敗
pool-1-thread-6初始化 atomicLong 失敗
pool-1-thread-10初始化 atomicLong 失敗
pool-1-thread-15操作成功
pool-1-thread-15目前的值為14289
pool-1-thread-3操作成功
pool-1-thread-3目前的值為14305
pool-1-thread-13操作成功
pool-1-thread-13目前的值為14420
pool-1-thread-2操作成功
pool-1-thread-2目前的值為14681
pool-1-thread-4操作成功
pool-1-thread-4目前的值為14876
pool-1-thread-1操作成功
pool-1-thread-1目前的值為14906
pool-1-thread-5操作成功
pool-1-thread-5目前的值為14953
pool-1-thread-8操作成功
pool-1-thread-8目前的值為14972
pool-1-thread-14操作成功
pool-1-thread-14目前的值為15001
pool-1-thread-7操作成功
pool-1-thread-7目前的值為15020
pool-1-thread-10操作成功
pool-1-thread-10目前的值為15051
pool-1-thread-11操作成功
pool-1-thread-11目前的值為15053
pool-1-thread-9操作成功
pool-1-thread-9目前的值為15060
pool-1-thread-12操作成功
pool-1-thread-12目前的值為15093
pool-1-thread-6操作成功
pool-1-thread-6目前的值為15100
最後的值為
15100
符合預期。
increment
、
decrement
、
add
以及
subtract
這四個方法都調用
worker
方法來完成。
@Override
public AtomicValue<Long> increment() throws Exception
{
return worker(1L);
}
@Override
public AtomicValue<Long> decrement() throws Exception
{
return worker(-1L);
}
@Override
public AtomicValue<Long> add(Long delta) throws Exception
{
return worker(delta);
}
@Override
public AtomicValue<Long> subtract(Long delta) throws Exception
{
return worker(-1 * delta);
}
DistributedAtomicLong
類的
worker
方法則是調用
DistributedAtomicValue
類的
trySet
方法來完成。
private AtomicValue<Long> worker(final Long addAmount) throws Exception
{
Preconditions.checkNotNull(addAmount, "addAmount cannot be null");
MakeValue makeValue = new MakeValue()
{
@Override
public byte[] makeFrom(byte[] previous)
{
long previousValue = (previous != null) ? bytesToValue(previous) : 0;
long newValue = previousValue + addAmount;
return valueToBytes(newValue);
}
};
AtomicValue<byte[]> result = value.trySet(makeValue);
return new AtomicLong(result);
}
DistributedAtomicValue
類的
trySet
方法嘗試以原子方式将計數器的值設定為給定值,首先嘗試使用樂觀鎖進行操作,如果失敗,則采用可選的
InterProcessMutex
(悲觀鎖)進行操作。
// 嘗試以原子方式将計數器的值設定為給定值
public AtomicValue<byte[]> trySet(final byte[] newValue) throws Exception
{
MutableAtomicValue<byte[]> result = new MutableAtomicValue<byte[]>(null, null, false);
MakeValue makeValue = new MakeValue()
{
@Override
public byte[] makeFrom(byte[] previous)
{
return newValue;
}
};
// 嘗試使用樂觀鎖
tryOptimistic(result, makeValue);
if ( !result.succeeded() && (mutex != null) )
{
// 如果在樂觀鎖下執行不成功,并且有悲觀鎖
// 嘗試使用悲觀鎖
tryWithMutex(result, makeValue);
}
return result;
}
DistributedAtomicLong
類的
trySet
方法用于嘗試設定計數器的值,也是通過調用
DistributedAtomicValue
類的
trySet
方法來完成。
@Override
public AtomicValue<Long> trySet(Long newValue) throws Exception
{
return new AtomicLong(value.trySet(valueToBytes(newValue)));
}
DistributedAtomicLong
類的
forceSet
方法用于強制設定計數器的值,通過調用
DistributedAtomicValue
類的
forceSet
方法來完成。
@Override
public void forceSet(Integer newValue) throws Exception
{
value.forceSet(valueToBytes(newValue));
}
DistributedAtomicValue
類的
forceSet
方法如下所示,就是直接設定
Zookeeper
節點的值。
/**
* 強制設定值
*/
public void forceSet(byte[] newValue) throws Exception
{
try
{
client.setData().forPath(path, newValue);
}
catch ( KeeperException.NoNodeException dummy )
{
try
{
client.create().creatingParentContainersIfNeeded().forPath(path, newValue);
}
catch ( KeeperException.NodeExistsException dummy2 )
{
client.setData().forPath(path, newValue);
}
}
}
這些方法比較簡單,部落客就不示範了。
PromotedToLock promotedToLock = PromotedToLock.builder()
// 用于分布式鎖的Zookeeper路徑
.lockPath("/lock")
// 鎖的重試政策
.retryPolicy(new RetryNTimes(100, 5))
// 鎖的逾時時間
.timeout(10000, TimeUnit.SECONDS)
.build();
DistributedAtomicLong atomicLong = new DistributedAtomicLong(curator, counterPath,
new RetryNTimes(100, 5),
promotedToLock);