天天看點

ZooKeeper : Curator架構重試政策和Session API介紹

ZooKeeper : Curator架構重試政策和Session API介紹

在學習​

​Curator​

​​架構​

​API​

​​之前,可以先了解​

​Java​

​​用戶端原生​

​API​

​​,這樣不僅可以更好的了解​

​Curator​

​​架構​

​API​

​​,還可以突出​

​Curator​

​架構的友善和強大。

  • ​​ZooKeeper :Java用戶端Session、ACL、Znode API介紹​​
  • ​​ZooKeeper :Java用戶端Watcher API介紹​​
  • ​​ZooKeeper :Java用戶端執行批量任務和Transaction API介紹​​

​Curator​

​​是一個比較完善的​

​ZooKeeper​

​​用戶端架構,通過封裝的一套進階​

​API​

​​簡化了​

​ZooKeeper​

​​的操作。​

​Curator​

​架構主要解決了三類問題:

  • 封裝​

    ​ZooKeeper Client​

    ​​與​

    ​ZooKeeper Server​

    ​之間的連接配接處理(提供連接配接重試機制等)。
  • 提供了一套​

    ​Fluent​

    ​​風格的​

    ​API​

    ​​,并且在​

    ​Java​

    ​​用戶端原生​

    ​API​

    ​的基礎上進行了增強(創捷多層節點、删除多層節點等)。
  • 提供​

    ​ZooKeeper​

    ​​各種應用場景(分布式鎖、​

    ​leader​

    ​選舉、共享計數器、分布式隊列等)的抽象封裝。

部落客将​

​Curator​

​​架構​

​API​

​​分為​

​Session​

​​、​

​Znode​

​​、​

​ACL​

​​、​

​Watcher​

​​和​

​Transaction​

​​這幾個部分來進行介紹,限于篇幅原因,本篇部落格隻介紹​

​Session API​

​​以及其中的重試政策,之後的部落格會介紹其他​

​API​

​​的使用。部落客使用的​

​Curator​

​​架構版本是​

​5.2.0​

​​,​

​ZooKeeper​

​​版本是​

​3.6.3​

​。

<dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.2.0</version>
        </dependency>      
ZooKeeper : Curator架構重試政策和Session API介紹

​5.2.0​

​​版本的​

​Curator​

​​使用​

​3.6.3​

​​版本的​

​ZooKeeper​

​。

ZooKeeper : Curator架構重試政策和Session API介紹

重試政策

在​

​Java​

​​用戶端原生​

​API​

​​中,用戶端與服務端的連接配接是沒有提供連接配接重試機制的,如果用戶端需要重連,就隻能将上一次連接配接的​

​Session ID​

​​與​

​Session Password​

​​發送給服務端進行重連。而​

​Curator​

​​架構提供了用戶端與服務端的連接配接重試機制,并且可以通過​

​Fluent​

​​風格的​

​API​

​來給連接配接添加重試政策。

​RetryPolicy​

​​接口是重試政策的抽象,​

​allowRetry​

​方法用來判斷是否允許重試。

package org.apache.curator;

import org.apache.zookeeper.KeeperException;

/**
 * 重連政策的抽象
 */
public interface RetryPolicy
{
    /**
     * 當操作由于某種原因失敗時調用,此方法應傳回true以進行另一次嘗試
     * retryCount – 到目前為止重試的次數(第一次為0)
     * elapsedTimeMs – 自嘗試操作以來經過的時間(以毫秒為機關)
     * sleeper – 使用它來睡眠,不要調用Thread.sleep
     */
    boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper);

    /**
     * 當操作因特定異常而失敗時調用,此方法應傳回true以進行另一次嘗試
     */
    default boolean allowRetry(Throwable exception)
    {
        if ( exception instanceof KeeperException)
        {
            final int rc = ((KeeperException) exception).code().intValue();
            return (rc == KeeperException.Code.CONNECTIONLOSS.intValue()) ||
                    (rc == KeeperException.Code.OPERATIONTIMEOUT.intValue()) ||
                    (rc == KeeperException.Code.SESSIONMOVED.intValue()) ||
                    (rc == KeeperException.Code.SESSIONEXPIRED.intValue());
        }
        return false;
    }
}      

​RetryPolicy​

​接口的關系圖如下圖所示:

ZooKeeper : Curator架構重試政策和Session API介紹

​SleepingRetry​

​​抽象類(實作了​

​RetryPolicy​

​接口):

package org.apache.curator.retry;

import org.apache.curator.RetryPolicy;
import org.apache.curator.RetrySleeper;
import java.util.concurrent.TimeUnit;

abstract class SleepingRetry implements RetryPolicy
{
    private final int n;

    protected SleepingRetry(int n)
    {
        this.n = n;
    }

    public int getN()
    {
        return n;
    }

    public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)
    {
        if ( retryCount < n )
        {
            try
            {
                sleeper.sleepFor(getSleepTimeMs(retryCount, elapsedTimeMs), TimeUnit.MILLISECONDS);
            }
            catch ( InterruptedException e )
            {
                Thread.currentThread().interrupt();
                return false;
            }
            return true;
        }
        return false;
    }

    protected abstract long  getSleepTimeMs(int retryCount, long elapsedTimeMs);
}      

