天天看點

源碼分析Dubbo服務提供者啟動流程-下篇

本文繼續上文Dubbo服務提供者啟動流程,在上篇文章中詳細梳理了基于dubbo spring檔案的配置方式,Dubbo是如何加載配置檔案,服務提供者dubbo:service标簽服務暴露全流程,本節重點關注RegistryProtocol#export中調用doLocalExport方法,其實主要是根據各自協定,服務提供者建立網絡伺服器,在特定端口建立監聽,監聽來自消息消費端服務的請求。

RegistryProtocol#doLocalExport:

private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
        String key = getCacheKey(originInvoker);
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
        if (exporter == null) {
            synchronized (bounds) {
                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                if (exporter == null) {
                    final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));   // @1
                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);    // @2
                    bounds.put(key, exporter);
                }
            }
        }
        return exporter;
    }           

代碼@1:如果服務提供者以dubbo協定暴露服務,getProviderUrl(originInvoker)傳回的URL将以dubbo://開頭。

代碼@2:根據Dubbo内置的SPI機制,将調用DubboProtocol#export方法。

1、源碼分析DubboProtocol#export

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();     // @1
        // export service.
        String key = serviceKey(url);      // @2
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);

        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);    //@3  start
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);                                                  
        if (isStubSupportEvent && !isCallbackservice) {                                                                                                                        
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);                                                                      
            }
        }   // @3 end

        openServer(url);   // @4
        optimizeSerialization(url);  // @5
        return exporter;                
    }           

代碼@1:擷取服務提供者URL,以協定名稱,這裡是dubbo://開頭。

代碼@2:從服務提供者URL中擷取服務名,key: interface:port,例如:com.alibaba.dubbo.demo.DemoService:20880。

代碼@3:是否将轉發事件導出成stub。

代碼@4:根據url打開服務,下面将詳細分析其實作。

代碼@5:根據url優化器序列化方式。

2、源碼分析DubboProtocol#openServer

private void openServer(URL url) {
        // find server.
        String key = url.getAddress();    // @1
        //client can export a service which's only for server to invoke
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        if (isServer) {
            ExchangeServer server = serverMap.get(key);           // @2
            if (server == null) {
                serverMap.put(key, createServer(url));                    //@3
            } else {
                // server supports reset, use together with override
                server.reset(url);                                                       //@4
            }
        }
    }           

代碼@1:根據url擷取網絡位址:ip:port,例如:192.168.56.1:20880,服務提供者IP與暴露服務端口号。

代碼@2:根據key從伺服器緩存中擷取,如果存在,則執行代碼@4,如果不存在,則執行代碼@3.

代碼@3:根據URL建立一伺服器,Dubbo服務提供者伺服器實作類為ExchangeServer。

代碼@4:如果伺服器已經存在,用目前URL重置伺服器,這個不難了解,因為一個Dubbo服務中,會存在多個dubbo:service标簽,這些标簽都會在服務台提供者的同一個IP位址、端口号上暴露服務。

2.1 源碼分析DubboProtocol#createServer

private ExchangeServer createServer(URL url) {
        // send readonly event when server closes, it's enabled by default
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());    // @1
        // enable heartbeat by default
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));     // @2
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);  // @3

        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))    // @4
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);

        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);       // @5
        ExchangeServer server;
        try {
            server = Exchangers.bind(url, requestHandler);    // @6
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        str = url.getParameter(Constants.CLIENT_KEY);     //@7
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return server;
    }           

代碼@1:為服務提供者url增加channel.readonly.sent屬性,預設為true,表示在發送請求時,是否等待将位元組寫入socket後再傳回,預設為true。

代碼@2:為服務提供者url增加heartbeat屬性,表示心跳間隔時間,預設為60*1000,表示60s。

代碼@3:為服務提供者url增加server屬性,可選值為netty,mina等等,預設為netty。

代碼@4:根據SPI機制,判斷server屬性是否支援。

代碼@5:為服務提供者url增加codec屬性,預設值為dubbo,協定編碼方式。

代碼@6:根據服務提供者URI,服務提供者指令請求處理器requestHandler建構ExchangeServer執行個體。requestHandler的實作具體在以後詳細分析Dubbo服務調用時再詳細分析。

代碼@7:驗證用戶端類型是否可用。

2.1.1 源碼分析Exchangers.bind 根據URL、ExchangeHandler建構伺服器

public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        return getExchanger(url).bind(url, handler);
    }           

上述代碼不難看出,首先根據url擷取Exchanger執行個體,然後調用bind方法建構ExchangeServer,Exchanger接口如下

源碼分析Dubbo服務提供者啟動流程-下篇
  • ExchangeServer bind(URL url, ExchangeHandler handler) : 服務提供者調用。
  • ExchangeClient connect(URL url, ExchangeHandler handler):服務消費者調用。

dubbo提供的實作類為:HeaderExchanger,其bind方法如下:

