ZooKeeper : Curator架構重試政策和Session API介紹
在學習
Curator
架構
API
之前,可以先了解
Java
用戶端原生
API
,這樣不僅可以更好的了解
Curator
架構
API
,還可以突出
Curator
架構的友善和強大。
- ZooKeeper :Java用戶端Session、ACL、Znode API介紹
- ZooKeeper :Java用戶端Watcher API介紹
- ZooKeeper :Java用戶端執行批量任務和Transaction API介紹
Curator
是一個比較完善的
ZooKeeper
用戶端架構,通過封裝的一套進階
API
簡化了
ZooKeeper
的操作。
Curator
架構主要解決了三類問題:
- 封裝
與ZooKeeper Client
之間的連接配接處理(提供連接配接重試機制等)。ZooKeeper Server
- 提供了一套
風格的Fluent
,并且在API
用戶端原生Java
的基礎上進行了增強(創捷多層節點、删除多層節點等)。API
- 提供
各種應用場景(分布式鎖、ZooKeeper
選舉、共享計數器、分布式隊列等)的抽象封裝。leader
部落客将
Curator
架構
API
分為
Session
、
Znode
、
ACL
、
Watcher
和
Transaction
這幾個部分來進行介紹,限于篇幅原因,本篇部落格隻介紹
Session API
以及其中的重試政策,之後的部落格會介紹其他
API
的使用。部落客使用的
Curator
架構版本是
5.2.0
,
ZooKeeper
版本是
3.6.3
。
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>

