天天看點

dubbo - 優雅停機

開篇

這篇文章主要的目的是想分析下dubbo優雅停機的過程,整個文章參考網上很多現成的文章,本着尊重原創的精神會在文章中備注參考資訊。

針對閱讀dubbo源碼,我的感覺是當你一開始鑽到細節當中就很容易一葉障目了,是以建議一開始着重梳理整個架構的邏輯而不要陷入細節當中。

優雅停機的原理

說明:

  • dubbo的優雅停機是建立在JVM的addShutdownHook回調的機制上的,通過注冊回調調用停機的邏輯ProtocolConfig.destroyAll()
  • ProtocolConfig.destroyAll()執行邏輯是:1、關閉注冊中心;2、關閉釋出協定服務。
  • 關閉注冊中心:AbstractRegistryFactory.destroyAll()。
  • 關閉釋出的協定服務:protocol.destroy()。
public abstract class AbstractConfig implements Serializable {
    static {
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            public void run() {
                if (logger.isInfoEnabled()) {
                    logger.info("Run shutdown hook now.");
                }
                ProtocolConfig.destroyAll();
            }
        }, "DubboShutdownHook"));
    }
}


public class ProtocolConfig extends AbstractConfig {

    public static void destroyAll() {
        if (!destroyed.compareAndSet(false, true)) {
            return;
        }

        // 關閉注冊中心
        AbstractRegistryFactory.destroyAll();

        // 關閉所有已釋出的協定如dubbo服務
        ExtensionLoader<Protocol> loader = ExtensionLoader.getExtensionLoader(Protocol.class);
        for (String protocolName : loader.getLoadedExtensions()) {
            try {
                Protocol protocol = loader.getLoadedExtension(protocolName);
                if (protocol != null) {
                    protocol.destroy();
                }
            } catch (Throwable t) {
                logger.warn(t.getMessage(), t);
            }
        }
    }
}           
  • 圖檔來自 Dubbo優雅停機
  • B服務作為Provider需要進行優雅停機。
  • B服務首先斷開和注冊中心的連接配接。
  • B服務關閉提供服務的Server端的監聽,保證不接受請求。
  • B服務關閉引用的C和D服務,保證不再調用下遊服務。

優雅停機過程-注冊中心關閉

  • 注冊中心關閉通過LOCK來保證不重入,此例中以ZookeeperRegistry為例。
  • ZookeeperRegistry的關閉順序:1、關閉注冊中心;2、斷開和zookeeper的連接配接。
  • 關閉注冊中心按照調用鍊路走到FailbackRegistry,關閉注冊中心并停掉重試操作。
  • 關閉注冊中心按照調用鍊路走到AbstractRegistry,按照先移除作為provider的URL,再移除作為consumer的訂閱的consumer資訊。
  • 具體的資訊看下面的源碼,已經按照繼承關系組織好了。
public abstract class AbstractRegistryFactory implements RegistryFactory {
    public static void destroyAll() {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Close all registries " + getRegistries());
        }
        // Lock up the registry shutdown process
        LOCK.lock();
        try {
            for (Registry registry : getRegistries()) {
                try {
                    registry.destroy();
                } catch (Throwable e) {
                    LOGGER.error(e.getMessage(), e);
                }
            }
            REGISTRIES.clear();
        } finally {
            // Release the lock
            LOCK.unlock();
        }
    }
}


