開篇
這篇文章主要的目的是想分析下dubbo優雅停機的過程,整個文章參考網上很多現成的文章,本着尊重原創的精神會在文章中備注參考資訊。
針對閱讀dubbo源碼,我的感覺是當你一開始鑽到細節當中就很容易一葉障目了,是以建議一開始着重梳理整個架構的邏輯而不要陷入細節當中。
優雅停機的原理
說明:
- dubbo的優雅停機是建立在JVM的addShutdownHook回調的機制上的,通過注冊回調調用停機的邏輯ProtocolConfig.destroyAll()
- ProtocolConfig.destroyAll()執行邏輯是:1、關閉注冊中心;2、關閉釋出協定服務。
- 關閉注冊中心:AbstractRegistryFactory.destroyAll()。
- 關閉釋出的協定服務:protocol.destroy()。
public abstract class AbstractConfig implements Serializable {
static {
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
if (logger.isInfoEnabled()) {
logger.info("Run shutdown hook now.");
}
ProtocolConfig.destroyAll();
}
}, "DubboShutdownHook"));
}
}
public class ProtocolConfig extends AbstractConfig {
public static void destroyAll() {
if (!destroyed.compareAndSet(false, true)) {
return;
}
// 關閉注冊中心
AbstractRegistryFactory.destroyAll();
// 關閉所有已釋出的協定如dubbo服務
ExtensionLoader<Protocol> loader = ExtensionLoader.getExtensionLoader(Protocol.class);
for (String protocolName : loader.getLoadedExtensions()) {
try {
Protocol protocol = loader.getLoadedExtension(protocolName);
if (protocol != null) {
protocol.destroy();
}
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
}
- 圖檔來自 Dubbo優雅停機
- B服務作為Provider需要進行優雅停機。
- B服務首先斷開和注冊中心的連接配接。
- B服務關閉提供服務的Server端的監聽,保證不接受請求。
- B服務關閉引用的C和D服務,保證不再調用下遊服務。
優雅停機過程-注冊中心關閉
- 注冊中心關閉通過LOCK來保證不重入,此例中以ZookeeperRegistry為例。
- ZookeeperRegistry的關閉順序:1、關閉注冊中心;2、斷開和zookeeper的連接配接。
- 關閉注冊中心按照調用鍊路走到FailbackRegistry,關閉注冊中心并停掉重試操作。
- 關閉注冊中心按照調用鍊路走到AbstractRegistry,按照先移除作為provider的URL,再移除作為consumer的訂閱的consumer資訊。
- 具體的資訊看下面的源碼,已經按照繼承關系組織好了。
public abstract class AbstractRegistryFactory implements RegistryFactory {
public static void destroyAll() {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Close all registries " + getRegistries());
}
// Lock up the registry shutdown process
LOCK.lock();
try {
for (Registry registry : getRegistries()) {
try {
registry.destroy();
} catch (Throwable e) {
LOGGER.error(e.getMessage(), e);
}
}
REGISTRIES.clear();
} finally {
// Release the lock
LOCK.unlock();
}
}
}
public class ZookeeperRegistry extends FailbackRegistry {
public void destroy() {
// 調用父類FailbackRegistry關閉注冊中心
super.destroy();
try {
// 關閉zkClient用戶端保證臨時provider節點下線
zkClient.close();
} catch (Exception e) {
logger.warn("Failed to close zookeeper client " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
}
public abstract class FailbackRegistry extends AbstractRegistry {
public void destroy() {
if (!canDestroy()){
return;
}
super.destroy();
try {
// 首先要明白FailbackRegistry的核心就在于失敗重試,是以這一層的關閉隻要關閉retryFuture就可以
retryFuture.cancel(true);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
public abstract class AbstractRegistry implements Registry {
public void destroy() {
if (!destroyed.compareAndSet(false, true)) {
return;
}
if (logger.isInfoEnabled()) {
logger.info("Destroy registry:" + getUrl());
}
// 作為provider,取消所有的服務注冊
Set<URL> destroyRegistered = new HashSet<URL>(getRegistered());
if (!destroyRegistered.isEmpty()) {
for (URL url : new HashSet<URL>(getRegistered())) {
if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
try {
// 從已注冊的清單中移除該URL
unregister(url);
if (logger.isInfoEnabled()) {
logger.info("Destroy unregister url " + url);
}
} catch (Throwable t) {
logger.warn("Failed to unregister url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
}
}
}
}
// 作為consumer,取消所有的訂閱關系
Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
if (!destroySubscribed.isEmpty()) {
for (Map.Entry<URL, Set<NotifyListener>> entry : destroySubscribed.entrySet()) {
URL url = entry.getKey();
for (NotifyListener listener : entry.getValue()) {
try {
unsubscribe(url, listener);
if (logger.isInfoEnabled()) {
logger.info("Destroy unsubscribe url " + url);
}
} catch (Throwable t) {
logger.warn("Failed to unsubscribe url " + url + " to registry " + getUrl() + " on destroy, cause: " + t.getMessage(), t);
}
}
}
}
}
public void unregister(URL url) {
if (url == null) {
throw new IllegalArgumentException("unregister url == null");
}
if (logger.isInfoEnabled()) {
logger.info("Unregister: " + url);
}
registered.remove(url);
}
public void unsubscribe(URL url, NotifyListener listener) {
if (url == null) {
throw new IllegalArgumentException("unsubscribe url == null");
}
if (listener == null) {
throw new IllegalArgumentException("unsubscribe listener == null");
}
if (logger.isInfoEnabled()) {
logger.info("Unsubscribe: " + url);
}
Set<NotifyListener> listeners = subscribed.get(url);
if (listeners != null) {
listeners.remove(listener);
}
}
}
優雅停機過程-協定關閉
- 協定關閉按照以下順序進行:1、關閉provider端的監聽;2、關閉作為consumer的reference的服務;3、調用父類針對exporter對象進行清理。
- 關閉provider端的監聽:關閉provider端的監聽(server.close)。
- 關閉consumer的服務:關閉dubbo服務引用的服務(client.close)。
- 調用父類清理exporter:清理exporter服務(super.destroy)。
public class DubboProtocol extends AbstractProtocol {
public void destroy() {
// 關停所有的Server,作為provider将不再接收新的請求
for (String key : new ArrayList<String>(serverMap.keySet())) {
ExchangeServer server = serverMap.remove(key);
if (server != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close dubbo server: " + server.getLocalAddress());
}
server.close(getServerShutdownTimeout());
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
// 關停所有的Client,作為consumer将不再發送新的請求
for (String key : new ArrayList<String>(referenceClientMap.keySet())) {
ExchangeClient client = referenceClientMap.remove(key);
if (client != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress());
}
client.close(getServerShutdownTimeout());
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
// 幽靈用戶端的處理邏輯,不清楚幽靈用戶端是啥?
for (String key : new ArrayList<String>(ghostClientMap.keySet())) {
ExchangeClient client = ghostClientMap.remove(key);
if (client != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress());
}
client.close(getServerShutdownTimeout());
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
stubServiceMethodsMap.clear();
// 調用父類繼續進行清理,針對exporter對象進行清理
super.destroy();
}
}
provider監聽的close過程
- provider監聽的close過程:關閉心跳檢測操作,關閉底層netty服務的監聽channel管道。
- 關閉心跳檢測操作:doClose()。
- 關閉底層netty監聽:server.close(timeout)。
public class HeaderExchangeServer implements ExchangeServer {
public void close(final int timeout) {
startClose();
if (timeout > 0) {
final long max = (long) timeout;
final long start = System.currentTimeMillis();
if (getUrl().getParameter(Constants.CHANNEL_SEND_READONLYEVENT_KEY, true)) {
sendChannelReadOnlyEvent();
}
// 如果還有進行中的任務并且沒有到達等待時間的上限,則繼續等待
while (HeaderExchangeServer.this.isRunning()
&& System.currentTimeMillis() - start < max) {
try {
// 休息10毫秒再檢查
Thread.sleep(10);
} catch (InterruptedException e) {
logger.warn(e.getMessage(), e);
}
}
}
// 關閉心跳,停止應答
doClose();
// 關閉通信通道
server.close(timeout);
}
private void doClose() {
// 修改标記位,該标記為設定為true後,provider不再對上遊請求做應答
if (!closed.compareAndSet(false, true)) {
return;
}
// 取消心跳的Futrue
stopHeartbeatTimer();
try {
// 關閉心跳的線程池
scheduled.shutdown();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
public abstract class AbstractServer extends AbstractEndpoint implements Server {
public void close() {
if (logger.isInfoEnabled()) {
logger.info("Close " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
ExecutorUtil.shutdownNow(executor, 100);
try {
// 設定關閉的标記位
super.close();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
// 執行真正的關閉動作
doClose();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
protected abstract void doClose() throws Throwable;
}
public class NettyServer extends AbstractServer implements Server {
protected void doClose() throws Throwable {
try {
if (channel != null) {
// unbind.
channel.close();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
Collection<com.alibaba.dubbo.remoting.Channel> channels = getChannels();
if (channels != null && channels.size() > 0) {
for (com.alibaba.dubbo.remoting.Channel channel : channels) {
try {
channel.close();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
if (bootstrap != null) {
// release external resource.
bootstrap.releaseExternalResources();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
if (channels != null) {
channels.clear();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
}
client的清理過程
- client的關閉過程本質上是關閉引用服務的channel對象。
- client的關閉順序按照:設定關閉标記位,關閉心跳檢測,關閉通道。
public class HeaderExchangeClient implements ExchangeClient {
public void close(int timeout) {
startClose();
doClose();
channel.close(timeout);
}
public void startClose() {
channel.startClose();
}
private void doClose() {
stopHeartbeatTimer();
}
}
exporter清理過程
- exporter的清理主要包括invoker和exporter兩個對象的清理。
- invoker和exporter兩個對象的具體作用暫時還未理清楚,待定。
- exporter的清理最終還是走到了invoker的清理過程當中。
public abstract class AbstractProtocol implements Protocol {
public void destroy() {
for (Invoker<?> invoker : invokers) {
if (invoker != null) {
// 移除invokers
invokers.remove(invoker);
try {
if (logger.isInfoEnabled()) {
logger.info("Destroy reference: " + invoker.getUrl());
}
// 銷毀invokers
invoker.destroy();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
for (String key : new ArrayList<String>(exporterMap.keySet())) {
// 移除exporter
Exporter<?> exporter = exporterMap.remove(key);
if (exporter != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Unexport service: " + exporter.getInvoker().getUrl());
}
// 銷毀exporter
exporter.unexport();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
}
}
public class DubboInvoker<T> extends AbstractInvoker<T> {
public void destroy() {
if (super.isDestroyed()) {
return;
} else {
destroyLock.lock();
try {
if (super.isDestroyed()) {
return;
}
super.destroy();
if (invokers != null) {
invokers.remove(this);
}
for (ExchangeClient client : clients) {
try {
client.close(getShutdownTimeout());
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
} finally {
destroyLock.unlock();
}
}
}
}
public abstract class AbstractExporter<T> implements Exporter<T> {
protected final Logger logger = LoggerFactory.getLogger(getClass());
private final Invoker<T> invoker;
private volatile boolean unexported = false;
public AbstractExporter(Invoker<T> invoker) {
if (invoker == null)
throw new IllegalStateException("service invoker == null");
if (invoker.getInterface() == null)
throw new IllegalStateException("service type == null");
if (invoker.getUrl() == null)
throw new IllegalStateException("service url == null");
this.invoker = invoker;
}
public Invoker<T> getInvoker() {
return invoker;
}
public void unexport() {
if (unexported) {
return;
}
unexported = true;
getInvoker().destroy();
}
public String toString() {
return getInvoker().toString();
}
}