天天看點

dubbo源碼學習(五)dubbo暴露服務的過程

初學dubbo的源碼,隻做嘗試性的去學習,做為自己學習的一個記錄,各位看官如果覺得寫的有錯誤或了解的不對,請在留言區告訴我,互相學習。本人能力有限,有大神進入 時請指點。

dubbo采用的nio異步的通信,通信協定預設為 netty,當然也可以選擇 mina,grizzy。在服務端(provider)在啟動時主要是開啟netty監聽,在zookeeper上注冊服務節點,處理消費者請求,傳回處理後的消息給消費者,消費者使用服務時主要是訂閱服務的節點,監聽zookeeper節點目錄,服務端的變化時zookeeper會推送給消費者,消費者重新緩存服務位址等。服務者、消費者、zookeeper三者之間都是長連接配接。

下面看dubbo源碼來看服務暴露的過程,服務暴露的入口為:com.alibaba.dubbo.config.ServiceConfig#export 方法,代碼如下:

//是否延時暴露
        if (delay != null && delay > 0) {
            Thread thread = new Thread(new Runnable() {
                public void run() {
                    try {
                        Thread.sleep(delay);
                    } catch (Throwable e) {
                    }
                    doExport();
                }
            });
            thread.setDaemon(true);
            thread.setName("DelayExportServiceThread");
            thread.start();
        } else {
            //不延時暴露,則直接暴露
            doExport();
        }
           

上在代碼無論是延時暴露或直接暴露調用的方法是:doExport(),doExport會對解析完的配置再做一次檢查,核心代碼大家可以檢視dubbo的源碼,下面列出一小部分

/*
            檢查預設設定,如果xml中沒有配置<dubbo:provider
            主要是從系統環境變量中尋找是否有相應的provider的配置
         */
        checkDefault();
        //下面設定的内容如果沒有配置<dubbo:provider時基本上都是Null
        if (provider != null) {
            if (application == null) {
                application = provider.getApplication();
            }
            if (module == null) {
                module = provider.getModule();
            }
            if (registries == null) {
                registries = provider.getRegistries();
            }
            if (monitor == null) {
                monitor = provider.getMonitor();
            }
            if (protocols == null) {
                protocols = provider.getProtocols();
            }
        }
        if (module != null) {
            //registries一般都會配置
            if (registries == null) {
                registries = module.getRegistries();
            }
            if (monitor == null) {
                monitor = module.getMonitor();
            }
        }
        if (application != null) {
            //application一般也會配置
            if (registries == null) {
                registries = application.getRegistries();
            }
            if (monitor == null) {
                monitor = application.getMonitor();
            }
        }
        //是否泛化調用
        if (ref instanceof GenericService) {
            interfaceClass = GenericService.class;
            if (StringUtils.isEmpty(generic)) {
                generic = Boolean.TRUE.toString();
            }
        } else {
            try {
                interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
                        .getContextClassLoader());
            } catch (ClassNotFoundException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            /*
                檢查即将暴露的接口的方法配置,檢查方法是否在接口中存在
                一般不會配置是以一般情況下methods為null
                <dubbo:service  > <dubbo:method /> </dubbo:serivce>
             */
            checkInterfaceAndMethods(interfaceClass, methods);
            /*
                檢查接口的引用不為空,并且必須實作的是要暴露的接口
             */
            checkRef();
            generic = Boolean.FALSE.toString();
        }
           

所有的檢查通過之後,會調用 :com.alibaba.dubbo.config.ServiceConfig#doExportUrls 方法,因為dubbo支援多通信協定時,都進行暴露,是以在代碼中可以看到