public class ZookeeperRegistry extends FailbackRegistry {
    public void destroy() {
        // 調用父類FailbackRegistry關閉注冊中心
        super.destroy();
        try {
            // 關閉zkClient用戶端保證臨時provider節點下線
            zkClient.close();
        } catch (Exception e) {
            logger.warn("Failed to close zookeeper client " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
}


public abstract class FailbackRegistry extends AbstractRegistry {
    public void destroy() {
        if (!canDestroy()){
            return;
        }
        super.destroy();
        try {
            // 首先要明白FailbackRegistry的核心就在于失敗重試,是以這一層的關閉隻要關閉retryFuture就可以
            retryFuture.cancel(true);
        } catch (Throwable t) {
            logger.warn(t.getMessage(), t);
        }
    }
}


public abstract class AbstractRegistry implements Registry {
    public void destroy() {
        if (!destroyed.compareAndSet(false, true)) {
            return;
        }

        if (logger.isInfoEnabled()) {
            logger.info("Destroy registry:" + getUrl());
        }
        // 作為provider,取消所有的服務注冊
        Set<URL> destroyRegistered = new HashSet<URL>(getRegistered());
        if (!destroyRegistered.isEmpty()) {
            for (URL url : new HashSet<URL>(getRegistered())) {
                if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
                    try {
                        // 從已注冊的清單中移除該URL
                        unregister(url);
                        if (logger.isInfoEnabled()) {
                            logger.info("Destroy unregister url " + url);
                        }
                    } catch (Throwable t) {
                        logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
                    }
                }
            }
        }
        // 作為consumer,取消所有的訂閱關系
        Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
        if (!destroySubscribed.isEmpty()) {
            for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
                URL url = entry.getKey();
                for (NotifyListener listener : entry.getValue()) {
                    try {
                        unsubscribe(url, listener);
                        if (logger.isInfoEnabled()) {
                            logger.info("Destroy unsubscribe url " + url);
                        }
                    } catch (Throwable t) {
                        logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
                    }
                }
            }
        }
    }


    public void unregister(URL url) {
        if (url == null) {
            throw new IllegalArgumentException("unregister url == null");
        }
        if (logger.isInfoEnabled()) {
            logger.info("Unregister: " + url);
        }
        registered.remove(url);
    }


    public void unsubscribe(URL url, NotifyListener listener) {
        if (url == null) {
            throw new IllegalArgumentException("unsubscribe url == null");
        }
        if (listener == null) {
            throw new IllegalArgumentException("unsubscribe listener == null");
        }
        if (logger.isInfoEnabled()) {
            logger.info("Unsubscribe: " + url);
        }
        Set<NotifyListener> listeners = subscribed.get(url);
        if (listeners != null) {
            listeners.remove(listener);
        }
    }
}           

優雅停機過程-協定關閉

  • 協定關閉按照以下順序進行:1、關閉provider端的監聽;2、關閉作為consumer的reference的服務;3、調用父類針對exporter對象進行清理。
  • 關閉provider端的監聽:關閉provider端的監聽(server.close)。
  • 關閉consumer的服務:關閉dubbo服務引用的服務(client.close)。
  • 調用父類清理exporter:清理exporter服務(super.destroy)。
public class DubboProtocol extends AbstractProtocol {