重試​

​n​

​​次(有次數限制),重試之前先進行睡眠,睡眠時間由​

​getSleepTimeMs​

​方法得到(抽象方法)。

​SessionFailedRetryPolicy​

​​類(實作了​

​RetryPolicy​

​接口):

package org.apache.curator;

import org.apache.zookeeper.KeeperException;

/**
 * Session過期導緻操作失敗時的重連政策
 */
public class SessionFailedRetryPolicy implements RetryPolicy
{

    private final RetryPolicy delegatePolicy;

    public SessionFailedRetryPolicy(RetryPolicy delegatePolicy)
    {
        this.delegatePolicy = delegatePolicy;
    }

    @Override
    public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)
    {
        return delegatePolicy.allowRetry(retryCount, elapsedTimeMs, sleeper);
    }

    @Override
    public boolean allowRetry(Throwable exception)
    {
        if ( exception instanceof KeeperException.SessionExpiredException )
        {
            return false;
        }
        else
        {
            return delegatePolicy.allowRetry(exception);
        }
    }
}      

這裡隻是增加了對​

​SessionExpiredException​

​​這種異常的判斷,當遇到​

​Session​

​​過期異常時,不再進行重連,即傳回​

​false​

​​。而其他的所有業務全部委托給​

​delegatePolicy​

​​執行個體。因為​

​RetryPolicy​

​​接口的​

​allowRetry(Throwable exception)​

​方法有預設實作:

default boolean allowRetry(Throwable exception)
    {
        if ( exception instanceof KeeperException)
        {
            final int rc = ((KeeperException) exception).code().intValue();
            return (rc == KeeperException.Code.CONNECTIONLOSS.intValue()) ||
                    (rc == KeeperException.Code.OPERATIONTIMEOUT.intValue()) ||
                    (rc == KeeperException.Code.SESSIONMOVED.intValue()) ||
                    (rc == KeeperException.Code.SESSIONEXPIRED.intValue());
        }
        return false;
    }      

當遇到​

​Session​

​​過期異常時,允許進行重連(​

​rc == KeeperException.Code.SESSIONEXPIRED.intValue()​

​​)。​

​SessionFailedRetryPolicy​

​這種重連政策用的不多,這裡就不詳細介紹了。

​RetryForever​

​​類(實作了​

​RetryPolicy​

​接口):

package org.apache.curator.retry;

import org.apache.curator.RetryPolicy;
import org.apache.curator.RetrySleeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

import static com.google.common.base.Preconditions.checkArgument;

/**
 * 始終允許重試
 */
public class RetryForever implements RetryPolicy
{
    private static final Logger log = LoggerFactory.getLogger(RetryForever.class);

    // 重試間隔時間,機關毫秒
    private final int retryIntervalMs;

    public RetryForever(int retryIntervalMs)
    {
        checkArgument(retryIntervalMs > 0);
        this.retryIntervalMs = retryIntervalMs;
    }

    @Override
    public boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)
    {
        try
        {
            sleeper.sleepFor(retryIntervalMs, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e)
        {
            Thread.currentThread().interrupt();
            log.warn("Error occurred while sleeping", e);
            return false;
        }
        return true;
    }
}      

重試沒有次數限制,一般也不常用。

​RetryNTimes​

​​類(繼承​

​SleepingRetry​

​抽象類):

package org.apache.curator.retry;

public class RetryNTimes extends SleepingRetry
{
    private final int sleepMsBetweenRetries;

    public RetryNTimes(int n, int sleepMsBetweenRetries)
    {
        super(n);
        this.sleepMsBetweenRetries = sleepMsBetweenRetries;
    }

    @Override
    protected long getSleepTimeMs(int retryCount, long elapsedTimeMs)
    {
        return sleepMsBetweenRetries;
    }
}      

繼承了​

​SleepingRetry​

​​抽象類,沒有重寫​

​allowRetry​

​​方法,是以也是重試​

​n​

​​次,實作了​

​getSleepTimeMs​

​​方法,該方法傳回重試之間的睡眠時間​

​sleepMsBetweenRetries​

​。

​RetryOneTime​

​​類(繼承了​

​RetryNTimes​

​類):

public class RetryOneTime extends RetryNTimes
{
    public RetryOneTime(int sleepMsBetweenRetry)
    {
        super(1, sleepMsBetweenRetry);
    }
}      

隻重試一次的重試政策。

​ExponentialBackoffRetry​

​​類(繼承了​

​SleepingRetry​

​抽象類):

package org.apache.curator.retry;

import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Random;

public class ExponentialBackoffRetry extends SleepingRetry
{
    private static final Logger log = LoggerFactory.getLogger(ExponentialBackoffRetry.class);

    private static final int MAX_RETRIES_LIMIT = 29;
    private static final int DEFAULT_MAX_SLEEP_MS = Integer.MAX_VALUE;

    private final Random random = new Random();
    private final int baseSleepTimeMs;
    private final int maxSleepMs;