/*
            将注冊協定轉化成url
            registry://45.119.68.23:2181/com.alibaba.dubbo.registry.RegistryService?
            application=test-dubbo&dubbo=2.5.3&pid=7648&registry=zookeeper×tamp=1462349748801
         */
        List<URL> registryURLs = loadRegistries(true);
        //配置多通信協定時,都進行暴露
        for (ProtocolConfig protocolConfig : protocols) {
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
           

doExportUrlsFor1Protocol中主要将所有的配置轉化成map,然後将map轉化成dubbo的統一URL,最終暴露的dubbo服務也就是這個統一的url,這個url也會注冊到zookeeper的節點上,部分代碼如下:

/*
	将不為null的配置對象中的屬性設定到 map 中
	即将 xml 配置檔案中的配置設定的值全轉化成為map
	{side=provider, application=alijk-dubbo, accepts=1000,
		dubbo=2.5.3, threads=100, pid=7236, interface=cn.eoncloud.account.sdk.export.AccountService,
		threadpool=fixed, version=1.0.0, timeout=500, anyhost=true, timestamp=1462347843960}
 */
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, provider, Constants.DEFAULT_KEY);
appendParameters(map, protocolConfig);
appendParameters(map, this);
。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
/*
	将配置資訊轉化成 url ,主要根據之前map裡的資料組裝成url
	調用 URL#buildString方法
	dubbo://10.6.13.137:9998/cn.eoncloud.account.sdk.export.AccountService
	?accepts=1000&anyhost=true&application=test-dubbo&dubbo=2.5.3
	&interface=cn.eoncloud.account.sdk.export.AccountService
	&methods=getAccountName,getAllTest&pid=7236&revision=1.0.0&side=provider
	&threadpool=fixed&threads=100&timeout=500×tamp=1462347843960&version=1.0.0
 */
URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);

if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
		.hasExtension(url.getProtocol())) {
	url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
			.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
//com.alibaba.dubbo.registry.integration.RegistryProtocol#export 即将進行暴露
Exporter<?> exporter = protocol.export(invoker);
           

上面的代碼核心暴露的一行代碼為:protocol.export(invoker); 這個protocol的值為:RegistryProtocol,也就是暴露會跳到:RegistryProtocol.exprot中去處理,RegistryProtocol.exprot主要做兩件事情:1、開啟netty服務端  。2、建立zookeeper服務節點。下面來看RegistryProtocol.export 方法,代碼如下:

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //export invoker doLocalExport調用dubboProtocol.export開啟netty服務監聽
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
        //registry provider
        final Registry registry = getRegistry(originInvoker);
        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
        //調用zodoRegister的doRegister 建立zookeeper的服務節點
        registry.register(registedProviderUrl);
        // 訂閱override資料
        // FIXME 提供者訂閱時,會影響同一JVM即暴露服務,又引用同一服務的的場景,因為subscribed以服務名為緩存的key,導緻訂閱資訊覆寫。
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        //訂閱
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
        //保證每次export都傳回一個新的exporter執行個體
        return new Exporter<T>() {
            public Invoker<T> getInvoker() {
                return exporter.getInvoker();
            }
            public void unexport() {
            	try {
            		exporter.unexport();
            	} catch (Throwable t) {
                	logger.warn(t.getMessage(), t);
                }
                try {
                	registry.unregister(registedProviderUrl);
                } catch (Throwable t) {
                	logger.warn(t.getMessage(), t);
                }
                try {
                	overrideListeners.remove(overrideSubscribeUrl);
                	registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
                } catch (Throwable t) {
                	logger.warn(t.getMessage(), t);
                }
            }
        };
    }
           

上面的代碼裡有一段特别重要,關鍵性的代碼在doLocalExport中:

final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
//此處protol為dubboProtocol
exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker);
           

從上面的代碼中可以看到會調用dubboProtocol的export對服務進行暴露,這個export最終目的就是開啟netty的監聽,下面來看dubbo是如何一步一步開啟netty的

private void openServer(URL url) {
        // find server. ip:port
        String key = url.getAddress();
        //client 也可以暴露一個隻有server可以調用的服務。
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
        if (isServer) {
        	ExchangeServer server = serverMap.get(key);
        	if (server == null) {
                //建立 Server
        		serverMap.put(key, createServer(url));
        	} else {
        		//server支援reset,配合override功能使用
        		server.reset(url);
        	}
        }
    }
    
    private ExchangeServer createServer(URL url) {
        //預設開啟server關閉時發送readonly事件
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
        //預設開啟heartbeat
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
        //預設使用netty
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

        if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
        //預設使用dubbo協定編碼
        url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
        ExchangeServer server;
        try {
            //HeaderExchangeServer 在此處已經開啟了Netty Server 進行監聽
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        str = url.getParameter(Constants.CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return server;
    }
           

在上面的代碼中:Exchangers.bind(url, requestHandler)  預設為:HeaderExchanger.bind()

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        //Transporters預設為NettyTransporter
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }
           

代碼運作到這裡可以看到傳輸方式了,dubbo預設采用的通信方式為 NettyTransporter ,再來看NettyTransporter.bind方法

public static final String NAME = "netty";
    
    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }
           