HeaderExchanger#bind

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}           

從這裡可以看出,端口的綁定由Transporters的bind方法實作。

2.1.2 源碼分析Transporters.bind方法

public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        }
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        return getTransporter().bind(url, handler);
    }

public static Transporter getTransporter() {
        return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}           

從這裡得知,Dubbo網絡傳輸的接口有Transporter接口實作,其繼承類圖所示:

源碼分析Dubbo服務提供者啟動流程-下篇

本文以netty版本來檢視一下Transporter實作。

NettyTransporter源碼如下:

public class NettyTransporter implements Transporter {

    public static final String NAME = "netty";

    @Override
    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }

    @Override
    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyClient(url, listener);
    }
}           

NettyServer建立網絡連接配接的實作方法為:

protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
        bootstrap = new ServerBootstrap(channelFactory);

        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);      // @1
        channels = nettyHandler.getChannels();
        // https://issues.jboss.org/browse/NETTY-365
        // https://issues.jboss.org/browse/NETTY-379
        // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                ChannelPipeline pipeline = Channels.pipeline();
                /*int idleTimeout = getIdleTimeout();
                if (idleTimeout > 10000) {
                    pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
                }*/
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);     // @2
                return pipeline;
            }
        });
        // bind
        channel = bootstrap.bind(getBindAddress());
    }           

熟悉本方法需要具備Netty的知識,有關源碼:

閱讀Netty系列文章

,這裡不對每一行代碼進行解讀,對于與網絡相關的參數,将在後續文章中詳細講解,本方法@1、@2引起了我的注意,首先建立NettyServer必須傳入一個服務提供者URL,但從DubboProtocol#createServer中可以看出,Server是基于網絡套接字(ip:port)緩存的,一個JVM應用中,必然會存在多個dubbo:server标簽,就會有多個URL,這裡為什麼可以這樣做呢?從DubboProtocol#createServer中可以看出,在解析第二個dubbo:service标簽時并不會調用createServer,而是會調用Server#reset方法,是不是這個方法有什麼魔法,在reset方法時能将URL也注冊到Server上,那接下來分析NettyServer#reset方法是如何實作的。

2.2 源碼分析DdubboProtocol#reset

reset方法最終将用Server的reset方法,同樣還是以netty版本的NettyServer為例,檢視reset方法的實作原理。NettyServer#reset--->父類(AbstractServer)

AbstractServer#reset

public void reset(URL url) {
        if (url == null) {
            return;
        }
        try {                                                                                                       // @1 start
            if (url.hasParameter(Constants.ACCEPTS_KEY)) {
                int a = url.getParameter(Constants.ACCEPTS_KEY, 0);
                if (a > 0) {
                    this.accepts = a;
                }
            }
        } catch (Throwable t) {
            logger.error(t.getMessage(), t);
        }
        try {
            if (url.hasParameter(Constants.IDLE_TIMEOUT_KEY)) {
                int t = url.getParameter(Constants.IDLE_TIMEOUT_KEY, 0);
                if (t > 0) {
                    this.idleTimeout = t;
                }
            }
        } catch (Throwable t) {
            logger.error(t.getMessage(), t);
        }
        try {
            if (url.hasParameter(Constants.THREADS_KEY)
                    && executor instanceof ThreadPoolExecutor && !executor.isShutdown()) {
                ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
                int threads = url.getParameter(Constants.THREADS_KEY, 0);
                int max = threadPoolExecutor.getMaximumPoolSize();
                int core = threadPoolExecutor.getCorePoolSize();
                if (threads > 0 && (threads != max || threads != core)) {
                    if (threads < core) {
                        threadPoolExecutor.setCorePoolSize(threads);
                        if (core == max) {
                            threadPoolExecutor.setMaximumPoolSize(threads);
                        }
                    } else {
                        threadPoolExecutor.setMaximumPoolSize(threads);
                        if (core == max) {
                            threadPoolExecutor.setCorePoolSize(threads);
                        }
                    }
                }
            }
        } catch (Throwable t) {
            logger.error(t.getMessage(), t);
        }              // @1 end
        super.setUrl(getUrl().addParameters(url.getParameters()));    // @2
    }           

代碼@1:首先是調整線程池的相關線程數量,這個好了解。、

代碼@2:然後設定調用setUrl覆寫原先NettyServer的private volatile URL url的屬性,那為什麼不會影響原先注冊的dubbo:server呢?

原來NettyHandler上加了注解:@Sharable,由該注解去實作線程安全。

Dubbo服務提供者啟動流程将分析到這裡了,本文并未對網絡細節進行詳細分析,旨在梳理出啟動流程,有關Dubbo服務網絡實作原理将在後續章節中詳細分析,敬請期待。

原文釋出時間為:2019-02-15

本文作者:丁威,《RocketMQ技術内幕》作者。

本文來自

中間件興趣圈

,了解相關資訊可以關注