    /**
     * baseSleepTimeMs – 重試之間等待的初始時間
     * maxRetries – 最大重試次數
     */
    public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)
    {
        this(baseSleepTimeMs, maxRetries, DEFAULT_MAX_SLEEP_MS);
    }

    /**
     * baseSleepTimeMs – 重試之間等待的初始時間
     * maxRetries – 最大重試次數
     * maxSleepMs – 每次重試時休眠的最長時間(以毫秒為機關)
     */
    public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
    {
        super(validateMaxRetries(maxRetries));
        this.baseSleepTimeMs = baseSleepTimeMs;
        this.maxSleepMs = maxSleepMs;
    }

    @VisibleForTesting
    public int getBaseSleepTimeMs()
    {
        return baseSleepTimeMs;
    }

    @Override
    protected long getSleepTimeMs(int retryCount, long elapsedTimeMs)
    {
        long sleepMs = baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)));
        if ( sleepMs > maxSleepMs )
        {
            log.warn(String.format("Sleep extension too large (%d). Pinning to %d", sleepMs, maxSleepMs));
            sleepMs = maxSleepMs;
        }
        return sleepMs;
    }

    private static int validateMaxRetries(int maxRetries)
    {
        if ( maxRetries > MAX_RETRIES_LIMIT )
        {
            log.warn(String.format("maxRetries too large (%d). Pinning to %d", maxRetries, MAX_RETRIES_LIMIT));
            maxRetries = MAX_RETRIES_LIMIT;
        }
        return maxRetries;
    }
}      

重試​

​maxRetries​

​​次,​

​maxRetries​

​​如果大于最大重試次數限制(​

​MAX_RETRIES_LIMIT​

​​),即大于​

​29​

​​,​

​maxRetries​

​​就會被​

​MAX_RETRIES_LIMIT​

​​覆寫,否則不改變​

​maxRetries​

​​的大小。有意思的是​

​getSleepTimeMs​

​​方法擷取的重試之間的睡眠時間并不是不變的,而是随機得到的,并且随着重試次數的增加,睡眠時間的随機範圍不斷擴大(右邊界不斷擴大),如果随機得到的睡眠時間超過​

​maxSleepMs​

​​(如果沒有被指定,預設為​

​DEFAULT_MAX_SLEEP_MS​

​​, 即 ​

​Integer.MAX_VALUE​

​​),會被​

​maxSleepMs​

​​覆寫,而随機得到的睡眠時間小于​

​baseSleepTimeMs​

​​,也會被​

​baseSleepTimeMs​

​覆寫。

package org.apache.curator.retry;

import com.google.common.annotations.VisibleForTesting;

public class BoundedExponentialBackoffRetry extends ExponentialBackoffRetry
{
    private final int maxSleepTimeMs;

    /**
     * baseSleepTimeMs – 重試之間等待的初始時間
     * maxSleepTimeMs – 重試之間等待的最長時間
     * maxRetries – 重試的最大次數
     */
    public BoundedExponentialBackoffRetry(int baseSleepTimeMs, int maxSleepTimeMs, int maxRetries)
    {
        super(baseSleepTimeMs, maxRetries);
        this.maxSleepTimeMs = maxSleepTimeMs;
    }

    @VisibleForTesting
    public int getMaxSleepTimeMs()
    {
        return maxSleepTimeMs;
    }

    @Override
    protected long getSleepTimeMs(int retryCount, long elapsedTimeMs)
    {
        return Math.min(maxSleepTimeMs, super.getSleepTimeMs(retryCount, elapsedTimeMs));
    }
}      

​maxSleepTimeMs​

​​屬性和​

​maxSleepMs​

​​屬性作用是一樣的,都是為了限制睡眠時間的最大取值。隻是​

​maxSleepTimeMs​

​​屬性不會提供預設值,是以必須要指定它的大小。個人感覺​

​maxSleepTimeMs​

​屬性可以不需要。

連接配接

package com.kaven.zookeeper;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.*;

/**
 * @Author: ITKaven
 * @Date: 2021/11/20 10:30
 * @Leetcode: https://leetcode-cn.com/u/kavenit
 * @Notes:
 */

public class Application{

    private static final String SERVER_PROXY = "192.168.1.184:9000";
    private static final int TIMEOUT = 40000;

    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework curator = CuratorFrameworkFactory.builder()
                .connectString(SERVER_PROXY)
                .namespace("curator")
                .retryPolicy(retryPolicy)
                .connectionTimeoutMs(TIMEOUT)
                .sessionTimeoutMs(TIMEOUT)
                .build();

        curator.start();
        if (curator.getState().equals(CuratorFrameworkState.STARTED)) {
            System.out.println("連接配接成功!");
        }
    }
}      

輸出:

連接配接成功!      
  • ​connectString​

    ​​:​

    ​ZooKeeper​

    ​服務端的位址。
  • ​namespace​

    ​​:命名空間,類似​

    ​chroot​

    ​的功能,之後在該用戶端上的操作,都是基于該命名空間。
  • ​retryPolicy​

    ​:重試政策。
  • ​connectionTimeoutMs​

    ​:連接配接逾時時間。
  • ​sessionTimeoutMs​

    ​​:​

    ​Session​

    ​逾時時間。