已經能看到NettyServer了,dubbo在暴露服務最終開啟的netty服務監聽,監聽消費者發送的請求,通過反射調用方法得到結果通過 tcp/ip 網絡傳輸傳回給消費者。再進入到NettyServer中我們就能看到非常傳統的開啟Netty服務的代碼了

protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
        //最後一個參數為 NIO 最大工作線程數
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
        //netty server 啟動器
        bootstrap = new ServerBootstrap(channelFactory);
        
        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        channels = nettyHandler.getChannels();
        // https://issues.jboss.org/browse/NETTY-365
        // https://issues.jboss.org/browse/NETTY-379
        // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
                ChannelPipeline pipeline = Channels.pipeline();
                /*int idleTimeout = getIdleTimeout();
                if (idleTimeout > 10000) {
                    pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
                }*/
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
        // 建立一個綁定到指定位址的新通道,也就是綁定IP、端口供用戶端連接配接
        channel = bootstrap.bind(getBindAddress());
    }
           

上面的代碼執行完成後,netty的服務端就已經開啟了,可以接收用戶端的連接配接了,但用戶端連接配接上來要怎麼處理呢?消息接收、發送怎麼處理呢?所有的處理都在上面代碼的 NettyHandler類中,Nettyhandler繼承了Netty包中的的SimpleChannelHandler

NettyHandler extends SimpleChannelHandler 
           

重寫了 channelConnected、channelDisconnected、messageReceived等方法,而我們比較關注的可能是messagereceived方法,在收到消息時如何處理,但今天暫時先不看dubbo如果處理消息,隻看暴露,消息處理如何實作異步通信下一節再講。

/**
     * 收到消息時觸發
     * @param ctx
     * @param e
     * @throws Exception
     */
    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
        try {
            handler.received(channel, e.getMessage());
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
        }
    }
           

從前面知道,開啟netty服務是在RegistryProtocol.export 的 doLocalExport 中,在開啟了netty服務後,就是在zookeeper上注冊服務節點了,消費者在消費服務時會根據消費的接口名找到對應的zookeeper節點目錄,對目錄進行監聽,接收推送

//registry provider
final Registry registry = getRegistry(originInvoker);
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
//調用zodoRegister的doRegister 建立zookeeper的服務節點
registry.register(registedProviderUrl);
// 訂閱override資料
// FIXME 提供者訂閱時,會影響同一JVM即暴露服務,又引用同一服務的的場景,因為subscribed以服務名為緩存的key,導緻訂閱資訊覆寫。
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
//訂閱
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
           

dubbo服務在zookeeper上的節點注冊是:com.alibaba.dubbo.registry.support.FailbackRegistry#register

@Override
    public void register(URL url) {
        super.register(url);
        failedRegistered.remove(url);
        failedUnregistered.remove(url);
        try {
            // 向伺服器端發送注冊請求
            doRegister(url);
           

因為doRegister是一個抽象的方法,檢視他的實作可以看到如下圖:

dubbo源碼學習(五)dubbo暴露服務的過程

從上圖可以看到doRegister實作有 dubbo、redis、zookeeper,這也是在我們配置時經常看到的 注冊協定的配置 ,最為常用的就是 zookeeper了,是以再看ZookeeperRegistry的代碼,看他的doRegistry幹什麼了如下

protected void doRegister(URL url) {
        try {
        	zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
           

其實從上面已經可以看到 在zookeeper上面建立 節點了,預設不分組的情況下,服務結構如下:/dubbo/XXXXservice/consumers、providers

dubbo源碼學習(五)dubbo暴露服務的過程

至此,dubbo的暴露基本上已經完成,開啟了netty服務,注冊了zookeeper的節點,就等着消費者連接配接上來使用了。下一節将介紹dubbo的消息發送和接收,NIO異步通訊的實作。

繼續閱讀