    public void destroy() {

        // 關停所有的Server,作為provider将不再接收新的請求
        for (String key : new ArrayList<String>(serverMap.keySet())) {
            ExchangeServer server = serverMap.remove(key);
            if (server != null) {
                try {
                    if (logger.isInfoEnabled()) {
                        logger.info("Close dubbo server: " + server.getLocalAddress());
                    }
                    server.close(getServerShutdownTimeout());
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        }

        // 關停所有的Client,作為consumer将不再發送新的請求
        for (String key : new ArrayList<String>(referenceClientMap.keySet())) {
            ExchangeClient client = referenceClientMap.remove(key);
            if (client != null) {
                try {
                    if (logger.isInfoEnabled()) {
                        logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress());
                    }
                    client.close(getServerShutdownTimeout());
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        }

        // 幽靈用戶端的處理邏輯,不清楚幽靈用戶端是啥?
        for (String key : new ArrayList<String>(ghostClientMap.keySet())) {
            ExchangeClient client = ghostClientMap.remove(key);
            if (client != null) {
                try {
                    if (logger.isInfoEnabled()) {
                        logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress());
                    }
                    client.close(getServerShutdownTimeout());
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        }
        stubServiceMethodsMap.clear();

        // 調用父類繼續進行清理,針對exporter對象進行清理
        super.destroy();
    }
}           

provider監聽的close過程

  • provider監聽的close過程:關閉心跳檢測操作,關閉底層netty服務的監聽channel管道。
  • 關閉心跳檢測操作:doClose()。
  • 關閉底層netty監聽:server.close(timeout)。
public class HeaderExchangeServer implements ExchangeServer {
    public void close(final int timeout) {
        startClose();
        if (timeout > 0) {
            final long max = (long) timeout;
            final long start = System.currentTimeMillis();
            if (getUrl().getParameter(Constants.CHANNEL_SEND_READONLYEVENT_KEY, true)) {
                sendChannelReadOnlyEvent();
            }

            // 如果還有進行中的任務并且沒有到達等待時間的上限,則繼續等待
            while (HeaderExchangeServer.this.isRunning()
                    && System.currentTimeMillis() - start < max) {
                try {
                    // 休息10毫秒再檢查
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
        // 關閉心跳,停止應答
        doClose();

        // 關閉通信通道
        server.close(timeout);
    }

    private void doClose() {
        // 修改标記位,該标記為設定為true後,provider不再對上遊請求做應答
        if (!closed.compareAndSet(false, true)) {
            return;
        }
         // 取消心跳的Futrue
        stopHeartbeatTimer();
        try {
             // 關閉心跳的線程池
            scheduled.shutdown();
        } catch (Throwable t) {
            logger.warn(t.getMessage(), t);
        }
    }
}


public abstract class AbstractServer extends AbstractEndpoint implements Server {
    public void close() {
        if (logger.isInfoEnabled()) {
            logger.info("Close " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
        }
        ExecutorUtil.shutdownNow(executor, 100);
        try {
            // 設定關閉的标記位
            super.close();
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
        try {
            // 執行真正的關閉動作
            doClose();
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
    }

    protected abstract void doClose() throws Throwable;
}



public class NettyServer extends AbstractServer implements Server {
    protected void doClose() throws Throwable {
        try {
            if (channel != null) {
                // unbind.
                channel.close();
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
        try {
            Collection<com.alibaba.dubbo.remoting.Channel> channels = getChannels();
            if (channels != null && channels.size() > 0) {
                for (com.alibaba.dubbo.remoting.Channel channel : channels) {
                    try {
                        channel.close();
                    } catch (Throwable e) {
                        logger.warn(e.getMessage(), e);
                    }
                }
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
        try {
            if (bootstrap != null) {
                // release external resource.
                bootstrap.releaseExternalResources();
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
        try {
            if (channels != null) {
                channels.clear();
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
    }
}           

client的清理過程

  • client的關閉過程本質上是關閉引用服務的channel對象。
  • client的關閉順序按照:設定關閉标記位,關閉心跳檢測,關閉通道。
public class HeaderExchangeClient implements ExchangeClient {

    public void close(int timeout) {
        startClose();
        doClose();
        channel.close(timeout);
    }

    public void startClose() {
        channel.startClose();
    }

    private void doClose() {
        stopHeartbeatTimer();
    }
}           

exporter清理過程

  • exporter的清理主要包括invoker和exporter兩個對象的清理。
  • invoker和exporter兩個對象的具體作用暫時還未理清楚,待定。
  • exporter的清理最終還是走到了invoker的清理過程當中。
public abstract class AbstractProtocol implements Protocol {

    public void destroy() {
        for (Invoker<?> invoker : invokers) {
            if (invoker != null) {
                // 移除invokers
                invokers.remove(invoker);
                try {
                    if (logger.isInfoEnabled()) {
                        logger.info("Destroy reference: " + invoker.getUrl());
                    }
                    // 銷毀invokers
                    invoker.destroy();
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        }
        for (String key : new ArrayList<String>(exporterMap.keySet())) {
            // 移除exporter
            Exporter<?> exporter = exporterMap.remove(key);
            if (exporter != null) {
                try {
                    if (logger.isInfoEnabled()) {
                        logger.info("Unexport service: " + exporter.getInvoker().getUrl());
                    }
                    // 銷毀exporter
                    exporter.unexport();
                } catch (Throwable t) {
                    logger.warn(t.getMessage(), t);
                }
            }
        }
    }
}



public class DubboInvoker<T> extends AbstractInvoker<T> {

    public void destroy() {
        if (super.isDestroyed()) {
            return;
        } else {
            destroyLock.lock();
            try {
                if (super.isDestroyed()) {
                    return;
                }
                super.destroy();
                if (invokers != null) {
                    invokers.remove(this);
                }
                for (ExchangeClient client : clients) {
                    try {
                        client.close(getShutdownTimeout());
                    } catch (Throwable t) {
                        logger.warn(t.getMessage(), t);
                    }
                }

            } finally {
                destroyLock.unlock();
            }
        }
    }
}


public abstract class AbstractExporter<T> implements Exporter<T> {

    protected final Logger logger = LoggerFactory.getLogger(getClass());

    private final Invoker<T> invoker;

    private volatile boolean unexported = false;

    public AbstractExporter(Invoker<T> invoker) {
        if (invoker == null)
            throw new IllegalStateException("service invoker == null");
        if (invoker.getInterface() == null)
            throw new IllegalStateException("service type == null");
        if (invoker.getUrl() == null)
            throw new IllegalStateException("service url == null");
        this.invoker = invoker;
    }

    public Invoker<T> getInvoker() {
        return invoker;
    }

    public void unexport() {
        if (unexported) {
            return;
        }
        unexported = true;
        getInvoker().destroy();
    }

    public String toString() {
        return getInvoker().toString();
    }
}