5.2.0
版本的
Curator
使用
3.6.3
版本的
ZooKeeper
。
重試政策
在
Java
用戶端原生
API
中,用戶端與服務端的連接配接是沒有提供連接配接重試機制的,如果用戶端需要重連,就隻能将上一次連接配接的
Session ID
與
Session Password
發送給服務端進行重連。而
Curator
架構提供了用戶端與服務端的連接配接重試機制,并且可以通過
Fluent
風格的
API
來給連接配接添加重試政策。
RetryPolicy
接口是重試政策的抽象,
allowRetry
方法用來判斷是否允許重試。
package org.apache.curator;
import org.apache.zookeeper.KeeperException;
/**
* 重連政策的抽象
*/
public interface RetryPolicy
{
/**
* 當操作由于某種原因失敗時調用,此方法應傳回true以進行另一次嘗試
* retryCount – 到目前為止重試的次數(第一次為0)
* elapsedTimeMs – 自嘗試操作以來經過的時間(以毫秒為機關)
* sleeper – 使用它來睡眠,不要調用Thread.sleep
*/
boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper);
/**
* 當操作因特定異常而失敗時調用,此方法應傳回true以進行另一次嘗試
*/
default boolean allowRetry(Throwable exception)
{
if ( exception instanceof KeeperException)
{
final int rc = ((KeeperException) exception).code().intValue();
return (rc == KeeperException.Code.CONNECTIONLOSS.intValue()) ||
(rc == KeeperException.Code.OPERATIONTIMEOUT.intValue()) ||
(rc == KeeperException.Code.SESSIONMOVED.intValue()) ||
(rc == KeeperException.Code.SESSIONEXPIRED.intValue());
}
return false;
}
}
RetryPolicy
接口的關系圖如下圖所示:
SleepingRetry
抽象類(實作了
RetryPolicy
接口):
package org.apache.curator.retry;
import org.apache.curator.RetryPolicy;
import org.apache.curator.RetrySleeper;
import java.util.concurrent.TimeUnit;
abstract class SleepingRetry implements RetryPolicy
{
private final int n;
protected SleepingRetry(int n)
{
this.n = n;
}
public int getN()
{
return n;
}
public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)
{
if ( retryCount < n )
{
try
{
sleeper.sleepFor(getSleepTimeMs(retryCount, elapsedTimeMs), TimeUnit.MILLISECONDS);
}
catch ( InterruptedException e )
{
Thread.currentThread().interrupt();
return false;
}
return true;
}
return false;
}
protected abstract long getSleepTimeMs(int retryCount, long elapsedTimeMs);
}
重試
n
次(有次數限制),重試之前先進行睡眠,睡眠時間由
getSleepTimeMs
方法得到(抽象方法)。
SessionFailedRetryPolicy
類(實作了
RetryPolicy
接口):
package org.apache.curator;
import org.apache.zookeeper.KeeperException;
/**
* Session過期導緻操作失敗時的重連政策
*/
public class SessionFailedRetryPolicy implements RetryPolicy
{
private final RetryPolicy delegatePolicy;
public SessionFailedRetryPolicy(RetryPolicy delegatePolicy)
{
this.delegatePolicy = delegatePolicy;
}
@Override
public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)
{
return delegatePolicy.allowRetry(retryCount, elapsedTimeMs, sleeper);
}
@Override
public boolean allowRetry(Throwable exception)
{
if ( exception instanceof KeeperException.SessionExpiredException )
{
return false;
}
else
{
return delegatePolicy.allowRetry(exception);
}
}
}
這裡隻是增加了對
SessionExpiredException
這種異常的判斷,當遇到
Session
過期異常時,不再進行重連,即傳回
false
。而其他的所有業務全部委托給
delegatePolicy
執行個體。因為
RetryPolicy
接口的
allowRetry(Throwable exception)
方法有預設實作:
default boolean allowRetry(Throwable exception)
{
if ( exception instanceof KeeperException)
{
final int rc = ((KeeperException) exception).code().intValue();
return (rc == KeeperException.Code.CONNECTIONLOSS.intValue()) ||
(rc == KeeperException.Code.OPERATIONTIMEOUT.intValue()) ||
(rc == KeeperException.Code.SESSIONMOVED.intValue()) ||
(rc == KeeperException.Code.SESSIONEXPIRED.intValue());
}
return false;
}
當遇到
Session
過期異常時,允許進行重連(
rc == KeeperException.Code.SESSIONEXPIRED.intValue()
)。
SessionFailedRetryPolicy
這種重連政策用的不多,這裡就不詳細介紹了。
RetryForever
類(實作了
RetryPolicy
接口):
package org.apache.curator.retry;
import org.apache.curator.RetryPolicy;
import org.apache.curator.RetrySleeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkArgument;
/**
* 始終允許重試
*/
public class RetryForever implements RetryPolicy
{
private static final Logger log = LoggerFactory.getLogger(RetryForever.class);
// 重試間隔時間,機關毫秒
private final int retryIntervalMs;
public RetryForever(int retryIntervalMs)
{
checkArgument(retryIntervalMs > 0);
this.retryIntervalMs = retryIntervalMs;
}
@Override
public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)
{
try
{
sleeper.sleepFor(retryIntervalMs, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
log.warn("Error occurred while sleeping", e);
return false;
}
return true;
}
}
重試沒有次數限制,一般也不常用。
RetryNTimes
類(繼承
SleepingRetry
抽象類):
package org.apache.curator.retry;
public class RetryNTimes extends SleepingRetry
{
private final int sleepMsBetweenRetries;
public RetryNTimes(int n, int sleepMsBetweenRetries)
{
super(n);
this.sleepMsBetweenRetries = sleepMsBetweenRetries;
}
@Override
protected long getSleepTimeMs(int retryCount, long elapsedTimeMs)
{
return sleepMsBetweenRetries;
}
}
繼承了
SleepingRetry
抽象類,沒有重寫
allowRetry
方法,是以也是重試
n
次,實作了
getSleepTimeMs
方法,該方法傳回重試之間的睡眠時間
sleepMsBetweenRetries
。
RetryOneTime
類(繼承了
RetryNTimes
類):
public class RetryOneTime extends RetryNTimes
{
public RetryOneTime(int sleepMsBetweenRetry)
{
super(1, sleepMsBetweenRetry);
}
}
隻重試一次的重試政策。
ExponentialBackoffRetry
類(繼承了
SleepingRetry
抽象類):
package org.apache.curator.retry;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Random;
public class ExponentialBackoffRetry extends SleepingRetry
{
private static final Logger log = LoggerFactory.getLogger(ExponentialBackoffRetry.class);
private static final int MAX_RETRIES_LIMIT = 29;
private static final int DEFAULT_MAX_SLEEP_MS = Integer.MAX_VALUE;
private final Random random = new Random();
private final int baseSleepTimeMs;
private final int maxSleepMs;
/**
* baseSleepTimeMs – 重試之間等待的初始時間
* maxRetries – 最大重試次數
*/
public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)
{
this(baseSleepTimeMs, maxRetries, DEFAULT_MAX_SLEEP_MS);
}
/**
* baseSleepTimeMs – 重試之間等待的初始時間
* maxRetries – 最大重試次數
* maxSleepMs – 每次重試時休眠的最長時間(以毫秒為機關)
*/
public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
{
super(validateMaxRetries(maxRetries));
this.baseSleepTimeMs = baseSleepTimeMs;
this.maxSleepMs = maxSleepMs;
}
@VisibleForTesting
public int getBaseSleepTimeMs()
{
return baseSleepTimeMs;
}
@Override
protected long getSleepTimeMs(int retryCount, long elapsedTimeMs)
{
long sleepMs = baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)));
if ( sleepMs > maxSleepMs )
{
log.warn(String.format("Sleep extension too large (%d). Pinning to %d", sleepMs, maxSleepMs));
sleepMs = maxSleepMs;
}
return sleepMs;
}
private static int validateMaxRetries(int maxRetries)
{
if ( maxRetries > MAX_RETRIES_LIMIT )
{
log.warn(String.format("maxRetries too large (%d). Pinning to %d", maxRetries, MAX_RETRIES_LIMIT));
maxRetries = MAX_RETRIES_LIMIT;
}
return maxRetries;
}
}
重試
maxRetries
次,
maxRetries
如果大于最大重試次數限制(
MAX_RETRIES_LIMIT
),即大于
29
,
maxRetries
就會被
MAX_RETRIES_LIMIT
覆寫,否則不改變
maxRetries
的大小。有意思的是
getSleepTimeMs
方法擷取的重試之間的睡眠時間并不是不變的,而是随機得到的,并且随着重試次數的增加,睡眠時間的随機範圍不斷擴大(右邊界不斷擴大),如果随機得到的睡眠時間超過
maxSleepMs
(如果沒有被指定,預設為
DEFAULT_MAX_SLEEP_MS
, 即
Integer.MAX_VALUE
),會被
maxSleepMs
覆寫,而随機得到的睡眠時間小于
baseSleepTimeMs
,也會被
baseSleepTimeMs
覆寫。
package org.apache.curator.retry;
import com.google.common.annotations.VisibleForTesting;
public class BoundedExponentialBackoffRetry extends ExponentialBackoffRetry
{
private final int maxSleepTimeMs;
/**
* baseSleepTimeMs – 重試之間等待的初始時間
* maxSleepTimeMs – 重試之間等待的最長時間
* maxRetries – 重試的最大次數
*/
public BoundedExponentialBackoffRetry(int baseSleepTimeMs, int maxSleepTimeMs, int maxRetries)
{
super(baseSleepTimeMs, maxRetries);
this.maxSleepTimeMs = maxSleepTimeMs;
}
@VisibleForTesting
public int getMaxSleepTimeMs()
{
return maxSleepTimeMs;
}
@Override
protected long getSleepTimeMs(int retryCount, long elapsedTimeMs)
{
return Math.min(maxSleepTimeMs, super.getSleepTimeMs(retryCount, elapsedTimeMs));
}
}
maxSleepTimeMs
屬性和
maxSleepMs
屬性作用是一樣的,都是為了限制睡眠時間的最大取值。隻是
maxSleepTimeMs
屬性不會提供預設值,是以必須要指定它的大小。個人感覺
maxSleepTimeMs
屬性可以不需要。
連接配接
package com.kaven.zookeeper;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.*;
/**
* @Author: ITKaven
* @Date: 2021/11/20 10:30
* @Leetcode: https://leetcode-cn.com/u/kavenit
* @Notes:
*/
public class Application{
private static final String SERVER_PROXY = "192.168.1.184:9000";
private static final int TIMEOUT = 40000;
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework curator = CuratorFrameworkFactory.builder()
.connectString(SERVER_PROXY)
.namespace("curator")
.retryPolicy(retryPolicy)
.connectionTimeoutMs(TIMEOUT)
.sessionTimeoutMs(TIMEOUT)
.build();
curator.start();
if (curator.getState().equals(CuratorFrameworkState.STARTED)) {
System.out.println("連接配接成功!");
}
}
}
輸出:
連接配接成功!
-
:connectString
服務端的位址。ZooKeeper
-
:命名空間,類似namespace
的功能,之後在該用戶端上的操作,都是基于該命名空間。chroot
-
:重試政策。retryPolicy
-
:連接配接逾時時間。connectionTimeoutMs
-
:sessionTimeoutMs
逾時時間。Session